Push-down stats with the xorq stat pipeline¶
The pluggable analysis framework (see Using Buckaroo) is built around
ColAnalysis objects that operate on pandas Series. That works well
when the data already fits in memory. XorqStatPipeline is the same
idea — a DAG of typed stat functions, error capture, add_analysis
on the fly — except every stat compiles to a xorq expression and executes on the
backend (DuckDB, Postgres, Snowflake, DataFusion, …). buckaroo never
materialises the table.
Install with pip install 'buckaroo[xorq]'.
Why push the stats down?¶
The pandas DfStats path summarises a 50M-row table by loading 50M
rows into memory. For anything bigger than the laptop, that’s the end
of the road — every column’s null_count, distinct_count, mean, median,
and histogram has to come back out as native data.
A xorq expression knows nothing about pandas. expr.aggregate(...)
compiles to a single SQL query, and the only thing that comes back to
Python is the row of scalars. The histogram is a second pass — ten
GROUP BY queries, again on the backend — but no row-level data
ever leaves the engine.
The pipeline is also the same shape as StatPipeline (pandas) and
PlDfStatsV2 (polars), so DataFlow, CustomizableDataflow, and
the autocleaning code keep working without changes.
In Action¶
Build a xorq expression, hand it to XorqStatPipeline, get the
same {column: {stat: value}} dict you’d get from pandas:
import xorq.api as xo
from buckaroo.pluggable_analysis_framework.xorq_stat_pipeline import XorqStatPipeline
from buckaroo.customizations.xorq_stats_v2 import XORQ_STATS_V2
expr = xo.memtable({
'price': [12.5, 18.9, 7.4, 22.1, None, 14.0, 9.9, 31.2, 11.5, 19.8],
'qty': [1, 2, 1, 3, 1, 2, 4, 1, 2, 5],
'category': ['a', 'b', 'a', 'c', 'a', 'b', 'b', 'c', 'a', 'b'],
'is_promo': [True, False, False, True, False, False, True, True, False, True],
})
pipeline = XorqStatPipeline(XORQ_STATS_V2)
summary, errors = pipeline.process_table(expr)
summary['price']
# {'_type': 'float', 'length': 10, 'null_count': 1, 'distinct_count': 9,
# 'min': 7.4, 'max': 31.2, 'mean': 16.367..., 'std': 7.05..., 'median': 14.0,
# 'non_null_count': 9, 'nan_per': 0.1, 'distinct_per': 0.9, 'histogram': [...]}
XORQ_STATS_V2 is the convenience list of @stat functions in
buckaroo/customizations/xorq_stats_v2.py — drop it in for the
default panel, or compose your own.
Two-phase execution¶
Each @stat function declares what it needs through its parameter
types. XorqStatPipeline looks at those types and decides which
phase the stat belongs to:
Batch aggregate. Stats whose only inputs are
XorqColumn(or framework-provided keys) return an ibis expression. The pipeline collects every such expression across every column, names each one"<col>|<stat>", and folds them into a singleexpr.aggregate(...)call. One round-trip to the backend per table.Per-column post-batch. Stats with
XorqExprparameters need their own query — histograms, for instance, can’t be folded into the batch because theyGROUP BYon the data. They run after the batch lands, with the batch results already in the per-column accumulator (so a histogram stat can readmin/maxwithout recomputing them).
Computed stats with no raw deps (non_null_count, distinct_per,
…) run in phase 2 too, but they’re cheap dict math — no query.
Marker types live in
buckaroo.pluggable_analysis_framework.stat_func:
|
|
|
the full xorq expression. For stats that issue their own per-column query. |
|
a 1-arg callable that runs an expression against the
pipeline’s backend (or |
Adding a custom stat¶
Same ergonomics as the pandas @stat decorator: function signature
is the contract. Parameter names become inputs in the DAG; the
function name becomes the output key.
from buckaroo.pluggable_analysis_framework.stat_func import stat, XorqColumn
@stat(column_filter=lambda dt: dt.is_numeric())
def value_range(col: XorqColumn) -> float:
return (col.max() - col.min()).cast('float64')
@stat(column_filter=lambda dt: dt.is_numeric() and not dt.is_boolean())
def cv(mean: float, std: float) -> float:
if mean is None or std is None or mean == 0:
return None
return std / mean
extended = [*XORQ_STATS_V2, value_range, cv]
summary, _ = XorqStatPipeline(extended).process_table(expr)
value_range takes an XorqColumn and returns an ibis expression
— it joins the batch aggregate. cv reads mean and std
(already in the accumulator from the batch) and runs as cheap Python
math in phase 2. The DAG figures out the order; column_filter
keeps both stats from running on string columns.
For stats that should write more than one accumulator key, return a
MultipleProvides (a TypedDict alias) — the pipeline expands
each field into its own StatKey:
from buckaroo.pluggable_analysis_framework.stat_func import MultipleProvides, stat
class TypingResult(MultipleProvides):
is_numeric: bool
is_integer: bool
is_float: bool
@stat()
def typing_stats(dtype: str) -> TypingResult:
return {'is_numeric': ..., 'is_integer': ..., 'is_float': ...}
Histograms¶
Numeric columns get ten equal-width buckets between min and
max. Non-numeric columns — and numeric columns with five or fewer
distinct values — get the top-10 categorical histogram instead. The
threshold mirrors pd_stats_v2: ten quantile buckets over five
distinct values is mostly empty bars.
Both branches go through execute: XorqExecute so a backend passed
to the pipeline (XorqStatPipeline(stats, backend=con)) is honoured.
DataFlow integration¶
XorqDfStatsV2 mirrors the DfStatsV2 / PlDfStatsV2 interface
(.sdf, .errs, .add_analysis) so anything wired up for
pandas/polars works against a xorq expression unchanged:
from buckaroo.pluggable_analysis_framework.xorq_stat_pipeline import XorqDfStatsV2
stats = XorqDfStatsV2(expr, XORQ_STATS_V2)
stats.sdf # SDType — same shape as DfStatsV2.sdf
stats.errs # ErrDict — same shape as DfStatsV2.errs
@stat(column_filter=lambda dt: dt.is_numeric())
def value_range(col: XorqColumn) -> float:
return (col.max() - col.min()).cast('float64')
stats.add_analysis(value_range)
stats.sdf['price']['value_range']
XorqDfStatsV2 lives in xorq_stat_pipeline rather than
df_stats_v2 so the generic DfStats module never imports xorq —
buckaroo without [xorq] keeps working.
Bring your own backend¶
By default, XorqStatPipeline calls query.execute() on each
expression, which uses whatever backend xorq inferred when the
expression was built (DataFusion for xo.memtable). Pass
backend= to route every query — the batch aggregate and every
per-column histogram — through a specific connection:
con = xo.connect('duckdb://')
table = con.read_parquet('s3://.../events.parquet')
pipeline = XorqStatPipeline(XORQ_STATS_V2, backend=con)
summary, errors = pipeline.process_table(table)
Useful when the table is unbound, or when you want to force execution on a specific engine (Postgres for prod data, DuckDB for ad-hoc, …).
Errors and validation¶
XorqStatPipeline follows the same rules as StatPipeline:
The DAG is validated at construction. A missing dependency or a cycle raises
DAGConfigErrorimmediately, not at first call.A construction-time smoke test runs the pipeline against
PERVERSE_DF(wrapped as a xorq memtable) so typos and wrong-dtype assumptions surface before real data hits.Per-column failures land in the
errorslist asStatErrorobjects with reproduction code; the rest of the pipeline keeps going.add_stat/add_analysisrevalidate and report; the pipeline state only updates on success.
The notebook example at docs/example-notebooks/Xorq-Stats.ipynb runs the full flow end-to-end and prints the SQL each phase emits.