Clinker
Clinker is a pure-Rust, memory-bounded CLI ETL engine for streaming transformation of CSV, JSON, XML, and fixed-width data. It ships as a single static binary with no interpreter, no runtime, and no install dependencies.
Pipelines are declared in YAML. Data transformation logic is written in CXL, a custom expression language purpose-built for ETL. Together they replace legacy tools like Informatica, SSIS, Talend, and NiFi with something deterministic, lightweight, and easy to reason about.
Why Clinker?
Single binary, zero dependencies. Download it, run it. No JVM, no Python, no package manager. Works on any Linux server out of the box.
Good neighbor on busy servers. Clinker enforces a strict memory ceiling (default 256 MB) so it can run alongside JVM applications, databases, and other services without competing for RAM. Aggregation spills to disk when memory pressure rises.
Reproducible output. Given the same input and pipeline, Clinker produces byte-identical output across runs. No nondeterminism from thread scheduling, hash randomization, or floating-point reordering.
Operability-first design. Per-stage metrics, dead-letter queues for error records, explain plans for understanding execution, and structured exit codes for scripting. Built for production from day one.
Two binaries:
| Binary | Purpose |
|---|---|
clinker | Run pipelines against real data |
cxl | Check, evaluate, and format CXL expressions interactively |
A taste of Clinker
Here is a complete pipeline that reads a customer CSV, filters to active customers, classifies them into tiers, and writes the result:
pipeline:
name: customer_etl
nodes:
- type: source
name: customers
config:
name: customers
type: csv
path: "./data/customers.csv"
schema:
- { name: customer_id, type: int }
- { name: first_name, type: string }
- { name: last_name, type: string }
- { name: status, type: string }
- { name: lifetime_value, type: float }
- type: transform
name: enrich
input: customers
config:
cxl: |
filter status == "active"
emit customer_id = customer_id
emit full_name = first_name + " " + last_name
emit tier = if lifetime_value >= 10000 then "gold" else "standard"
- type: output
name: result
input: enrich
config:
name: enriched
type: csv
path: "./output/enriched_customers.csv"
Run it:
clinker run customer_etl.yaml
That is the entire workflow. No project scaffolding, no configuration files, no compile step. One YAML file, one command.
Next steps
- Installation – download the binary and verify it works
- Your First Pipeline – build and run a pipeline step by step
- Key Concepts – understand the mental model behind Clinker pipelines
Installation
Clinker is a single static binary with no runtime dependencies. Download it,
put it on your PATH, and you are ready to go.
Binaries
Clinker ships two binaries:
clinker– the pipeline executor. This is the main tool you use to validate and run pipelines against data.cxl– the CXL expression checker, evaluator, and formatter. Use it during development to test expressions interactively, check types, and format CXL blocks.
Verify installation
After placing the binaries on your PATH, confirm they work:
clinker --version
clinker 0.1.0
cxl --version
cxl 0.1.0
Both commands should print a version string and exit. If you see
command not found, check that the directory containing the binaries is in
your PATH.
Building from source
Clinker requires Rust 1.91+ (edition 2024). If you have a Rust toolchain installed, build and install both binaries directly from the repository:
# Clone the repository
git clone https://github.com/rustpunk/clinker.git
cd clinker
# Install the pipeline executor
cargo install --path crates/clinker
# Install the CXL expression tool
cargo install --path crates/cxl-cli
This compiles release-optimized binaries and places them in ~/.cargo/bin/,
which is typically already on your PATH.
To verify the build:
cargo test --workspace
This runs the full test suite (approximately 1100 tests) and confirms everything is working correctly on your system.
Rust toolchain
The repository includes a rust-toolchain.toml that pins the exact Rust
version. If you use rustup, it will automatically download the correct
toolchain when you build.
| Requirement | Value |
|---|---|
| Rust edition | 2024 |
| Minimum version | 1.91 |
| C dependencies | None |
Your First Pipeline
This walkthrough builds a pipeline from scratch, runs it, and explores the tools Clinker provides for validating and understanding pipelines before they touch real data.
1. Create sample data
Save the following as employees.csv:
id,name,department,salary
1,Alice Chen,Engineering,95000
2,Bob Martinez,Marketing,62000
3,Carol Johnson,Engineering,88000
4,Dave Williams,Sales,71000
2. Write the pipeline
Save the following as my_first_pipeline.yaml:
pipeline:
name: salary_report
nodes:
- type: source
name: employees
config:
name: employees
type: csv
path: "./employees.csv"
schema:
- { name: id, type: int }
- { name: name, type: string }
- { name: department, type: string }
- { name: salary, type: int }
- type: transform
name: classify
input: employees
config:
cxl: |
emit id = id
emit name = name
emit department = department
emit salary = salary
emit level = if salary >= 90000 then "senior" else "junior"
- type: output
name: report
input: classify
config:
name: salary_report
type: csv
path: "./salary_report.csv"
This pipeline has three nodes:
employees(source) – reads the CSV file and declares the schema.classify(transform) – passes all fields through and adds alevelfield based on salary.report(output) – writes the result to a new CSV file.
The input: field on each consumer node wires the DAG together. Data flows
from employees through classify to report.
3. Validate before running
Before processing any data, check that the pipeline is well-formed:
clinker run my_first_pipeline.yaml --dry-run
Dry-run parses the YAML, resolves the DAG, and type-checks all CXL expressions
against the declared schemas. If there are errors – a typo in a field name, a
type mismatch, a missing input: reference – Clinker reports them with
source-location diagnostics and stops. No data is read.
4. Preview records
To see what the output will look like without writing files, preview a few records:
clinker run my_first_pipeline.yaml --dry-run -n 2
This reads the first 2 records from the source, runs them through the pipeline, and prints the results to the terminal. Useful for sanity-checking transformations before committing to a full run.
5. Understand the execution plan
To see how Clinker will execute the pipeline:
clinker run my_first_pipeline.yaml --explain
The explain plan shows the DAG topology, the order nodes will execute, per-node parallelism strategy, and schema propagation through the pipeline. This is valuable for understanding complex pipelines with routes, merges, and aggregations.
6. Run it
clinker run my_first_pipeline.yaml
Clinker reads employees.csv, applies the transform, and writes
salary_report.csv. The output:
id,name,department,salary,level
1,Alice Chen,Engineering,95000,senior
2,Bob Martinez,Marketing,62000,junior
3,Carol Johnson,Engineering,88000,junior
4,Dave Williams,Sales,71000,junior
Alice’s salary of 95,000 meets the threshold, so she is classified as
senior. Everyone else is junior.
What just happened
The pipeline executed as a streaming process:
- The source node read
employees.csvone record at a time. - Each record flowed through the
classifytransform, which evaluated the CXL block to produce the output fields. - The output node wrote each transformed record to
salary_report.csv.
At no point was the entire dataset loaded into memory. This is how Clinker processes files of any size under its memory ceiling.
Next steps
- Key Concepts – understand the building blocks of Clinker pipelines
- Pipeline YAML Structure – full reference for pipeline configuration
- CXL Overview – learn the expression language in depth
Key Concepts
This page covers the mental model behind Clinker pipelines. If you have experience with other ETL tools, most of this will feel familiar – but pay attention to where Clinker diverges, especially around CXL and streaming execution.
Pipelines are DAGs
A pipeline is a directed acyclic graph of nodes. Data flows from sources, through processing nodes, to outputs. There are no cycles – a node cannot consume its own output, directly or indirectly.
You define the graph by setting input: on each consumer node, naming the
upstream node it reads from. Clinker resolves these references, validates that
the graph is acyclic, and determines execution order automatically.
The nodes: list
Every pipeline has a single flat list of nodes. Each node has a type:
discriminator that determines its behavior. The seven node types are:
| Type | Purpose |
|---|---|
source | Read data from a file (CSV, JSON, XML, fixed-width) |
transform | Apply CXL logic to reshape, filter, or enrich records |
aggregate | Group records and compute summary values (sum, count, etc.) |
route | Split a stream into named ports based on conditions |
merge | Combine multiple streams into one |
output | Write data to a file |
composition | Embed a reusable sub-pipeline |
You can have as many nodes of each type as your pipeline requires. The only constraint is that the resulting graph must be a valid DAG.
CXL is not SQL
CXL is a per-record expression language. Each record flows through a CXL block
independently – there is no table-level context, no SELECT, no FROM, no
JOIN. Think of it as a programmable row mapper.
The core statements:
emit name = expr– produce a field in the output record. Only emitted fields appear downstream. If you want to pass a field through unchanged, you must emit it explicitly:emit id = id.let name = expr– bind a local variable for use in later expressions. Local variables do not appear in the output.filter condition– discard the record if the condition is false. A filtered record produces no output and is not counted as an error.distinct/distinct by field– deduplicate records.distinctdeduplicates on all output fields;distinct by fielddeduplicates on a specific field.
CXL uses and, or, and not for boolean logic – not && or ||. String
concatenation uses +. Conditional expressions use
if ... then ... else ... syntax.
System namespaces use a $ prefix: $pipeline.*, $window.*, $meta.*.
These provide access to pipeline metadata, window function state, and record
metadata respectively.
Streaming execution
Records flow through the pipeline one at a time. Clinker does not load an entire file into memory before processing it. A source reads one record, pushes it through the downstream nodes, and then reads the next.
This design keeps memory usage bounded regardless of file size. A 100 GB CSV is processed with the same memory footprint as a 100 KB CSV.
The exception is aggregation. Aggregate nodes must accumulate state across records (to compute sums, counts, averages, etc.). Clinker uses hash aggregation by default and spills to disk when memory pressure exceeds configured limits. When the input is already sorted by the group key, Clinker can use streaming aggregation, which requires only constant memory.
Input wiring
Consumer nodes reference their upstream via the input: field:
- type: transform
name: enrich
input: customers # reads from the node named "customers"
Route nodes produce named output ports. Downstream nodes reference a specific port using dot notation:
- type: route
name: split_by_region
input: customers
config:
routes:
us: region == "US"
eu: region == "EU"
default: other
- type: output
name: us_output
input: split_by_region.us # reads from the "us" port
Merge nodes accept multiple inputs using inputs: (plural):
- type: merge
name: combined
inputs:
- us_transform
- eu_transform
Schema declaration
Source nodes require an explicit schema: that declares every column’s name
and type:
config:
schema:
- { name: customer_id, type: int }
- { name: email, type: string }
- { name: balance, type: float }
- { name: created_at, type: date }
Clinker uses these declarations to type-check CXL expressions at compile time, before any data is read. If a CXL block references a field that does not exist in the upstream schema, or applies an operation to an incompatible type, the error is caught during validation – not at row 5 million of a production run.
Supported types include int, float, string, bool, date, and
datetime.
Error handling
Each node can specify an error handling strategy:
| Strategy | Behavior |
|---|---|
fail_fast | Stop the pipeline on the first error (default) |
continue | Route error records to a dead-letter queue file and continue |
best_effort | Log errors and continue without writing error records |
When using continue, Clinker writes rejected records to a DLQ file alongside
the output. Each DLQ entry includes the original record, the error category,
the error message, and the node that rejected it. This makes diagnosing
production issues straightforward: check the DLQ, fix the data or the
pipeline, and rerun.
Pipeline YAML Structure
A Clinker pipeline is a single YAML file with three top-level sections: pipeline (metadata), nodes (the processing graph), and optionally error_handling.
Top-level shape
pipeline:
name: my_pipeline # Required — pipeline identifier
memory_limit: "256M" # Optional (K/M/G suffixes)
vars: # Optional key-value pairs
threshold: 500
label: "Monthly Report"
date_formats: ["%Y-%m-%d"] # Optional — custom date parsing formats
rules_path: "./rules/" # Optional — CXL module search path
concurrency: # Optional
threads: 4
chunk_size: 1000
metrics: # Optional
spool_dir: "./metrics/"
nodes: # Required — flat list of pipeline nodes
- type: source
name: raw_data
config:
name: raw_data
type: csv
path: "./data/input.csv"
schema:
- { name: id, type: int }
- { name: value, type: string }
- type: transform
name: clean
input: raw_data
config:
cxl: |
emit id = id
emit value = value.trim()
- type: output
name: result
input: clean
config:
name: result
type: csv
path: "./output/result.csv"
error_handling: # Optional
strategy: fail_fast
Pipeline metadata
The pipeline: block carries global settings that apply to the entire run.
| Field | Required | Description |
|---|---|---|
name | Yes | Pipeline identifier. Used in logs and metrics. |
memory_limit | No | Soft RSS budget. Accepts K, M, G suffixes (e.g. "512M"). |
vars | No | Scalar constants accessible in CXL via $vars.*. |
date_formats | No | List of strftime-style patterns for date parsing. |
rules_path | No | Directory for CXL use module resolution. |
concurrency | No | threads and chunk_size for parallel chunk processing. |
metrics | No | spool_dir for per-run JSON metric files. |
date_locale | No | Locale for date formatting. |
include_provenance | No | Attach provenance metadata to records. |
The nodes list
Every pipeline has a flat nodes: list. Each entry is a node with a type: discriminator that determines its kind:
| Type | Role |
|---|---|
source | Reads data from a file |
transform | Applies CXL expressions to each record |
aggregate | Groups and summarizes records |
route | Splits records into named branches by condition |
merge | Combines multiple upstream branches |
output | Writes records to a file |
composition | Imports a reusable transform fragment |
Node naming
Every node must have a name: field. Names must be unique within the pipeline and must not contain dots – the dot character is reserved for port syntax (see below). Names are used for wiring, logging, and diagnostics.
Wiring: input and inputs
Nodes connect to each other through input: (singular) and inputs: (plural) fields that live at the node’s top level, alongside name: and type:.
Single upstream – used by transform, aggregate, route, and output nodes:
- type: transform
name: clean
input: raw_data # References the source node named "raw_data"
config: ...
Port syntax – for consuming a specific branch from a route node, use node.port:
- type: output
name: high_value_out
input: split.high # Consumes the "high" branch of route node "split"
config: ...
Multiple upstreams – merge nodes use inputs: (plural) instead of input::
- type: merge
name: combined
inputs:
- east_processed
- west_processed
config: {}
Source nodes have no input field. They are entry points – adding an input: field to a source is a parse error.
Using inputs: on a non-merge node (or input: on a merge node) is caught at parse time by deny_unknown_fields.
Optional fields on all nodes
Every node type supports these optional fields:
description:– human-readable text for documentation. Ignored by the engine._notes:– arbitrary metadata (JSON object). Ignored by the engine, used by the Kiln IDE for visual annotations and inspector panels.
- type: transform
name: enrich
description: "Add customer tier based on lifetime value"
_notes:
color: "#4a9eff"
position: { x: 300, y: 200 }
input: customers
config:
cxl: |
emit tier = if lifetime_value >= 10000 then "gold" else "standard"
Strict parsing
All config structs use deny_unknown_fields. If you misspell a field name – for example, writing inputt: instead of input: or stratgy: instead of strategy: – the YAML parser rejects it immediately with a diagnostic pointing to the typo. This catches configuration errors before any data processing begins.
Environment variable: CLINKER_ENV
The CLINKER_ENV environment variable can be used for conditional logic outside of pipelines (e.g., selecting channel directories or controlling CLI behavior). It is not directly referenced within pipeline YAML but is available to the channel and workspace systems.
Source Nodes
Source nodes read data from files and are the entry points of every pipeline. They have no input: field – they produce records, they do not consume them.
Basic structure
- type: source
name: customers
config:
name: customers
type: csv
path: "./data/customers.csv"
schema:
- { name: customer_id, type: int }
- { name: name, type: string }
- { name: email, type: string }
- { name: status, type: string }
- { name: amount, type: float }
Schema declaration
The schema: field is required on every source node. Clinker does not infer types from data – you must declare each column’s name and CXL type explicitly. This schema drives compile-time type checking across the entire pipeline.
Each entry is a { name, type } pair:
schema:
- { name: employee_id, type: string }
- { name: salary, type: int }
- { name: hired_at, type: date_time }
- { name: is_active, type: bool }
- { name: notes, type: nullable(string) }
Available types
| Type | Description |
|---|---|
string | UTF-8 text |
int | 64-bit signed integer |
float | 64-bit IEEE 754 floating point |
bool | Boolean (true / false) |
date | Calendar date |
date_time | Date with time component |
array | Ordered sequence of values |
numeric | Union of int and float – resolved during type unification |
any | Unknown type – field used in type-agnostic contexts |
nullable(T) | Nullable wrapper around any inner type (e.g. nullable(int)) |
Format types
The type: field inside config: selects the file format. Supported values: csv, json, xml, fixed_width.
CSV
- type: source
name: orders
config:
name: orders
type: csv
path: "./data/orders.csv"
schema:
- { name: order_id, type: int }
- { name: customer_id, type: int }
- { name: amount, type: float }
- { name: order_date, type: date }
options:
delimiter: "," # Default: ","
quote_char: "\"" # Default: "\""
has_header: true # Default: true
encoding: "utf-8" # Default: "utf-8"
All CSV options are optional. With no options: block, Clinker uses standard RFC 4180 defaults.
JSON
- type: source
name: events
config:
name: events
type: json
path: "./data/events.json"
schema:
- { name: event_id, type: string }
- { name: timestamp, type: date_time }
- { name: payload, type: string }
options:
format: ndjson # array | ndjson | object (auto-detect if omitted)
record_path: "$.data" # JSONPath to records array
array– the file is a single JSON array of objects.ndjson– one JSON object per line (newline-delimited JSON).object– single top-level object; userecord_pathto locate the records array within it.
If format is omitted, Clinker auto-detects based on file content.
XML
- type: source
name: catalog
config:
name: catalog
type: xml
path: "./data/catalog.xml"
schema:
- { name: product_id, type: int }
- { name: name, type: string }
- { name: price, type: float }
options:
record_path: "//product" # XPath to record elements
attribute_prefix: "@" # Prefix for XML attribute fields
namespace_handling: strip # strip | qualify
strip(default) – removes namespace prefixes from element and attribute names.qualify– preserves namespace-qualified names.
Fixed-width
- type: source
name: legacy_data
config:
name: legacy_data
type: fixed_width
path: "./data/mainframe.dat"
schema:
- { name: account_id, type: string }
- { name: balance, type: float }
- { name: status_code, type: string }
options:
line_separator: crlf # Line ending style
Fixed-width sources require a separate format schema (.schema.yaml file) that defines field positions, widths, and padding. The schema: on the source body declares CXL types for compile-time checking; the format schema defines the physical layout.
Sort order
If your source data is pre-sorted, declare the sort order so the optimizer can use streaming aggregation instead of hash aggregation:
- type: source
name: sorted_transactions
config:
name: sorted_transactions
type: csv
path: "./data/transactions_sorted.csv"
schema:
- { name: account_id, type: string }
- { name: txn_date, type: date }
- { name: amount, type: float }
sort_order:
- { field: "account_id", order: asc }
- { field: "txn_date", order: asc }
Sort order declarations are trusted – Clinker does not verify that the data is actually sorted. If the data violates the declared order, downstream streaming aggregation may produce incorrect results.
The shorthand form is also accepted – a bare string defaults to ascending:
sort_order:
- "account_id"
- { field: "txn_date", order: desc }
Array paths
For nested data (JSON/XML sources with embedded arrays), array_paths controls how nested arrays are handled:
- type: source
name: invoices
config:
name: invoices
type: json
path: "./data/invoices.json"
schema:
- { name: invoice_id, type: int }
- { name: customer, type: string }
- { name: line_item, type: string }
- { name: line_amount, type: float }
array_paths:
- path: "$.line_items"
mode: explode # One output record per array element
- path: "$.tags"
mode: join # Concatenate array elements into a string
separator: ","
explode(default) – produces one output record per array element, with parent fields repeated.join– concatenates array elements into a single string using the specifiedseparator.
Transform Nodes
Transform nodes apply CXL expressions to each record, producing new fields, filtering records, or both. They process one record at a time in streaming fashion with constant memory overhead.
Basic structure
- type: transform
name: enrich
input: customers
config:
cxl: |
emit full_name = first_name + " " + last_name
emit tier = if lifetime_value >= 10000 then "gold" else "standard"
filter status == "active"
The cxl: field is required and contains a CXL program. The three core CXL statements for transforms are:
emit– produces an output field. Only emitted fields appear in downstream nodes.filter– drops records that do not match the boolean condition.let– binds a local variable for use in subsequent expressions (not emitted).
cxl: |
let margin = revenue - cost
emit product_id = product_id
emit margin = margin
emit margin_pct = if revenue > 0 then margin / revenue * 100 else 0
filter margin > 0
Analytic window
The analytic_window field enables cross-source lookups by joining a secondary dataset into the transform. The secondary source is loaded into memory and indexed by the join key.
- type: transform
name: enrich_orders
input: orders
config:
analytic_window:
source: products
on: product_id
group_by: [product_id]
cxl: |
emit order_id = order_id
emit product_name = $window.first()
emit quantity = quantity
emit line_total = quantity * price
The $window.* namespace provides access to the windowed data. Functions like $window.first(), $window.last(), and $window.count() operate over the matched group.
Validations
Declarative validation checks can be attached to a transform. They run against each record and either route failures to the DLQ (severity error) or log a warning and continue (severity warn).
- type: transform
name: validate_orders
input: raw_orders
config:
cxl: |
emit order_id = order_id
emit amount = amount
emit email = email
validations:
- field: email
check: "not_empty"
severity: error
message: "Email is required"
- check: "amount > 0"
severity: warn
message: "Non-positive amount"
- field: order_id
check: "not_empty"
severity: error
Validation fields
| Field | Required | Description |
|---|---|---|
field | No | Restrict the check to a single field |
check | Yes | Validation name (e.g. "not_empty") or CXL boolean expression |
severity | No | error (default) routes to DLQ; warn logs and continues |
message | No | Custom error message for DLQ entries |
name | No | Validation name for DLQ reporting. Auto-derived from field + check if omitted |
args | No | Additional arguments as key-value pairs |
Log directives
Log directives control diagnostic output during transform execution:
- type: transform
name: process
input: validated
config:
cxl: |
emit id = id
emit result = compute(value)
log:
- level: info
when: per_record
every: 1000
message: "Processed record"
- level: warn
when: on_error
message: "Record failed processing"
- level: debug
when: before_transform
message: "Starting transform"
Log directive fields
| Field | Required | Description |
|---|---|---|
level | Yes | trace, debug, info, warn, or error |
when | Yes | before_transform, after_transform, per_record, or on_error |
message | Yes | Log message text |
every | No | Only log every N records (for per_record timing) |
condition | No | CXL boolean expression – only log when true |
fields | No | List of field names to include in the log output |
log_rule | No | Reference to an external log rule definition |
Complete example
- type: source
name: employees
config:
name: employees
type: csv
path: "./data/employees.csv"
schema:
- { name: employee_id, type: string }
- { name: first_name, type: string }
- { name: last_name, type: string }
- { name: department, type: string }
- { name: salary, type: int }
- { name: hire_date, type: date }
- type: transform
name: enrich_employees
description: "Compute display name and tenure"
input: employees
config:
cxl: |
emit employee_id = employee_id
emit display_name = last_name + ", " + first_name
emit department = department.upper()
emit salary = salary
emit annual_bonus = if salary >= 80000 then salary * 0.15
else salary * 0.10
validations:
- field: employee_id
check: "not_empty"
severity: error
message: "Employee ID is required"
- check: "salary > 0"
severity: warn
message: "Salary should be positive"
log:
- level: info
when: per_record
every: 5000
message: "Processing employees"
Aggregate Nodes
Aggregate nodes group records by one or more fields and compute summary values using CXL aggregate functions. They consume all input records in a group before emitting a single summary record per group.
Basic structure
- type: aggregate
name: dept_totals
input: employees
config:
group_by: [department]
cxl: |
emit total_salary = sum(salary)
emit headcount = count(*)
emit avg_salary = avg(salary)
Group-by fields pass through automatically – you do not need to emit them. In this example, the output records contain department, total_salary, headcount, and avg_salary.
Group-by fields
The group_by: field is a list of field names from the input schema. Records sharing the same values for all group-by fields are placed in the same group.
group_by: [region, department]
cxl: |
emit total_salary = sum(salary)
emit max_salary = max(salary)
This produces one output record per unique (region, department) combination.
Global aggregation
An empty group_by list treats the entire input as a single group, producing exactly one output record:
- type: aggregate
name: grand_totals
input: orders
config:
group_by: []
cxl: |
emit grand_total = sum(amount)
emit record_count = count(*)
emit avg_order = avg(amount)
Aggregate functions
The following aggregate functions are available in CXL:
| Function | Description |
|---|---|
sum(field) | Sum of all values in the group |
count(*) | Number of records in the group |
avg(field) | Arithmetic mean |
min(field) | Minimum value |
max(field) | Maximum value |
collect(field) | Collect all values into an array |
weighted_avg(value, weight) | Weighted average |
Strategy hint
The strategy: field controls how aggregation is executed:
- type: aggregate
name: totals
input: sorted_data
config:
group_by: [account_id]
strategy: streaming
cxl: |
emit total = sum(amount)
| Strategy | Behavior |
|---|---|
auto | Default. The optimizer chooses based on whether the input is provably sorted for the group-by keys. |
hash | Force hash aggregation. Works on any input ordering. Holds all groups in memory (with disk spill if memory budget is exceeded). |
streaming | Require streaming aggregation. Processes one group at a time with O(1) memory per group. Compile-time error if the input is not provably sorted for the group-by keys. |
When to use streaming
If your source declares a sort_order: that covers the group-by fields, the optimizer will automatically choose streaming aggregation. Use strategy: streaming as an explicit assertion – it turns a silent fallback to hash aggregation into a compile error, which is useful for catching sort-order regressions.
When to use hash
Hash aggregation works on unsorted input and is the safe default. It uses more memory but handles any data ordering. Memory-aware disk spill kicks in when RSS approaches the pipeline’s memory_limit.
Complete example
- type: source
name: transactions
config:
name: transactions
type: csv
path: "./data/transactions.csv"
schema:
- { name: account_id, type: string }
- { name: txn_date, type: date }
- { name: amount, type: float }
- { name: category, type: string }
sort_order:
- { field: "account_id", order: asc }
- type: aggregate
name: account_summary
input: transactions
config:
group_by: [account_id]
strategy: streaming
cxl: |
emit total_amount = sum(amount)
emit txn_count = count(*)
emit avg_amount = avg(amount)
emit max_amount = max(amount)
emit categories = collect(category)
- type: output
name: summary_output
input: account_summary
config:
name: summary_output
type: csv
path: "./output/account_summary.csv"
Route Nodes
Route nodes split a stream of records into named branches based on CXL boolean conditions. Each branch becomes an independent output port that downstream nodes can wire to using port syntax.
Basic structure
- type: route
name: split_by_value
input: orders
config:
mode: exclusive
conditions:
high: "amount.to_int() > 1000"
medium: "amount.to_int() > 100"
default: low
This creates three output ports: split_by_value.high, split_by_value.medium, and split_by_value.low.
Conditions
The conditions: field is an ordered map of branch names to CXL boolean expressions. Each expression is evaluated against the incoming record.
conditions:
priority: "urgency == \"high\" and amount > 500"
standard: "urgency == \"medium\""
bulk: "quantity > 100"
default: other
Condition keys become the port names used in downstream input: wiring.
Default branch
The default: field is required. Records that match no condition are routed to the default branch. The default branch name must not collide with any condition key.
Routing modes
Exclusive (default)
In exclusive mode, conditions are evaluated in declaration order and the first matching condition wins. A record appears in exactly one branch. Order matters – put more specific conditions first.
mode: exclusive
conditions:
vip: "lifetime_value > 100000"
high: "lifetime_value > 10000"
medium: "lifetime_value > 1000"
default: standard
A customer with lifetime_value = 50000 matches both vip and high, but because exclusive stops at first match, they go to high only if vip was checked first – and they do, because vip comes first. Actually, 50000 is not > 100000, so they match high.
Inclusive
In inclusive mode, all matching conditions route the record. A single record can appear in multiple branches simultaneously.
mode: inclusive
conditions:
needs_review: "amount > 10000"
flagged: "status == \"flagged\""
international: "country != \"US\""
default: standard
A flagged international order over 10000 would appear in needs_review, flagged, and international – three copies routed to three branches.
Downstream wiring
Downstream nodes reference route branches using port syntax: route_name.branch_name.
- type: route
name: classify
input: transactions
config:
mode: exclusive
conditions:
high: "amount > 1000"
medium: "amount > 100"
default: low
- type: transform
name: high_value_processing
input: classify.high
config:
cxl: |
emit txn_id = txn_id
emit amount = amount
emit review_flag = true
- type: transform
name: standard_processing
input: classify.medium
config:
cxl: |
emit txn_id = txn_id
emit amount = amount
- type: output
name: low_value_out
input: classify.low
config:
name: low_value_out
type: csv
path: "./output/low_value.csv"
Constraints
- At least 1 condition is required.
- Maximum 256 branches (conditions + default).
- Branch names must be unique.
- The
defaultname must not collide with any condition key.
Complete example
pipeline:
name: order_routing
nodes:
- type: source
name: orders
config:
name: orders
type: csv
path: "./data/orders.csv"
schema:
- { name: order_id, type: int }
- { name: region, type: string }
- { name: amount, type: float }
- { name: priority, type: string }
- type: route
name: by_region
input: orders
config:
mode: exclusive
conditions:
domestic: "region == \"US\" or region == \"CA\""
emea: "region == \"UK\" or region == \"DE\" or region == \"FR\""
apac: "region == \"JP\" or region == \"AU\" or region == \"SG\""
default: other
- type: output
name: domestic_orders
input: by_region.domestic
config:
name: domestic_orders
type: csv
path: "./output/domestic.csv"
- type: output
name: emea_orders
input: by_region.emea
config:
name: emea_orders
type: csv
path: "./output/emea.csv"
- type: output
name: apac_orders
input: by_region.apac
config:
name: apac_orders
type: csv
path: "./output/apac.csv"
- type: output
name: other_orders
input: by_region.other
config:
name: other_orders
type: csv
path: "./output/other_regions.csv"
Merge Nodes
Merge nodes combine multiple upstream branches into a single stream. They are the counterpart to route nodes – where a route splits one stream into many, a merge joins many streams back into one.
Basic structure
- type: merge
name: combined
inputs:
- east_data
- west_data
config: {}
Note the key differences from other node types:
- Uses
inputs:(plural), notinput:(singular). - The
config:block is empty – all wiring is on the node header. - Using
input:(singular) on a merge node is a parse error.
Wiring
The inputs: field is a list of upstream node references. These can be bare node names or port references from route nodes:
- type: merge
name: rejoin
inputs:
- process_high
- process_medium
- classify.low # Port syntax for a route branch
config: {}
Downstream nodes wire to the merge as a normal single-input reference:
- type: output
name: final_output
input: rejoin
config:
name: final_output
type: csv
path: "./output/combined.csv"
Record ordering
Records arrive in the order they are produced by upstream nodes. There is no guaranteed interleaving order between upstream branches. If you need sorted output, use a sort_order on the downstream output node.
Use cases
Reuniting route branches
The most common pattern is routing records through different processing paths and then merging them back together:
- type: route
name: classify
input: orders
config:
mode: exclusive
conditions:
high: "amount > 1000"
default: standard
- type: transform
name: process_high
input: classify.high
config:
cxl: |
emit order_id = order_id
emit amount = amount
emit surcharge = amount * 0.02
emit tier = "premium"
- type: transform
name: process_standard
input: classify.standard
config:
cxl: |
emit order_id = order_id
emit amount = amount
emit surcharge = 0
emit tier = "standard"
- type: merge
name: all_orders
inputs:
- process_high
- process_standard
config: {}
- type: output
name: result
input: all_orders
config:
name: result
type: csv
path: "./output/all_orders.csv"
Unioning multiple sources
Merge nodes can combine records from multiple source files that share the same schema:
- type: source
name: jan_sales
config:
name: jan_sales
type: csv
path: "./data/sales_jan.csv"
schema:
- { name: sale_id, type: int }
- { name: amount, type: float }
- { name: region, type: string }
- type: source
name: feb_sales
config:
name: feb_sales
type: csv
path: "./data/sales_feb.csv"
schema:
- { name: sale_id, type: int }
- { name: amount, type: float }
- { name: region, type: string }
- type: merge
name: all_sales
inputs:
- jan_sales
- feb_sales
config: {}
- type: aggregate
name: totals
input: all_sales
config:
group_by: [region]
cxl: |
emit total = sum(amount)
emit count = count(*)
Output Nodes
Output nodes write processed records to files. They are the terminal nodes of a pipeline – every pipeline path must end at an output (or records are silently dropped).
Basic structure
- type: output
name: result
input: transform_node
config:
name: output_stage
type: csv
path: "./output/result.csv"
The type: field selects the output format: csv, json, xml, or fixed_width.
Field control
By default, output nodes write only the fields explicitly emitted by upstream transforms. Several options control which fields appear and how they are named.
Include unmapped fields
include_unmapped: true # Default: false
When true, fields that were not explicitly emitted by transforms but exist on the record are included in the output. Useful for pass-through pipelines where you want all original fields plus a few computed ones.
Field mapping
Rename fields at output time without changing upstream CXL:
mapping:
"Customer Name": "full_name"
"Order Total": "amount"
Keys are output column names; values are the source field names from upstream.
Excluding fields
Remove specific fields from output:
exclude: [internal_id, _debug_flag, temp_calc]
Header control (CSV)
include_header: true # Default: true
Set to false to omit the CSV header row.
Null handling
preserve_nulls: false # Default: false
When false, null values are written as empty strings. When true, nulls are preserved in the output format’s native null representation (e.g., null in JSON).
Metadata inclusion
Control whether per-record $meta.* metadata fields appear in output:
include_metadata: all # Include all metadata fields
include_metadata: none # Default -- strip all metadata
include_metadata:
- source_file # Include only listed metadata keys
- source_row
Metadata fields are prefixed with meta. in the output.
Output format options
CSV
- type: output
name: csv_out
input: processed
config:
name: csv_out
type: csv
path: "./output/result.csv"
options:
delimiter: "|"
JSON
- type: output
name: json_out
input: processed
config:
name: json_out
type: json
path: "./output/result.json"
options:
format: ndjson # array | ndjson
pretty: true # Pretty-print JSON
array(default) – writes a single JSON array containing all records.ndjson– writes one JSON object per line.
XML
- type: output
name: xml_out
input: processed
config:
name: xml_out
type: xml
path: "./output/result.xml"
options:
root_element: "data"
record_element: "row"
Fixed-width
- type: output
name: fw_out
input: processed
config:
name: fw_out
type: fixed_width
path: "./output/result.dat"
schema: "./schemas/output.schema.yaml"
options:
line_separator: crlf
Fixed-width output requires a format schema defining field positions and widths.
Sort order
Sort records before writing:
sort_order:
- { field: "name", order: asc }
- { field: "amount", order: desc, null_order: last }
| Sort option | Values | Default |
|---|---|---|
order | asc, desc | asc |
null_order | first, last, drop | last |
first– nulls sort before all non-null values.last– nulls sort after all non-null values.drop– records with null sort keys are excluded from output.
Shorthand: a bare string defaults to ascending with nulls last:
sort_order:
- "name"
- { field: "amount", order: desc }
File splitting
Split output into multiple files based on record count, byte size, or group boundaries:
- type: output
name: split_output
input: processed
config:
name: split_output
type: csv
path: "./output/result.csv"
split:
max_records: 10000
max_bytes: 10485760 # 10 MB
group_key: "department" # Never split mid-group
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true # Repeat CSV header in each file
oversize_group: warn # warn | error | allow
Split configuration fields
| Field | Required | Default | Description |
|---|---|---|---|
max_records | No | – | Soft record count limit per file |
max_bytes | No | – | Soft byte size limit per file |
group_key | No | – | Field name – never split within a group sharing this key value |
naming | No | "{stem}_{seq:04}.{ext}" | File naming pattern. {stem} is the base name, {seq:04} is a zero-padded sequence number, {ext} is the file extension |
repeat_header | No | true | Repeat CSV header row in each split file |
oversize_group | No | warn | What to do when a single key group exceeds file limits |
At least one of max_records or max_bytes should be specified for splitting to have any effect.
Oversize group policies
warn(default) – log a warning and allow the oversized file.error– stop the pipeline.allow– silently allow the oversized file.
When group_key is set, the split point is the first group boundary after the threshold is reached (greedy). Without group_key, files are split at the exact limit.
Complete example
- type: output
name: department_reports
input: enriched_employees
config:
name: department_reports
type: csv
path: "./output/employees.csv"
mapping:
"Employee ID": "employee_id"
"Full Name": "display_name"
"Department": "department"
"Annual Salary": "salary"
exclude: [internal_flags]
include_header: true
sort_order:
- { field: "department", order: asc }
- { field: "display_name", order: asc }
split:
max_records: 5000
group_key: "department"
naming: "employees_{seq:03}.csv"
repeat_header: true
Error Handling & DLQ
Clinker provides structured error handling with a dead-letter queue (DLQ) for records that fail processing. The error_handling: block at the top level of the pipeline YAML controls the behavior.
Configuration
error_handling:
strategy: continue
dlq:
path: "./output/errors.csv"
include_reason: true
include_source_row: true
Strategies
The strategy: field controls what happens when a record fails:
| Strategy | Behavior |
|---|---|
fail_fast | Default. Stop the pipeline on the first error. |
continue | Route bad records to the DLQ and keep processing good records. |
best_effort | Continue processing with partial results, even if some stages produce incomplete output. |
fail_fast
The safest strategy. Any record-level error (type coercion failure, validation error, missing required field) halts the pipeline immediately. Use this when data quality is critical and you prefer to fix issues before reprocessing.
continue
The production workhorse. Bad records are written to the DLQ file with diagnostic metadata, and the pipeline continues processing remaining records. After the run completes, inspect the DLQ to understand and correct failures.
A pipeline that completes with DLQ entries exits with code 2 – this signals “pipeline completed successfully but some records were rejected.” It is not a crash or internal error.
best_effort
The most lenient strategy. Processing continues even with partial results. Use this for exploratory data analysis where completeness is less important than progress.
DLQ configuration
The DLQ is always written as CSV, regardless of the pipeline’s input/output formats.
dlq:
path: "./output/errors.csv"
include_reason: true
include_source_row: true
| Field | Required | Default | Description |
|---|---|---|---|
path | No | – | File path for DLQ output. If omitted, DLQ records are logged but not written to file. |
include_reason | No | – | Include _cxl_dlq_error_category and _cxl_dlq_error_detail columns. |
include_source_row | No | – | Include original source fields alongside DLQ metadata. |
DLQ columns
Every DLQ record includes these metadata columns:
| Column | Description |
|---|---|
_cxl_dlq_id | UUID v7 (time-ordered unique identifier) |
_cxl_dlq_timestamp | RFC 3339 timestamp of when the error occurred |
_cxl_dlq_source_file | Input filename that produced the failing record |
_cxl_dlq_source_row | 1-based row number in the source file |
_cxl_dlq_stage | Name of the transform or aggregate node where the error occurred |
_cxl_dlq_route | Route branch name (if the error occurred after routing) |
_cxl_dlq_trigger | Validation rule name that triggered the rejection |
When include_reason: true is set, two additional columns appear:
| Column | Description |
|---|---|
_cxl_dlq_error_category | Machine-readable error classification |
_cxl_dlq_error_detail | Human-readable error description |
Error categories
The _cxl_dlq_error_category column contains one of these values:
| Category | Description |
|---|---|
missing_required_field | A required field is absent from the record |
type_coercion_failure | A value could not be converted to the expected type |
required_field_conversion_failure | A required field exists but its value cannot be converted |
nan_in_output_field | A computation produced NaN |
aggregate_type_error | An aggregate function received an incompatible type |
validation_failure | A declarative validation check failed |
aggregate_finalize | An aggregate function failed during finalization |
Advanced options
Type error threshold
Abort the pipeline if the fraction of failing records exceeds a threshold:
type_error_threshold: 0.05 # Abort if >5% of records fail
This acts as a circuit breaker – if your input data is unexpectedly corrupt, the pipeline stops early rather than filling the DLQ with millions of entries.
Correlation key
Group DLQ rejections by a key field. When any record in a correlation group fails, all records in that group are routed to the DLQ:
correlation_key: order_id
For compound keys:
correlation_key: [order_id, customer_id]
This is useful for transactional data where partial processing of a group is worse than rejecting the entire group. For example, if one line item in an order fails validation, you may want to reject the entire order.
Max group buffer
Limit the number of records buffered per correlation group:
max_group_buffer: 100000 # Default: 100,000
Groups exceeding this limit are DLQ’d entirely with a group_size_exceeded summary entry.
Exit codes
| Code | Meaning |
|---|---|
| 0 | Pipeline completed successfully, no errors |
| 1 | Pipeline failed (internal error, config error, or fail_fast triggered) |
| 2 | Pipeline completed, but DLQ entries were produced |
Exit code 2 is not a failure – it means the pipeline ran to completion and handled errors according to the configured strategy. Check the DLQ file for details.
Complete example
pipeline:
name: order_processing
memory_limit: "512M"
nodes:
- type: source
name: orders
config:
name: orders
type: csv
path: "./data/orders.csv"
schema:
- { name: order_id, type: int }
- { name: customer_id, type: int }
- { name: amount, type: float }
- { name: email, type: string }
- type: transform
name: validate_orders
input: orders
config:
cxl: |
emit order_id = order_id
emit customer_id = customer_id
emit amount = amount
emit email = email
validations:
- field: email
check: "not_empty"
severity: error
message: "Customer email is required"
- check: "amount > 0"
severity: error
message: "Order amount must be positive"
- type: output
name: valid_orders
input: validate_orders
config:
name: valid_orders
type: csv
path: "./output/valid_orders.csv"
error_handling:
strategy: continue
dlq:
path: "./output/rejected_orders.csv"
include_reason: true
include_source_row: true
type_error_threshold: 0.10
correlation_key: order_id
Pipeline Variables
Pipeline variables are constants defined in the YAML pipeline: header and accessible in CXL expressions. They provide a way to parameterize pipeline behavior without modifying CXL logic.
Defining variables
Variables are declared in the vars: block of the pipeline: section:
pipeline:
name: order_processing
vars:
high_value_threshold: 500
express_surcharge: 5.99
report_title: "Monthly Orders"
apply_discount: true
tax_rate: 0.085
Variables are scalar values only: strings, numbers (integers and floats), and booleans. Complex types (arrays, objects) are not supported.
Accessing variables in CXL
Use the $vars.* namespace to reference variables in CXL expressions:
- type: transform
name: classify_orders
input: orders
config:
cxl: |
emit order_id = order_id
emit amount = amount
emit is_high_value = amount > $vars.high_value_threshold
emit surcharge = if shipping == "express" then $vars.express_surcharge else 0
emit total = amount + surcharge
The $pipeline.* namespace also provides access to pipeline-level metadata.
Variable semantics
Variables are frozen at pipeline start and constant across all records. They cannot be modified during execution. This makes them suitable for:
- Thresholds and cutoff values
- Fixed surcharges, tax rates, and multipliers
- Labels and titles for report headers
- Feature flags that control conditional logic
Using variables with channels
Variables defined in pipeline.vars serve as defaults. The channel system can override these values per client or environment without modifying the pipeline YAML:
# Pipeline: pipeline.yaml
pipeline:
name: invoice_processing
vars:
express_surcharge: 5.99
late_fee: 25.00
# Channel: channels/acme-corp/channel.yaml
_channel:
id: acme-corp
name: "Acme Corp"
active: true
variables:
EXPRESS_SURCHARGE: "8.99"
When run with --channel acme-corp, the channel variable overrides the pipeline default.
Complete example
pipeline:
name: sales_report
vars:
min_amount: 100
commission_rate: 0.12
region_label: "North America"
include_pending: false
nodes:
- type: source
name: sales
config:
name: sales
type: csv
path: "./data/sales.csv"
schema:
- { name: sale_id, type: int }
- { name: rep_name, type: string }
- { name: amount, type: float }
- { name: status, type: string }
- type: transform
name: compute
input: sales
config:
cxl: |
filter amount >= $vars.min_amount
filter if $vars.include_pending then true else status == "closed"
emit sale_id = sale_id
emit rep_name = rep_name
emit amount = amount
emit commission = amount * $vars.commission_rate
emit region = $vars.region_label
- type: aggregate
name: rep_totals
input: compute
config:
group_by: [rep_name]
cxl: |
emit total_sales = sum(amount)
emit total_commission = sum(commission)
emit deal_count = count(*)
- type: output
name: report
input: rep_totals
config:
name: report
type: csv
path: "./output/sales_report.csv"
sort_order:
- { field: "total_sales", order: desc }
Channels
Channels enable multi-tenant pipeline customization. A single pipeline definition can be run with different configurations per client, environment, or business unit – without duplicating or modifying the base YAML.
Channel structure
Each channel lives in its own directory under a channels/ folder, with a channel.yaml manifest:
project/
pipeline.yaml
channels/
acme-corp/
channel.yaml
globex/
channel.yaml
Channel manifest
# channels/acme-corp/channel.yaml
_channel:
id: acme-corp
name: "Acme Corporation"
description: "Custom config for Acme Corp"
contact: "ops@acme.example.com"
active: true
tags: [enterprise, priority]
tier: gold
variables:
EXPRESS_SURCHARGE: "8.99"
TAX_RATE: "0.095"
OUTPUT_DIR: "/data/acme/output"
Channel metadata fields
| Field | Required | Description |
|---|---|---|
id | Yes | Unique channel identifier (used in CLI and logs) |
name | Yes | Human-readable display name |
description | No | Channel purpose and notes |
contact | No | Responsible team or person |
active | No | Whether the channel is enabled (default: true) |
tags | No | Arbitrary tags for filtering and grouping |
tier | No | Service tier classification |
Running with a channel
Use the --channel flag to run a pipeline with a specific channel’s configuration:
clinker run pipeline.yaml --channel acme-corp
Clinker looks for the channel manifest in the workspace’s channels/ directory, loads the variable overrides, and applies them before pipeline execution begins.
Variable overrides
The variables: section in a channel manifest provides values that override pipeline-level vars: defaults. This lets each tenant customize thresholds, paths, labels, and other parameters:
# Pipeline defaults
pipeline:
name: invoice_processing
vars:
late_fee: 25.00
output_path: "./output/"
# Channel override
variables:
LATE_FEE: "50.00"
OUTPUT_PATH: "/data/acme/invoices/"
Channel variables support ${ENV_VAR} syntax for referencing system environment variables – these are resolved at channel load time.
Workspace discovery
Channels are part of the broader workspace system. Clinker discovers workspaces via clinker.toml files, which can define the channel directory layout and other workspace-level settings.
Current status
Note: The channel system is being rebuilt in Phase 16c. The current implementation supports variable overrides. Full channel documentation – including path overrides, schema overrides, and channel inheritance – will be expanded when the rebuild is complete.
Complete example
# channels/globex/channel.yaml
_channel:
id: globex
name: "Globex Industries"
active: true
tier: standard
variables:
COMMISSION_RATE: "0.15"
MIN_ORDER: "250"
REPORT_LABEL: "Globex Monthly"
# Run the pipeline with Globex's configuration
clinker run sales_pipeline.yaml --channel globex
Compositions
Compositions are reusable pipeline fragments that can be imported into multiple pipelines. They encapsulate common transform patterns – date derivations, address normalization, currency conversion – into self-contained, testable units.
Using a composition
A composition node in your pipeline references an external .comp.yaml file:
- type: composition
name: fiscal_dates
input: invoices
use: "./compositions/fiscal_date.comp.yaml"
config:
start_month: 4
The use: field points to the composition definition file. The config: block passes parameters that customize the composition’s behavior for this specific invocation.
Composition definition file
A .comp.yaml file declares the composition’s interface – what fields it requires from upstream and what fields it produces:
# compositions/fiscal_date.comp.yaml
composition:
name: fiscal_date
description: "Derive fiscal year, quarter, and period from a date field"
requires:
- { name: invoice_date, type: date }
produces:
- { name: fiscal_year, type: int }
- { name: fiscal_quarter, type: string }
- { name: fiscal_period, type: int }
params:
- name: start_month
type: int
default: 1
description: "First month of the fiscal year (1-12)"
Composition fields
| Field | Required | Description |
|---|---|---|
name | Yes | Composition identifier |
description | No | Human-readable purpose |
requires | Yes | Input fields the composition needs from upstream (name + type) |
produces | Yes | Output fields the composition adds to the record (name + type) |
params | No | Configurable parameters with optional defaults |
Advanced wiring
For compositions with multiple input or output ports, the node supports explicit port bindings:
- type: composition
name: enrich_address
input: customers
use: "./compositions/address_normalize.comp.yaml"
inputs:
primary: customers
reference: zip_lookup
outputs:
normalized: next_stage
config:
country_code: "US"
resources:
zip_database: "./data/zipcodes.csv"
Port and resource fields
| Field | Required | Description |
|---|---|---|
inputs | No | Map of composition input ports to upstream node references |
outputs | No | Map of composition output ports to downstream node references |
config | No | Parameter overrides (key-value pairs) |
resources | No | External resource bindings (file paths, connection strings) |
alias | No | Namespace prefix for expanded node names (avoids collisions) |
Complete example
pipeline:
name: invoice_pipeline
nodes:
- type: source
name: invoices
config:
name: invoices
type: csv
path: "./data/invoices.csv"
schema:
- { name: invoice_id, type: int }
- { name: customer_id, type: int }
- { name: invoice_date, type: date }
- { name: amount, type: float }
- type: composition
name: fiscal_dates
input: invoices
use: "./compositions/fiscal_date.comp.yaml"
config:
start_month: 4
- type: transform
name: final_enrich
input: fiscal_dates
config:
cxl: |
emit invoice_id = invoice_id
emit customer_id = customer_id
emit amount = amount
emit fiscal_year = fiscal_year
emit fiscal_quarter = fiscal_quarter
- type: output
name: result
input: final_enrich
config:
name: result
type: csv
path: "./output/invoices_enriched.csv"
Current status
Note: Composition support is being built in Phase 16c. The YAML shape parses and validates, but compilation currently returns a diagnostic (E100) per composition node. The documentation above reflects the intended design. Full compilation and expansion will land when Phase 16c is complete.
CXL Overview
CXL (Clinker Expression Language) is a per-record expression language designed for ETL transformations. Every CXL program operates on one record at a time, producing output fields, filtering records, or computing derived values.
CXL is not SQL. There are no SELECT, FROM, or WHERE keywords. CXL programs are sequences of statements – emit, let, filter, distinct – that execute top to bottom against the current record.
Key differences from SQL
| SQL | CXL |
|---|---|
SELECT col AS alias | emit alias = col |
WHERE condition | filter condition |
AND / OR / NOT | and / or / not (keywords) |
&& / || / ! | Not supported – use keywords |
COALESCE(a, b) | a ?? b |
CASE WHEN ... THEN ... END | if ... then ... else ... or match { } |
Boolean operators are keywords
CXL uses English keywords for boolean logic, not symbols:
$ cxl eval -e 'emit result = true and false' --field dummy=1
{
"result": false
}
The operators &&, ||, and ! are syntax errors in CXL. Always use and, or, and not.
System namespaces use $ prefix
CXL provides built-in namespaces for accessing pipeline state, metadata, and window functions. All system namespaces are prefixed with $:
$pipeline.*– pipeline execution context (name, counters, provenance)$meta.*– per-record metadata$window.*– window function calls$vars.*– user-defined pipeline variables
$ cxl eval -e 'emit name = $pipeline.name'
{
"name": "cxl-eval"
}
Compile-time type checking
CXL catches type errors before data processing begins. The compilation pipeline runs four phases:
- Parse – tokenize and build an AST from CXL source text
- Resolve – bind field references, validate method names, check arity
- Typecheck – infer types, validate operator compatibility, check method receiver types
- Eval – execute the typed program against each record
Errors at any phase produce rich diagnostics with source locations and fix suggestions via miette.
$ cxl check transform.cxl
ok: transform.cxl is valid
If there are type errors, the checker reports them with spans:
error[typecheck]: cannot apply '+' to String and Int (at transform.cxl:12)
help: convert one operand — use .to_int() or .to_string()
A minimal CXL program
emit greeting = "hello"
emit doubled = amount * 2
filter amount > 0
This program:
- Emits a constant string field
greeting - Emits
doubledas twice the inputamount - Filters out records where
amountis not positive
Try it:
$ cxl eval -e 'emit greeting = "hello"' -e 'emit doubled = amount * 2' \
--field amount=5
{
"greeting": "hello",
"doubled": 10
}
Statement order matters
CXL statements execute sequentially. Later statements can reference fields produced by earlier emit or let statements:
$ cxl eval -e 'let tax_rate = 0.21' -e 'emit tax = price * tax_rate' \
--field price=100
{
"tax": 21.0
}
A filter statement short-circuits execution – if the condition is false, remaining statements do not run and the record is excluded from output.
Types & Literals
CXL has 9 value types. Every field value, literal, and expression result is one of these types.
Value types
| Type | Rust backing | Description |
|---|---|---|
| Null | Value::Null | Missing or absent value |
| Bool | bool | true or false |
| Integer | i64 | 64-bit signed integer |
| Float | f64 | 64-bit double-precision float |
| String | Box<str> | UTF-8 text |
| Date | NaiveDate | Calendar date without timezone |
| DateTime | NaiveDateTime | Date and time without timezone |
| Array | Vec<Value> | Ordered collection of values |
| Map | IndexMap<Box<str>, Value> | Key-value pairs |
Literal syntax
Integers
Standard decimal notation. Negative values use the unary minus operator.
$ cxl eval -e 'emit a = 42' -e 'emit b = -5' -e 'emit c = 0'
{
"a": 42,
"b": -5,
"c": 0
}
Floats
Decimal notation with a dot. Must have digits on both sides of the decimal point.
$ cxl eval -e 'emit a = 3.14' -e 'emit b = -0.5'
{
"a": 3.14,
"b": -0.5
}
Strings
Double-quoted or single-quoted. Supports escape sequences: \\, \", \', \n, \t, \r.
$ cxl eval -e 'emit greeting = "hello world"'
{
"greeting": "hello world"
}
Booleans
The keywords true and false.
$ cxl eval -e 'emit flag = true' -e 'emit neg = not flag'
{
"flag": true,
"neg": false
}
Dates
Hash-delimited ISO 8601 format: #YYYY-MM-DD#.
$ cxl eval -e 'emit d = #2024-01-15#'
{
"d": "2024-01-15"
}
Null
The keyword null.
$ cxl eval -e 'emit nothing = null'
{
"nothing": null
}
Schema types
When declaring column types in YAML pipeline schemas, use these type names:
| Schema type | CXL type | Description |
|---|---|---|
string | String | Text values |
int | Integer | 64-bit integers |
float | Float | 64-bit floats |
bool | Bool | Boolean values |
date | Date | Calendar dates |
date_time | DateTime | Date and time |
array | Array | Ordered collections |
numeric | Int or Float | Union type – accepts either |
any | Any | Unknown type – no type constraints |
nullable(T) | Nullable(T) | Wrapper – value may be null |
Example YAML schema declaration:
schema:
employee_id: int
name: string
salary: nullable(float)
start_date: date
Type promotion
CXL automatically promotes types in mixed expressions:
Int + Float promotes to Float:
$ cxl eval -e 'emit result = 2 + 3.5'
{
"result": 5.5
}
Null + T produces Nullable(T): Any operation involving null produces a nullable result.
$ cxl eval -e 'emit result = null + 5'
{
"result": null
}
Nullable(A) + B unifies to Nullable(unified): When a nullable value meets a non-nullable value, the result type wraps the unified inner type in Nullable.
Type unification rules
The type checker follows these rules when two types meet in an expression:
- Same types unify to themselves:
Int + IntproducesInt Anyunifies with anything:Any + TproducesTNumericresolves to the concrete type:Numeric + IntproducesInt,Numeric + FloatproducesFloatIntpromotes toFloat:Int + FloatproducesFloatNullwraps:Null + TproducesNullable(T)Nullablepropagates:Nullable(A) + BproducesNullable(unified(A, B))- Incompatible types fail:
String + Intis a type error
Operators & Expressions
CXL provides arithmetic, comparison, boolean, null coalescing, and string operators. Boolean logic uses keywords (and, or, not), not symbols.
Arithmetic operators
| Operator | Description | Example |
|---|---|---|
+ | Addition (or string concatenation) | 2 + 3 |
- | Subtraction | 10 - 4 |
* | Multiplication | 3 * 5 |
/ | Division | 10 / 3 |
% | Modulo (remainder) | 10 % 3 |
$ cxl eval -e 'emit result = 2 + 3 * 4'
{
"result": 14
}
Multiplication binds tighter than addition, so 2 + 3 * 4 is 2 + (3 * 4) = 14, not (2 + 3) * 4 = 20.
$ cxl eval -e 'emit result = 10 % 3'
{
"result": 1
}
Comparison operators
| Operator | Description | Example |
|---|---|---|
== | Equal | x == 0 |
!= | Not equal | x != 0 |
> | Greater than | x > 10 |
< | Less than | x < 10 |
>= | Greater than or equal | x >= 10 |
<= | Less than or equal | x <= 10 |
$ cxl eval -e 'emit result = 5 > 3' --field dummy=1
{
"result": true
}
Boolean operators
CXL uses keywords for boolean logic. The symbols &&, ||, and ! are not valid CXL syntax.
| Operator | Description | Example |
|---|---|---|
and | Logical AND | a and b |
or | Logical OR | a or b |
not | Logical NOT (unary) | not a |
$ cxl eval -e 'emit result = true and not false'
{
"result": true
}
$ cxl eval -e 'emit result = 5 > 3 or 10 < 2'
{
"result": true
}
Null coalesce operator
The ?? operator returns its left operand if non-null, otherwise its right operand.
$ cxl eval -e 'emit result = null ?? "default"'
{
"result": "default"
}
$ cxl eval -e 'emit result = "present" ?? "default"'
{
"result": "present"
}
String concatenation
The + operator concatenates strings when both operands are strings.
$ cxl eval -e 'emit result = "hello" + " " + "world"'
{
"result": "hello world"
}
Unary operators
| Operator | Description | Example |
|---|---|---|
- | Numeric negation | -x |
not | Boolean negation | not done |
$ cxl eval -e 'emit result = -42'
{
"result": -42
}
Method calls
Methods are called on a receiver using dot notation:
$ cxl eval -e 'emit result = "hello".upper()'
{
"result": "HELLO"
}
Methods can be chained:
$ cxl eval -e 'emit result = " hello ".trim().upper()'
{
"result": "HELLO"
}
Field references
Bare identifiers reference fields from the input record:
$ cxl eval -e 'emit result = price * qty' \
--field price=10 \
--field qty=3
{
"result": 30
}
Qualified field references use dot notation for multi-source pipelines: source.field.
Operator precedence
From highest (binds tightest) to lowest:
| Precedence | Operators | Associativity |
|---|---|---|
| 1 (highest) | . (method calls, field access) | Left |
| 2 | - (unary), not | Prefix |
| 3 | * / % | Left |
| 4 | + - | Left |
| 5 | == != > < >= <= | Left |
| 6 | and | Left |
| 7 | or | Left |
| 8 (lowest) | ?? | Right |
Use parentheses to override precedence:
$ cxl eval -e 'emit result = (2 + 3) * 4'
{
"result": 20
}
Comments
Line comments start with # (when not followed by a digit – digit-prefixed # starts a date literal):
# This is a comment
emit total = price * qty # inline comment
emit deadline = #2024-12-31# # this is a date literal, not a comment
Statements
CXL programs are sequences of statements that execute top-to-bottom against each input record. Statement order matters – later statements can reference values produced by earlier ones.
emit
The emit statement produces an output field. Each emit becomes a column in the output record.
emit name = expression
$ cxl eval -e 'emit greeting = "hello"' -e 'emit doubled = 21 * 2'
{
"greeting": "hello",
"doubled": 42
}
Multiple emit statements build up the output record field by field:
$ cxl eval -e 'emit first = "Alice"' -e 'emit last = "Smith"' \
-e 'emit full = first + " " + last'
{
"first": "Alice",
"last": "Smith",
"full": "Alice Smith"
}
let
The let statement creates a local variable binding. The variable is available to subsequent statements but is NOT included in the output record.
let name = expression
$ cxl eval -e 'let tax_rate = 0.21' -e 'emit tax = 100 * tax_rate'
{
"tax": 21.0
}
Note that tax_rate does not appear in the output – only emit statements produce output fields.
filter
The filter statement excludes records where the condition evaluates to false. When a filter excludes a record, remaining statements do not execute (short-circuit).
filter condition
$ cxl eval -e 'filter amount > 0' -e 'emit result = amount * 2' \
--field amount=5
{
"result": 10
}
When the filter condition is false, the entire record is dropped and no output is produced.
Filters can appear anywhere in the statement sequence. Place them early to skip unnecessary computation:
filter status == "active"
let discount = if tier == "gold" then 0.2 else 0.1
emit final_price = price * (1 - discount)
distinct
The distinct statement deduplicates records. The bare form deduplicates on all emitted fields. The by form deduplicates on a specific field.
distinct
distinct by field_name
In a pipeline, distinct tracks values seen so far and drops records that have already been emitted with the same key.
emit meta
The emit meta statement writes a value to the $meta.* namespace – per-record metadata that is not part of the output columns. Metadata can be read by downstream nodes via $meta.field.
emit meta quality_flag = if amount < 0 then "suspect" else "ok"
Access metadata downstream:
filter $meta.quality_flag == "ok"
trace
The trace statement emits debug logging. It has no effect on the output record. Trace messages are only visible when tracing is enabled at the appropriate level.
trace "processing record"
trace warn "unusual value detected"
trace info if amount > 10000 then "high value transaction"
Trace levels: trace (default), debug, info, warn, error. An optional guard condition (via if) limits when the trace fires.
Statement ordering
Statements execute sequentially. A statement can reference any field or variable defined by a preceding emit or let:
$ cxl eval -e 'let base = 100' -e 'let rate = 0.15' \
-e 'emit subtotal = base * rate' \
-e 'emit total = base + subtotal'
{
"subtotal": 15.0,
"total": 115.0
}
Referencing a name before it is defined is a resolve-time error:
emit total = base + tax # error: 'base' is not defined yet
let base = 100
let tax = base * 0.21
use
The use statement imports a CXL module for reuse. See Modules & use for details.
use shared.dates as d
emit fy = d::fiscal_year(invoice_date)
Conditionals
CXL provides two conditional expression forms: if/then/else and match. Both are expressions – they return values and can be used anywhere an expression is expected.
If / then / else
The basic conditional expression:
if condition then value else alternative
$ cxl eval -e 'emit label = if amount > 100 then "high" else "low"' \
--field amount=250
{
"label": "high"
}
The else branch is optional. When omitted, records where the condition is false produce null:
$ cxl eval -e 'emit bonus = if score > 90 then score * 0.1' \
--field score=80
{
"bonus": null
}
Chained conditionals
Chain multiple conditions with else if:
$ cxl eval -e 'emit tier = if amount > 1000 then "platinum"
else if amount > 500 then "gold"
else if amount > 100 then "silver"
else "bronze"' \
--field amount=750
{
"tier": "gold"
}
Nested usage
Since if/then/else is an expression, it can be used inside other expressions:
$ cxl eval -e 'emit price = base * (if member then 0.8 else 1.0)' \
--field base=100 \
--field member=true
{
"price": 80.0
}
Match
The match expression provides pattern matching. It comes in two forms: value matching (with a subject) and condition matching (without a subject).
Value form (with subject)
Match a subject expression against literal patterns:
match subject {
pattern1 => result1,
pattern2 => result2,
_ => default
}
$ cxl eval -e 'emit label = match status {
"A" => "Active",
"I" => "Inactive",
"P" => "Pending",
_ => "Unknown"
}' \
--field status=A
{
"label": "Active"
}
The wildcard _ is the catch-all arm. It matches any value not covered by preceding arms.
Condition form (without subject)
When no subject is provided, each arm’s pattern is evaluated as a boolean condition. This is CXL’s equivalent of SQL’s CASE WHEN:
match {
condition1 => result1,
condition2 => result2,
_ => default
}
$ cxl eval -e 'emit tier = match {
amount > 1000 => "high",
amount > 100 => "medium",
_ => "low"
}' \
--field amount=500
{
"tier": "medium"
}
Practical examples
Tiered pricing:
emit discount = match {
qty >= 1000 => 0.25,
qty >= 100 => 0.15,
qty >= 10 => 0.05,
_ => 0.0
}
Status code mapping:
emit status_text = match http_code {
200 => "OK",
201 => "Created",
400 => "Bad Request",
404 => "Not Found",
500 => "Internal Server Error",
_ => "HTTP " + http_code.to_string()
}
Region classification:
emit region = match country {
"US" => "North America",
"CA" => "North America",
"MX" => "North America",
"GB" => "Europe",
"DE" => "Europe",
"FR" => "Europe",
_ => "Other"
}
Match arms are evaluated in order
The first matching arm wins. Place more specific conditions before general ones:
# Correct: specific before general
emit category = match {
amount > 10000 => "enterprise",
amount > 1000 => "business",
_ => "personal"
}
# Wrong: first arm always matches
emit category = match {
amount > 0 => "personal", # catches everything positive
amount > 1000 => "business", # never reached
amount > 10000 => "enterprise", # never reached
_ => "unknown"
}
Built-in Methods
CXL provides 68 built-in scalar methods organized into 8 categories. Methods are called on a receiver value using dot notation: receiver.method(args).
Null propagation
Most methods return null when the receiver is null. This means null values flow through method chains without causing errors. The exceptions are documented in Introspection & Debug.
Method categories
String Methods (24 methods)
Text manipulation: case conversion, trimming, padding, searching, splitting, regex matching.
| Method | Description |
|---|---|
upper, lower | Case conversion |
trim, trim_start, trim_end | Whitespace removal |
starts_with, ends_with, contains | Substring testing |
replace | Find and replace |
substring, left, right | Extraction |
pad_left, pad_right | Padding |
repeat, reverse | Repetition and reversal |
length | Character count |
split, join | Splitting and joining |
matches, find, capture | Regex operations |
format, concat | Formatting and concatenation |
Numeric Methods (8 methods)
Rounding, clamping, and comparison for integers and floats.
| Method | Description |
|---|---|
abs | Absolute value |
ceil, floor | Ceiling and floor |
round, round_to | Rounding to decimal places |
clamp | Constrain to range |
min, max | Pairwise minimum/maximum |
Date & Time Methods (13 methods)
Date component extraction, arithmetic, and formatting.
| Method | Description |
|---|---|
year, month, day | Date component extraction |
hour, minute, second | Time component extraction (DateTime only) |
add_days, add_months, add_years | Date arithmetic |
diff_days, diff_months, diff_years | Date difference |
format_date | Custom date formatting |
Conversion Methods (11 methods)
Type conversion in strict (error on failure) and lenient (null on failure) variants.
| Method | Description |
|---|---|
to_int, to_float, to_string, to_bool | Strict conversion |
to_date, to_datetime | Strict date parsing |
try_int, try_float, try_bool | Lenient conversion |
try_date, try_datetime | Lenient date parsing |
Introspection & Debug (5 methods)
Type inspection, null checking, and debugging. These are the only methods that accept null receivers without propagating null.
| Method | Description |
|---|---|
type_of | Returns the type name as a string |
is_null | Tests for null |
is_empty | Tests for empty string, empty array, or null |
catch | Null fallback (equivalent to ??) |
debug | Passthrough with tracing side effect |
Path Methods (5 methods)
File path component extraction.
| Method | Description |
|---|---|
file_name | Full filename with extension |
file_stem | Filename without extension |
extension | File extension |
parent | Parent directory path |
parent_name | Parent directory name |
String Methods
CXL provides 24 built-in methods for string manipulation. All string methods return null when the receiver is null (null propagation).
Case conversion
upper()
Converts all characters to uppercase.
$ cxl eval -e 'emit result = "hello world".upper()'
{
"result": "HELLO WORLD"
}
lower()
Converts all characters to lowercase.
$ cxl eval -e 'emit result = "Hello World".lower()'
{
"result": "hello world"
}
Whitespace trimming
trim()
Removes leading and trailing whitespace.
$ cxl eval -e 'emit result = " hello ".trim()'
{
"result": "hello"
}
trim_start()
Removes leading whitespace only.
$ cxl eval -e 'emit result = " hello ".trim_start()'
{
"result": "hello "
}
trim_end()
Removes trailing whitespace only.
$ cxl eval -e 'emit result = " hello ".trim_end()'
{
"result": " hello"
}
Substring testing
starts_with(prefix: String) -> Bool
Tests whether the string starts with the given prefix.
$ cxl eval -e 'emit result = "hello world".starts_with("hello")'
{
"result": true
}
ends_with(suffix: String) -> Bool
Tests whether the string ends with the given suffix.
$ cxl eval -e 'emit result = "report.csv".ends_with(".csv")'
{
"result": true
}
contains(substring: String) -> Bool
Tests whether the string contains the given substring.
$ cxl eval -e 'emit result = "hello world".contains("lo wo")'
{
"result": true
}
Find and replace
replace(find: String, replacement: String) -> String
Replaces all occurrences of find with replacement.
$ cxl eval -e 'emit result = "foo-bar-baz".replace("-", "_")'
{
"result": "foo_bar_baz"
}
Extraction
substring(start: Int [, length: Int]) -> String
Extracts a substring starting at start (0-based character index). If length is provided, takes at most that many characters. If omitted, takes all remaining characters.
$ cxl eval -e 'emit result = "hello world".substring(6)'
{
"result": "world"
}
$ cxl eval -e 'emit result = "hello world".substring(0, 5)'
{
"result": "hello"
}
left(n: Int) -> String
Returns the first n characters.
$ cxl eval -e 'emit result = "hello world".left(5)'
{
"result": "hello"
}
right(n: Int) -> String
Returns the last n characters.
$ cxl eval -e 'emit result = "hello world".right(5)'
{
"result": "world"
}
Padding
pad_left(width: Int [, char: String]) -> String
Left-pads the string to the given width. Default pad character is a space.
$ cxl eval -e 'emit result = "42".pad_left(5, "0")'
{
"result": "00042"
}
$ cxl eval -e 'emit result = "hi".pad_left(6)'
{
"result": " hi"
}
pad_right(width: Int [, char: String]) -> String
Right-pads the string to the given width. Default pad character is a space.
$ cxl eval -e 'emit result = "hi".pad_right(6, ".")'
{
"result": "hi...."
}
Repetition and reversal
repeat(n: Int) -> String
Repeats the string n times.
$ cxl eval -e 'emit result = "ab".repeat(3)'
{
"result": "ababab"
}
reverse() -> String
Reverses the characters in the string.
$ cxl eval -e 'emit result = "hello".reverse()'
{
"result": "olleh"
}
Length
length() -> Int
Returns the number of characters in the string. Also works on arrays, returning the number of elements.
$ cxl eval -e 'emit result = "hello".length()'
{
"result": 5
}
Splitting and joining
split(delimiter: String) -> Array
Splits the string by the delimiter, returning an array of strings.
$ cxl eval -e 'emit result = "a,b,c".split(",")'
{
"result": ["a", "b", "c"]
}
join(delimiter: String) -> String
Joins an array of values into a string with the given delimiter. The receiver must be an array.
$ cxl eval -e 'emit result = "a,b,c".split(",").join(" - ")'
{
"result": "a - b - c"
}
Regex operations
matches(pattern: String) -> Bool
Tests whether the string fully matches the given regex pattern.
$ cxl eval -e 'emit result = "abc123".matches("^[a-z]+[0-9]+$")'
{
"result": true
}
find(pattern: String) -> Bool
Tests whether the string contains a substring matching the given regex pattern (partial match).
$ cxl eval -e 'emit result = "hello world 42".find("[0-9]+")'
{
"result": true
}
capture(pattern: String [, group: Int]) -> String
Extracts a capture group from the first regex match. Default group is 0 (the full match).
$ cxl eval -e 'emit result = "order-12345".capture("order-([0-9]+)", 1)'
{
"result": "12345"
}
Formatting and concatenation
format(fmt: String) -> String
Formats the receiver value as a string.
$ cxl eval -e 'emit result = 42.format("")'
{
"result": "42"
}
concat(args: String…) -> String
Concatenates the receiver with one or more string arguments. Null arguments are treated as empty strings.
$ cxl eval -e 'emit result = "hello".concat(" ", "world")'
{
"result": "hello world"
}
This is variadic – it accepts any number of string arguments:
$ cxl eval -e 'emit result = "a".concat("b", "c", "d")'
{
"result": "abcd"
}
Numeric Methods
CXL provides 8 built-in methods for numeric operations. These methods work on both Integer and Float values (the Numeric receiver type). All return null when the receiver is null.
abs() -> Numeric
Returns the absolute value. Preserves the original type (Int stays Int, Float stays Float).
$ cxl eval -e 'emit result = (-42).abs()'
{
"result": 42
}
$ cxl eval -e 'emit result = (-3.14).abs()'
{
"result": 3.14
}
ceil() -> Int
Rounds up to the nearest integer. Returns the value unchanged for integers.
$ cxl eval -e 'emit result = 3.2.ceil()'
{
"result": 4
}
$ cxl eval -e 'emit result = (-3.2).ceil()'
{
"result": -3
}
floor() -> Int
Rounds down to the nearest integer. Returns the value unchanged for integers.
$ cxl eval -e 'emit result = 3.8.floor()'
{
"result": 3
}
$ cxl eval -e 'emit result = (-3.2).floor()'
{
"result": -4
}
round([decimals: Int]) -> Float
Rounds to the specified number of decimal places. Default is 0 decimal places.
$ cxl eval -e 'emit result = 3.456.round()'
{
"result": 3.0
}
$ cxl eval -e 'emit result = 3.456.round(2)'
{
"result": 3.46
}
round_to(decimals: Int) -> Float
Rounds to the specified number of decimal places. Unlike round(), the decimals argument is required.
$ cxl eval -e 'emit result = 3.14159.round_to(3)'
{
"result": 3.142
}
Use round_to when you want to be explicit about precision in financial or scientific calculations:
$ cxl eval -e 'emit price = 19.995.round_to(2)'
{
"price": 20.0
}
clamp(min: Numeric, max: Numeric) -> Numeric
Constrains the value to the given range. Returns min if the value is below it, max if above, or the value itself if within range.
$ cxl eval -e 'emit result = 150.clamp(0, 100)'
{
"result": 100
}
$ cxl eval -e 'emit result = (-5).clamp(0, 100)'
{
"result": 0
}
$ cxl eval -e 'emit result = 50.clamp(0, 100)'
{
"result": 50
}
min(other: Numeric) -> Numeric
Returns the smaller of the receiver and the argument.
$ cxl eval -e 'emit result = 10.min(20)'
{
"result": 10
}
$ cxl eval -e 'emit result = 10.min(5)'
{
"result": 5
}
max(other: Numeric) -> Numeric
Returns the larger of the receiver and the argument.
$ cxl eval -e 'emit result = 10.max(20)'
{
"result": 20
}
$ cxl eval -e 'emit result = 10.max(5)'
{
"result": 10
}
Practical examples
Clamp a percentage:
emit pct = (completed / total * 100).clamp(0, 100).round_to(1)
Absolute difference:
emit diff = (actual - expected).abs()
Floor division for batch numbering:
emit batch = (row_number / 1000).floor()
Date & Time Methods
CXL provides 13 built-in methods for date and time manipulation. These methods work on Date and DateTime values. All return null when the receiver is null.
Component extraction
year() -> Int
Returns the year component.
$ cxl eval -e 'emit result = #2024-03-15#.year()'
{
"result": 2024
}
month() -> Int
Returns the month component (1-12).
$ cxl eval -e 'emit result = #2024-03-15#.month()'
{
"result": 3
}
day() -> Int
Returns the day-of-month component (1-31).
$ cxl eval -e 'emit result = #2024-03-15#.day()'
{
"result": 15
}
hour() -> Int
Returns the hour component (0-23). DateTime only – returns null for Date values.
$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime().hour()'
{
"result": 14
}
minute() -> Int
Returns the minute component (0-59). DateTime only – returns null for Date values.
$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime().minute()'
{
"result": 30
}
second() -> Int
Returns the second component (0-59). DateTime only – returns null for Date values.
$ cxl eval -e 'emit result = "2024-03-15T14:30:45".to_datetime().second()'
{
"result": 45
}
Date arithmetic
add_days(n: Int) -> Date
Adds n days to the date. Use negative values to subtract. Works on both Date and DateTime.
$ cxl eval -e 'emit result = #2024-01-15#.add_days(10)'
{
"result": "2024-01-25"
}
$ cxl eval -e 'emit result = #2024-01-15#.add_days(-5)'
{
"result": "2024-01-10"
}
add_months(n: Int) -> Date
Adds n months to the date. Day is clamped to the last day of the target month if necessary.
$ cxl eval -e 'emit result = #2024-01-31#.add_months(1)'
{
"result": "2024-02-29"
}
$ cxl eval -e 'emit result = #2024-03-15#.add_months(-2)'
{
"result": "2024-01-15"
}
add_years(n: Int) -> Date
Adds n years to the date. Leap day (Feb 29) is clamped to Feb 28 in non-leap years.
$ cxl eval -e 'emit result = #2024-02-29#.add_years(1)'
{
"result": "2025-02-28"
}
Date difference
diff_days(other: Date) -> Int
Returns the number of days between the receiver and the argument (receiver - other). Positive when the receiver is later.
$ cxl eval -e 'emit result = #2024-03-15#.diff_days(#2024-03-01#)'
{
"result": 14
}
$ cxl eval -e 'emit result = #2024-01-01#.diff_days(#2024-03-15#)'
{
"result": -74
}
diff_months(other: Date) -> Int
Returns the difference in months between two dates.
Note: This method currently returns
null(unimplemented). Usediff_daysand divide by 30 as an approximation.
diff_years(other: Date) -> Int
Returns the difference in years between two dates.
Note: This method currently returns
null(unimplemented). Usediff_daysand divide by 365 as an approximation.
Formatting
format_date(format: String) -> String
Formats the date/datetime using a chrono format string. See chrono format syntax.
Common format specifiers:
| Specifier | Description | Example |
|---|---|---|
%Y | 4-digit year | 2024 |
%m | 2-digit month | 03 |
%d | 2-digit day | 15 |
%H | Hour (24h) | 14 |
%M | Minute | 30 |
%S | Second | 00 |
%B | Full month name | March |
%b | Abbreviated month | Mar |
%A | Full weekday | Friday |
$ cxl eval -e 'emit result = #2024-03-15#.format_date("%B %d, %Y")'
{
"result": "March 15, 2024"
}
$ cxl eval -e 'emit result = #2024-03-15#.format_date("%Y/%m/%d")'
{
"result": "2024/03/15"
}
Practical examples
Fiscal year calculation (April start):
let d = invoice_date
emit fiscal_year = if d.month() < 4 then d.year() - 1 else d.year()
Age in days:
emit days_since = now.diff_days(created_date)
Quarter:
emit quarter = match {
invoice_date.month() <= 3 => "Q1",
invoice_date.month() <= 6 => "Q2",
invoice_date.month() <= 9 => "Q3",
_ => "Q4"
}
ISO week format:
emit formatted = order_date.format_date("%Y-W%V")
Conversion Methods
CXL provides two families of conversion methods: strict (6 methods) and lenient (5 methods). Strict conversions raise an error on failure, halting pipeline execution. Lenient conversions return null on failure, allowing graceful handling of dirty data.
All conversion methods accept any receiver type (Any).
Strict conversions
Use strict conversions for required fields where invalid data should halt processing.
to_int() -> Int
Converts the receiver to an integer. Errors on failure.
- Float: truncates toward zero
- String: parses as integer
- Bool:
truebecomes1,falsebecomes0
$ cxl eval -e 'emit result = "42".to_int()'
{
"result": 42
}
$ cxl eval -e 'emit result = 3.9.to_int()'
{
"result": 3
}
to_float() -> Float
Converts the receiver to a float. Errors on failure.
- Integer: promotes to float
- String: parses as float
$ cxl eval -e 'emit result = "3.14".to_float()'
{
"result": 3.14
}
$ cxl eval -e 'emit result = 42.to_float()'
{
"result": 42.0
}
to_string() -> String
Converts any value to its string representation. Never fails.
$ cxl eval -e 'emit result = 42.to_string()'
{
"result": "42"
}
$ cxl eval -e 'emit result = true.to_string()'
{
"result": "true"
}
to_bool() -> Bool
Converts the receiver to a boolean. Errors on failure.
- String:
"true","1","yes"becometrue;"false","0","no"becomefalse(case-insensitive) - Integer:
0isfalse, everything else istrue
$ cxl eval -e 'emit result = "yes".to_bool()'
{
"result": true
}
$ cxl eval -e 'emit result = 0.to_bool()'
{
"result": false
}
to_date([format: String]) -> Date
Parses a string to a Date. Without a format argument, expects ISO 8601 (YYYY-MM-DD). With a format, uses chrono strftime syntax.
$ cxl eval -e 'emit result = "2024-03-15".to_date()'
{
"result": "2024-03-15"
}
$ cxl eval -e 'emit result = "15/03/2024".to_date("%d/%m/%Y")'
{
"result": "2024-03-15"
}
to_datetime([format: String]) -> DateTime
Parses a string to a DateTime. Without a format argument, expects ISO 8601 (YYYY-MM-DDTHH:MM:SS). With a format, uses chrono strftime syntax.
$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime()'
{
"result": "2024-03-15T14:30:00"
}
Lenient conversions
Use lenient conversions for optional or dirty data fields. They return null instead of raising errors, making them safe to combine with ?? for fallback values.
try_int() -> Int
Attempts to convert to integer. Returns null on failure.
$ cxl eval -e 'emit a = "42".try_int()' -e 'emit b = "abc".try_int()'
{
"a": 42,
"b": null
}
try_float() -> Float
Attempts to convert to float. Returns null on failure.
$ cxl eval -e 'emit a = "3.14".try_float()' -e 'emit b = "N/A".try_float()'
{
"a": 3.14,
"b": null
}
try_bool() -> Bool
Attempts to convert to boolean. Returns null on failure.
$ cxl eval -e 'emit a = "yes".try_bool()' -e 'emit b = "maybe".try_bool()'
{
"a": true,
"b": null
}
try_date([format: String]) -> Date
Attempts to parse a string as a Date. Returns null on failure.
$ cxl eval -e 'emit a = "2024-03-15".try_date()' \
-e 'emit b = "not a date".try_date()'
{
"a": "2024-03-15",
"b": null
}
try_datetime([format: String]) -> DateTime
Attempts to parse a string as a DateTime. Returns null on failure.
$ cxl eval -e 'emit a = "2024-03-15T14:30:00".try_datetime()' \
-e 'emit b = "invalid".try_datetime()'
{
"a": "2024-03-15T14:30:00",
"b": null
}
When to use each
Strict conversions (to_*) for:
- Required fields that must be valid
- Schema-enforced data where bad input should halt the pipeline
- Fields already validated upstream
Lenient conversions (try_*) for:
- Optional fields that may be missing or malformed
- Dirty data with mixed formats
- Fields where a fallback value is acceptable
Practical patterns
Safe numeric parsing with fallback:
emit amount = raw_amount.try_float() ?? 0.0
Parse dates from multiple formats:
emit parsed = raw_date.try_date("%Y-%m-%d")
?? raw_date.try_date("%m/%d/%Y")
?? raw_date.try_date("%d-%b-%Y")
Strict conversion for required fields:
emit employee_id = raw_id.to_int() # halts on bad data -- correct behavior
emit salary = raw_salary.to_float() # must be numeric
Lenient conversion for optional fields:
emit bonus = raw_bonus.try_float() # null if missing or non-numeric
emit total = salary + (bonus ?? 0.0) # safe arithmetic
Introspection & Debug
CXL provides 4 introspection methods and 1 debug method. These are the only methods that accept null receivers without propagating null – they are designed specifically for inspecting and handling null values.
type_of() -> String
Returns the type name of the receiver as a string. Works on any value, including null.
Type name strings: "String", "Int", "Float", "Bool", "Date", "DateTime", "Null", "Array", "Map".
$ cxl eval -e 'emit a = 42.type_of()' -e 'emit b = "hello".type_of()' \
-e 'emit c = null.type_of()'
{
"a": "Int",
"b": "String",
"c": "Null"
}
Useful for branching on dynamic types:
emit formatted = match value.type_of() {
"Int" => value.to_string() + " (integer)",
"Float" => value.round_to(2).to_string() + " (decimal)",
_ => value.to_string()
}
is_null() -> Bool
Returns true if the receiver is null, false otherwise. This is the primary way to test for null values – it is NOT subject to null propagation.
$ cxl eval -e 'emit a = null.is_null()' -e 'emit b = 42.is_null()'
{
"a": true,
"b": false
}
Use in filter statements:
filter not field.is_null()
is_empty() -> Bool
Returns true for empty strings, empty arrays, or null values. Returns false for all other values.
$ cxl eval -e 'emit a = "".is_empty()' -e 'emit b = "hello".is_empty()' \
-e 'emit c = null.is_empty()'
{
"a": true,
"b": false,
"c": true
}
Useful for filtering out blank or missing records:
filter not name.is_empty()
catch(fallback: Any) -> Any
Returns the receiver if it is non-null, otherwise returns the fallback value. This is the method equivalent of the ?? operator.
$ cxl eval -e 'emit a = null.catch("default")' \
-e 'emit b = "present".catch("default")'
{
"a": "default",
"b": "present"
}
catch and ?? are interchangeable:
# These two are equivalent:
emit name = raw_name.catch("Unknown")
emit name = raw_name ?? "Unknown"
debug(label: String) -> Any
Passes the receiver through unchanged while emitting a trace log with the given label. Zero overhead when tracing is disabled. The return value is always the receiver, making it safe to insert into any expression chain.
$ cxl eval -e 'emit result = 42.debug("check value")'
{
"result": 42
}
Insert debug anywhere in a method chain for inspection without affecting the output:
emit total = price.debug("price")
* qty.debug("qty")
When tracing is enabled, this produces log lines like:
TRACE source_row=1 source_file=input.csv: price: Integer(100)
TRACE source_row=1 source_file=input.csv: qty: Integer(5)
Null-safe summary
| Method | Null receiver behavior |
|---|---|
type_of() | Returns "Null" |
is_null() | Returns true |
is_empty() | Returns true |
catch(x) | Returns x |
debug(l) | Passes through null, logs it |
| All other methods | Return null (propagation) |
Path Methods
CXL provides 5 built-in methods for extracting components from file path strings. All path methods take a string receiver and return a string. They return null when the receiver is null or when the requested component does not exist.
file_name() -> String
Returns the full filename (with extension) from the path.
$ cxl eval -e 'emit result = "/data/reports/sales.csv".file_name()'
{
"result": "sales.csv"
}
file_stem() -> String
Returns the filename without the extension.
$ cxl eval -e 'emit result = "/data/reports/sales.csv".file_stem()'
{
"result": "sales"
}
extension() -> String
Returns the file extension (without the leading dot).
$ cxl eval -e 'emit result = "/data/reports/sales.csv".extension()'
{
"result": "csv"
}
Returns null when no extension is present:
$ cxl eval -e 'emit result = "/data/reports/README".extension()'
{
"result": null
}
parent() -> String
Returns the parent directory path.
$ cxl eval -e 'emit result = "/data/reports/sales.csv".parent()'
{
"result": "/data/reports"
}
parent_name() -> String
Returns just the name of the parent directory (not the full path).
$ cxl eval -e 'emit result = "/data/reports/sales.csv".parent_name()'
{
"result": "reports"
}
Practical examples
Organize output by source directory:
emit source_dir = $pipeline.source_file.parent_name()
emit source_type = $pipeline.source_file.extension()
Extract file identifiers:
emit file_id = $pipeline.source_file.file_stem()
emit is_csv = $pipeline.source_file.extension() == "csv"
Route by file type:
let ext = input_path.extension()
emit format = match ext {
"csv" => "delimited",
"json" => "structured",
"xml" => "markup",
_ => "unknown"
}
Window Functions
Window functions allow CXL expressions to access aggregated values across a set of records within an analytic window. Unlike aggregate functions (which collapse groups into single rows), window functions attach computed values to each individual record.
Window functions are accessed via the $window.* namespace and require an analytic_window: configuration on the transform node.
Configuring an analytic window
Window functions are only available in transform nodes that declare an analytic_window: section in YAML:
nodes:
- name: ranked_sales
type: transform
input: raw_sales
analytic_window:
partition_by: [region]
order_by:
- field: amount
direction: desc
frame:
type: rows
start: unbounded_preceding
end: current_row
cxl: |
emit region = region
emit amount = amount
emit running_total = $window.sum(amount)
emit rank_position = $window.count()
Window configuration fields
| Field | Description |
|---|---|
partition_by | List of fields to partition the window by (like SQL PARTITION BY) |
order_by | List of ordering specifications (field + direction) |
frame.type | Frame type: rows or range |
frame.start | Frame start: unbounded_preceding, current_row, or preceding(n) |
frame.end | Frame end: unbounded_following, current_row, or following(n) |
Aggregate window functions
These compute aggregate values over the window frame.
$window.sum(field)
Sum of the field values in the window frame.
emit running_total = $window.sum(amount)
$window.avg(field)
Average of the field values in the window frame. Returns Float.
emit moving_avg = $window.avg(amount)
$window.min(field)
Minimum value in the window frame.
emit window_min = $window.min(amount)
$window.max(field)
Maximum value in the window frame.
emit window_max = $window.max(amount)
$window.count()
Count of records in the window frame. Takes no arguments.
emit window_size = $window.count()
Positional window functions
These access specific records by position within the window frame.
$window.first()
Returns the value of the current field from the first record in the window frame.
emit first_amount = $window.first()
$window.last()
Returns the value of the current field from the last record in the window frame.
emit last_amount = $window.last()
$window.lag(n)
Returns the value from n records before the current record. Returns null if there is no record at that offset.
emit prev_amount = $window.lag(1)
emit two_back = $window.lag(2)
$window.lead(n)
Returns the value from n records after the current record. Returns null if there is no record at that offset.
emit next_amount = $window.lead(1)
Iterable window functions
These evaluate predicates or collect values across the window.
$window.any(predicate)
Returns true if the predicate is true for any record in the window.
emit has_high = $window.any(amount > 1000)
$window.all(predicate)
Returns true if the predicate is true for all records in the window.
emit all_positive = $window.all(amount > 0)
$window.collect(field)
Collects all values of the field in the window into an array.
emit all_amounts = $window.collect(amount)
$window.distinct(field)
Collects distinct values of the field in the window into an array.
emit unique_regions = $window.distinct(region)
Complete example
nodes:
- name: sales_analysis
type: transform
input: daily_sales
analytic_window:
partition_by: [store_id]
order_by:
- field: sale_date
direction: asc
frame:
type: rows
start: preceding(6)
end: current_row
cxl: |
emit store_id = store_id
emit sale_date = sale_date
emit daily_revenue = revenue
emit week_avg = $window.avg(revenue)
emit week_total = $window.sum(revenue)
emit prev_day_revenue = $window.lag(1)
emit day_over_day = revenue - ($window.lag(1) ?? revenue)
This computes a 7-day rolling average and total per store, along with day-over-day revenue change.
Aggregate Functions
Aggregate functions operate across grouped record sets in aggregate nodes, collapsing multiple input records into summary rows. They are distinct from window functions, which attach computed values to each individual record.
Aggregate functions
CXL provides 7 aggregate functions. These are called as free-standing function calls (not method calls) within the CXL block of an aggregate node.
| Function | Signature | Returns | Description |
|---|---|---|---|
sum(expr) | Numeric | Numeric | Sum of values |
count(*) | – | Int | Count of records in the group |
avg(expr) | Numeric | Float | Arithmetic mean |
min(expr) | Any | Any | Minimum value |
max(expr) | Any | Any | Maximum value |
collect(expr) | Any | Array | All values collected into an array |
weighted_avg(value, weight) | Numeric, Numeric | Float | Weighted arithmetic mean |
YAML aggregate node
Aggregate functions are used inside the cxl: block of a node with type: aggregate. The node must declare group_by: fields.
nodes:
- name: dept_summary
type: aggregate
input: employees
group_by: [department]
cxl: |
emit total_salary = sum(salary)
emit headcount = count(*)
emit avg_salary = avg(salary)
emit max_salary = max(salary)
emit min_salary = min(salary)
Group-by fields pass through automatically
Fields listed in group_by: are automatically included in the output. You do NOT need to emit them – they are carried through as group keys.
In the example above, department is automatically present in every output record without an explicit emit department = department statement.
Function details
sum(expr) -> Numeric
Computes the sum of the expression across all records in the group. Null values are skipped.
cxl: |
emit total_revenue = sum(price * quantity)
count(*) -> Int
Counts the number of records in the group. The argument is the wildcard *.
cxl: |
emit num_orders = count(*)
avg(expr) -> Float
Computes the arithmetic mean. Null values are skipped. Returns Float.
cxl: |
emit avg_order_value = avg(order_total)
min(expr) -> Any
Returns the minimum value in the group. Works on numeric, string, and date types.
cxl: |
emit earliest_order = min(order_date)
emit lowest_price = min(unit_price)
max(expr) -> Any
Returns the maximum value in the group. Works on numeric, string, and date types.
cxl: |
emit latest_order = max(order_date)
emit highest_price = max(unit_price)
collect(expr) -> Array
Collects all values of the expression into an array. Useful for building lists of values per group.
cxl: |
emit all_order_ids = collect(order_id)
weighted_avg(value, weight) -> Float
Computes a weighted average: sum(value * weight) / sum(weight). Takes two arguments.
cxl: |
emit weighted_price = weighted_avg(unit_price, quantity)
Aggregates vs. windows
| Feature | Aggregate node | Window function |
|---|---|---|
| Record output | One row per group | One row per input record |
| Syntax | sum(field) (free-standing) | $window.sum(field) (namespace) |
| Configuration | type: aggregate + group_by: | type: transform + analytic_window: |
| Use case | Summarize groups | Enrich records with group context |
Combining aggregates with expressions
Aggregate function calls can be mixed with regular CXL expressions in emit statements:
nodes:
- name: category_stats
type: aggregate
input: products
group_by: [category]
cxl: |
emit total_revenue = sum(price * quantity)
emit avg_price = avg(price)
emit margin_pct = (sum(revenue) - sum(cost)) / sum(revenue) * 100
emit product_count = count(*)
emit has_premium = max(price) > 100
Restrictions
letbindings in aggregate transforms are restricted to row-pure expressions (no aggregate function calls inlet).filterin aggregate transforms runs pre-aggregation – it filters input records before grouping.distinctis not permitted inside aggregate transforms. Place a separate distinct transform upstream.
Complete example
pipeline:
name: sales_summary
nodes:
- name: raw_sales
type: source
format: csv
path: sales.csv
- name: monthly_summary
type: aggregate
input: raw_sales
group_by: [region, month]
cxl: |
emit total_sales = sum(amount)
emit order_count = count(*)
emit avg_order = avg(amount)
emit top_sale = max(amount)
emit all_reps = collect(sales_rep)
- name: output
type: output
input: monthly_summary
format: csv
path: summary.csv
System Variables
CXL provides several system variable namespaces prefixed with $. These give CXL expressions access to pipeline execution context, user-defined variables, per-record metadata, and the current time.
$pipeline.* – Pipeline context
Pipeline variables are accessed via $pipeline.member_name. Some are frozen at pipeline start; others update per record.
Stable (frozen at pipeline start)
| Variable | Type | Description |
|---|---|---|
$pipeline.name | String | Pipeline name from YAML config |
$pipeline.execution_id | String | UUID v7, unique per pipeline run |
$pipeline.batch_id | String | From --batch-id CLI flag, or auto-generated UUID v7 |
$pipeline.start_time | DateTime | Frozen at pipeline start, deterministic within a run |
$ cxl eval -e 'emit name = $pipeline.name' \
-e 'emit exec = $pipeline.execution_id'
{
"name": "cxl-eval",
"exec": "00000000-0000-0000-0000-000000000000"
}
Per-record provenance
| Variable | Type | Description |
|---|---|---|
$pipeline.source_file | String | Path of the source file for the current record |
$pipeline.source_row | Int | Row number within the source file |
These change per record, tracking where each record originated. Useful for diagnostics and auditing.
emit meta audit_source = $pipeline.source_file
emit meta audit_row = $pipeline.source_row
Counters
| Variable | Type | Description |
|---|---|---|
$pipeline.total_count | Int | Total records processed so far |
$pipeline.ok_count | Int | Records that passed successfully |
$pipeline.dlq_count | Int | Records sent to dead-letter queue |
$pipeline.filtered_count | Int | Records excluded by filter statements |
$pipeline.distinct_count | Int | Records excluded by distinct statements |
trace info if $pipeline.total_count % 10000 == 0 then "processed " + $pipeline.total_count.to_string() + " records"
$vars.* – User-defined variables
User-defined variables are declared in the YAML pipeline config under pipeline.vars: and accessed via $vars.name in CXL expressions.
YAML declaration
pipeline:
name: invoice_processing
vars:
high_value_threshold: 10000
tax_rate: 0.21
output_currency: "USD"
fiscal_year_start_month: 4
CXL usage
filter amount > $vars.high_value_threshold
emit tax = amount * $vars.tax_rate
emit currency = $vars.output_currency
Variables provide a clean way to externalize configuration from CXL logic. Combined with channels, different variable sets can parameterize the same pipeline for different environments or clients.
$meta.* – Per-record metadata
Metadata is a per-record key-value store that travels with the record through the pipeline but is not part of the output columns. Write to it with emit meta; read from it with $meta.field.
Writing metadata
emit meta quality = if amount < 0 then "suspect" else "ok"
emit meta source_system = "legacy_erp"
Reading metadata
Downstream nodes can read metadata:
filter $meta.quality == "ok"
emit audit_system = $meta.source_system
Metadata is useful for tagging records with quality flags, routing hints, or audit information that should not appear in the final output unless explicitly emitted.
now – Current time
The now keyword returns the current wall-clock time as a DateTime value. It is evaluated fresh per record, so each record gets the actual time of its processing.
$ cxl eval -e 'emit timestamp = now'
{
"timestamp": "2026-04-11T15:30:00"
}
now is useful for timestamping records:
emit processed_at = now
emit days_old = now.diff_days(created_date)
Note:
nowis a keyword, not a function call. Writenow, notnow().
Complete example
pipeline:
name: order_enrichment
vars:
discount_threshold: 500
tax_rate: 0.08
nodes:
- name: orders
type: source
format: csv
path: orders.csv
- name: enrich
type: transform
input: orders
cxl: |
emit order_id = order_id
emit amount = amount
emit discount = if amount > $vars.discount_threshold then 0.1 else 0.0
emit tax = amount * $vars.tax_rate
emit total = amount * (1 - discount) + tax
emit processed_at = now
emit meta source_row = $pipeline.source_row
emit pipeline_run = $pipeline.execution_id
- name: output
type: output
input: enrich
format: csv
path: enriched_orders.csv
Null Handling
Null values in CXL represent missing or absent data. CXL uses null propagation – most operations on null produce null – with specific tools for detecting and handling nulls.
Null propagation
When a method receives a null receiver, it returns null without executing. This is called null propagation and applies to all methods except the introspection methods.
$ cxl eval -e 'emit result = null.upper()'
{
"result": null
}
Propagation flows through method chains:
$ cxl eval -e 'emit result = null.trim().upper().length()'
{
"result": null
}
Null propagation exceptions
Five methods are exempt from null propagation and actively handle null receivers:
| Method | Null behavior |
|---|---|
is_null() | Returns true |
type_of() | Returns "Null" |
is_empty() | Returns true |
catch(x) | Returns x |
debug(l) | Passes through null, logs it |
$ cxl eval -e 'emit a = null.is_null()' -e 'emit b = null.type_of()' \
-e 'emit c = null.catch("fallback")'
{
"a": true,
"b": "Null",
"c": "fallback"
}
Null coalesce operator (??)
The ?? operator returns its left operand if non-null, otherwise its right operand. It is the primary tool for providing default values.
$ cxl eval -e 'emit a = null ?? "default"' \
-e 'emit b = "present" ?? "default"'
{
"a": "default",
"b": "present"
}
Chain multiple ?? operators for fallback chains:
$ cxl eval -e 'emit result = null ?? null ?? "last resort"'
{
"result": "last resort"
}
Three-valued logic
Boolean operations with null follow three-valued logic (like SQL):
and
| Left | Right | Result |
|---|---|---|
true | null | null |
false | null | false |
null | true | null |
null | false | false |
null | null | null |
The key insight: false and null is false because the result is false regardless of the unknown value.
or
| Left | Right | Result |
|---|---|---|
true | null | true |
false | null | null |
null | true | true |
null | false | null |
null | null | null |
The key insight: true or null is true because the result is true regardless of the unknown value.
not
| Operand | Result |
|---|---|
true | false |
false | true |
null | null |
Arithmetic with null
Any arithmetic operation involving null produces null:
$ cxl eval -e 'emit result = 5 + null'
{
"result": null
}
Comparison with null
Comparisons involving null produce null (not false):
$ cxl eval -e 'emit result = null == null'
{
"result": null
}
To test for null, use is_null():
$ cxl eval -e 'emit result = null.is_null()'
{
"result": true
}
Practical patterns
Fallback values with ??
emit name = raw_name ?? "Unknown"
emit amount = raw_amount ?? 0
emit active = is_active ?? false
Safe conversion with try_* and ??
emit price = raw_price.try_float() ?? 0.0
emit qty = raw_qty.try_int() ?? 1
Explicit null testing
filter not amount.is_null()
emit has_email = not email.is_null()
Catch method (equivalent to ??)
emit name = raw_name.catch("Unknown")
Conditional null handling
emit status = if amount.is_null() then "missing"
else if amount < 0 then "invalid"
else "ok"
Filter blank or null
# Filter out records where name is null or empty string
filter not name.is_empty()
Null-safe chaining
When working with fields that may be null, place the null check early or use ??:
# Safe: coalesce first, then transform
emit normalized = (raw_name ?? "").trim().upper()
# Safe: test before use
emit name = if raw_name.is_null() then "N/A" else raw_name.trim()
Modules & use
CXL supports a module system for organizing reusable expressions. Modules contain function declarations and constant bindings that can be imported into CXL programs.
Module files
A module is a .cxl file containing fn declarations and let constants. Module files live in the rules path (default: ./rules/).
Function declarations
Functions are pure, single-expression bodies with named parameters:
fn fiscal_year(d) = if d.month() < 4 then d.year() - 1 else d.year()
fn full_name(first, last) = first.trim() + " " + last.trim()
fn clamp_pct(value) = value.clamp(0, 100).round_to(1)
Functions are pure – they have no side effects and always return a value.
Module constants
Constants are let bindings at the module level:
let tax_rate = 0.21
let max_retries = 3
let default_currency = "USD"
Example module file
File: rules/shared/dates.cxl
fn fiscal_year(d) = if d.month() < 4 then d.year() - 1 else d.year()
fn quarter(d) = match {
d.month() <= 3 => 1,
d.month() <= 6 => 2,
d.month() <= 9 => 3,
_ => 4
}
fn fiscal_quarter(d) = quarter(d.add_months(-3))
let fiscal_start_month = 4
Importing modules
Use the use statement to import a module. Module paths use dot notation (not ::):
use shared.dates as d
This imports the module at rules/shared/dates.cxl and binds it to the alias d.
Import syntax
use module.path
use module.path as alias
The as alias clause is optional. When omitted, the last segment of the path becomes the default name.
use shared.dates # access as dates::fiscal_year(...)
use shared.dates as d # access as d::fiscal_year(...)
Path resolution
Module paths are resolved relative to the rules path:
| Import | File path |
|---|---|
use shared.dates | rules/shared/dates.cxl |
use transforms.normalize | rules/transforms/normalize.cxl |
use utils | rules/utils.cxl |
The rules path defaults to ./rules/ and can be overridden with --rules-path.
Using imported functions and constants
After importing, reference module members with :: (double colon) syntax:
use shared.dates as d
use shared.finance as f
emit fiscal_year = d::fiscal_year(invoice_date)
emit quarter = d::quarter(invoice_date)
emit tax = amount * f::tax_rate
emit net = amount - tax
Functions
Call functions with alias::function_name(args):
use shared.dates as d
emit fy = d::fiscal_year(order_date)
Constants
Access constants with alias::constant_name:
use shared.finance as f
emit tax = amount * f::tax_rate
Restrictions
- No wildcard imports.
use shared.*is not supported. Import modules explicitly. - Dot separator only. Module paths use
., not::. The::syntax is reserved for member access after import. - Single expression bodies. Functions must be a single expression – no multi-statement bodies.
- Pure functions. Functions cannot use
emit,filter,distinct, or other statement forms. They are pure computations. - No recursion. Functions cannot call themselves (directly or indirectly).
Complete example
File: rules/etl/clean.cxl
fn normalize_name(name) = name.trim().upper()
fn safe_amount(raw) = raw.try_float() ?? 0.0
fn flag_suspicious(amount, threshold) =
if amount > threshold then "review" else "ok"
let max_amount = 999999.99
Pipeline CXL block:
use etl.clean as c
emit customer = c::normalize_name(raw_customer)
emit amount = c::safe_amount(raw_amount)
filter amount <= c::max_amount
emit review_flag = c::flag_suspicious(amount, 10000)
The cxl CLI Tool
The cxl command-line tool validates, evaluates, and formats CXL source files. It is the standalone companion to the Clinker pipeline engine, useful for testing expressions, validating transforms, and debugging CXL logic.
Commands
cxl check
Parse, resolve, and type-check a .cxl file. Reports errors with source locations and fix suggestions.
$ cxl check transform.cxl
ok: transform.cxl is valid
On errors:
error[parse]: expected expression, found '}' (at transform.cxl:12)
help: check for missing operand or extra closing brace
error[resolve]: unknown field 'amoutn' (at transform.cxl:5)
help: did you mean 'amount'?
error[typecheck]: cannot apply '+' to String and Int (at transform.cxl:8)
help: convert one operand — use .to_int() or .to_string()
cxl eval
Evaluate CXL expressions against provided data and print the result as JSON.
Inline expression:
$ cxl eval -e 'emit result = 1 + 2'
{
"result": 3
}
From a file with field values:
$ cxl eval transform.cxl \
--field Price=10.5 \
--field Qty=3
From a file with JSON input:
$ cxl eval transform.cxl --record '{"price": 10.5, "qty": 3}'
Multiple inline expressions:
$ cxl eval -e 'let tax = 0.21' -e 'emit net = price * (1 - tax)' \
--field price=100
{
"net": 79.0
}
cxl fmt
Parse and pretty-print a .cxl file in canonical format with normalized whitespace and consistent styling.
$ cxl fmt transform.cxl
Output is printed to stdout. Redirect to overwrite:
$ cxl fmt transform.cxl > transform.cxl.tmp && mv transform.cxl.tmp transform.cxl
Input data
–field name=value
Provide individual field values as key-value pairs. Values are automatically type-inferred:
| Input | Inferred type | Example |
|---|---|---|
| Integer pattern | Int | --field count=42 |
| Decimal pattern | Float | --field price=10.5 |
true / false | Bool | --field active=true |
null | Null | --field value=null |
| Anything else | String | --field name=Alice |
$ cxl eval -e 'emit t = amount.type_of()' --field amount=42
{
"t": "Int"
}
$ cxl eval -e 'emit t = name.type_of()' --field name=Alice
{
"t": "String"
}
–record JSON
Provide a full JSON object as input. Mutually exclusive with --field.
$ cxl eval -e 'emit total = price * qty' \
--record '{"price": 10.5, "qty": 3}'
{
"total": 31.5
}
JSON types map directly:
| JSON type | CXL type |
|---|---|
null | Null |
true / false | Bool |
| integer number | Int |
| decimal number | Float |
"string" | String |
[array] | Array |
Output format
Output is always JSON. Each emit statement produces a key-value pair:
$ cxl eval -e 'emit a = 1' -e 'emit b = "two"' -e 'emit c = true'
{
"a": 1,
"b": "two",
"c": true
}
Date and DateTime values are serialized as ISO 8601 strings:
$ cxl eval -e 'emit d = #2024-03-15#'
{
"d": "2024-03-15"
}
Exit codes
| Code | Meaning |
|---|---|
| 0 | Success (or warnings only) |
| 1 | Parse, resolve, type-check, or evaluation errors |
| 2 | I/O error (file not found, invalid JSON, etc.) |
Pipeline context in eval mode
When running cxl eval, a minimal pipeline context is provided:
| Variable | Value |
|---|---|
$pipeline.name | "cxl-eval" |
$pipeline.execution_id | Zeroed UUID |
$pipeline.batch_id | Zeroed UUID |
$pipeline.start_time | Current wall-clock time |
$pipeline.source_file | Filename or "<inline>" |
$pipeline.source_row | 1 |
now | Current wall-clock time (live) |
Practical usage
Quick expression testing:
$ cxl eval -e 'emit result = "hello world".upper().split(" ").length()'
{
"result": 2
}
Validate a transform file:
$ cxl check transforms/enrich_orders.cxl && echo "Valid"
Test conditional logic:
$ cxl eval -e 'emit tier = match {
amount > 1000 => "high",
amount > 100 => "med",
_ => "low"
}' \
--field amount=500
{
"tier": "med"
}
Test date operations:
$ cxl eval -e 'emit year = d.year()' -e 'emit month = d.month()' \
-e 'emit next_week = d.add_days(7)' \
--record '{"d": "2024-03-15"}'
Test null handling:
$ cxl eval -e 'emit safe = raw.try_int() ?? 0' --field raw=abc
{
"safe": 0
}
CLI Reference
Clinker ships two command-line tools: clinker (the pipeline runner) and cxl (the expression REPL, covered in the CXL CLI chapter). This page is the complete reference for clinker.
clinker run
Execute a pipeline.
clinker run [OPTIONS] <CONFIG>
Positional arguments
| Argument | Description |
|---|---|
<CONFIG> | Path to the pipeline YAML configuration file (required) |
Options
| Flag | Default | Description |
|---|---|---|
--memory-limit <SIZE> | 256M | Memory budget for the execution. Accepts K, M, G suffixes. When the limit is approached, aggregation operators spill to disk rather than crashing. CLI value overrides any memory_limit set in the YAML. |
--threads <N> | number of CPUs | Size of the thread pool used for parallel node execution. |
--error-threshold <N> | 0 (unlimited) | Maximum number of records routed to the dead-letter queue before the pipeline aborts. 0 means no limit – the pipeline will run to completion regardless of DLQ volume. |
--batch-id <ID> | UUID v7 | Custom execution identifier. Appears in metrics output and log lines. Use a meaningful value (e.g. daily-2026-04-11) for correlation across retries. |
--explain [FORMAT] | text | Print the execution plan and exit without processing data. Accepted formats: text, json, dot. See Explain Plans. |
--dry-run | – | Validate the configuration (YAML structure, CXL syntax, type checking, DAG wiring) without reading any data. |
-n, --dry-run-n <N> | – | Process only the first N records through the full pipeline. Implies --dry-run. |
--dry-run-output <FILE> | stdout | Redirect dry-run output to a file instead of stdout. Only meaningful with -n. |
--rules-path <DIR> | ./rules/ | Search path for CXL module files referenced by use statements. |
--base-dir <DIR> | – | Base directory for resolving relative paths in the YAML config. Defaults to the directory containing the config file. |
--allow-absolute-paths | – | Permit absolute file paths in the pipeline YAML. By default, absolute paths are rejected to encourage portable configs. |
--env <NAME> | – | Set the active environment. Equivalent to setting CLINKER_ENV. Used by when: conditions in channel overrides. |
--quiet | – | Suppress progress output. Errors are still printed to stderr. |
--force | – | Allow output files to be overwritten if they already exist. Without this flag, the pipeline aborts rather than clobbering existing output. |
--log-level <LEVEL> | info | Logging verbosity. One of: error, warn, info, debug, trace. |
--metrics-spool-dir <DIR> | – | Directory for per-execution metrics files. See Metrics & Monitoring. |
Examples
# Basic execution
clinker run pipeline.yaml
# Production run with memory budget and forced overwrite
clinker run pipeline.yaml --memory-limit 512M --force --log-level warn
# Validate without processing
clinker run pipeline.yaml --dry-run
# Preview first 10 records
clinker run pipeline.yaml --dry-run -n 10
# Show execution plan as Graphviz
clinker run pipeline.yaml --explain dot | dot -Tpng -o plan.png
# Run with a custom batch ID for tracing
clinker run pipeline.yaml --batch-id "daily-2026-04-11" --metrics-spool-dir ./metrics/
clinker metrics collect
Sweep per-execution metrics files from a spool directory into a single NDJSON archive.
clinker metrics collect [OPTIONS]
Options
| Flag | Description |
|---|---|
--spool-dir <DIR> | Spool directory to sweep (required). |
--output-file <FILE> | NDJSON archive destination (required). If the file exists, new entries are appended. |
--delete-after-collect | Remove spool files after they have been successfully written to the archive. |
--dry-run | Preview which files would be collected without writing anything. |
Examples
# Collect and archive, then clean up spool
clinker metrics collect \
--spool-dir /var/spool/clinker/ \
--output-file /var/log/clinker/metrics.ndjson \
--delete-after-collect
# Preview what would be collected
clinker metrics collect \
--spool-dir ./metrics/ \
--output-file ./archive.ndjson \
--dry-run
Environment Variables
| Variable | Description |
|---|---|
CLINKER_ENV | Active environment name. Equivalent to --env. Used by when: conditions in channel overrides to select environment-specific configuration. |
CLINKER_METRICS_SPOOL_DIR | Default metrics spool directory. Overridden by --metrics-spool-dir. |
Precedence (highest to lowest): CLI flag, environment variable, YAML config value.
Validation & Dry Run
Clinker provides two levels of pre-flight validation so you can catch problems before committing to a full run.
Config-only validation
clinker run pipeline.yaml --dry-run
This validates everything that can be checked without reading data:
- YAML structure and required fields
- CXL syntax and compile-time type checking
- Schema compatibility between connected nodes
- DAG wiring (no cycles, no dangling inputs, no missing nodes)
- File path resolution (existence checks for inputs)
No records are read. No output files are created. The command exits with code 0 on success or code 1 with a diagnostic message on failure.
Use this after every YAML edit. It runs in milliseconds and catches the majority of configuration mistakes.
Record preview
clinker run pipeline.yaml --dry-run -n 10
This reads the first 10 records from each source and processes them through the full pipeline – transforms, aggregations, routing, and output formatting. Results are printed to stdout.
The record preview exercises the runtime evaluation path, catching issues that config-only validation cannot:
- CXL expressions that are syntactically valid but fail at runtime (e.g., calling a string method on an integer)
- Data format mismatches between the declared schema and actual file contents
- Unexpected null values in required fields
Save preview to file
clinker run pipeline.yaml --dry-run -n 100 --dry-run-output preview.csv
The output format matches what the pipeline’s output node would produce, so preview.csv shows you exactly what the full run will write.
Recommended workflow
Use both validation levels in sequence before every production run:
--dry-run– catch configuration and type errors instantly.--dry-run -n 10– verify output shape and values against real data.- Full run – execute with confidence.
This three-step pattern is especially valuable when:
- Editing CXL expressions in transform or aggregate nodes
- Changing source schemas or swapping input files
- Adding or removing nodes from the pipeline DAG
- Modifying route conditions
Combining with explain
You can also inspect the execution plan before running:
clinker run pipeline.yaml --explain
This shows the DAG structure, parallelism strategy, and node ordering without reading any data. See Explain Plans for details.
The typical full pre-flight sequence is:
clinker run pipeline.yaml --explain # inspect the DAG
clinker run pipeline.yaml --dry-run # validate config
clinker run pipeline.yaml --dry-run -n 10 # preview with data
clinker run pipeline.yaml --force # run for real
Explain Plans
The --explain flag prints the execution plan – the DAG of nodes, their connections, and the parallelism strategy the optimizer has chosen – without reading any data.
Text format
clinker run pipeline.yaml --explain
# or explicitly:
clinker run pipeline.yaml --explain text
The text format shows a human-readable summary of the execution plan:
Execution Plan: customer_etl
============================
Node 0: customers (Source, parallel: file-chunked)
-> transform_1
Node 1: transform_1 (Transform, parallel: record)
-> route_1
Node 2: route_1 (Route, parallel: record)
-> [high] output_high
-> [default] output_standard
Node 3: output_high (Output, parallel: serial)
Node 4: output_standard (Output, parallel: serial)
Key information shown:
- Node index and name – the topological position in the DAG
- Node type – Source, Transform, Aggregate, Route, Merge, Output, Composition
- Parallelism strategy – how the optimizer plans to execute the node
- Connections – downstream nodes, with port labels for route branches
JSON format
clinker run pipeline.yaml --explain json
Produces a machine-readable JSON object for programmatic consumption. Useful for:
- CI pipelines that need to assert plan properties
- Custom dashboards that visualize execution plans
- Diffing plans between config versions
# Compare plans before and after a config change
clinker run old.yaml --explain json > plan_old.json
clinker run new.yaml --explain json > plan_new.json
diff plan_old.json plan_new.json
Graphviz DOT format
clinker run pipeline.yaml --explain dot
Produces a Graphviz DOT graph. Pipe it to dot to render an image:
# PNG
clinker run pipeline.yaml --explain dot | dot -Tpng -o pipeline.png
# SVG (scalable, good for documentation)
clinker run pipeline.yaml --explain dot | dot -Tsvg -o pipeline.svg
# PDF
clinker run pipeline.yaml --explain dot | dot -Tpdf -o pipeline.pdf
This requires the graphviz package to be installed on the system.
The resulting diagram shows:
- Nodes as labeled boxes with type and parallelism annotations
- Edges as arrows with port labels where applicable
- Branch/merge fan-out and fan-in structure
When to use explain
- During development – verify the DAG shape matches your mental model before writing test data.
- After adding route or merge nodes – confirm branch wiring is correct.
- When tuning parallelism – check which strategy the optimizer selected for each node.
- In code review – generate a DOT diagram and include it in the PR for visual confirmation.
Explain runs instantly because it only parses the YAML and builds the plan – no data is touched. Pair it with --dry-run for full config validation:
clinker run pipeline.yaml --explain # inspect plan
clinker run pipeline.yaml --dry-run # validate config
Memory Tuning
Clinker is designed to be a good neighbor on shared servers. Rather than consuming all available memory, it works within a configurable budget and spills to disk when necessary.
Setting the memory limit
CLI flag (highest priority):
clinker run pipeline.yaml --memory-limit 512M
YAML config:
pipeline:
memory_limit: "512M"
The CLI flag overrides the YAML value. Accepted suffixes: K (kilobytes), M (megabytes), G (gigabytes).
Default: 256M
How it works
Clinker uses a custom accounting allocator to track memory usage across all pipeline nodes. When memory pressure rises toward the configured limit, aggregation operations spill intermediate state to temporary files on disk. The pipeline continues with degraded throughput rather than crashing with an out-of-memory error.
This means:
- Pipelines always complete if disk space is available, regardless of input size.
- Performance degrades gracefully under memory pressure – you will see slower execution, not failures.
- The memory limit is a soft ceiling, not a hard wall. Momentary spikes may briefly exceed the limit before spill kicks in.
Sizing guidelines
| Workload | Recommended limit | Notes |
|---|---|---|
| Small files (<10 MB) | 128M | Minimal memory pressure |
| Medium files (10-50 MB) | 256M (default) | Covers most ETL jobs |
| Large files or complex aggregations | 512M-1G | Multiple group-by keys, large cardinality |
| Multiple large group-by keys | 1G+ | High-cardinality distinct values |
Target workload: Clinker is optimized for 1-5 input files of up to 100 MB each, processing 10K-2M records per run.
Aggregation strategy interaction
Memory consumption depends heavily on the aggregation strategy the optimizer selects:
-
Hash aggregation accumulates state in a hash map. Memory usage is proportional to the number of distinct group-by values. With high-cardinality keys, this can consume significant memory before spill triggers.
-
Streaming aggregation processes groups in order and emits results as each group completes. Memory usage is minimal (proportional to a single group’s state) but requires the input to be sorted by the group-by keys.
-
strategy: auto(the default) lets the optimizer choose based on the declared sort order of the input. If the data arrives sorted by the group-by keys, streaming aggregation is selected automatically.
To influence strategy selection:
- type: aggregate
name: rollup
input: sorted_data
config:
group_by: [department]
strategy: streaming # force streaming (input MUST be sorted)
cxl: |
emit total = sum(amount)
Only force streaming when you are certain the input is sorted by the group-by keys. If the data is not sorted, results will be incorrect. Use auto when in doubt.
Monitoring memory usage
Use the metrics system to track peak_rss_bytes across runs:
clinker run pipeline.yaml --metrics-spool-dir ./metrics/
The metrics file includes peak_rss_bytes, which shows the maximum resident memory during execution. If this consistently approaches your memory limit, consider increasing the budget or restructuring the pipeline to reduce intermediate state.
Shared server considerations
On servers running JVM applications, memory is often at a premium. Recommendations:
- Set
--memory-limitexplicitly rather than relying on the default. Know your budget. - Use
--threadsto limit CPU contention alongside memory limits. - Monitor
peak_rss_bytesin production metrics to right-size the limit over time. - Schedule large pipelines during off-peak hours when JVM heap pressure is lower.
Metrics & Monitoring
Clinker writes per-execution metrics as JSON files to a spool directory. These files can be collected into an NDJSON archive for ingestion into monitoring systems.
Enabling metrics
There are three ways to enable metrics collection, listed from highest to lowest priority:
CLI flag:
clinker run pipeline.yaml --metrics-spool-dir ./metrics/
Environment variable:
export CLINKER_METRICS_SPOOL_DIR=./metrics/
clinker run pipeline.yaml
YAML config:
pipeline:
metrics:
spool_dir: "./metrics/"
When metrics are enabled, each execution writes one JSON file to the spool directory, named <execution_id>.json.
Metrics schema
Each metrics file follows schema version 1:
{
"execution_id": "01912345-6789-7abc-def0-123456789abc",
"schema_version": 1,
"pipeline_name": "customer_etl",
"config_path": "/opt/clinker/pipelines/daily_etl.yaml",
"hostname": "prod-etl-01",
"started_at": "2026-04-11T10:00:00Z",
"finished_at": "2026-04-11T10:00:05Z",
"duration_ms": 5000,
"exit_code": 0,
"records_total": 50000,
"records_ok": 49950,
"records_dlq": 50,
"execution_mode": "streaming",
"peak_rss_bytes": 134217728,
"thread_count": 4,
"input_files": ["./data/customers.csv"],
"output_files": ["./output/enriched.csv"],
"dlq_path": "./output/errors.csv",
"error": null
}
Field reference
| Field | Type | Description |
|---|---|---|
execution_id | string | UUID v7 or custom --batch-id value |
schema_version | integer | Always 1 for this release |
pipeline_name | string | The name from the pipeline YAML |
config_path | string | Absolute path to the config file |
hostname | string | Machine hostname |
started_at | string | ISO 8601 UTC timestamp |
finished_at | string | ISO 8601 UTC timestamp |
duration_ms | integer | Wall-clock duration in milliseconds |
exit_code | integer | Process exit code (see Exit Codes) |
records_total | integer | Total records read from all sources |
records_ok | integer | Records that reached an output node |
records_dlq | integer | Records routed to the dead-letter queue |
execution_mode | string | streaming or batch |
peak_rss_bytes | integer | Maximum resident set size during execution |
thread_count | integer | Thread pool size used |
input_files | array | Paths to all source files |
output_files | array | Paths to all output files written |
dlq_path | string/null | Path to the DLQ file, or null if none |
error | string/null | Error message on failure, or null on success |
Collecting metrics
The spool directory accumulates one file per execution. Use clinker metrics collect to sweep them into an NDJSON archive:
clinker metrics collect \
--spool-dir ./metrics/ \
--output-file ./metrics/archive.ndjson \
--delete-after-collect
This appends all spool files to the archive (one JSON object per line) and removes the originals. The NDJSON format is compatible with most log aggregation and monitoring tools.
Preview without writing:
clinker metrics collect \
--spool-dir ./metrics/ \
--output-file ./metrics/archive.ndjson \
--dry-run
Integration with monitoring systems
Grafana / Prometheus
Parse the NDJSON archive with a log shipper (Promtail, Filebeat, Vector) and create dashboards tracking:
duration_ms– execution time trendsrecords_dlq– data quality over timepeak_rss_bytes– memory utilization
Datadog
Ship NDJSON to Datadog Logs, then create metrics from log attributes:
# Example: tail the archive and ship to Datadog
tail -f ./metrics/archive.ndjson | datadog-agent log-stream
ELK Stack
Filebeat can ingest NDJSON directly:
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/clinker/metrics.ndjson
json.keys_under_root: true
Simple alerting with jq
For environments without a full monitoring stack, use jq to query the archive directly:
# Find all runs with DLQ entries in the last 24 hours
jq 'select(.records_dlq > 0)' metrics/archive.ndjson
# Find runs that exceeded 400MB RSS
jq 'select(.peak_rss_bytes > 419430400)' metrics/archive.ndjson
# Average duration by pipeline
jq -s 'group_by(.pipeline_name) | map({
pipeline: .[0].pipeline_name,
avg_ms: (map(.duration_ms) | add / length)
})' metrics/archive.ndjson
Operational recommendations
- Always enable metrics in production. The overhead is negligible (one small JSON write at the end of each run).
- Run
metrics collect --delete-after-collecton a schedule (e.g., hourly) to prevent spool directory growth. - Use
--batch-idwith meaningful identifiers to correlate metrics across retries and environments. - Alert on
records_dlq > 0to catch data quality regressions early. - Track
peak_rss_bytestrends to anticipate when memory limits need adjustment.
Exit Codes & Error Diagnosis
Clinker uses structured exit codes to communicate the outcome of a pipeline run. These codes are designed for integration with schedulers, cron, CI systems, and monitoring tools.
Exit code reference
| Code | Meaning | Description |
|---|---|---|
| 0 | Success | Pipeline completed. All records processed successfully. |
| 1 | Configuration error | Invalid YAML, CXL syntax error, type mismatch, or DAG wiring problem. Fix the pipeline configuration. |
| 2 | Partial success | Pipeline ran to completion, but some records were routed to the dead-letter queue. Check the DLQ file. |
| 3 | Evaluation error | CXL runtime error during record processing (e.g., division by zero, type coercion failure). |
| 4 | I/O error | File not found, permission denied, disk full, or input format mismatch. |
Understanding exit code 2
Exit code 2 is not a crash. It means:
- The pipeline started and ran to completion.
- All viable records were processed and written to output files.
- Some records could not be processed and were diverted to the dead-letter queue.
Your scheduler should treat exit code 2 as a warning, not a failure. The DLQ file contains the problematic records along with the error that caused each one to be rejected.
To control when exit code 2 escalates to a hard failure, use --error-threshold:
# Abort if more than 100 records hit the DLQ
clinker run pipeline.yaml --error-threshold 100
With a threshold set, the pipeline aborts (exit code 3) when the DLQ count exceeds the threshold, rather than continuing to completion.
Diagnosing failures
Exit code 1: Configuration error
The error message includes a span-annotated diagnostic pointing to the exact location of the problem:
Error: CXL type error in node 'transform_1'
--> pipeline.yaml:25:15
|
25 | emit total = amount + name
| ^^^^^^^^^^^^^ cannot add Int and String
Action: Fix the YAML or CXL expression indicated in the diagnostic, then re-run with --dry-run to confirm the fix.
Exit code 2: Partial success (DLQ entries)
Check the DLQ file for details:
# The DLQ path is shown in the run output and in metrics
cat output/errors.csv
Common causes:
- Null values in fields that a CXL expression does not handle
- Data that does not match the declared schema (e.g., non-numeric value in an integer column)
- Coercion failures between types
Action: Review the DLQ records, fix the data or add null handling to CXL expressions, and re-run.
Exit code 3: Evaluation error
A CXL expression failed at runtime. The error message includes the failing expression and the record that triggered it:
Error: division by zero in node 'compute_ratio'
expression: emit ratio = total / count
record: {total: 500, count: 0}
Action: Add guard conditions to the CXL expression:
emit ratio = if count == 0 then 0 else total / count
Exit code 4: I/O error
File system or format errors:
Error: file not found: ./data/customers.csv
--> pipeline.yaml:8:12
Common causes:
- Input file does not exist or path is wrong
- Permission denied on input or output directories
- Output file already exists (use
--forceto overwrite) - Disk full during output writing
- Input file format does not match the declared type (e.g., invalid CSV)
Action: Fix file paths, permissions, or disk space, then re-run.
Scheduler integration
Cron script
#!/bin/bash
set -euo pipefail
PIPELINE=/opt/clinker/pipelines/daily_etl.yaml
METRICS_DIR=/var/spool/clinker/
clinker run "$PIPELINE" \
--memory-limit 512M \
--log-level warn \
--metrics-spool-dir "$METRICS_DIR" \
--force
EXIT=$?
case $EXIT in
0)
echo "$(date): Success" >> /var/log/clinker/daily_etl.log
;;
2)
echo "$(date): Warning - DLQ entries produced" >> /var/log/clinker/daily_etl.log
mail -s "Clinker ETL Warning: DLQ entries" ops@company.com < /dev/null
;;
*)
echo "$(date): FAILURE (exit code $EXIT)" >> /var/log/clinker/daily_etl.log
mail -s "Clinker ETL FAILURE (exit $EXIT)" ops@company.com < /dev/null
;;
esac
exit $EXIT
CI pipeline (GitHub Actions)
- name: Run ETL pipeline
run: clinker run pipeline.yaml --dry-run
# Exit code 1 fails the build on config errors
- name: Smoke test with real data
run: clinker run pipeline.yaml --dry-run -n 100
# Catches runtime evaluation errors
Systemd
Systemd Type=oneshot services interpret non-zero exit codes as failures. To allow exit code 2 (partial success) without triggering service failure:
[Service]
Type=oneshot
SuccessExitStatus=2
ExecStart=/opt/clinker/bin/clinker run /opt/clinker/pipelines/daily_etl.yaml --force
Production Deployment
Clinker is a single statically-linked binary with no runtime dependencies. Deployment is straightforward: copy the binary to the server.
Installation
# Copy the binary
scp target/release/clinker user@server:/opt/clinker/bin/
# Verify it runs
ssh user@server /opt/clinker/bin/clinker --version
No JVM, no Python, no container runtime required.
Recommended directory structure
/opt/clinker/
bin/
clinker # The binary
pipelines/
daily_etl.yaml # Pipeline configs
weekly_report.yaml
data/ # Input data (or symlinks to data locations)
output/ # Output files
rules/ # CXL module files (for use statements)
metrics/ # Metrics spool directory
Create a dedicated user:
sudo useradd --system --home-dir /opt/clinker --shell /usr/sbin/nologin clinker
sudo chown -R clinker:clinker /opt/clinker
Systemd service
For scheduled one-shot execution:
[Unit]
Description=Clinker ETL - Daily Customer Processing
After=network.target
[Service]
Type=oneshot
ExecStart=/opt/clinker/bin/clinker run /opt/clinker/pipelines/daily_etl.yaml \
--memory-limit 512M \
--log-level warn \
--metrics-spool-dir /var/spool/clinker/ \
--force
WorkingDirectory=/opt/clinker
User=clinker
Group=clinker
SuccessExitStatus=2
# Resource limits
MemoryMax=1G
CPUQuota=200%
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=clinker-daily
[Install]
WantedBy=multi-user.target
Pair with a systemd timer for scheduling:
[Unit]
Description=Run Clinker daily ETL at 2 AM
[Timer]
OnCalendar=*-*-* 02:00:00
Persistent=true
[Install]
WantedBy=timers.target
sudo systemctl enable --now clinker-daily.timer
Note: SuccessExitStatus=2 tells systemd that exit code 2 (partial success with DLQ entries) is not a service failure. See Exit Codes for the full reference.
Cron scheduling
# Run daily at 2 AM, log to syslog
0 2 * * * /opt/clinker/bin/clinker run \
/opt/clinker/pipelines/daily_etl.yaml \
--log-level warn --force \
2>&1 | logger -t clinker
# Collect metrics hourly
0 * * * * /opt/clinker/bin/clinker metrics collect \
--spool-dir /var/spool/clinker/ \
--output-file /var/log/clinker/metrics.ndjson \
--delete-after-collect
Environment-based configuration
Use the CLINKER_ENV variable or --env flag to activate environment-specific overrides:
# Production
CLINKER_ENV=production clinker run pipeline.yaml
# Staging
CLINKER_ENV=staging clinker run pipeline.yaml
Combined with channel overrides in the pipeline YAML, this allows a single pipeline definition to target different file paths, connection strings, or thresholds per environment.
Logging
Log levels for production
| Level | Use case |
|---|---|
warn | Recommended for production cron jobs. Prints warnings and errors only. |
info | Default. Includes progress messages. Useful during initial deployment. |
error | Minimal output. Only prints when something fails. |
debug | Troubleshooting. Generates significant output. |
trace | Development only. Extremely verbose. |
Directing logs
To syslog via logger:
clinker run pipeline.yaml --log-level warn 2>&1 | logger -t clinker
To a log file:
clinker run pipeline.yaml --log-level warn 2>> /var/log/clinker/etl.log
Systemd journal captures stdout and stderr automatically when running as a service.
DLQ monitoring
When a pipeline exits with code 2, records that could not be processed are written to the dead-letter queue file. Set up a daily check:
#!/bin/bash
# Check for DLQ files produced today
DLQ_DIR=/opt/clinker/output/
DLQ_FILES=$(find "$DLQ_DIR" -name "*_errors.csv" -mtime 0 -size +0c)
if [ -n "$DLQ_FILES" ]; then
echo "DLQ entries found:" | mail -s "Clinker DLQ Alert" ops@company.com <<EOF
The following DLQ files were produced today:
$DLQ_FILES
Review the files and address data quality issues.
EOF
fi
Batch ID for tracing
Use --batch-id with a meaningful, consistent naming scheme:
# Date-based
clinker run pipeline.yaml --batch-id "daily-$(date +%Y-%m-%d)"
# Include environment
clinker run pipeline.yaml --batch-id "prod-daily-$(date +%Y-%m-%d)"
The batch ID appears in metrics output and log lines, making it easy to correlate a specific run across logs, metrics, and DLQ files. On retries, use a different batch ID (e.g., append -retry-1) to distinguish attempts.
Upgrades
To upgrade Clinker:
- Validate the new version against your pipelines:
/opt/clinker/bin/clinker-new run pipeline.yaml --dry-run - Replace the binary:
cp clinker-new /opt/clinker/bin/clinker - Verify:
/opt/clinker/bin/clinker --version
There is no configuration migration. Pipeline YAML files are forward-compatible within the same major version.
CSV-to-CSV Transform
This recipe reads employee data from a CSV file, computes salary tiers using CXL expressions, and writes the enriched result to a new CSV file.
Input data
employees.csv:
id,name,department,salary
1,Alice Chen,Engineering,95000
2,Bob Martinez,Marketing,62000
3,Carol Johnson,Engineering,88000
4,Dave Williams,Sales,71000
5,Eva Brown,Marketing,58000
6,Frank Lee,Engineering,102000
Pipeline
salary_tiers.yaml:
pipeline:
name: salary_tiers
nodes:
- type: source
name: employees
config:
name: employees
type: csv
path: "./employees.csv"
schema:
- { name: id, type: int }
- { name: name, type: string }
- { name: department, type: string }
- { name: salary, type: int }
- type: transform
name: classify
input: employees
config:
cxl: |
emit id = id
emit name = name
emit department = department
emit salary = salary
emit level = if salary >= 90000 then "senior" else "junior"
emit salary_band = match {
salary >= 100000 => "100k+",
salary >= 90000 => "90-100k",
salary >= 70000 => "70-90k",
_ => "under 70k"
}
- type: output
name: report
input: classify
config:
name: salary_report
type: csv
path: "./output/salary_report.csv"
error_handling:
strategy: fail_fast
Run it
# Validate first
clinker run salary_tiers.yaml --dry-run
# Preview output
clinker run salary_tiers.yaml --dry-run -n 3
# Full run
clinker run salary_tiers.yaml
Expected output
output/salary_report.csv:
id,name,department,salary,level,salary_band
1,Alice Chen,Engineering,95000,senior,90-100k
2,Bob Martinez,Marketing,62000,junior,under 70k
3,Carol Johnson,Engineering,88000,junior,70-90k
4,Dave Williams,Sales,71000,junior,70-90k
5,Eva Brown,Marketing,58000,junior,under 70k
6,Frank Lee,Engineering,102000,senior,100k+
Key points
Schema declaration. The source node declares the schema explicitly with typed columns. This enables compile-time type checking of CXL expressions – if you write salary + name, the type checker catches the error before any data is read.
Emit statements. Each emit in the transform produces one output column. The output schema is defined entirely by the emit statements – input columns that are not emitted are dropped. This is intentional: explicit output schemas prevent accidental data leakage.
Match expressions. The match block evaluates conditions top to bottom and returns the value of the first matching arm. The _ wildcard is the default case and must appear last.
Error handling. The fail_fast strategy aborts the pipeline on the first record error. For production pipelines processing dirty data, consider dead_letter_queue instead – see Error Handling & DLQ.
Variations
Filtering records
Add a filter statement to exclude records:
- type: transform
name: classify
input: employees
config:
cxl: |
filter salary >= 60000
emit id = id
emit name = name
emit salary = salary
Records where salary < 60000 are dropped silently – they do not appear in the output or the DLQ.
Computed columns with type conversion
cxl: |
emit id = id
emit name = name
emit monthly_salary = (salary.to_float() / 12.0).round(2)
emit salary_display = "$" + salary.to_string()
The .to_float() conversion is required because salary is declared as int and division by a float literal requires matching types.
Multi-Source Lookup
This recipe enriches order records with product information from a separate catalog file using the lookup: transform configuration.
Input data
orders.csv:
order_id,product_id,quantity,unit_price
ORD-001,PROD-A,5,29.99
ORD-002,PROD-B,2,149.99
ORD-003,PROD-A,1,29.99
ORD-004,PROD-C,10,9.99
ORD-005,PROD-B,3,149.99
products.csv:
product_id,product_name,category
PROD-A,Widget Pro,Hardware
PROD-B,DataSync License,Software
PROD-C,Cable Kit,Hardware
Pipeline
order_enrichment.yaml:
pipeline:
name: order_enrichment
nodes:
- type: source
name: orders
config:
name: orders
type: csv
path: "./orders.csv"
schema:
- { name: order_id, type: string }
- { name: product_id, type: string }
- { name: quantity, type: int }
- { name: unit_price, type: float }
- type: source
name: products
config:
name: products
type: csv
path: "./products.csv"
schema:
- { name: product_id, type: string }
- { name: product_name, type: string }
- { name: category, type: string }
- type: transform
name: enrich
input: orders
config:
lookup:
source: products
where: "product_id == products.product_id"
cxl: |
emit order_id = order_id
emit product_id = product_id
emit product_name = products.product_name
emit category = products.category
emit quantity = quantity
emit unit_price = unit_price
emit line_total = quantity.to_float() * unit_price
- type: output
name: result
input: enrich
config:
name: enriched_orders
type: csv
path: "./output/enriched_orders.csv"
Run it
clinker run order_enrichment.yaml --dry-run
clinker run order_enrichment.yaml --dry-run -n 3
clinker run order_enrichment.yaml
Expected output
output/enriched_orders.csv:
order_id,product_id,product_name,category,quantity,unit_price,line_total
ORD-001,PROD-A,Widget Pro,Hardware,5,29.99,149.95
ORD-002,PROD-B,DataSync License,Software,2,149.99,299.98
ORD-003,PROD-A,Widget Pro,Hardware,1,29.99,29.99
ORD-004,PROD-C,Cable Kit,Hardware,10,9.99,99.90
ORD-005,PROD-B,DataSync License,Software,3,149.99,449.97
How lookup works
The lookup: configuration on a transform node loads a secondary source into memory and matches each input record against it:
source– the name of another source node in the pipeline (here,products).where– a CXL boolean expression evaluated per candidate lookup row. Bare field names reference the primary input; qualified names (products.field) reference the lookup source.on_miss(optional) – what to do when no lookup row matches:null_fields(default),skip, orerror.match(optional) –first(default, 1:1) orall(1:N fan-out).
Lookup fields are accessed using qualified syntax: products.product_name, products.category, etc.
Unmatched records
By default (on_miss: null_fields), unmatched records emit with all lookup fields set to null. Other options:
lookup:
source: products
where: "product_id == products.product_id"
on_miss: skip # drop records with no match
lookup:
source: products
where: "product_id == products.product_id"
on_miss: error # fail the pipeline on first unmatched record
Range predicates
Lookups support any CXL boolean expression, not just equality. This classifies employees into pay bands:
- type: transform
name: classify
input: employees
config:
lookup:
source: rate_bands
where: |
ee_group == rate_bands.ee_group
and pay >= rate_bands.min_pay
and pay <= rate_bands.max_pay
cxl: |
emit employee_id = employee_id
emit rate_class = rate_bands.rate_class
Fan-out (match: all)
When match: all is set, each matching lookup row produces a separate output record. This is useful for one-to-many enrichments:
- type: transform
name: expand_benefits
input: employees
config:
lookup:
source: benefits
where: "department == benefits.department"
match: all
cxl: |
emit employee_id = employee_id
emit benefit = benefits.benefit_name
An employee in a department with 3 benefits produces 3 output records.
Multiple lookup sources
Each transform can reference one lookup source. Chain transforms to enrich from multiple sources:
- type: transform
name: enrich_product
input: orders
config:
lookup:
source: products
where: "product_id == products.product_id"
cxl: |
emit order_id = order_id
emit product_name = products.product_name
emit customer_id = customer_id
emit amount = amount
- type: transform
name: enrich_customer
input: enrich_product
config:
lookup:
source: customers
where: "customer_id == customers.customer_id"
cxl: |
emit order_id = order_id
emit product_name = product_name
emit customer_name = customers.customer_name
emit amount = amount
Memory considerations
The entire lookup source is loaded into memory. For large reference files, ensure --memory-limit accounts for this. A 10 MB CSV lookup table uses roughly 10-20 MB of heap when loaded.
Routing to Multiple Outputs
This recipe splits a stream of order records into separate output files based on business rules. High-value orders go to one file, standard orders to another.
Input data
orders.csv:
order_id,customer,amount,region
ORD-001,Acme Corp,15000,US
ORD-002,Globex,450,EU
ORD-003,Initech,8500,US
ORD-004,Umbrella,22000,APAC
ORD-005,Stark Ind,950,US
ORD-006,Wayne Ent,3200,EU
Pipeline
order_routing.yaml:
pipeline:
name: order_routing
vars:
high_value_threshold: 5000
nodes:
- type: source
name: orders
config:
name: orders
type: csv
path: "./orders.csv"
schema:
- { name: order_id, type: string }
- { name: customer, type: string }
- { name: amount, type: float }
- { name: region, type: string }
- type: route
name: split_by_value
input: orders
config:
mode: exclusive
conditions:
high: "amount >= $vars.high_value_threshold"
default: standard
- type: output
name: high_value_output
input: split_by_value.high
config:
name: high_value_orders
type: csv
path: "./output/high_value.csv"
- type: output
name: standard_output
input: split_by_value.standard
config:
name: standard_orders
type: csv
path: "./output/standard.csv"
Run it
clinker run order_routing.yaml --dry-run
clinker run order_routing.yaml
Expected output
output/high_value.csv:
order_id,customer,amount,region
ORD-001,Acme Corp,15000,US
ORD-003,Initech,8500,US
ORD-004,Umbrella,22000,APAC
output/standard.csv:
order_id,customer,amount,region
ORD-002,Globex,450,EU
ORD-005,Stark Ind,950,US
ORD-006,Wayne Ent,3200,EU
How routing works
Port syntax
Route nodes produce named output ports. Downstream nodes reference these ports using dot syntax: split_by_value.high and split_by_value.standard.
The port names come from two places:
- Condition names in the
conditionsmap (here,high) - The
defaultfield (here,standard)
Exclusive mode
With mode: exclusive, each record goes to exactly one branch. Conditions are evaluated top to bottom – the first matching condition wins, and the record is sent to that port. Records that match no condition go to the default port.
Pipeline variables
The threshold is defined in pipeline.vars and referenced in the CXL expression as $vars.high_value_threshold. This makes it easy to adjust the threshold without editing the route condition, and channel overrides can change it per environment.
Variations
Multiple branches
Route nodes can have any number of named branches:
- type: route
name: split_by_region
input: orders
config:
mode: exclusive
conditions:
us: "region == \"US\""
eu: "region == \"EU\""
apac: "region == \"APAC\""
default: other
- type: output
name: us_output
input: split_by_region.us
config:
name: us_orders
type: csv
path: "./output/us_orders.csv"
- type: output
name: eu_output
input: split_by_region.eu
config:
name: eu_orders
type: csv
path: "./output/eu_orders.csv"
# ... additional outputs for apac, other
Transform before output
Insert a transform between the route and output to shape the data differently per branch:
- type: transform
name: enrich_high_value
input: split_by_value.high
config:
cxl: |
emit order_id = order_id
emit customer = customer
emit amount = amount
emit priority = "URGENT"
emit review_required = true
- type: output
name: high_value_output
input: enrich_high_value
config:
name: high_value_orders
type: csv
path: "./output/high_value.csv"
Combining routing with aggregation
Route first, then aggregate each branch independently:
- type: aggregate
name: high_value_summary
input: split_by_value.high
config:
group_by: [region]
cxl: |
emit total = sum(amount)
emit count = count(*)
This produces a per-region summary of high-value orders only.
Aggregation & Rollups
This recipe demonstrates grouping records and computing summary statistics. The pipeline filters active sales records, then rolls them up by department.
Input data
sales.csv:
id,department,amount,status,rep
1,Engineering,5000,active,Alice
2,Marketing,3000,active,Bob
3,Engineering,7000,active,Carol
4,Sales,4000,inactive,Dave
5,Marketing,2000,active,Eva
6,Engineering,9500,active,Frank
7,Sales,6000,active,Grace
8,Marketing,1500,inactive,Hank
Pipeline
dept_rollup.yaml:
pipeline:
name: dept_rollup
nodes:
- type: source
name: sales
config:
name: sales
type: csv
path: "./sales.csv"
schema:
- { name: id, type: int }
- { name: department, type: string }
- { name: amount, type: float }
- { name: status, type: string }
- { name: rep, type: string }
- type: transform
name: active_only
input: sales
config:
cxl: |
filter status == "active"
- type: aggregate
name: rollup
input: active_only
config:
group_by: [department]
cxl: |
emit total = sum(amount)
emit count = count(*)
emit average = avg(amount)
emit maximum = max(amount)
emit minimum = min(amount)
- type: output
name: report
input: rollup
config:
name: dept_totals
type: csv
path: "./output/dept_totals.csv"
Run it
clinker run dept_rollup.yaml --dry-run
clinker run dept_rollup.yaml
Expected output
output/dept_totals.csv:
department,total,count,average,maximum,minimum
Engineering,21500,3,7166.67,9500,5000
Marketing,5000,2,2500,3000,2000
Sales,6000,1,6000,6000,6000
One row per department. The inactive records (Dave’s $4000, Hank’s $1500) are excluded by the filter.
How aggregation works
Group-by keys
The group_by field lists the columns that define each group. Records with the same values for all group-by columns are aggregated together. The group-by columns appear automatically in the output – you do not need to emit them.
Aggregate functions
Available aggregate functions in CXL:
| Function | Description |
|---|---|
sum(expr) | Sum of values |
count(*) | Number of records |
avg(expr) | Arithmetic mean |
min(expr) | Minimum value |
max(expr) | Maximum value |
first(expr) | First value encountered |
last(expr) | Last value encountered |
Strategy selection
Clinker offers two aggregation strategies:
-
Hash aggregation (default): Builds an in-memory hash map keyed by the group-by columns. Works with any input order. Memory usage is proportional to the number of distinct groups.
-
Streaming aggregation: Processes records in order, emitting each group’s result as soon as the next group starts. Requires input sorted by the group-by keys. Uses minimal memory regardless of the number of groups.
The default strategy (auto) selects streaming when the optimizer can prove the input is sorted by the group-by keys, and hash otherwise. You can force a strategy:
config:
group_by: [department]
strategy: streaming # requires sorted input
See Memory Tuning for details on memory implications.
Variations
Multiple group-by keys
config:
group_by: [department, region]
cxl: |
emit total = sum(amount)
emit count = count(*)
Produces one row per unique (department, region) combination.
Pre-aggregation transform
Compute derived fields before aggregating:
- type: transform
name: prepare
input: sales
config:
cxl: |
filter status == "active"
emit department = department
emit amount = amount
emit is_large = amount >= 5000
- type: aggregate
name: rollup
input: prepare
config:
group_by: [department]
cxl: |
emit total = sum(amount)
emit large_count = sum(if is_large then 1 else 0)
emit small_count = sum(if not is_large then 1 else 0)
Aggregation followed by routing
Aggregate first, then route the summary rows:
- type: aggregate
name: rollup
input: active_only
config:
group_by: [department]
cxl: |
emit total = sum(amount)
- type: route
name: split_by_total
input: rollup
config:
mode: exclusive
conditions:
large: "total >= 10000"
default: small
This routes departments with over $10,000 in total sales to one output and the rest to another.
No group-by (grand total)
Omit group_by to aggregate all records into a single output row:
config:
cxl: |
emit grand_total = sum(amount)
emit record_count = count(*)
emit average_amount = avg(amount)
File Splitting
This recipe demonstrates splitting large output files into smaller chunks, optionally keeping related records together.
Basic record-count splitting
Split output into files of at most 5,000 records each:
pipeline:
name: monthly_report
nodes:
- type: source
name: transactions
config:
name: transactions
type: csv
path: "./data/transactions.csv"
schema:
- { name: id, type: int }
- { name: date, type: string }
- { name: department, type: string }
- { name: amount, type: float }
- { name: description, type: string }
- type: output
name: split_output
input: transactions
config:
name: monthly_report
type: csv
path: "./output/report.csv"
split:
max_records: 5000
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true
Output files
output/report_0001.csv (5000 records + header)
output/report_0002.csv (5000 records + header)
output/report_0003.csv (remaining records + header)
Naming pattern variables
| Variable | Description | Example |
|---|---|---|
{stem} | Base filename without extension | report |
{ext} | File extension | csv |
{seq:04} | Zero-padded sequence number (width 4) | 0001 |
The path field provides the template: ./output/report.csv means stem is report and ext is csv.
Header behavior
When repeat_header: true, each output file includes the CSV header row. This is the recommended setting – each file is self-contained and can be processed independently.
Grouped splitting
Keep all records with the same group key value in the same file:
split:
max_records: 5000
group_key: "department"
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true
oversize_group: warn
With group_key: "department", the splitter ensures that all records for a given department land in the same output file. A new file starts only at a group boundary (when the department value changes), even if the current file has not reached max_records yet.
Oversize group policy
If a single group contains more records than max_records, the oversize_group setting controls behavior:
| Policy | Behavior |
|---|---|
warn (default) | Log a warning and write all records for the group into one file, exceeding the limit |
error | Stop the pipeline with an error |
allow | Silently allow the oversized file |
For example, if max_records is 5,000 but the Engineering department has 7,000 records, the warn policy produces a file with 7,000 records and logs a warning.
Byte-based splitting
Split by file size instead of record count:
split:
max_bytes: 10485760 # 10 MB per file
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true
The splitter estimates the current file size and starts a new file when the limit is approached. The actual file size may slightly exceed the limit because the current record is always completed before splitting.
Combined limits
Use both max_records and max_bytes together – whichever limit is reached first triggers a new file:
split:
max_records: 10000
max_bytes: 5242880 # 5 MB
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true
This is useful when record sizes vary widely. Short records might produce a tiny file at 10,000 records, while long records might hit the byte limit well before 10,000.
Full pipeline example
A complete pipeline that reads a large transaction file, filters it, and splits the output:
pipeline:
name: split_transactions
nodes:
- type: source
name: transactions
config:
name: transactions
type: csv
path: "./data/all_transactions.csv"
schema:
- { name: id, type: int }
- { name: date, type: string }
- { name: department, type: string }
- { name: category, type: string }
- { name: amount, type: float }
- type: transform
name: current_year
input: transactions
config:
cxl: |
filter date.starts_with("2026")
- type: output
name: chunked
input: current_year
config:
name: transactions_2026
type: csv
path: "./output/transactions_2026.csv"
split:
max_records: 5000
group_key: "department"
naming: "{stem}_{seq:04}.{ext}"
repeat_header: true
oversize_group: warn
clinker run split_transactions.yaml --force
Practical considerations
-
Downstream consumers. Splitting is useful when the receiving system has file size limits (e.g., an upload API that accepts files up to 10 MB) or when parallel processing of chunks is desired.
-
Record ordering. Records within each output file maintain their original order from the pipeline. Across files, the sequence number (
{seq}) indicates the order. -
Group key sorting. For
group_keyto work correctly, the input should ideally be sorted by the group key. If the input is not sorted, records for the same group may appear in multiple files. Pre-sort with a transform if needed, or accept the split-group behavior. -
Overwrite behavior. Use
--forcewhen re-running a pipeline with splitting enabled. Without it, the pipeline aborts if any of the output chunk files already exist.