Skip to main content

Horizon Monitoring

Horizon provides monitoring with three components: metrics for observability, alerts for real-time notifications, and calibration tracking for model evaluation.

Metrics

Thread-safe counters, gauges, and histograms with Prometheus text export.

Alerts

Alerting with throttling per time window and structured alert payloads.

Calibration

Track prediction accuracy with Brier scores and calibration curves across markets.

Metrics

MetricsCollector

The MetricsCollector provides thread-safe metric primitives that render to Prometheus text format.
from horizon.metrics import MetricsCollector

collector = MetricsCollector()

Counter

A monotonically increasing counter. Use for event counts (orders submitted, fills received, errors).
orders_counter = collector.counter("horizon_orders_submitted_total")
orders_counter.inc()       # Increment by 1
orders_counter.inc()       # Now 2

Gauge

A value that can go up and down. Use for current state (position size, PnL, active orders).
position_gauge = collector.gauge("horizon_position_size")
position_gauge.set(50.0)   # Set to 50
position_gauge.inc()       # Increment by 1 (now 51)

Histogram

Records observations and computes distribution statistics. Use for latencies, fill sizes, spreads.
latency_hist = collector.histogram("horizon_tick_latency_seconds")
latency_hist.observe(0.0032)
latency_hist.observe(0.0041)
latency_hist.observe(0.0028)

Rendering

All metrics render to Prometheus text exposition format:
output = collector.render()
print(output)
# TYPE horizon_orders_submitted_total counter
horizon_orders_submitted_total 2

# TYPE horizon_position_size gauge
horizon_position_size 51.0

# TYPE horizon_tick_latency_seconds histogram
horizon_tick_latency_seconds_bucket{le="0.005"} 3
horizon_tick_latency_seconds_bucket{le="0.01"} 3
...
horizon_tick_latency_seconds_sum 0.0101
horizon_tick_latency_seconds_count 3

Tracking Engine Status

Use the collector’s gauge primitives to record engine status each tick:
from horizon.metrics import MetricsCollector

collector = MetricsCollector()

# Call each tick to update gauges from engine status
def update_metrics(collector, engine):
    status = engine.status()
    collector.gauge("horizon_open_orders").set(status.open_orders)
    collector.gauge("horizon_active_positions").set(status.active_positions)
    collector.gauge("horizon_total_realized_pnl").set(status.total_realized_pnl)
    collector.gauge("horizon_total_unrealized_pnl").set(status.total_unrealized_pnl)
    collector.gauge("horizon_daily_pnl").set(status.daily_pnl)
    collector.gauge("horizon_kill_switch").set(1.0 if status.kill_switch_active else 0.0)
    collector.gauge("horizon_uptime_secs").set(status.uptime_secs)

Alerts

AlertManager

The AlertManager routes alerts to channels with time-window throttling. By default, alerts are logged via the Python logger.
from horizon.alerts import AlertManager, AlertType
The constructor accepts optional channels and throttling configuration:
manager = AlertManager(
    channels=None,            # defaults to LogChannel (logs via Python logger)
    max_alerts_per_window=10, # max alerts before throttling kicks in
    window_secs=60.0,         # sliding window in seconds
)
ParameterTypeDefaultDescription
channelslist or NoneNoneAlert delivery channels; defaults to logging
max_alerts_per_windowint10Maximum alerts allowed per window before throttling
window_secsfloat60.0Sliding time window in seconds

AlertType

TypeDescription
AlertType.FILLA fill was received
AlertType.RISK_TRIGGERRisk pipeline rejected an order
AlertType.MARKET_RESOLUTIONA market resolved
AlertType.FEED_STALEFeed data is stale
AlertType.ARB_WINDOWArbitrage window detected
AlertType.PRICE_DEVIATIONSignificant price movement detected
AlertType.LIFECYCLELifecycle event (e.g., kill switch activated)
AlertType.CUSTOMUser-defined alert

Sending Alerts

Use the .alert() method to send an alert to all configured channels:
from horizon.alerts import AlertManager, AlertType

manager = AlertManager()

