Horizon Monitoring
Horizon provides monitoring with three components: metrics for observability, alerts for real-time notifications, and calibration tracking for model evaluation.
Metrics Thread-safe counters, gauges, and histograms with Prometheus text export.
Alerts Alerting with throttling per time window and structured alert payloads.
Calibration Track prediction accuracy with Brier scores and calibration curves across markets.
Metrics
MetricsCollector
The MetricsCollector provides thread-safe metric primitives that render to Prometheus text format.
from horizon.metrics import MetricsCollector
collector = MetricsCollector()
Counter
A monotonically increasing counter. Use for event counts (orders submitted, fills received, errors).
orders_counter = collector.counter( "horizon_orders_submitted_total" )
orders_counter.inc() # Increment by 1
orders_counter.inc() # Now 2
Gauge
A value that can go up and down. Use for current state (position size, PnL, active orders).
position_gauge = collector.gauge( "horizon_position_size" )
position_gauge.set( 50.0 ) # Set to 50
position_gauge.inc() # Increment by 1 (now 51)
Histogram
Records observations and computes distribution statistics. Use for latencies, fill sizes, spreads.
latency_hist = collector.histogram( "horizon_tick_latency_seconds" )
latency_hist.observe( 0.0032 )
latency_hist.observe( 0.0041 )
latency_hist.observe( 0.0028 )
Rendering
All metrics render to Prometheus text exposition format:
output = collector.render()
print (output)
# TYPE horizon_orders_submitted_total counter
horizon_orders_submitted_total 2
# TYPE horizon_position_size gauge
horizon_position_size 51.0
# TYPE horizon_tick_latency_seconds histogram
horizon_tick_latency_seconds_bucket{le="0.005"} 3
horizon_tick_latency_seconds_bucket{le="0.01"} 3
...
horizon_tick_latency_seconds_sum 0.0101
horizon_tick_latency_seconds_count 3
Tracking Engine Status
Use the collector’s gauge primitives to record engine status each tick:
from horizon.metrics import MetricsCollector
collector = MetricsCollector()
# Call each tick to update gauges from engine status
def update_metrics ( collector , engine ):
status = engine.status()
collector.gauge( "horizon_open_orders" ).set(status.open_orders)
collector.gauge( "horizon_active_positions" ).set(status.active_positions)
collector.gauge( "horizon_total_realized_pnl" ).set(status.total_realized_pnl)
collector.gauge( "horizon_total_unrealized_pnl" ).set(status.total_unrealized_pnl)
collector.gauge( "horizon_daily_pnl" ).set(status.daily_pnl)
collector.gauge( "horizon_kill_switch" ).set( 1.0 if status.kill_switch_active else 0.0 )
collector.gauge( "horizon_uptime_secs" ).set(status.uptime_secs)
Alerts
AlertManager
The AlertManager routes alerts to channels with time-window throttling. By default, alerts are logged via the Python logger.
from horizon.alerts import AlertManager, AlertType
The constructor accepts optional channels and throttling configuration:
manager = AlertManager(
channels = None , # defaults to LogChannel (logs via Python logger)
max_alerts_per_window = 10 , # max alerts before throttling kicks in
window_secs = 60.0 , # sliding window in seconds
)
Parameter Type Default Description channelslist or NoneNoneAlert delivery channels; defaults to logging max_alerts_per_windowint10Maximum alerts allowed per window before throttling window_secsfloat60.0Sliding time window in seconds
AlertType
Type Description AlertType.FILLA fill was received AlertType.RISK_TRIGGERRisk pipeline rejected an order AlertType.MARKET_RESOLUTIONA market resolved AlertType.FEED_STALEFeed data is stale AlertType.ARB_WINDOWArbitrage window detected AlertType.PRICE_DEVIATIONSignificant price movement detected AlertType.LIFECYCLELifecycle event (e.g., kill switch activated) AlertType.CUSTOMUser-defined alert
Sending Alerts
Use the .alert() method to send an alert to all configured channels:
from horizon.alerts import AlertManager, AlertType
manager = AlertManager()
# Send alerts
manager.alert(
alert_type = AlertType. FILL ,
message = "Filled 50 contracts @ 0.55 on election-winner" ,
)
manager.alert(
alert_type = AlertType. LIFECYCLE ,
message = "Kill switch activated: max drawdown exceeded" ,
)
manager.alert(
alert_type = AlertType. CUSTOM ,
message = "Model retrained with new data" ,
data = { "version" : "2.1" },
)
The .alert() method returns True if the alert was sent, or False if it was throttled.
sent = manager.alert( alert_type = AlertType. FILL , message = "New fill received" )
if not sent:
print ( "Alert was throttled" )
Throttling
Alerts are throttled based on the max_alerts_per_window and window_secs parameters set in the constructor. Once the limit is reached within the sliding window, subsequent alerts are suppressed until the window advances.
# With default settings (10 alerts per 60 seconds),
# the 11th alert within a 60-second window will be throttled
manager.alert( alert_type = AlertType. PRICE_DEVIATION , message = "Price moved +3%" )
time.sleep( 1 )
manager.alert( alert_type = AlertType. PRICE_DEVIATION , message = "Price moved +4%" ) # Sent if under limit
Throttling is global across all alert types (not per-type). If you need higher throughput for certain alerts, create a separate AlertManager instance with a larger max_alerts_per_window.
Calibration Tracking
CalibrationTracker
Track prediction accuracy over time to evaluate your model’s calibration. Uses SQLite for persistence.
from horizon.calibration import CalibrationTracker
tracker = CalibrationTracker( db_path = ":memory:" )
Parameter Type Default Description db_pathstr":memory:"SQLite database path; use a file path for persistence across sessions
Recording Predictions
Log predictions and resolve markets separately. This supports the typical workflow where you make predictions first and learn outcomes later.
# Log predictions as you make them
tracker.log_prediction( "will-it-rain-tomorrow" , probability = 0.70 )
tracker.log_prediction( "btc-above-100k" , probability = 0.45 )
tracker.log_prediction( "fed-rate-cut" , probability = 0.80 )
# Later, when markets resolve, record the outcomes
tracker.resolve_market( "will-it-rain-tomorrow" , outcome = True )
tracker.resolve_market( "btc-above-100k" , outcome = False )
tracker.resolve_market( "fed-rate-cut" , outcome = True )
Brier Score
The Brier score measures the accuracy of probabilistic predictions. Range is 0.0 (perfect) to 1.0 (worst).
score = tracker.brier_score()
print ( f "Brier Score: { score :.4f} " )
Brier Score Interpretation 0.00 Perfect predictions 0.10 Excellent calibration 0.20 Good calibration 0.25 Equivalent to always predicting 50% 0.33+ Poor, model is harmful
Log Loss
Log loss (cross-entropy) provides another measure of prediction quality, penalizing confident wrong predictions more heavily.
ll = tracker.log_loss()
print ( f "Log Loss: { ll :.4f} " )
Calibration Curve
Compute a calibration curve to visualize reliability. Returns a list of CalibrationBucket objects.
curve = tracker.calibration_curve( n_bins = 10 )
print ( "Predicted | Actual | Count" )
print ( "-" * 35 )
for bucket in curve:
print ( f " { bucket.predicted_mean :.2f} | { bucket.actual_frequency :.2f} | { bucket.count } " )
Each CalibrationBucket has:
Attribute Type Description bin_startfloatLower bound of the probability bin bin_endfloatUpper bound of the probability bin predicted_meanfloatMean predicted probability in this bin actual_frequencyfloatActual outcome frequency in this bin countintNumber of predictions in this bin
The calibration curve groups predictions into bins and compares the average predicted probability against the actual outcome frequency. A perfectly calibrated model produces points along the diagonal (predicted == actual).
Suggest Adjustment
Get a calibration-adjusted probability based on historical data:
raw_prob = 0.75
adjusted = tracker.suggest_adjustment(raw_prob, n_bins = 10 )
print ( f "Raw: { raw_prob :.2f} -> Adjusted: { adjusted :.2f} " )
Calibration Report
Generate a full calibration report with all metrics:
report = tracker.report( n_bins = 10 )
print ( f "Brier Score: { report.brier_score :.4f} " )
print ( f "Log Loss: { report.log_loss :.4f} " )
print ( f "Predictions: { report.n_predictions } " )
print ( f "Resolved: { report.n_resolved } " )
print ( f "Buckets: { len (report.buckets) } " )
Attribute Type Description brier_scorefloatOverall Brier score log_lossfloatOverall log loss n_predictionsintTotal number of logged predictions n_resolvedintNumber of resolved markets bucketslist[CalibrationBucket]Calibration curve buckets
Cleanup
tracker.clear() # Clear all calibration data
tracker.close() # Close the database connection
Full Examples
Production Monitoring Setup
Initialize monitoring components
from horizon.metrics import MetricsCollector
from horizon.alerts import AlertManager, AlertType
from horizon.calibration import CalibrationTracker
# Metrics
collector = MetricsCollector()
# Alerts (defaults to logging)
alerts = AlertManager()
# Calibration
calibration = CalibrationTracker( db_path = "calibration.db" )
Create custom metrics
# Strategy-specific metrics
tick_counter = collector.counter( "horizon_ticks_total" )
edge_gauge = collector.gauge( "horizon_current_edge" )
fill_size_hist = collector.histogram( "horizon_fill_size" )
model_latency = collector.histogram( "horizon_model_latency_seconds" )
Wire into the pipeline
import time
import horizon as hz
def monitored_model ( ctx ):
start = time.time()
# Your model logic
fair = 0.62
# Record model latency
elapsed = time.time() - start
model_latency.observe(elapsed)
edge = hz.edge(fair, ctx.feed.price)
edge_gauge.set(edge)
return fair
def monitored_quoter ( ctx , fair ):
tick_counter.inc()
if fair > ctx.feed.price + 0.03 :
return hz.quotes(ctx.feed.price, spread = 0.04 , size = 10 )
def fill_monitor ( ctx ):
"""Track fills and update metrics."""
engine = ctx.params[ "engine" ]
status = engine.status()
collector.gauge( "horizon_open_orders" ).set(status.open_orders)
collector.gauge( "horizon_active_positions" ).set(status.active_positions)
collector.gauge( "horizon_total_realized_pnl" ).set(status.total_realized_pnl)
collector.gauge( "horizon_kill_switch" ).set(
1.0 if status.kill_switch_active else 0.0
)
if status.kill_switch_active:
alerts.alert(
alert_type = AlertType. LIFECYCLE ,
message = f "Kill switch activated. PnL: $ { status.total_realized_pnl :.2f} " ,
)
Run the strategy
hz.run(
name = "monitored-strategy" ,
markets = [ "election-winner" ],
feeds = { "election-winner" : "polymarket_book" },
pipeline = [monitored_model, monitored_quoter, fill_monitor],
)
Alert-Driven Risk Management
import horizon as hz
from horizon.alerts import AlertManager, AlertType
alerts = AlertManager()
def risk_alerter ( ctx ):
"""Send alerts based on portfolio state."""
engine = ctx.params[ "engine" ]
status = engine.status()
positions = engine.positions()
# Alert on large positions
for pos in positions:
if abs (pos.size) > 200 :
alerts.alert(
alert_type = AlertType. RISK_TRIGGER ,
message = f "Large position in { pos.market_id } : { pos.size :.0f} contracts" ,
)
# Alert on significant drawdown
if status.total_unrealized_pnl < - 100 :
alerts.alert(
alert_type = AlertType. RISK_TRIGGER ,
message = f "Unrealized loss: $ { status.total_unrealized_pnl :.2f} " ,
)
# Alert on fills
recent_fills = engine.recent_fills()
for fill in recent_fills:
alerts.alert(
alert_type = AlertType. FILL ,
message = f "Fill: { fill.order_side.name } { fill.size } @ { fill.price :.4f} in { fill.market_id } " ,
)
def model ( ctx ):
return 0.60
def quoter ( ctx , fair ):
if fair > ctx.feed.price + 0.03 :
return hz.quotes(ctx.feed.price, spread = 0.04 , size = 10 )
hz.run(
name = "alert-strategy" ,
markets = [ "election-winner" ],
feeds = { "election-winner" : "polymarket_book" },
pipeline = [model, quoter, risk_alerter],
)
Calibration Tracking Over Time
from horizon.calibration import CalibrationTracker
tracker = CalibrationTracker( db_path = ":memory:" )
# Log predictions for multiple markets
markets = [
( "market-1" , 0.80 ),
( "market-2" , 0.30 ),
( "market-3" , 0.65 ),
( "market-4" , 0.90 ),
( "market-5" , 0.20 ), # Bad prediction: said 20% but it happened
( "market-6" , 0.55 ),
( "market-7" , 0.75 ),
( "market-8" , 0.40 ),
( "market-9" , 0.85 ),
( "market-10" , 0.10 ),
]
for market_id, prob in markets:
tracker.log_prediction(market_id, probability = prob)
# Later, resolve markets as outcomes become known
outcomes = {
"market-1" : True ,
"market-2" : False ,
"market-3" : True ,
"market-4" : True ,
"market-5" : True , # Surprising outcome
"market-6" : False ,
"market-7" : True ,
"market-8" : False ,
"market-9" : True ,
"market-10" : False ,
}
for market_id, outcome in outcomes.items():
tracker.resolve_market(market_id, outcome = outcome)
# Overall performance
print ( f "Overall Brier Score: { tracker.brier_score() :.4f} " )
print ( f "Log Loss: { tracker.log_loss() :.4f} " )
# Calibration curve
print ( " \n Calibration Curve:" )
print ( "Predicted | Actual | Deviation" )
print ( "-" * 40 )
curve = tracker.calibration_curve( n_bins = 5 )
for bucket in curve:
deviation = abs (bucket.predicted_mean - bucket.actual_frequency)
marker = " ***" if deviation > 0.10 else ""
print ( f " { bucket.predicted_mean :.2f} | { bucket.actual_frequency :.2f} | { deviation :.2f}{ marker } " )
# Full report
report = tracker.report( n_bins = 5 )
print ( f " \n Total predictions: { report.n_predictions } " )
print ( f "Resolved markets: { report.n_resolved } " )
tracker.close()
Integrated Monitoring with Backtesting
import horizon as hz
from horizon.calibration import CalibrationTracker
tracker = CalibrationTracker( db_path = ":memory:" )
def model ( ctx ):
# Your calibrated model
fair = 0.62
return fair
def quoter ( ctx , fair ):
if fair > ctx.feed.price + 0.03 :
return hz.quotes(ctx.feed.price, spread = 0.04 , size = 10 )
# Run backtest with outcomes for Brier scoring
data = [
{ "timestamp" : t, "price" : 0.50 + (t % 10 ) * 0.01 }
for t in range ( 1700000000 , 1700000500 )
]
result = hz.backtest(
name = "calibration-backtest" ,
markets = [ "test-market" ],
data = data,
pipeline = [model, quoter],
outcomes = { "test-market" : 1.0 },
)
# Check backtest Brier score
m = result.metrics
if m.brier_score is not None :
print ( f "Backtest Brier Score: { m.brier_score :.4f} " )
# Also track in calibration tracker for running statistics
tracker.log_prediction( "test-market" , probability = 0.62 )
tracker.resolve_market( "test-market" , outcome = True )
print ( f "Tracker Brier Score: { tracker.brier_score() :.4f} " )
print (result.summary())
tracker.close()
Grafana Dashboard
Example Grafana configuration for Horizon metrics
Use collector.render() to expose metrics in Prometheus text format, then scrape them with Prometheus: # prometheus.yml
scrape_configs :
- job_name : "horizon"
scrape_interval : 15s
static_configs :
- targets : [ "localhost:9090" ]
Suggested panels:
PnL Over Time : horizon_total_realized_pnl + horizon_total_unrealized_pnl
Order Rate : rate(horizon_orders_submitted_total[1m])
Position Count : horizon_active_positions
Tick Latency : histogram_quantile(0.95, horizon_tick_latency_seconds_bucket)
Fill Size Distribution : horizon_fill_size_bucket
Kill Switch Status : horizon_kill_switch (alert on value == 1)
If you serve metrics over HTTP, never expose the endpoint to the public internet. Use a reverse proxy with authentication or bind only to localhost.