Skip to main content
Feeds provide live price data to your pipeline functions via ctx.feeds. Each feed runs as an async task on the engine’s tokio runtime.
Exchange Orderbooks: PolymarketBook, KalshiBook Crypto Prices: BinanceWS, CoinbaseFeed, RobinhoodFeed Equities & Options: AlpacaFeed, IBKRFeed Prediction Markets: PredictIt, Manifold Alternative Data: ESPN, NWS, CalendarFeed, TreasuryFeed On-Chain Oracles: Chainlink, Mempool Generic: REST, RESTJsonPath

Available Feeds

BinanceWS

Real-time trade stream from Binance WebSocket.

PolymarketBook

Polymarket orderbook snapshots via WebSocket.

KalshiBook

Kalshi orderbook polling via REST.

RESTFeed

Generic REST API polling for any endpoint.

PredictItFeed

PredictIt market contract prices.

ManifoldFeed

Manifold Markets crowd probabilities.

ESPNFeed

Live sports scores from ESPN.

NWSFeed

National Weather Service forecasts and alerts.

RESTJsonPathFeed

Any REST API with dot-notation JSON path extraction.

ChainlinkFeed

On-chain Chainlink price oracle via JSON-RPC.

MempoolFeed

Polygon mempool watcher for CTF exchange order flow.

AlpacaFeed

Real-time stock data from Alpaca Markets via WebSocket.

CoinbaseFeed

Coinbase Advanced Trade ticker data via WebSocket.

RobinhoodFeed

Robinhood crypto quotes via REST polling.

IBKRFeed

Interactive Brokers market data via REST polling.

CalendarFeed

Economic events and earnings calendar feed.

TreasuryFeed

Treasury yield curve data from FRED API.

BinanceWS

