> ## Documentation Index
> Fetch the complete documentation index at: https://mathematicalcompany.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Collector

> Persistent point-in-time market data archiving

# Market Data Collector

Polymarket and Kalshi don't retain all historical data indefinitely. Orderbook depth, trade history, and market metadata can disappear. The collector module creates a persistent **PIT (point-in-time)** data archive that captures:

* **Full L2 orderbooks**: every bid/ask level at configurable intervals
* **Trade-by-trade history**: individual trades with deduplication
* **Market metadata**: names, tokens, volumes, liquidity, status
* **Event metadata**: event-level data with associated markets

## Two Modes

### Pipeline Mode

Integrate directly into your `hz.run()` strategy to record orderbooks alongside trading:

```python theme={null}
import horizon as hz

hz.run(
    name="mm_with_collector",
    markets=["0x1234..."],
    feeds={"poly": hz.PolymarketBook("0x1234...")},
    pipeline=[
        hz.collector("poly", orderbook_every_n=5),  # record every 5th cycle
        my_strategy,
    ],
)
```

### Standalone Daemon

Run as a 24/7 passive collector without any trading:

```python theme={null}
import horizon as hz

hz.collect(
    markets=["0xabc...", "0xdef..."],
    exchange="polymarket",
    db_path="data/markets.db",
)
```

Or with full configuration:

```python theme={null}
config = hz.DataCollectorConfig(
    markets=["0xabc...", "0xdef..."],
    exchange="polymarket",
    orderbook_interval=5.0,       # seconds between OB snapshots
    trade_poll_interval=10.0,     # seconds between trade polls
    metadata_interval=300.0,      # seconds between metadata refreshes
    max_book_levels=20,           # 0 means all levels
    db_path="data/markets.db",
    parquet_dir="data/parquet",   # auto-export to Parquet
    parquet_export_interval=3600, # export every hour
    retention_hours=168,          # purge data older than 7 days (168h = 1 week)
)

hz.collect(config=config)
```

## Parquet Export

Export collected data to Parquet for batch analysis with pandas, polars, or DuckDB:

```python theme={null}
files = hz.export_parquet(
    db_path="data/markets.db",
    output_dir="data/parquet",
    tables=["orderbook_snapshots", "market_trades"],
)
```

Requires `pyarrow`:

```bash theme={null}
pip install pyarrow
```

## Query Helpers

Query collected data directly:

```python theme={null}
# Orderbook snapshots
obs = hz.query_orderbooks("data/markets.db", market_id="0xabc...", limit=100)
for ob in obs:
    print(f"t={ob['timestamp']:.0f} mid={ob['mid_price']:.4f} spread={ob['spread']:.4f}")

# Trades
trades = hz.query_collected_trades("data/markets.db", market_id="0xabc...")
for t in trades:
    print(f"{t['side']} {t['size']} @ {t['price']:.4f}")
```

## Programmatic Access (Rust)

For maximum control, use the Rust `Collector` directly:

```python theme={null}
from horizon import Collector, CollectorConfig

config = CollectorConfig(
    market_ids=["0xabc..."],
    exchange="polymarket",
    orderbook_interval_secs=5.0,
    db_path="data/collector.db",
)
c = Collector(config)
c.start()

# Check status
st = c.status()
print(f"Running: {st.is_running}, OB snapshots: {st.ob_snapshots_total}")

# Manual recording (pipeline mode)
c.record_orderbook("0xabc...", "polymarket", bids_json, asks_json, ...)

# Query
import json
obs = json.loads(c.query_orderbooks("0xabc..."))
trades = json.loads(c.query_trades("0xabc..."))
meta = c.query_metadata("0xabc...")

# Cleanup
c.purge(max_age_secs=86400)  # purge data older than 24 hours
c.stop()
```

## Schema Reference

### `orderbook_snapshots`

| Column      | Type | Description                       |
| ----------- | ---- | --------------------------------- |
| `market_id` | TEXT | Market identifier                 |
| `exchange`  | TEXT | Exchange name                     |
| `bids_json` | TEXT | JSON array `[[price, size], ...]` |
| `asks_json` | TEXT | JSON array `[[price, size], ...]` |
| `best_bid`  | REAL | Top-of-book bid price             |
| `best_ask`  | REAL | Top-of-book ask price             |
| `mid_price` | REAL | `(best_bid + best_ask) / 2`       |
| `spread`    | REAL | `best_ask - best_bid`             |
| `timestamp` | REAL | Source timestamp (UNIX epoch)     |

### `market_trades`

| Column      | Type | Description             |
| ----------- | ---- | ----------------------- |
| `trade_id`  | TEXT | Unique trade identifier |
| `market_id` | TEXT | Market identifier       |
| `price`     | REAL | Trade price             |
| `size`      | REAL | Trade size              |
| `side`      | TEXT | `buy` or `sell`         |
| `timestamp` | REAL | Trade timestamp         |

### `market_metadata`

| Column      | Type | Description            |
| ----------- | ---- | ---------------------- |
| `market_id` | TEXT | Market identifier (PK) |
| `exchange`  | TEXT | Exchange name (PK)     |
| `question`  | TEXT | Market question        |
| `volume`    | REAL | Total volume           |
| `liquidity` | REAL | Current liquidity      |
| `status`    | TEXT | Market status          |

## Storage Estimates

| Data Type             | Interval  | Per Market/Day | 10 Markets/Month |
| --------------------- | --------- | -------------- | ---------------- |
| Orderbook (20 levels) | 5s        | \~35 MB        | \~10.5 GB        |
| Orderbook (20 levels) | 30s       | \~6 MB         | \~1.8 GB         |
| Trades                | 10s polls | \~5 MB         | \~1.5 GB         |
| Metadata              | 5 min     | \~0.5 MB       | \~150 MB         |

Use `retention_hours` and `purge()` to control storage growth. Parquet export + purge is recommended for long-term archiving.
