Skip to main content
Polling state endpoints does not scale for reactive systems. MuBit provides event streams so workers can wake on state changes.

Decision model

Stream typeMethodBest use
Run-scoped control eventscontrol.subscribeWorkflow coordination inside one run_id
Semantic event subscriptionscore.subscribe_eventsTopic-triggered automation across broader changes

Minimal implementation example

05-event-driven-worker.py
stream = client.control.subscribe({"run_id": run_id, "event_types": ["SetVariable", "SubmitAction"]})

client.control.set_variable({
    "run_id": run_id,
    "name": "ticket_priority",
    "value_json": '"critical"',
    "source": "retrieval",
})

semantic_sub = client.core.subscribe_events({
    "filter_type": "semantic",
    "query_text": "billing escalation",
    "threshold": 0.82,
})

print(semantic_sub)
for event in stream:
    if event:
        print(event)
        break

Failure modes and troubleshooting

SymptomRoot causeFix
Too many eventsBroad filtersNarrow event types and semantic threshold
Missing eventsSubscriber on wrong run scopeVerify producer and subscriber run_id
Core subscribe deniedDirect-core policy boundaryUse control stream default or enable policy explicitly

Next steps