[ Switch to styled version → ]


← Docs index

Pub/Sub

Subscribe to topics, publish events, and stream data in real time. The system is designed for fan-out scenarios where multiple consumers need the same data stream.

Overview

Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.

For one-to-one messaging, use stream connections or data exchange.

Architecture

Each daemon runs its own independent broker inside the daemon process. It manages subscriptions for that node only. There is no central message server.

When an agent subscribes to topics on another agent, its daemon opens a connection to the remote agent's event stream port (1002). The remote broker registers the subscription and pushes matching events over that connection. When an agent publishes to another agent, its daemon sends the event to the remote broker, which fans it out to all active subscribers.

Subscribing

To collect a fixed number of events, use a bounded subscription. This returns a JSON array.

pilotctl subscribe other-agent status --count 5 --timeout 60s

The command returns an `events` object containing an array of `topic`, `data`, and `bytes`, and a `timeout` boolean.

To stream events indefinitely as NDJSON (one JSON object per line), use an unbounded subscription.

pilotctl subscribe other-agent status

Each line is a standalone JSON object, for example: {"topic":"status","data":"online","bytes":6}

Publishing

pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}'

Events are delivered to all active subscribers of the topic on the target node. The command returns the `target`, `topic`, and `bytes`.

Wildcards

Use `*` as the topic to subscribe to all topics on a broker.

pilotctl subscribe other-agent "*" --count 10

`*` is a full wildcard that matches every topic. It is not a prefix glob, so `events.*` is not valid syntax. It is the only wildcard form available.

NDJSON streaming

Without `--count`, subscriptions stream newline-delimited JSON (NDJSON) indefinitely. This format can be integrated with tools that process line-delimited JSON.

# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'

# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl

# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
  echo "\$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done

Delivery guarantees

Pub/sub is for real-time streaming where dropping an occasional event is acceptable. For guaranteed delivery, use data exchange or stream connections.

Limits

Topic conventions

Topic names are arbitrary strings. A convention is to use dot-separated namespaces.

Since `*` is the only wildcard, prefix-based filtering (like `metrics.*`) is not supported. Subscribe to a specific topic or use `*` and filter on the client side.

How it works under the hood

The event stream uses a simple wire protocol.

The broker is an in-memory fan-out system with no queues, disk I/O, or acknowledgments.

Use cases

For real-time monitoring, an agent publishes system metrics. A dashboard agent subscribes and renders them. The `publish` command always targets a peer, not the local agent.

# From any peer with metrics to share
pilotctl publish dashboard-agent metrics --data '{"cpu":42,"mem":1024,"disk":80}'

# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl

For coordination, a controller publishes tasks, and workers subscribe to pick them up.

# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1

# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}'

For event-driven workflows, actions can be triggered in response to events from other agents.

# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
  echo "Task done, starting next stage..."
done

The daemon fires webhook events for pub/sub activity: `pubsub.subscribed`, `pubsub.unsubscribed`, and `pubsub.published`.

Related