Feeds provide live price data to your pipeline functions via ctx.feeds. Each feed runs as an async task on the engine’s tokio runtime.
Exchange Orderbooks: PolymarketBook, KalshiBook
Crypto Prices: BinanceWS, CoinbaseFeed, RobinhoodFeed
Equities & Options: AlpacaFeed, IBKRFeed
Prediction Markets: PredictIt, Manifold
Alternative Data: ESPN, NWS, CalendarFeed, TreasuryFeed
On-Chain Oracles: Chainlink, Mempool
Generic: REST, RESTJsonPath
Available Feeds
BinanceWS Real-time trade stream from Binance WebSocket.
PolymarketBook Polymarket orderbook snapshots via WebSocket.
KalshiBook Kalshi orderbook polling via REST.
RESTFeed Generic REST API polling for any endpoint.
PredictItFeed PredictIt market contract prices.
ManifoldFeed Manifold Markets crowd probabilities.
ESPNFeed Live sports scores from ESPN.
NWSFeed National Weather Service forecasts and alerts.
RESTJsonPathFeed Any REST API with dot-notation JSON path extraction.
ChainlinkFeed On-chain Chainlink price oracle via JSON-RPC.
MempoolFeed Polygon mempool watcher for CTF exchange order flow.
AlpacaFeed Real-time stock data from Alpaca Markets via WebSocket.
CoinbaseFeed Coinbase Advanced Trade ticker data via WebSocket.
RobinhoodFeed Robinhood crypto quotes via REST polling.
IBKRFeed Interactive Brokers market data via REST polling.
CalendarFeed Economic events and earnings calendar feed.
TreasuryFeed Treasury yield curve data from FRED API.
BinanceWS
Real-time price stream from Binance via WebSocket:
hz.run(
feeds = { "btc" : hz.BinanceWS( "btcusdt" )},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "btc" ].price
The feed connects to Binance’s trade stream and updates price, bid, ask, and timestamp on each trade.
Field Description symbolBinance trading pair (e.g., "btcusdt", "ethusdt")
PolymarketBook
Polymarket orderbook feed via WebSocket. Streams real-time best bid/ask from the CLOB WebSocket and updates on every orderbook change.
hz.run(
feeds = { "poly" : hz.PolymarketBook( "will-btc-hit-100k" )},
...
)
def fair_value ( ctx : hz.Context) -> float :
poly = ctx.feeds[ "poly" ]
return (poly.bid + poly.ask) / 2
Field Description market_slugPolymarket market slug or CLOB token ID
You can pass either a market slug (e.g., "will-btc-hit-100k") or a numeric CLOB token ID. When a slug is provided, Horizon automatically resolves it to the YES token ID via the Gamma API on startup (2 retry attempts, 15s timeout). If resolution fails, the slug is used as-is.
The feed handles three Polymarket WebSocket message formats automatically:
Initial book snapshot - full L2 orderbook with bid/ask levels
Price change updates - best bid/ask deltas
Direct book messages - fallback top-of-book format
Asset ID filtering ensures only data for the subscribed YES token is used (Polymarket sends updates for both YES and NO tokens on the same channel).
KalshiBook
Kalshi orderbook feed via REST polling:
hz.run(
feeds = { "kalshi" : hz.KalshiBook( "KXBTC-25FEB16" )},
...
)
Field Description tickerKalshi market ticker (e.g., "KXBTC-25FEB16")
RESTFeed
Generic REST API polling feed for any JSON endpoint:
hz.run(
feeds = { "custom" : hz.RESTFeed( "https://api.example.com/price" , interval = 5.0 )},
...
)
Field Default Description urlrequired REST endpoint URL interval5.0Polling interval in seconds
PredictItFeed
PredictIt market contract prices via REST polling:
hz.run(
feeds = { "election" : hz.PredictItFeed( market_id = 7456 , contract_id = 28562 )},
...
)
def fair_value ( ctx : hz.Context) -> float :
pi = ctx.feeds[ "election" ]
return pi.price # lastTradePrice
Field Default Description market_idrequired PredictIt market ID (integer) contract_idNoneSpecific contract ID. If None, uses the first contract. interval5.0Polling interval in seconds (minimum 1s)
Snapshot mapping : price = lastTradePrice, bid = bestBuyYesCost, ask = bestSellYesCost.
Handles HTTP 429 with automatic exponential backoff.
ManifoldFeed
Manifold Markets probability feed via REST polling:
hz.run(
feeds = { "manifold" : hz.ManifoldFeed( "will-btc-hit-100k-by-2026" )},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "manifold" ].price # probability (0-1)
Field Default Description slugrequired Manifold market slug interval5.0Polling interval in seconds
Snapshot mapping : price = probability, volume_24h = volume.
ESPNFeed
Live sports scores from ESPN’s scoreboard API:
hz.run(
feeds = { "nba" : hz.ESPNFeed( "basketball" , "nba" )},
...
)
def score_signal ( ctx : hz.Context) -> float :
game = ctx.feeds[ "nba" ]
home_score = game.price
away_score = game.bid
period = game.volume_24h
# game.source contains "espn:basketball:nba:{event_id}:{status}"
return home_score - away_score
Field Default Description sportrequired ESPN sport name (e.g., "basketball", "football") leaguerequired ESPN league name (e.g., "nba", "nfl") event_idNoneFilter to a specific game. If None, picks the first active game. interval10.0Polling interval in seconds
Snapshot mapping : price = home score, bid = away score, volume_24h = period/quarter. The source field encodes the game status ("in_progress", "scheduled", "final").
NWSFeed
National Weather Service data in two modes:
Forecast mode (default)
hz.run(
feeds = { "weather" : hz.NWSFeed( office = "TOP" , grid_x = 31 , grid_y = 80 )},
...
)
def weather_signal ( ctx : hz.Context) -> float :
wx = ctx.feeds[ "weather" ]
temperature = wx.price
wind_speed = wx.bid
precip_chance = wx.ask
return precip_chance / 100.0
Alerts mode
hz.run(
feeds = { "alerts" : hz.NWSFeed( state = "FL" , mode = "alerts" )},
...
)
def alert_signal ( ctx : hz.Context) -> float :
al = ctx.feeds[ "alerts" ]
has_alerts = al.price # 1.0 if active alerts, 0.0 otherwise
alert_count = al.bid
# al.source contains severity: "nws:alerts:FL:Severe"
return has_alerts
Field Default Description office""NWS office code (e.g., "TOP", "MIA"). Required for forecast mode. grid_x0Grid X coordinate. Required for forecast mode. grid_y0Grid Y coordinate. Required for forecast mode. state""US state abbreviation (e.g., "FL"). Required for alerts mode. mode"forecast""forecast" or "alerts"user_agent"Horizon-SDK/0.4.5"User-Agent header (NWS requires one) interval60.0Polling interval in seconds (minimum 60s per NWS policy)
Forecast mapping : price = temperature, bid = wind speed, ask = precipitation chance.
Alerts mapping : price = 1.0 if active alerts else 0.0, bid = alert count. Source encodes severity.
RESTJsonPathFeed
Flexible REST feed that extracts values from any JSON API using dot-notation paths:
hz.run(
feeds = {
"btc_coingecko" : hz.RESTJsonPathFeed(
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd" ,
price_path = "bitcoin.usd" ,
),
},
...
)
Path resolution:
"price" → json["price"]
"data.market.price" → json["data"]["market"]["price"]
"markets.0.lastPrice" → json["markets"][0]["lastPrice"]
Numeric segments try array index first, then object key
String values are auto-parsed to f64
Field Default Description urlrequired REST endpoint URL price_pathNoneDot-notation path to price field. Falls back to json["price"]. bid_pathNoneDot-notation path to bid field. Falls back to json["bid"]. ask_pathNoneDot-notation path to ask field. Falls back to json["ask"]. volume_pathNoneDot-notation path to volume field. Falls back to json["volume"]. interval5.0Polling interval in seconds
RESTJsonPathFeed is the most flexible feed - use it to pull data from any public API (CoinGecko, weather services, sports APIs, etc.) without writing custom feed code.
ChainlinkFeed
On-chain Chainlink price oracle feed. Reads latestRoundData() from any Chainlink aggregator proxy contract via JSON-RPC eth_call. Works with any EVM chain - Ethereum, Arbitrum, Polygon, BSC, etc.
hz.run(
feeds = {
"eth_usd" : hz.ChainlinkFeed(
contract_address = "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419" ,
rpc_url = "https://eth.llamarpc.com" ,
),
},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "eth_usd" ].price # e.g., 3245.12
Field Default Description contract_addressrequired Chainlink aggregator proxy address (with 0x prefix) rpc_urlrequired JSON-RPC endpoint URL for the target chain decimals8Price decimals (most Chainlink feeds use 8; some use 18) interval10.0Polling interval in seconds (minimum 1s)
Snapshot mapping : price = on-chain price (divided by 10^decimals), timestamp = on-chain updatedAt.
Common contract addresses (Ethereum mainnet):
Pair Address ETH/USD 0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419BTC/USD 0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88cLINK/USD 0x2c1d072e956AFFC0D435Cb7AC38EF18d24d9127cUSDC/USD 0x8fFfFfd4AfB6115b954Bd326cbe7B4BA576818f6
Find contract addresses for any chain at data.chain.link . Chainlink feeds work on Ethereum, Arbitrum, Polygon, BSC, Avalanche, and more - just change the rpc_url.
MempoolFeed
Polygon mempool watcher that monitors pending transactions to Polymarket’s CTF exchange contracts. Provides visibility into large pending orders before they confirm on-chain.
hz.run(
feeds = {
"mempool" : hz.MempoolFeed(
rpc_url = "https://polygon-rpc.com" ,
),
},
...
)
def detect_whale ( ctx : hz.Context) -> list :
mp = ctx.feeds.get( "mempool" )
if mp and mp.price > 10 : # >10 pending trades
print ( f "Large flow detected: { mp.volume_24h :.0f} USDC pending" )
return []
Field Default Description rpc_url""Polygon JSON-RPC URL (or set POLYGON_RPC_URL env var) ctf_addressesCTF + Neg Risk CTF Contract addresses to monitor interval2.0Polling interval in seconds
Snapshot mapping :
price = number of pending CTF trades
volume_24h = total pending USDC volume
bid = largest pending buy
ask = largest pending sell
last_trade_size = largest single trade
Default CTF addresses (Polygon):
Contract Address CTF Exchange 0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982ENeg Risk CTF 0xC5d563A36AE78145C45a50134d48A1215220f80a
Use a private/premium RPC endpoint for mempool access. Public RPCs may not support pending block queries.
AlpacaFeed
Real-time stock and ETF data from Alpaca Markets via WebSocket, streaming trades and quotes for one or more symbols.
hz.run(
feeds = { "stocks" : hz.AlpacaFeed( symbols = [ "AAPL" , "SPY" ])},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "stocks" ].price
Field Default Description symbols[]List of stock/ETF ticker symbols (e.g., ["AAPL", "SPY"]) api_keyNoneAlpaca API key (or set ALPACA_API_KEY / APCA_API_KEY_ID env var) api_secretNoneAlpaca API secret (or set ALPACA_API_SECRET / APCA_API_SECRET_KEY env var) data_source"iex"Data source: "iex" (free) or "sip" (paid, all exchanges)
Credentials are resolved from constructor args first, then environment variables. The sip data source provides consolidated tape from all US exchanges but requires a paid Alpaca subscription.
CoinbaseFeed
Coinbase Advanced Trade real-time ticker data via WebSocket for crypto trading pairs.
hz.run(
feeds = { "crypto" : hz.CoinbaseFeed( product_ids = [ "BTC-USD" , "ETH-USD" ])},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "crypto" ].price
Field Default Description product_ids[]List of Coinbase product IDs (e.g., ["BTC-USD", "ETH-USD"]) api_keyNoneCoinbase API key (or set COINBASE_API_KEY env var) api_secretNoneCoinbase API secret (or set COINBASE_API_SECRET env var)
RobinhoodFeed
Robinhood crypto quotes via REST polling at a configurable interval.
hz.run(
feeds = { "rh" : hz.RobinhoodFeed( symbols = [ "BTC-USD" , "ETH-USD" ])},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "rh" ].price
Field Default Description symbols[]List of crypto pair symbols (e.g., ["BTC-USD", "ETH-USD"]) api_keyNoneRobinhood API key (or set ROBINHOOD_API_KEY env var) interval5.0Polling interval in seconds
IBKRFeed
Interactive Brokers market data feed via REST polling against the Client Portal API. Supports stocks, options, futures, and ForecastEx event contracts.
hz.run(
feeds = { "aapl" : hz.IBKRFeed( conids = [ "265598" ])},
...
)
def fair_value ( ctx : hz.Context) -> float :
return ctx.feeds[ "aapl" ].price
Field Default Description conids[]List of IBKR contract IDs (e.g., ["265598"] for AAPL) access_tokenNoneOAuth2 access token (or set IBKR_ACCESS_TOKEN env var) api_urlNoneAPI base URL (auto-set based on paper flag if None) paperTrueUse paper trading API (paper-api.ibkr.com) vs live (api.ibkr.com) interval5.0Polling interval in seconds
Look up contract IDs (conids) via the IBKR Contract Search API or TWS. Set paper=False for live trading data.
CalendarFeed
Economic event and earnings calendar feed. Polls bundled or API-provided event data and maps the nearest upcoming event to FeedSnapshot fields.
hz.run(
feeds = { "events" : hz.CalendarFeed()},
...
)
def event_signal ( ctx : hz.Context) -> float :
ev = ctx.feeds[ "events" ]
seconds_to_event = ev.bid
event_type_code = ev.ask
return seconds_to_event
Field Default Description events_json""JSON string of custom events. If empty, uses bundled FOMC/CPI dates. api_url""Optional API URL to fetch events from interval300.0Polling interval in seconds
Snapshot mapping : bid = seconds until next event, ask = event type code.
TreasuryFeed
Treasury yield curve data from the Federal Reserve Economic Data (FRED) API. Polls multiple maturity series and maps the primary series to the price field.
hz.run(
feeds = { "yields" : hz.TreasuryFeed()},
...
)
def yield_signal ( ctx : hz.Context) -> float :
return ctx.feeds[ "yields" ].price # 10Y yield by default
Field Default Description api_keyNoneFRED API key (or set FRED_API_KEY env var). Get one free at fred.stlouisfed.org . series_ids9 maturities FRED series to fetch: 1M, 3M, 6M, 1Y, 2Y, 5Y, 10Y, 20Y, 30Y primary_series"DGS10"Series mapped to price field (default: 10-year yield) interval300.0Polling interval in seconds
Snapshot mapping : price = yield for primary_series (default 10Y).
Feed Data in Context
Each feed is accessible in pipeline functions via ctx.feeds:
from horizon.context import FeedData
def fair_value ( ctx : hz.Context) -> float :
btc = ctx.feeds.get( "btc" , FeedData())
btc.price # Last trade price
btc.bid # Best bid
btc.ask # Best ask
btc.timestamp # Unix timestamp of last update
return btc.price
FeedSnapshot (Engine Level)
At the Engine level, feed data is stored as FeedSnapshot objects:
snap = engine.feed_snapshot( "btc" )
snap.price # 100250.0
snap.bid # 100248.0
snap.ask # 100252.0
snap.timestamp # 1700000000.0
snap.source # "binance_ws"
snap.volume_24h # 50000.0
Other Engine feed methods:
all_snaps = engine.all_feed_snapshots() # dict[str, FeedSnapshot]
age = engine.feed_age( "btc" ) # seconds since last update (or None)
engine.stop_feeds() # stop all feeds
Feed Staleness
In live mode, the strategy loop checks feed staleness before quoting. If any feed is stale (no update within the threshold), quoting is skipped for that cycle:
# Default threshold: 30 seconds
# Override via params:
hz.run(
params = { "feed_stale_threshold" : 60.0 },
...
)
The FeedData.is_stale() method checks staleness:
from horizon.context import FeedData
feed = ctx.feeds.get( "btc" , FeedData())
if feed.is_stale( max_age_secs = 30.0 ):
# Feed hasn't updated, skip quoting
return []
Engine-Level Feed Management
When using the Engine directly, start feeds with start_feed():
# Original feeds
engine.start_feed( "btc" , "binance_ws" , symbol = "btcusdt" )
engine.start_feed( "poly" , "polymarket_book" , symbol = "will-btc-hit-100k" )
engine.start_feed( "kalshi" , "kalshi_book" , symbol = "KXBTC-25FEB16" )
engine.start_feed( "custom" , "rest" , url = "https://api.example.com/price" , interval = 5.0 )
# New feeds (v0.4.5) - use config_json for structured config
import json
engine.start_feed( "pi" , "predictit" , config_json = json.dumps({
"market_id" : 7456 , "contract_id" : 28562
}))
engine.start_feed( "mf" , "manifold" , config_json = json.dumps({
"slug" : "will-btc-hit-100k-by-2026"
}))
engine.start_feed( "nba" , "espn" , config_json = json.dumps({
"sport" : "basketball" , "league" : "nba"
}))
engine.start_feed( "wx" , "nws" , config_json = json.dumps({
"mode" : "alerts" , "state" : "FL"
}))
engine.start_feed( "btc2" , "rest_json_path" , config_json = json.dumps({
"url" : "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd" ,
"price_path" : "bitcoin.usd"
}))
engine.start_feed( "eth_usd" , "chainlink" , config_json = json.dumps({
"contract_address" : "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419" ,
"rpc_url" : "https://eth.llamarpc.com"
}))
# Exchange data feeds (v0.5.0)
engine.start_feed( "stocks" , "alpaca" , config_json = json.dumps({
"symbols" : [ "AAPL" , "SPY" ],
"api_key" : "..." , "api_secret" : "..." , "data_source" : "iex"
}))
engine.start_feed( "crypto" , "coinbase" , config_json = json.dumps({
"product_ids" : [ "BTC-USD" , "ETH-USD" ],
"api_key" : "..." , "api_secret" : "..."
}))
engine.start_feed( "rh" , "robinhood" , config_json = json.dumps({
"symbols" : [ "BTC-USD" ], "api_key" : "..." , "interval" : 5.0
}))
engine.start_feed( "aapl" , "ibkr" , config_json = json.dumps({
"conids" : [ "265598" ], "access_token" : "..." , "paper" : True
}))
engine.start_feed( "events" , "calendar" , config_json = json.dumps({
"api_url" : "" , "interval" : 300.0
}))
engine.start_feed( "yields" , "treasury" , config_json = json.dumps({
"api_key" : "..." , "primary_series" : "DGS10" , "interval" : 300.0
}))
Multiple Feeds
Use multiple feeds to combine data sources across markets:
hz.run(
feeds = {
# Crypto price
"btc" : hz.BinanceWS( "btcusdt" ),
# Prediction market orderbooks
"poly" : hz.PolymarketBook( "will-btc-hit-100k" ),
"kalshi" : hz.KalshiBook( "KXBTC-25FEB16" ),
# Cross-market signals
"pi_election" : hz.PredictItFeed( market_id = 7456 , contract_id = 28562 ),
"manifold" : hz.ManifoldFeed( "will-btc-hit-100k-by-2026" ),
# Sports data for sports-related markets
"nba" : hz.ESPNFeed( "basketball" , "nba" ),
# Weather for hurricane/climate markets
"weather_fl" : hz.NWSFeed( state = "FL" , mode = "alerts" ),
# Any custom API
"custom" : hz.RESTJsonPathFeed(
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd" ,
price_path = "bitcoin.usd" ,
),
# On-chain Chainlink oracle
"eth_usd" : hz.ChainlinkFeed(
contract_address = "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419" ,
rpc_url = "https://eth.llamarpc.com" ,
),
# Equities
"stocks" : hz.AlpacaFeed( symbols = [ "AAPL" , "SPY" ]),
# Macro data
"yields" : hz.TreasuryFeed(),
"events" : hz.CalendarFeed(),
},
...
)
def fair_value ( ctx : hz.Context) -> float :
from horizon.context import FeedData
btc = ctx.feeds.get( "btc" , FeedData())
poly = ctx.feeds.get( "poly" , FeedData())
# Combine BTC spot price with Polymarket book
if poly.bid > 0 and poly.ask > 0 :
return (poly.bid + poly.ask) / 2
return btc.price * 0.01 # Fallback