Pub-Sub & Semantic Subscriptions
Build event-driven workflows with run-scoped control streams and semantic subscriptions on the data plane.
Polling state endpoints does not scale for reactive systems. MuBit exposes two streaming surfaces so workers wake on state changes instead of spinning.
Decision model
| Stream | Scope | Method | Best use |
|---|---|---|---|
| Run-scoped control events | run_id | client.control.subscribe(...) / client.advanced.subscribe(...) (SSE or gRPC) | Workflow coordination inside one run: ingestion completion, checkpoints, lesson validation and promotion, handoffs, outcome recording, drift signals |
| Core data-plane pub-sub | User + filter | client.core.subscribe_events(...) (SSE or gRPC) | Topic-triggered automation across broader state: wake on node insert/update/delete, wake when a newly-written node crosses a semantic-similarity threshold |
Both surfaces stream events as they happen. The subscription lives for the lifetime of the HTTP or gRPC stream — on disconnect the server cleans up the subscription automatically.
Control event stream
Run-scoped events fire for every meaningful control-plane state change: ~24 types covering ingest, reflection, checkpointing, outcome recording, handoffs, prompt activations, drift, project/agent/skill CRUD, run-monitor signals. Filter with event_types to keep the stream narrow.
Lesson lifecycle events flow through this stream too, and reflect a gated candidate lifecycle. A freshly reflected lesson enters as a pending candidate and is not trusted immediately — it stays pending until it passes a held-out validation gate. Watch for context.lesson_validation_passed (candidate marked active) and context.lesson_validation_failed (candidate rejected and down-weighted) alongside context.lesson_promoted; promotion now reflects a validated lesson rather than an unconditional one.
events = client.advanced.subscribe({
"run_id": run_id,
"event_types": ["context.ingest_completed", "context.checkpoint_created"],
})
for event in events:
print(event["type"], event["payload"])const stream = await client.control.subscribe({
run_id: runId,
event_types: ["context.ingest_completed", "context.checkpoint_created"],
});
for await (const event of stream) {
console.log(event.type, event.payload);
}Events include id, type, run_id, agent_id, payload (event-specific JSON), and created_at. Recent events are replayable from Redis Streams (default retention 10,000 per run, env-configurable).
Core semantic subscriptions
core.subscribe_events opens an SSE (or server-streaming gRPC) connection that delivers pub-sub events as they are published by the storage layer. Every event yielded by the SDK has the same shape across Python, JavaScript, and Rust — and across both HTTP and gRPC transports. The first event on the stream is always subscribed; after that, events are node.inserted, node.updated, node.deleted, or memory.added.
Event shape
Every event is a dict / object with a type discriminator plus the fields relevant to that type. The SDK wrappers parse metadata_json / entry_json wire strings into native metadata / entry objects and strip proto3 scalar defaults so callers see a tight shape:
type | Fields |
|---|---|
subscribed | subscription_id |
node.inserted / node.updated | node_id, run_id, metadata, created_at, updated_at |
node.deleted | node_id |
memory.added | session_id, entry |
Example stream:
{"type":"subscribed","subscription_id":42}
{"type":"node.inserted","node_id":7,"run_id":"r1","metadata":{"intent":"fact"},"created_at":1714000000,"updated_at":1714000000}
{"type":"node.deleted","node_id":7}# Fire only when a node whose vector is near the "billing escalation" embedding lands.
events = client.core.subscribe_events({
"filter_type": "semantic",
"query_text": "billing escalation",
"threshold": 0.82,
})
for event in events:
if event["type"] == "subscribed":
print("subscription id:", event["subscription_id"])
continue
if event["type"] == "node.inserted":
node_id = event["node_id"]
# Wake the agent loop, page an on-call, enqueue a follow-up recall, …
handle_billing_escalation(node_id)use serde_json::json;
use tokio_stream::StreamExt;
let mut stream = client.core.subscribe_events(json!({
"filter_type": "semantic",
"query_text": "billing escalation",
"threshold": 0.82
})).await?;
while let Some(event) = stream.next().await {
let evt = event?;
if evt["type"] == "node.inserted" {
let node_id = evt["node_id"].as_u64().unwrap_or(0);
// react…
}
}Filter types
filter_type | Wakes on |
|---|---|
"all" | Any node insert/update/delete (use for admin tooling, not production) |
"node" | Events touching a single node_id (needs node_id in the request) |
"semantic" | Inserts/updates whose vector exceeds threshold cosine similarity to the encoded query_text. Default threshold 0.8. |
"session" | memory.added events scoped to one session id |
ACL filtering runs server-side: subscribers only see events for nodes they have permission to read.
Cleanup and lifecycle
- Close the SSE response / drop the gRPC stream to end the subscription. The server unsubscribes automatically — you do not need to call
unsubscribeexplicitly. POST /v2/core/pubsub/unsubscribe { subscription_id }exists for admin tooling or for cleaning up subscriptions whose originating client was killed before it could close the stream.POST /v2/core/pubsub/listreturns the active subscription IDs for the calling user plus the server-wide count — useful for monitoring.
Failure modes and troubleshooting
| Symptom | Root cause | Fix |
|---|---|---|
| Too many events | Broad filter ("all") or a low semantic threshold | Narrow filter_type, raise threshold, or filter client-side by event_types on the control stream |
| Missing events | Subscriber scoped to a different run_id (control stream) or subscribed before the data plane had permission to read the emitting node (core stream) | Verify the producer and subscriber share a run_id; check ACLs on the node class you expect to see |
| SSE stream idle for minutes | Proxy / load balancer idle timeout | The server emits SSE keep-alives, but some proxies still cut idle connections — configure the proxy for long-lived SSE or fall back to the gRPC surface |
| Core subscribe returns 404 / denied | Data-plane access policy not enabled for the route | /v2/core/pubsub/* sits under the core route policy. See Core Direct Lanes and Policy for the flag set and rollout guidance |
Next steps
- Add planner/specialist coordination at Sessions and branching.
- Review state APIs at State management endpoints.
- See the exact HTTP request/response shapes at Core Direct Lanes — PubSub.