Skip to main content
The event bus is a WebSocket server that connects every component of the swarm. The 13 Hive subsystems publish state changes, agents publish lifecycle events, the circuit breaker publishes alerts, the kill chain publishes incident phases. Any client can subscribe to any channel and receive events in real time.

Channels

ChannelWhat Gets Published
agentsPhase changes, spawned, stopped, errors, heartbeats, directives
hiveProposal verdicts, capital allocated, agents spawned/killed
riskCircuit breaker triggered, drawdown alerts, risk limit hits
knowledgeInsights shared between agents, discoveries
systemStatus updates, metrics, startup/shutdown
uiUser commands forwarded from external clients

Usage

from horizon.swarm import EventBus, SwarmEvent, SwarmEventType
import time

bus = EventBus(host="127.0.0.1", port=8765)
await bus.start()

# Publish from async context
await bus.publish("agents", SwarmEvent(
    type=SwarmEventType.AGENT_PHASE_CHANGED,
    source="agent_001",
    data={"phase": "analyzing", "reason": "found 3 candidate markets"},
    timestamp=time.time(),
))

# Publish from threads (circuit breaker, engine callbacks)
bus.publish_sync("risk", SwarmEvent(
    type=SwarmEventType.CIRCUIT_BREAKER_TRIGGERED,
    source="circuit_breaker",
    data={"reason": "portfolio drawdown 12% > 10% limit"},
    timestamp=time.time(),
))

Ring Buffers

Each channel keeps the last 1,000 events in a ring buffer. Late-connecting clients can request history:
{"get_history": "agents", "limit": 50}
The server responds with the most recent events from that channel’s buffer.

Client Protocol

Clients connect via WebSocket to ws://host:port. On connect, automatically subscribed to all channels. They can manage subscriptions and send commands:
// Subscribe to specific channels only
{"subscribe": ["agents", "hive"]}

// Unsubscribe
{"unsubscribe": ["system"]}

// Send a command to the Hive
{"command": "spawn", "args": "Research weather markets on Kalshi"}

// Request history
{"get_history": "risk", "limit": 100}
Commands sent via command are forwarded to the hive channel as HIVE_DIRECTIVE events. The Hive picks them up on its next oversight cycle.

Event Format

Every event follows the SwarmEvent structure:
@dataclass
class SwarmEvent:
    type: SwarmEventType     # enum: AGENT_SPAWNED, FILL_RECEIVED, etc.
    source: str              # "agent_001", "hive", "circuit_breaker"
    data: dict[str, Any]     # event-specific payload
    timestamp: float         # time.time()
Over the wire, events are JSON:
{
  "channel": "agents",
  "event": {
    "type": "agent_phase_changed",
    "source": "agent_003",
    "data": {"phase": "backtesting", "reason": "edge estimate 3.2% on KSH-FED-RATE"},
    "timestamp": 1710768000.0
  }
}

Event Types

TypeChannelWhen
agent_spawnedagentsAgent created by AgentFactory
agent_promotedagentsAgent moved paper to shadow to live
agent_culledagentsAgent removed by SwarmCoordinator
agent_phase_changedagentsAgent transitions to new phase
agent_stoppedagentsAgent finished or errored
order_submittedagentsAgent places an order
fill_receivedagentsOrder matched
position_openedagentsNew position created
position_closedagentsPosition fully closed
pheromone_deposithiveAgent deposits pheromone signal
consensus_updatehiveConsensusEngine view updated
capital_rebalancedhiveSwarmCoordinator reallocated capital
evolution_completehiveEvolutionEngine finished a generation
autonomy_graduatedhiveAutonomyController promoted to new mode
circuit_breaker_triggeredriskHard limit breached, emergency stop
criticality_alertriskCriticalityMonitor detected systemic risk
kill_chain_triggeredriskKillChain incident response started
kill_chain_phaseriskKillChain advanced to next phase
drawdown_alertriskDrawdown approaching limit
knowledge_sharedknowledgeAgent shared insight to knowledge graph
causal_edge_detectedknowledgeCausalEngine found new relationship
heartbeatagentsPeriodic agent state snapshot
metrics_updatesystemHive oversight cycle metrics
system_statussystemStartup, shutdown, status change

Thread Safety

The event bus runs as an asyncio WebSocket server. Agents and the Hive publish from the async event loop using await bus.publish(). The circuit breaker and engine callbacks run in threads, so they use bus.publish_sync() which calls asyncio.run_coroutine_threadsafe() to cross the thread/async boundary safely.

Building a Custom Client

Any WebSocket client can connect:
import asyncio, json, websockets

async def monitor():
    async with websockets.connect("ws://127.0.0.1:8765") as ws:
        # Subscribe to risk events only
        await ws.send(json.dumps({"subscribe": ["risk"]}))

        async for msg in ws:
            event = json.loads(msg)
            if event.get("event", {}).get("type") == "circuit_breaker_triggered":
                print("CIRCUIT BREAKER:", event["event"]["data"])

asyncio.run(monitor())
JavaScript/TypeScript, Go, Rust, or any language with WebSocket support works the same way. Plain JSON over WebSocket.