# Send alerts
manager.alert(
    alert_type=AlertType.FILL,
    message="Filled 50 contracts @ 0.55 on election-winner",
)

manager.alert(
    alert_type=AlertType.LIFECYCLE,
    message="Kill switch activated: max drawdown exceeded",
)

manager.alert(
    alert_type=AlertType.CUSTOM,
    message="Model retrained with new data",
    data={"version": "2.1"},
)
The .alert() method returns True if the alert was sent, or False if it was throttled.
sent = manager.alert(alert_type=AlertType.FILL, message="New fill received")
if not sent:
    print("Alert was throttled")

Throttling

Alerts are throttled based on the max_alerts_per_window and window_secs parameters set in the constructor. Once the limit is reached within the sliding window, subsequent alerts are suppressed until the window advances.
# With default settings (10 alerts per 60 seconds),
# the 11th alert within a 60-second window will be throttled
manager.alert(alert_type=AlertType.PRICE_DEVIATION, message="Price moved +3%")
time.sleep(1)
manager.alert(alert_type=AlertType.PRICE_DEVIATION, message="Price moved +4%")  # Sent if under limit
Throttling is global across all alert types (not per-type). If you need higher throughput for certain alerts, create a separate AlertManager instance with a larger max_alerts_per_window.

Calibration Tracking

CalibrationTracker

Track prediction accuracy over time to evaluate your model’s calibration. Uses SQLite for persistence.
from horizon.calibration import CalibrationTracker

tracker = CalibrationTracker(db_path=":memory:")
ParameterTypeDefaultDescription
db_pathstr":memory:"SQLite database path; use a file path for persistence across sessions

Recording Predictions

Log predictions and resolve markets separately. This supports the typical workflow where you make predictions first and learn outcomes later.
# Log predictions as you make them
tracker.log_prediction("will-it-rain-tomorrow", probability=0.70)
tracker.log_prediction("btc-above-100k", probability=0.45)
tracker.log_prediction("fed-rate-cut", probability=0.80)

# Later, when markets resolve, record the outcomes
tracker.resolve_market("will-it-rain-tomorrow", outcome=True)
tracker.resolve_market("btc-above-100k", outcome=False)
tracker.resolve_market("fed-rate-cut", outcome=True)

Brier Score

The Brier score measures the accuracy of probabilistic predictions. Range is 0.0 (perfect) to 1.0 (worst).
score = tracker.brier_score()
print(f"Brier Score: {score:.4f}")
Brier ScoreInterpretation
0.00Perfect predictions
0.10Excellent calibration
0.20Good calibration
0.25Equivalent to always predicting 50%
0.33+Poor, model is harmful

Log Loss

Log loss (cross-entropy) provides another measure of prediction quality, penalizing confident wrong predictions more heavily.
ll = tracker.log_loss()
print(f"Log Loss: {ll:.4f}")

Calibration Curve

Compute a calibration curve to visualize reliability. Returns a list of CalibrationBucket objects.
curve = tracker.calibration_curve(n_bins=10)

print("Predicted | Actual  | Count")
print("-" * 35)
for bucket in curve:
    print(f"  {bucket.predicted_mean:.2f}    |  {bucket.actual_frequency:.2f}   | {bucket.count}")
Each CalibrationBucket has:
AttributeTypeDescription
bin_startfloatLower bound of the probability bin
bin_endfloatUpper bound of the probability bin
predicted_meanfloatMean predicted probability in this bin
actual_frequencyfloatActual outcome frequency in this bin
countintNumber of predictions in this bin
The calibration curve groups predictions into bins and compares the average predicted probability against the actual outcome frequency. A perfectly calibrated model produces points along the diagonal (predicted == actual).

Suggest Adjustment

Get a calibration-adjusted probability based on historical data:
raw_prob = 0.75
adjusted = tracker.suggest_adjustment(raw_prob, n_bins=10)
print(f"Raw: {raw_prob:.2f} -> Adjusted: {adjusted:.2f}")

Calibration Report

Generate a full calibration report with all metrics:
report = tracker.report(n_bins=10)