Real-time price stream from Binance via WebSocket:
hz.run(
    feeds={"btc": hz.BinanceWS("btcusdt")},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["btc"].price
The feed connects to Binance’s trade stream and updates price, bid, ask, and timestamp on each trade.
FieldDescription
symbolBinance trading pair (e.g., "btcusdt", "ethusdt")

PolymarketBook

Polymarket orderbook feed via WebSocket. Streams real-time best bid/ask from the CLOB WebSocket and updates on every orderbook change.
hz.run(
    feeds={"poly": hz.PolymarketBook("will-btc-hit-100k")},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    poly = ctx.feeds["poly"]
    return (poly.bid + poly.ask) / 2
FieldDescription
market_slugPolymarket market slug or CLOB token ID
You can pass either a market slug (e.g., "will-btc-hit-100k") or a numeric CLOB token ID. When a slug is provided, Horizon automatically resolves it to the YES token ID via the Gamma API on startup (2 retry attempts, 15s timeout). If resolution fails, the slug is used as-is.
The feed handles three Polymarket WebSocket message formats automatically:
  1. Initial book snapshot - full L2 orderbook with bid/ask levels
  2. Price change updates - best bid/ask deltas
  3. Direct book messages - fallback top-of-book format
Asset ID filtering ensures only data for the subscribed YES token is used (Polymarket sends updates for both YES and NO tokens on the same channel).

KalshiBook

Kalshi orderbook feed via REST polling:
hz.run(
    feeds={"kalshi": hz.KalshiBook("KXBTC-25FEB16")},
    ...
)
FieldDescription
tickerKalshi market ticker (e.g., "KXBTC-25FEB16")

RESTFeed

Generic REST API polling feed for any JSON endpoint:
hz.run(
    feeds={"custom": hz.RESTFeed("https://api.example.com/price", interval=5.0)},
    ...
)
FieldDefaultDescription
urlrequiredREST endpoint URL
interval5.0Polling interval in seconds

PredictItFeed

PredictIt market contract prices via REST polling:
hz.run(
    feeds={"election": hz.PredictItFeed(market_id=7456, contract_id=28562)},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    pi = ctx.feeds["election"]
    return pi.price  # lastTradePrice
FieldDefaultDescription
market_idrequiredPredictIt market ID (integer)
contract_idNoneSpecific contract ID. If None, uses the first contract.
interval5.0Polling interval in seconds (minimum 1s)
Snapshot mapping: price = lastTradePrice, bid = bestBuyYesCost, ask = bestSellYesCost. Handles HTTP 429 with automatic exponential backoff.

ManifoldFeed

Manifold Markets probability feed via REST polling:
hz.run(
    feeds={"manifold": hz.ManifoldFeed("will-btc-hit-100k-by-2026")},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["manifold"].price  # probability (0-1)
FieldDefaultDescription
slugrequiredManifold market slug
interval5.0Polling interval in seconds
Snapshot mapping: price = probability, volume_24h = volume.

ESPNFeed

Live sports scores from ESPN’s scoreboard API:
hz.run(
    feeds={"nba": hz.ESPNFeed("basketball", "nba")},
    ...
)

def score_signal(ctx: hz.Context) -> float:
    game = ctx.feeds["nba"]
    home_score = game.price
    away_score = game.bid
    period = game.volume_24h
    # game.source contains "espn:basketball:nba:{event_id}:{status}"
    return home_score - away_score
FieldDefaultDescription
sportrequiredESPN sport name (e.g., "basketball", "football")
leaguerequiredESPN league name (e.g., "nba", "nfl")
event_idNoneFilter to a specific game. If None, picks the first active game.
interval10.0Polling interval in seconds
Snapshot mapping: price = home score, bid = away score, volume_24h = period/quarter. The source field encodes the game status ("in_progress", "scheduled", "final").

NWSFeed

National Weather Service data in two modes:

Forecast mode (default)

hz.run(
    feeds={"weather": hz.NWSFeed(office="TOP", grid_x=31, grid_y=80)},
    ...
)

def weather_signal(ctx: hz.Context) -> float:
    wx = ctx.feeds["weather"]
    temperature = wx.price
    wind_speed = wx.bid
    precip_chance = wx.ask
    return precip_chance / 100.0

Alerts mode

hz.run(
    feeds={"alerts": hz.NWSFeed(state="FL", mode="alerts")},
    ...
)

def alert_signal(ctx: hz.Context) -> float:
    al = ctx.feeds["alerts"]
    has_alerts = al.price  # 1.0 if active alerts, 0.0 otherwise
    alert_count = al.bid
    # al.source contains severity: "nws:alerts:FL:Severe"
    return has_alerts
FieldDefaultDescription
office""NWS office code (e.g., "TOP", "MIA"). Required for forecast mode.
grid_x0Grid X coordinate. Required for forecast mode.
grid_y0Grid Y coordinate. Required for forecast mode.
state""US state abbreviation (e.g., "FL"). Required for alerts mode.
mode"forecast""forecast" or "alerts"
user_agent"Horizon-SDK/0.4.5"User-Agent header (NWS requires one)
interval60.0Polling interval in seconds (minimum 60s per NWS policy)
Forecast mapping: price = temperature, bid = wind speed, ask = precipitation chance. Alerts mapping: price = 1.0 if active alerts else 0.0, bid = alert count. Source encodes severity.

RESTJsonPathFeed

Flexible REST feed that extracts values from any JSON API using dot-notation paths:
hz.run(
    feeds={
        "btc_coingecko": hz.RESTJsonPathFeed(
            url="https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd",
            price_path="bitcoin.usd",
        ),
    },
    ...
)
Path resolution:
  • "price"json["price"]
  • "data.market.price"json["data"]["market"]["price"]
  • "markets.0.lastPrice"json["markets"][0]["lastPrice"]
  • Numeric segments try array index first, then object key
  • String values are auto-parsed to f64
FieldDefaultDescription
urlrequiredREST endpoint URL
price_pathNoneDot-notation path to price field. Falls back to json["price"].
bid_pathNoneDot-notation path to bid field. Falls back to json["bid"].
ask_pathNoneDot-notation path to ask field. Falls back to json["ask"].
volume_pathNoneDot-notation path to volume field. Falls back to json["volume"].
interval5.0Polling interval in seconds
RESTJsonPathFeed is the most flexible feed - use it to pull data from any public API (CoinGecko, weather services, sports APIs, etc.) without writing custom feed code.

ChainlinkFeed

On-chain Chainlink price oracle feed. Reads latestRoundData() from any Chainlink aggregator proxy contract via JSON-RPC eth_call. Works with any EVM chain - Ethereum, Arbitrum, Polygon, BSC, etc.
hz.run(
    feeds={
        "eth_usd": hz.ChainlinkFeed(
            contract_address="0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419",
            rpc_url="https://eth.llamarpc.com",
        ),
    },
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["eth_usd"].price  # e.g., 3245.12
FieldDefaultDescription
contract_addressrequiredChainlink aggregator proxy address (with 0x prefix)
rpc_urlrequiredJSON-RPC endpoint URL for the target chain
decimals8Price decimals (most Chainlink feeds use 8; some use 18)
interval10.0Polling interval in seconds (minimum 1s)
Snapshot mapping: price = on-chain price (divided by 10^decimals), timestamp = on-chain updatedAt. Common contract addresses (Ethereum mainnet):
PairAddress
ETH/USD0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419
BTC/USD0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88c
LINK/USD0x2c1d072e956AFFC0D435Cb7AC38EF18d24d9127c
USDC/USD0x8fFfFfd4AfB6115b954Bd326cbe7B4BA576818f6
Find contract addresses for any chain at data.chain.link. Chainlink feeds work on Ethereum, Arbitrum, Polygon, BSC, Avalanche, and more - just change the rpc_url.

MempoolFeed

Polygon mempool watcher that monitors pending transactions to Polymarket’s CTF exchange contracts. Provides visibility into large pending orders before they confirm on-chain.
hz.run(
    feeds={
        "mempool": hz.MempoolFeed(
            rpc_url="https://polygon-rpc.com",
        ),
    },
    ...
)

def detect_whale(ctx: hz.Context) -> list:
    mp = ctx.feeds.get("mempool")
    if mp and mp.price > 10:  # >10 pending trades
        print(f"Large flow detected: {mp.volume_24h:.0f} USDC pending")
    return []
FieldDefaultDescription
rpc_url""Polygon JSON-RPC URL (or set POLYGON_RPC_URL env var)
ctf_addressesCTF + Neg Risk CTFContract addresses to monitor
interval2.0Polling interval in seconds
Snapshot mapping:
  • price = number of pending CTF trades
  • volume_24h = total pending USDC volume
  • bid = largest pending buy
  • ask = largest pending sell
  • last_trade_size = largest single trade
Default CTF addresses (Polygon):
ContractAddress
CTF Exchange0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E
Neg Risk CTF0xC5d563A36AE78145C45a50134d48A1215220f80a
Use a private/premium RPC endpoint for mempool access. Public RPCs may not support pending block queries.

AlpacaFeed

Real-time stock and ETF data from Alpaca Markets via WebSocket, streaming trades and quotes for one or more symbols.
hz.run(
    feeds={"stocks": hz.AlpacaFeed(symbols=["AAPL", "SPY"])},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["stocks"].price
FieldDefaultDescription
symbols[]List of stock/ETF ticker symbols (e.g., ["AAPL", "SPY"])
api_keyNoneAlpaca API key (or set ALPACA_API_KEY / APCA_API_KEY_ID env var)
api_secretNoneAlpaca API secret (or set ALPACA_API_SECRET / APCA_API_SECRET_KEY env var)
data_source"iex"Data source: "iex" (free) or "sip" (paid, all exchanges)
Credentials are resolved from constructor args first, then environment variables. The sip data source provides consolidated tape from all US exchanges but requires a paid Alpaca subscription.

CoinbaseFeed

Coinbase Advanced Trade real-time ticker data via WebSocket for crypto trading pairs.
hz.run(
    feeds={"crypto": hz.CoinbaseFeed(product_ids=["BTC-USD", "ETH-USD"])},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["crypto"].price
FieldDefaultDescription
product_ids[]List of Coinbase product IDs (e.g., ["BTC-USD", "ETH-USD"])
api_keyNoneCoinbase API key (or set COINBASE_API_KEY env var)
api_secretNoneCoinbase API secret (or set COINBASE_API_SECRET env var)

RobinhoodFeed

Robinhood crypto quotes via REST polling at a configurable interval.
hz.run(
    feeds={"rh": hz.RobinhoodFeed(symbols=["BTC-USD", "ETH-USD"])},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["rh"].price
FieldDefaultDescription
symbols[]List of crypto pair symbols (e.g., ["BTC-USD", "ETH-USD"])
api_keyNoneRobinhood API key (or set ROBINHOOD_API_KEY env var)
interval5.0Polling interval in seconds

IBKRFeed

Interactive Brokers market data feed via REST polling against the Client Portal API. Supports stocks, options, futures, and ForecastEx event contracts.
hz.run(
    feeds={"aapl": hz.IBKRFeed(conids=["265598"])},
    ...
)

def fair_value(ctx: hz.Context) -> float:
    return ctx.feeds["aapl"].price
FieldDefaultDescription
conids[]List of IBKR contract IDs (e.g., ["265598"] for AAPL)
access_tokenNoneOAuth2 access token (or set IBKR_ACCESS_TOKEN env var)
api_urlNoneAPI base URL (auto-set based on paper flag if None)
paperTrueUse paper trading API (paper-api.ibkr.com) vs live (api.ibkr.com)
interval5.0Polling interval in seconds
Look up contract IDs (conids) via the IBKR Contract Search API or TWS. Set paper=False for live trading data.

CalendarFeed

Economic event and earnings calendar feed. Polls bundled or API-provided event data and maps the nearest upcoming event to FeedSnapshot fields.
hz.run(
    feeds={"events": hz.CalendarFeed()},
    ...
)

def event_signal(ctx: hz.Context) -> float:
    ev = ctx.feeds["events"]
    seconds_to_event = ev.bid
    event_type_code = ev.ask
    return seconds_to_event
FieldDefaultDescription
events_json""JSON string of custom events. If empty, uses bundled FOMC/CPI dates.
api_url""Optional API URL to fetch events from
interval300.0Polling interval in seconds
Snapshot mapping: bid = seconds until next event, ask = event type code.

TreasuryFeed

Treasury yield curve data from the Federal Reserve Economic Data (FRED) API. Polls multiple maturity series and maps the primary series to the price field.
hz.run(
    feeds={"yields": hz.TreasuryFeed()},
    ...
)

def yield_signal(ctx: hz.Context) -> float:
    return ctx.feeds["yields"].price  # 10Y yield by default
FieldDefaultDescription
api_keyNoneFRED API key (or set FRED_API_KEY env var). Get one free at fred.stlouisfed.org.
series_ids9 maturitiesFRED series to fetch: 1M, 3M, 6M, 1Y, 2Y, 5Y, 10Y, 20Y, 30Y
primary_series"DGS10"Series mapped to price field (default: 10-year yield)
interval300.0Polling interval in seconds
Snapshot mapping: price = yield for primary_series (default 10Y).

Feed Data in Context

Each feed is accessible in pipeline functions via ctx.feeds:
from horizon.context import FeedData

def fair_value(ctx: hz.Context) -> float:
    btc = ctx.feeds.get("btc", FeedData())

    btc.price       # Last trade price
    btc.bid         # Best bid
    btc.ask         # Best ask
    btc.timestamp   # Unix timestamp of last update

    return btc.price

FeedSnapshot (Engine Level)

At the Engine level, feed data is stored as FeedSnapshot objects:
snap = engine.feed_snapshot("btc")

snap.price       # 100250.0
snap.bid         # 100248.0
snap.ask         # 100252.0
snap.timestamp   # 1700000000.0
snap.source      # "binance_ws"
snap.volume_24h  # 50000.0
Other Engine feed methods:
all_snaps = engine.all_feed_snapshots()  # dict[str, FeedSnapshot]
age = engine.feed_age("btc")            # seconds since last update (or None)
engine.stop_feeds()                      # stop all feeds

Feed Staleness

In live mode, the strategy loop checks feed staleness before quoting. If any feed is stale (no update within the threshold), quoting is skipped for that cycle:
# Default threshold: 30 seconds
# Override via params:
hz.run(
    params={"feed_stale_threshold": 60.0},
    ...
)
The FeedData.is_stale() method checks staleness:
from horizon.context import FeedData

feed = ctx.feeds.get("btc", FeedData())
if feed.is_stale(max_age_secs=30.0):
    # Feed hasn't updated, skip quoting
    return []

Engine-Level Feed Management

When using the Engine directly, start feeds with start_feed():
# Original feeds
engine.start_feed("btc", "binance_ws", symbol="btcusdt")
engine.start_feed("poly", "polymarket_book", symbol="will-btc-hit-100k")
engine.start_feed("kalshi", "kalshi_book", symbol="KXBTC-25FEB16")
engine.start_feed("custom", "rest", url="https://api.example.com/price", interval=5.0)

# New feeds (v0.4.5) - use config_json for structured config
import json

engine.start_feed("pi", "predictit", config_json=json.dumps({
    "market_id": 7456, "contract_id": 28562
}))
engine.start_feed("mf", "manifold", config_json=json.dumps({
    "slug": "will-btc-hit-100k-by-2026"
}))
engine.start_feed("nba", "espn", config_json=json.dumps({
    "sport": "basketball", "league": "nba"
}))
engine.start_feed("wx", "nws", config_json=json.dumps({
    "mode": "alerts", "state": "FL"
}))
engine.start_feed("btc2", "rest_json_path", config_json=json.dumps({
    "url": "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd",
    "price_path": "bitcoin.usd"
}))
engine.start_feed("eth_usd", "chainlink", config_json=json.dumps({
    "contract_address": "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419",
    "rpc_url": "https://eth.llamarpc.com"
}))

# Exchange data feeds (v0.5.0)
engine.start_feed("stocks", "alpaca", config_json=json.dumps({
    "symbols": ["AAPL", "SPY"],
    "api_key": "...", "api_secret": "...", "data_source": "iex"
}))
engine.start_feed("crypto", "coinbase", config_json=json.dumps({
    "product_ids": ["BTC-USD", "ETH-USD"],
    "api_key": "...", "api_secret": "..."
}))
engine.start_feed("rh", "robinhood", config_json=json.dumps({
    "symbols": ["BTC-USD"], "api_key": "...", "interval": 5.0
}))
engine.start_feed("aapl", "ibkr", config_json=json.dumps({
    "conids": ["265598"], "access_token": "...", "paper": True
}))
engine.start_feed("events", "calendar", config_json=json.dumps({
    "api_url": "", "interval": 300.0
}))
engine.start_feed("yields", "treasury", config_json=json.dumps({
    "api_key": "...", "primary_series": "DGS10", "interval": 300.0
}))

Multiple Feeds

Use multiple feeds to combine data sources across markets:
hz.run(
    feeds={
        # Crypto price
        "btc": hz.BinanceWS("btcusdt"),
        # Prediction market orderbooks
        "poly": hz.PolymarketBook("will-btc-hit-100k"),
        "kalshi": hz.KalshiBook("KXBTC-25FEB16"),
        # Cross-market signals
        "pi_election": hz.PredictItFeed(market_id=7456, contract_id=28562),
        "manifold": hz.ManifoldFeed("will-btc-hit-100k-by-2026"),
        # Sports data for sports-related markets
        "nba": hz.ESPNFeed("basketball", "nba"),
        # Weather for hurricane/climate markets
        "weather_fl": hz.NWSFeed(state="FL", mode="alerts"),
        # Any custom API
        "custom": hz.RESTJsonPathFeed(
            url="https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd",
            price_path="bitcoin.usd",
        ),
        # On-chain Chainlink oracle
        "eth_usd": hz.ChainlinkFeed(
            contract_address="0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419",
            rpc_url="https://eth.llamarpc.com",
        ),
        # Equities
        "stocks": hz.AlpacaFeed(symbols=["AAPL", "SPY"]),
        # Macro data
        "yields": hz.TreasuryFeed(),
        "events": hz.CalendarFeed(),
    },
    ...
)

def fair_value(ctx: hz.Context) -> float:
    from horizon.context import FeedData
    btc = ctx.feeds.get("btc", FeedData())
    poly = ctx.feeds.get("poly", FeedData())

    # Combine BTC spot price with Polymarket book
    if poly.bid > 0 and poly.ask > 0:
        return (poly.bid + poly.ask) / 2
    return btc.price * 0.01  # Fallback