> ## Documentation Index
> Fetch the complete documentation index at: https://mathematicalcompany.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Event Bus

> WebSocket pub/sub server. 6 channels, ring buffers, thread-safe publishing.

The event bus is a WebSocket server that connects every component of the swarm. The 13 Hive subsystems publish state changes, agents publish lifecycle events, the circuit breaker publishes alerts, the kill chain publishes incident phases. Any client can subscribe to any channel and receive events in real time.

## Channels

| Channel     | What Gets Published                                             |
| ----------- | --------------------------------------------------------------- |
| `agents`    | Phase changes, spawned, stopped, errors, heartbeats, directives |
| `hive`      | Proposal verdicts, capital allocated, agents spawned/killed     |
| `risk`      | Circuit breaker triggered, drawdown alerts, risk limit hits     |
| `knowledge` | Insights shared between agents, discoveries                     |
| `system`    | Status updates, metrics, startup/shutdown                       |
| `ui`        | User commands forwarded from external clients                   |

## Usage

```python theme={null}
from horizon.swarm import EventBus, SwarmEvent, SwarmEventType
import time

bus = EventBus(host="127.0.0.1", port=8765)
await bus.start()

# Publish from async context
await bus.publish("agents", SwarmEvent(
    type=SwarmEventType.AGENT_PHASE_CHANGED,
    source="agent_001",
    data={"phase": "analyzing", "reason": "found 3 candidate markets"},
    timestamp=time.time(),
))

# Publish from threads (circuit breaker, engine callbacks)
bus.publish_sync("risk", SwarmEvent(
    type=SwarmEventType.CIRCUIT_BREAKER_TRIGGERED,
    source="circuit_breaker",
    data={"reason": "portfolio drawdown 12% > 10% limit"},
    timestamp=time.time(),
))
```

## Ring Buffers

Each channel keeps the last 1,000 events in a ring buffer. Late-connecting clients can request history:

```json theme={null}
{"get_history": "agents", "limit": 50}
```

The server responds with the most recent events from that channel's buffer.

## Client Protocol

Clients connect via WebSocket to `ws://host:port`. On connect, automatically subscribed to all channels. They can manage subscriptions and send commands:

```json theme={null}
// Subscribe to specific channels only
{"subscribe": ["agents", "hive"]}

// Unsubscribe
{"unsubscribe": ["system"]}

// Send a command to the Hive
{"command": "spawn", "args": "Research weather markets on Kalshi"}

// Request history
{"get_history": "risk", "limit": 100}
```

Commands sent via `command` are forwarded to the `hive` channel as `HIVE_DIRECTIVE` events. The Hive picks them up on its next oversight cycle.

## Event Format

Every event follows the `SwarmEvent` structure:

```python theme={null}
@dataclass
class SwarmEvent:
    type: SwarmEventType     # enum: AGENT_SPAWNED, FILL_RECEIVED, etc.
    source: str              # "agent_001", "hive", "circuit_breaker"
    data: dict[str, Any]     # event-specific payload
    timestamp: float         # time.time()
```

Over the wire, events are JSON:

```json theme={null}
{
  "channel": "agents",
  "event": {
    "type": "agent_phase_changed",
    "source": "agent_003",
    "data": {"phase": "backtesting", "reason": "edge estimate 3.2% on KSH-FED-RATE"},
    "timestamp": 1710768000.0
  }
}
```

## Event Types

| Type                        | Channel   | When                                      |
| --------------------------- | --------- | ----------------------------------------- |
| `agent_spawned`             | agents    | Agent created by AgentFactory             |
| `agent_promoted`            | agents    | Agent moved paper to shadow to live       |
| `agent_culled`              | agents    | Agent removed by SwarmCoordinator         |
| `agent_phase_changed`       | agents    | Agent transitions to new phase            |
| `agent_stopped`             | agents    | Agent finished or errored                 |
| `order_submitted`           | agents    | Agent places an order                     |
| `fill_received`             | agents    | Order matched                             |
| `position_opened`           | agents    | New position created                      |
| `position_closed`           | agents    | Position fully closed                     |
| `pheromone_deposit`         | hive      | Agent deposits pheromone signal           |
| `consensus_update`          | hive      | ConsensusEngine view updated              |
| `capital_rebalanced`        | hive      | SwarmCoordinator reallocated capital      |
| `evolution_complete`        | hive      | EvolutionEngine finished a generation     |
| `autonomy_graduated`        | hive      | AutonomyController promoted to new mode   |
| `circuit_breaker_triggered` | risk      | Hard limit breached, emergency stop       |
| `criticality_alert`         | risk      | CriticalityMonitor detected systemic risk |
| `kill_chain_triggered`      | risk      | KillChain incident response started       |
| `kill_chain_phase`          | risk      | KillChain advanced to next phase          |
| `drawdown_alert`            | risk      | Drawdown approaching limit                |
| `knowledge_shared`          | knowledge | Agent shared insight to knowledge graph   |
| `causal_edge_detected`      | knowledge | CausalEngine found new relationship       |
| `heartbeat`                 | agents    | Periodic agent state snapshot             |
| `metrics_update`            | system    | Hive oversight cycle metrics              |
| `system_status`             | system    | Startup, shutdown, status change          |

## Thread Safety

The event bus runs as an asyncio WebSocket server. Agents and the Hive publish from the async event loop using `await bus.publish()`. The circuit breaker and engine callbacks run in threads, so they use `bus.publish_sync()` which calls `asyncio.run_coroutine_threadsafe()` to cross the thread/async boundary safely.

## Building a Custom Client

Any WebSocket client can connect:

```python theme={null}
import asyncio, json, websockets

async def monitor():
    async with websockets.connect("ws://127.0.0.1:8765") as ws:
        # Subscribe to risk events only
        await ws.send(json.dumps({"subscribe": ["risk"]}))

        async for msg in ws:
            event = json.loads(msg)
            if event.get("event", {}).get("type") == "circuit_breaker_triggered":
                print("CIRCUIT BREAKER:", event["event"]["data"])

asyncio.run(monitor())
```

JavaScript/TypeScript, Go, Rust, or any language with WebSocket support works the same way. Plain JSON over WebSocket.