print(f"Brier Score: {report.brier_score:.4f}")
print(f"Log Loss: {report.log_loss:.4f}")
print(f"Predictions: {report.n_predictions}")
print(f"Resolved: {report.n_resolved}")
print(f"Buckets: {len(report.buckets)}")
AttributeTypeDescription
brier_scorefloatOverall Brier score
log_lossfloatOverall log loss
n_predictionsintTotal number of logged predictions
n_resolvedintNumber of resolved markets
bucketslist[CalibrationBucket]Calibration curve buckets

Cleanup

tracker.clear()  # Clear all calibration data
tracker.close()  # Close the database connection

Full Examples

Production Monitoring Setup

1

Initialize monitoring components

from horizon.metrics import MetricsCollector
from horizon.alerts import AlertManager, AlertType
from horizon.calibration import CalibrationTracker

# Metrics
collector = MetricsCollector()

# Alerts (defaults to logging)
alerts = AlertManager()

# Calibration
calibration = CalibrationTracker(db_path="calibration.db")
2

Create custom metrics

# Strategy-specific metrics
tick_counter = collector.counter("horizon_ticks_total")
edge_gauge = collector.gauge("horizon_current_edge")
fill_size_hist = collector.histogram("horizon_fill_size")
model_latency = collector.histogram("horizon_model_latency_seconds")
3

Wire into the pipeline

import time
import horizon as hz

def monitored_model(ctx):
    start = time.time()

    # Your model logic
    fair = 0.62

    # Record model latency
    elapsed = time.time() - start
    model_latency.observe(elapsed)

    edge = hz.edge(fair, ctx.feed.price)
    edge_gauge.set(edge)

    return fair

def monitored_quoter(ctx, fair):
    tick_counter.inc()

    if fair > ctx.feed.price + 0.03:
        return hz.quotes(ctx.feed.price, spread=0.04, size=10)

def fill_monitor(ctx):
    """Track fills and update metrics."""
    engine = ctx.params["engine"]
    status = engine.status()
    collector.gauge("horizon_open_orders").set(status.open_orders)
    collector.gauge("horizon_active_positions").set(status.active_positions)
    collector.gauge("horizon_total_realized_pnl").set(status.total_realized_pnl)
    collector.gauge("horizon_kill_switch").set(
        1.0 if status.kill_switch_active else 0.0
    )

    if status.kill_switch_active:
        alerts.alert(
            alert_type=AlertType.LIFECYCLE,
            message=f"Kill switch activated. PnL: ${status.total_realized_pnl:.2f}",
        )
4

Run the strategy

hz.run(
    name="monitored-strategy",
    markets=["election-winner"],
    feeds={"election-winner": "polymarket_book"},
    pipeline=[monitored_model, monitored_quoter, fill_monitor],
)

Alert-Driven Risk Management

import horizon as hz
from horizon.alerts import AlertManager, AlertType

alerts = AlertManager()

def risk_alerter(ctx):
    """Send alerts based on portfolio state."""
    engine = ctx.params["engine"]
    status = engine.status()
    positions = engine.positions()

    # Alert on large positions
    for pos in positions:
        if abs(pos.size) > 200:
            alerts.alert(
                alert_type=AlertType.RISK_TRIGGER,
                message=f"Large position in {pos.market_id}: {pos.size:.0f} contracts",
            )

    # Alert on significant drawdown
    if status.total_unrealized_pnl < -100:
        alerts.alert(
            alert_type=AlertType.RISK_TRIGGER,
            message=f"Unrealized loss: ${status.total_unrealized_pnl:.2f}",
        )

    # Alert on fills
    recent_fills = engine.recent_fills()
    for fill in recent_fills:
        alerts.alert(
            alert_type=AlertType.FILL,
            message=f"Fill: {fill.order_side.name} {fill.size} @ {fill.price:.4f} in {fill.market_id}",
        )

def model(ctx):
    return 0.60

def quoter(ctx, fair):
    if fair > ctx.feed.price + 0.03:
        return hz.quotes(ctx.feed.price, spread=0.04, size=10)

