Skip to main content

Market Data Collector

Polymarket and Kalshi don’t retain all historical data indefinitely. Orderbook depth, trade history, and market metadata can disappear. The collector module creates a persistent PIT (point-in-time) data archive that captures:
  • Full L2 orderbooks: every bid/ask level at configurable intervals
  • Trade-by-trade history: individual trades with deduplication
  • Market metadata: names, tokens, volumes, liquidity, status
  • Event metadata: event-level data with associated markets

Two Modes

Pipeline Mode

Integrate directly into your hz.run() strategy to record orderbooks alongside trading:
import horizon as hz

hz.run(
    name="mm_with_collector",
    markets=["0x1234..."],
    feeds={"poly": hz.PolymarketBook("0x1234...")},
    pipeline=[
        hz.collector("poly", orderbook_every_n=5),  # record every 5th cycle
        my_strategy,
    ],
)

Standalone Daemon

Run as a 24/7 passive collector without any trading:
import horizon as hz

hz.collect(
    markets=["0xabc...", "0xdef..."],
    exchange="polymarket",
    db_path="data/markets.db",
)
Or with full configuration:
config = hz.DataCollectorConfig(
    markets=["0xabc...", "0xdef..."],
    exchange="polymarket",
    orderbook_interval=5.0,       # seconds between OB snapshots
    trade_poll_interval=10.0,     # seconds between trade polls
    metadata_interval=300.0,      # seconds between metadata refreshes
    max_book_levels=20,           # 0 means all levels
    db_path="data/markets.db",
    parquet_dir="data/parquet",   # auto-export to Parquet
    parquet_export_interval=3600, # export every hour
    retention_hours=168,          # purge data older than 7 days (168h = 1 week)
)

hz.collect(config=config)

Parquet Export

Export collected data to Parquet for batch analysis with pandas, polars, or DuckDB:
files = hz.export_parquet(
    db_path="data/markets.db",
    output_dir="data/parquet",
    tables=["orderbook_snapshots", "market_trades"],
)
Requires pyarrow:
pip install pyarrow

Query Helpers

Query collected data directly:
# Orderbook snapshots
obs = hz.query_orderbooks("data/markets.db", market_id="0xabc...", limit=100)
for ob in obs:
    print(f"t={ob['timestamp']:.0f} mid={ob['mid_price']:.4f} spread={ob['spread']:.4f}")

# Trades
trades = hz.query_collected_trades("data/markets.db", market_id="0xabc...")
for t in trades:
    print(f"{t['side']} {t['size']} @ {t['price']:.4f}")

Programmatic Access (Rust)

For maximum control, use the Rust Collector directly:
from horizon import Collector, CollectorConfig

config = CollectorConfig(
    market_ids=["0xabc..."],
    exchange="polymarket",
    orderbook_interval_secs=5.0,
    db_path="data/collector.db",
)
c = Collector(config)
c.start()

# Check status
st = c.status()
print(f"Running: {st.is_running}, OB snapshots: {st.ob_snapshots_total}")

# Manual recording (pipeline mode)
c.record_orderbook("0xabc...", "polymarket", bids_json, asks_json, ...)

# Query
import json
obs = json.loads(c.query_orderbooks("0xabc..."))
trades = json.loads(c.query_trades("0xabc..."))
meta = c.query_metadata("0xabc...")

# Cleanup
c.purge(max_age_secs=86400)  # purge data older than 24 hours
c.stop()

Schema Reference

orderbook_snapshots

ColumnTypeDescription
market_idTEXTMarket identifier
exchangeTEXTExchange name
bids_jsonTEXTJSON array [[price, size], ...]
asks_jsonTEXTJSON array [[price, size], ...]
best_bidREALTop-of-book bid price
best_askREALTop-of-book ask price
mid_priceREAL(best_bid + best_ask) / 2
spreadREALbest_ask - best_bid
timestampREALSource timestamp (UNIX epoch)

market_trades

ColumnTypeDescription
trade_idTEXTUnique trade identifier
market_idTEXTMarket identifier
priceREALTrade price
sizeREALTrade size
sideTEXTbuy or sell
timestampREALTrade timestamp

market_metadata

ColumnTypeDescription
market_idTEXTMarket identifier (PK)
exchangeTEXTExchange name (PK)
questionTEXTMarket question
volumeREALTotal volume
liquidityREALCurrent liquidity
statusTEXTMarket status

Storage Estimates

Data TypeIntervalPer Market/Day10 Markets/Month
Orderbook (20 levels)5s~35 MB~10.5 GB
Orderbook (20 levels)30s~6 MB~1.8 GB
Trades10s polls~5 MB~1.5 GB
Metadata5 min~0.5 MB~150 MB
Use retention_hours and purge() to control storage growth. Parquet export + purge is recommended for long-term archiving.