hz.run(
    name="alert-strategy",
    markets=["election-winner"],
    feeds={"election-winner": "polymarket_book"},
    pipeline=[model, quoter, risk_alerter],
)

Calibration Tracking Over Time

from horizon.calibration import CalibrationTracker

tracker = CalibrationTracker(db_path=":memory:")

# Log predictions for multiple markets
markets = [
    ("market-1", 0.80),
    ("market-2", 0.30),
    ("market-3", 0.65),
    ("market-4", 0.90),
    ("market-5", 0.20),  # Bad prediction: said 20% but it happened
    ("market-6", 0.55),
    ("market-7", 0.75),
    ("market-8", 0.40),
    ("market-9", 0.85),
    ("market-10", 0.10),
]

for market_id, prob in markets:
    tracker.log_prediction(market_id, probability=prob)

# Later, resolve markets as outcomes become known
outcomes = {
    "market-1": True,
    "market-2": False,
    "market-3": True,
    "market-4": True,
    "market-5": True,   # Surprising outcome
    "market-6": False,
    "market-7": True,
    "market-8": False,
    "market-9": True,
    "market-10": False,
}

for market_id, outcome in outcomes.items():
    tracker.resolve_market(market_id, outcome=outcome)

# Overall performance
print(f"Overall Brier Score: {tracker.brier_score():.4f}")
print(f"Log Loss: {tracker.log_loss():.4f}")

# Calibration curve
print("\nCalibration Curve:")
print("Predicted | Actual  | Deviation")
print("-" * 40)
curve = tracker.calibration_curve(n_bins=5)
for bucket in curve:
    deviation = abs(bucket.predicted_mean - bucket.actual_frequency)
    marker = " ***" if deviation > 0.10 else ""
    print(f"  {bucket.predicted_mean:.2f}     | {bucket.actual_frequency:.2f}   | {deviation:.2f}{marker}")

# Full report
report = tracker.report(n_bins=5)
print(f"\nTotal predictions: {report.n_predictions}")
print(f"Resolved markets: {report.n_resolved}")

tracker.close()

Integrated Monitoring with Backtesting

import horizon as hz
from horizon.calibration import CalibrationTracker

tracker = CalibrationTracker(db_path=":memory:")

def model(ctx):
    # Your calibrated model
    fair = 0.62
    return fair

def quoter(ctx, fair):
    if fair > ctx.feed.price + 0.03:
        return hz.quotes(ctx.feed.price, spread=0.04, size=10)

# Run backtest with outcomes for Brier scoring
data = [
    {"timestamp": t, "price": 0.50 + (t % 10) * 0.01}
    for t in range(1700000000, 1700000500)
]

result = hz.backtest(
    name="calibration-backtest",
    markets=["test-market"],
    data=data,
    pipeline=[model, quoter],
    outcomes={"test-market": 1.0},
)

# Check backtest Brier score
m = result.metrics
if m.brier_score is not None:
    print(f"Backtest Brier Score: {m.brier_score:.4f}")

# Also track in calibration tracker for running statistics
tracker.log_prediction("test-market", probability=0.62)
tracker.resolve_market("test-market", outcome=True)
print(f"Tracker Brier Score:  {tracker.brier_score():.4f}")

print(result.summary())
tracker.close()

Grafana Dashboard

Use collector.render() to expose metrics in Prometheus text format, then scrape them with Prometheus:
# prometheus.yml
scrape_configs:
 - job_name: "horizon"
    scrape_interval: 15s
    static_configs:
     - targets: ["localhost:9090"]
Suggested panels:
  • PnL Over Time: horizon_total_realized_pnl + horizon_total_unrealized_pnl
  • Order Rate: rate(horizon_orders_submitted_total[1m])
  • Position Count: horizon_active_positions
  • Tick Latency: histogram_quantile(0.95, horizon_tick_latency_seconds_bucket)
  • Fill Size Distribution: horizon_fill_size_bucket
  • Kill Switch Status: horizon_kill_switch (alert on value == 1)
If you serve metrics over HTTP, never expose the endpoint to the public internet. Use a reverse proxy with authentication or bind only to localhost.