Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Clinker

Clinker is a pure-Rust, memory-bounded CLI ETL engine for streaming transformation of CSV, JSON, XML, and fixed-width data. It ships as a single static binary with no interpreter, no runtime, and no install dependencies.

Pipelines are declared in YAML. Data transformation logic is written in CXL, a custom expression language purpose-built for ETL. Together they replace legacy tools like Informatica, SSIS, Talend, and NiFi with something deterministic, lightweight, and easy to reason about.

Why Clinker?

Single binary, zero dependencies. Download it, run it. No JVM, no Python, no package manager. Works on any Linux server out of the box.

Good neighbor on busy servers. Clinker enforces a strict memory ceiling (default 256 MB) so it can run alongside JVM applications, databases, and other services without competing for RAM. Aggregation spills to disk when memory pressure rises.

Reproducible output. Given the same input and pipeline, Clinker produces byte-identical output across runs. No nondeterminism from thread scheduling, hash randomization, or floating-point reordering.

Operability-first design. Per-stage metrics, dead-letter queues for error records, explain plans for understanding execution, and structured exit codes for scripting. Built for production from day one.

Two binaries:

BinaryPurpose
clinkerRun pipelines against real data
cxlCheck, evaluate, and format CXL expressions interactively

A taste of Clinker

Here is a complete pipeline that reads a customer CSV, filters to active customers, classifies them into tiers, and writes the result:

pipeline:
  name: customer_etl

nodes:
  - type: source
    name: customers
    config:
      name: customers
      type: csv
      path: "./data/customers.csv"
      schema:
        - { name: customer_id, type: int }
        - { name: first_name, type: string }
        - { name: last_name, type: string }
        - { name: status, type: string }
        - { name: lifetime_value, type: float }

  - type: transform
    name: enrich
    input: customers
    config:
      cxl: |
        filter status == "active"
        emit customer_id = customer_id
        emit full_name = first_name + " " + last_name
        emit tier = if lifetime_value >= 10000 then "gold" else "standard"

  - type: output
    name: result
    input: enrich
    config:
      name: enriched
      type: csv
      path: "./output/enriched_customers.csv"

Run it:

clinker run customer_etl.yaml

That is the entire workflow. No project scaffolding, no configuration files, no compile step. One YAML file, one command.

Next steps

Installation

Clinker is a single static binary with no runtime dependencies. Download it, put it on your PATH, and you are ready to go.

Binaries

Clinker ships two binaries:

  • clinker – the pipeline executor. This is the main tool you use to validate and run pipelines against data.
  • cxl – the CXL expression checker, evaluator, and formatter. Use it during development to test expressions interactively, check types, and format CXL blocks.

Verify installation

After placing the binaries on your PATH, confirm they work:

clinker --version
clinker 0.1.0
cxl --version
cxl 0.1.0

Both commands should print a version string and exit. If you see command not found, check that the directory containing the binaries is in your PATH.

Building from source

Clinker requires Rust 1.91+ (edition 2024). If you have a Rust toolchain installed, build and install both binaries directly from the repository:

# Clone the repository
git clone https://github.com/rustpunk/clinker.git
cd clinker

# Install the pipeline executor
cargo install --path crates/clinker

# Install the CXL expression tool
cargo install --path crates/cxl-cli

This compiles release-optimized binaries and places them in ~/.cargo/bin/, which is typically already on your PATH.

To verify the build:

cargo test --workspace

This runs the full test suite (approximately 1100 tests) and confirms everything is working correctly on your system.

Rust toolchain

The repository includes a rust-toolchain.toml that pins the exact Rust version. If you use rustup, it will automatically download the correct toolchain when you build.

RequirementValue
Rust edition2024
Minimum version1.91
C dependenciesNone

Your First Pipeline

This walkthrough builds a pipeline from scratch, runs it, and explores the tools Clinker provides for validating and understanding pipelines before they touch real data.

1. Create sample data

Save the following as employees.csv:

id,name,department,salary
1,Alice Chen,Engineering,95000
2,Bob Martinez,Marketing,62000
3,Carol Johnson,Engineering,88000
4,Dave Williams,Sales,71000

2. Write the pipeline

Save the following as my_first_pipeline.yaml:

pipeline:
  name: salary_report

nodes:
  - type: source
    name: employees
    config:
      name: employees
      type: csv
      path: "./employees.csv"
      schema:
        - { name: id, type: int }
        - { name: name, type: string }
        - { name: department, type: string }
        - { name: salary, type: int }

  - type: transform
    name: classify
    input: employees
    config:
      cxl: |
        emit id = id
        emit name = name
        emit department = department
        emit salary = salary
        emit level = if salary >= 90000 then "senior" else "junior"

  - type: output
    name: report
    input: classify
    config:
      name: salary_report
      type: csv
      path: "./salary_report.csv"

This pipeline has three nodes:

  1. employees (source) – reads the CSV file and declares the schema.
  2. classify (transform) – passes all fields through and adds a level field based on salary.
  3. report (output) – writes the result to a new CSV file.

The input: field on each consumer node wires the DAG together. Data flows from employees through classify to report.

3. Validate before running

Before processing any data, check that the pipeline is well-formed:

clinker run my_first_pipeline.yaml --dry-run

Dry-run parses the YAML, resolves the DAG, and type-checks all CXL expressions against the declared schemas. If there are errors – a typo in a field name, a type mismatch, a missing input: reference – Clinker reports them with source-location diagnostics and stops. No data is read.

4. Preview records

To see what the output will look like without writing files, preview a few records:

clinker run my_first_pipeline.yaml --dry-run -n 2

This reads the first 2 records from the source, runs them through the pipeline, and prints the results to the terminal. Useful for sanity-checking transformations before committing to a full run.

5. Understand the execution plan

To see how Clinker will execute the pipeline:

clinker run my_first_pipeline.yaml --explain

The explain plan shows the DAG topology, the order nodes will execute, per-node parallelism strategy, and schema propagation through the pipeline. This is valuable for understanding complex pipelines with routes, merges, and aggregations.

6. Run it

clinker run my_first_pipeline.yaml

Clinker reads employees.csv, applies the transform, and writes salary_report.csv. The output:

id,name,department,salary,level
1,Alice Chen,Engineering,95000,senior
2,Bob Martinez,Marketing,62000,junior
3,Carol Johnson,Engineering,88000,junior
4,Dave Williams,Sales,71000,junior

Alice’s salary of 95,000 meets the threshold, so she is classified as senior. Everyone else is junior.

What just happened

The pipeline executed as a streaming process:

  1. The source node read employees.csv one record at a time.
  2. Each record flowed through the classify transform, which evaluated the CXL block to produce the output fields.
  3. The output node wrote each transformed record to salary_report.csv.

At no point was the entire dataset loaded into memory. This is how Clinker processes files of any size under its memory ceiling.

Next steps

Key Concepts

This page covers the mental model behind Clinker pipelines. If you have experience with other ETL tools, most of this will feel familiar – but pay attention to where Clinker diverges, especially around CXL and streaming execution.

Pipelines are DAGs

A pipeline is a directed acyclic graph of nodes. Data flows from sources, through processing nodes, to outputs. There are no cycles – a node cannot consume its own output, directly or indirectly.

You define the graph by setting input: on each consumer node, naming the upstream node it reads from. Clinker resolves these references, validates that the graph is acyclic, and determines execution order automatically.

The nodes: list

Every pipeline has a single flat list of nodes. Each node has a type: discriminator that determines its behavior. The seven node types are:

TypePurpose
sourceRead data from a file (CSV, JSON, XML, fixed-width)
transformApply CXL logic to reshape, filter, or enrich records
aggregateGroup records and compute summary values (sum, count, etc.)
routeSplit a stream into named ports based on conditions
mergeCombine multiple streams into one
outputWrite data to a file
compositionEmbed a reusable sub-pipeline

You can have as many nodes of each type as your pipeline requires. The only constraint is that the resulting graph must be a valid DAG.

CXL is not SQL

CXL is a per-record expression language. Each record flows through a CXL block independently – there is no table-level context, no SELECT, no FROM, no JOIN. Think of it as a programmable row mapper.

The core statements:

  • emit name = expr – produce a field in the output record. Only emitted fields appear downstream. If you want to pass a field through unchanged, you must emit it explicitly: emit id = id.
  • let name = expr – bind a local variable for use in later expressions. Local variables do not appear in the output.
  • filter condition – discard the record if the condition is false. A filtered record produces no output and is not counted as an error.
  • distinct / distinct by field – deduplicate records. distinct deduplicates on all output fields; distinct by field deduplicates on a specific field.

CXL uses and, or, and not for boolean logic – not && or ||. String concatenation uses +. Conditional expressions use if ... then ... else ... syntax.

System namespaces use a $ prefix: $pipeline.*, $window.*, $meta.*. These provide access to pipeline metadata, window function state, and record metadata respectively.

Streaming execution

Records flow through the pipeline one at a time. Clinker does not load an entire file into memory before processing it. A source reads one record, pushes it through the downstream nodes, and then reads the next.

This design keeps memory usage bounded regardless of file size. A 100 GB CSV is processed with the same memory footprint as a 100 KB CSV.

The exception is aggregation. Aggregate nodes must accumulate state across records (to compute sums, counts, averages, etc.). Clinker uses hash aggregation by default and spills to disk when memory pressure exceeds configured limits. When the input is already sorted by the group key, Clinker can use streaming aggregation, which requires only constant memory.

Input wiring

Consumer nodes reference their upstream via the input: field:

- type: transform
  name: enrich
  input: customers    # reads from the node named "customers"

Route nodes produce named output ports. Downstream nodes reference a specific port using dot notation:

- type: route
  name: split_by_region
  input: customers
  config:
    routes:
      us: region == "US"
      eu: region == "EU"
    default: other

- type: output
  name: us_output
  input: split_by_region.us    # reads from the "us" port

Merge nodes accept multiple inputs using inputs: (plural):

- type: merge
  name: combined
  inputs:
    - us_transform
    - eu_transform

Schema declaration

Source nodes require an explicit schema: that declares every column’s name and type:

config:
  schema:
    - { name: customer_id, type: int }
    - { name: email, type: string }
    - { name: balance, type: float }
    - { name: created_at, type: date }

Clinker uses these declarations to type-check CXL expressions at compile time, before any data is read. If a CXL block references a field that does not exist in the upstream schema, or applies an operation to an incompatible type, the error is caught during validation – not at row 5 million of a production run.

Supported types include int, float, string, bool, date, and datetime.

Error handling

Each node can specify an error handling strategy:

StrategyBehavior
fail_fastStop the pipeline on the first error (default)
continueRoute error records to a dead-letter queue file and continue
best_effortLog errors and continue without writing error records

When using continue, Clinker writes rejected records to a DLQ file alongside the output. Each DLQ entry includes the original record, the error category, the error message, and the node that rejected it. This makes diagnosing production issues straightforward: check the DLQ, fix the data or the pipeline, and rerun.

Pipeline YAML Structure

A Clinker pipeline is a single YAML file with three top-level sections: pipeline (metadata), nodes (the processing graph), and optionally error_handling.

Top-level shape

pipeline:
  name: my_pipeline            # Required — pipeline identifier
  memory_limit: "256M"         # Optional (K/M/G suffixes)
  vars:                        # Optional key-value pairs
    threshold: 500
    label: "Monthly Report"
  date_formats: ["%Y-%m-%d"]   # Optional — custom date parsing formats
  rules_path: "./rules/"       # Optional — CXL module search path
  concurrency:                 # Optional
    threads: 4
    chunk_size: 1000
  metrics:                     # Optional
    spool_dir: "./metrics/"

nodes:                         # Required — flat list of pipeline nodes
  - type: source
    name: raw_data
    config:
      name: raw_data
      type: csv
      path: "./data/input.csv"
      schema:
        - { name: id, type: int }
        - { name: value, type: string }

  - type: transform
    name: clean
    input: raw_data
    config:
      cxl: |
        emit id = id
        emit value = value.trim()

  - type: output
    name: result
    input: clean
    config:
      name: result
      type: csv
      path: "./output/result.csv"

error_handling:                # Optional
  strategy: fail_fast

Pipeline metadata

The pipeline: block carries global settings that apply to the entire run.

FieldRequiredDescription
nameYesPipeline identifier. Used in logs and metrics.
memory_limitNoSoft RSS budget. Accepts K, M, G suffixes (e.g. "512M").
varsNoScalar constants accessible in CXL via $vars.*.
date_formatsNoList of strftime-style patterns for date parsing.
rules_pathNoDirectory for CXL use module resolution.
concurrencyNothreads and chunk_size for parallel chunk processing.
metricsNospool_dir for per-run JSON metric files.
date_localeNoLocale for date formatting.
include_provenanceNoAttach provenance metadata to records.

The nodes list

Every pipeline has a flat nodes: list. Each entry is a node with a type: discriminator that determines its kind:

TypeRole
sourceReads data from a file
transformApplies CXL expressions to each record
aggregateGroups and summarizes records
routeSplits records into named branches by condition
mergeCombines multiple upstream branches
outputWrites records to a file
compositionImports a reusable transform fragment

Node naming

Every node must have a name: field. Names must be unique within the pipeline and must not contain dots – the dot character is reserved for port syntax (see below). Names are used for wiring, logging, and diagnostics.

Wiring: input and inputs

Nodes connect to each other through input: (singular) and inputs: (plural) fields that live at the node’s top level, alongside name: and type:.

Single upstream – used by transform, aggregate, route, and output nodes:

- type: transform
  name: clean
  input: raw_data       # References the source node named "raw_data"
  config: ...

Port syntax – for consuming a specific branch from a route node, use node.port:

- type: output
  name: high_value_out
  input: split.high     # Consumes the "high" branch of route node "split"
  config: ...

Multiple upstreams – merge nodes use inputs: (plural) instead of input::

- type: merge
  name: combined
  inputs:
    - east_processed
    - west_processed
  config: {}

Source nodes have no input field. They are entry points – adding an input: field to a source is a parse error.

Using inputs: on a non-merge node (or input: on a merge node) is caught at parse time by deny_unknown_fields.

Optional fields on all nodes

Every node type supports these optional fields:

  • description: – human-readable text for documentation. Ignored by the engine.
  • _notes: – arbitrary metadata (JSON object). Ignored by the engine, used by the Kiln IDE for visual annotations and inspector panels.
- type: transform
  name: enrich
  description: "Add customer tier based on lifetime value"
  _notes:
    color: "#4a9eff"
    position: { x: 300, y: 200 }
  input: customers
  config:
    cxl: |
      emit tier = if lifetime_value >= 10000 then "gold" else "standard"

Strict parsing

All config structs use deny_unknown_fields. If you misspell a field name – for example, writing inputt: instead of input: or stratgy: instead of strategy: – the YAML parser rejects it immediately with a diagnostic pointing to the typo. This catches configuration errors before any data processing begins.

Environment variable: CLINKER_ENV

The CLINKER_ENV environment variable can be used for conditional logic outside of pipelines (e.g., selecting channel directories or controlling CLI behavior). It is not directly referenced within pipeline YAML but is available to the channel and workspace systems.

Source Nodes

Source nodes read data from files and are the entry points of every pipeline. They have no input: field – they produce records, they do not consume them.

Basic structure

- type: source
  name: customers
  config:
    name: customers
    type: csv
    path: "./data/customers.csv"
    schema:
      - { name: customer_id, type: int }
      - { name: name, type: string }
      - { name: email, type: string }
      - { name: status, type: string }
      - { name: amount, type: float }

Schema declaration

The schema: field is required on every source node. Clinker does not infer types from data – you must declare each column’s name and CXL type explicitly. This schema drives compile-time type checking across the entire pipeline.

Each entry is a { name, type } pair:

schema:
  - { name: employee_id, type: string }
  - { name: salary, type: int }
  - { name: hired_at, type: date_time }
  - { name: is_active, type: bool }
  - { name: notes, type: nullable(string) }

Available types

TypeDescription
stringUTF-8 text
int64-bit signed integer
float64-bit IEEE 754 floating point
boolBoolean (true / false)
dateCalendar date
date_timeDate with time component
arrayOrdered sequence of values
numericUnion of int and float – resolved during type unification
anyUnknown type – field used in type-agnostic contexts
nullable(T)Nullable wrapper around any inner type (e.g. nullable(int))

Format types

The type: field inside config: selects the file format. Supported values: csv, json, xml, fixed_width.

CSV

- type: source
  name: orders
  config:
    name: orders
    type: csv
    path: "./data/orders.csv"
    schema:
      - { name: order_id, type: int }
      - { name: customer_id, type: int }
      - { name: amount, type: float }
      - { name: order_date, type: date }
    options:
      delimiter: ","         # Default: ","
      quote_char: "\""       # Default: "\""
      has_header: true        # Default: true
      encoding: "utf-8"      # Default: "utf-8"

All CSV options are optional. With no options: block, Clinker uses standard RFC 4180 defaults.

JSON

- type: source
  name: events
  config:
    name: events
    type: json
    path: "./data/events.json"
    schema:
      - { name: event_id, type: string }
      - { name: timestamp, type: date_time }
      - { name: payload, type: string }
    options:
      format: ndjson          # array | ndjson | object (auto-detect if omitted)
      record_path: "$.data"   # JSONPath to records array
  • array – the file is a single JSON array of objects.
  • ndjson – one JSON object per line (newline-delimited JSON).
  • object – single top-level object; use record_path to locate the records array within it.

If format is omitted, Clinker auto-detects based on file content.

XML

- type: source
  name: catalog
  config:
    name: catalog
    type: xml
    path: "./data/catalog.xml"
    schema:
      - { name: product_id, type: int }
      - { name: name, type: string }
      - { name: price, type: float }
    options:
      record_path: "//product"          # XPath to record elements
      attribute_prefix: "@"             # Prefix for XML attribute fields
      namespace_handling: strip         # strip | qualify
  • strip (default) – removes namespace prefixes from element and attribute names.
  • qualify – preserves namespace-qualified names.

Fixed-width

- type: source
  name: legacy_data
  config:
    name: legacy_data
    type: fixed_width
    path: "./data/mainframe.dat"
    schema:
      - { name: account_id, type: string }
      - { name: balance, type: float }
      - { name: status_code, type: string }
    options:
      line_separator: crlf    # Line ending style

Fixed-width sources require a separate format schema (.schema.yaml file) that defines field positions, widths, and padding. The schema: on the source body declares CXL types for compile-time checking; the format schema defines the physical layout.

Sort order

If your source data is pre-sorted, declare the sort order so the optimizer can use streaming aggregation instead of hash aggregation:

- type: source
  name: sorted_transactions
  config:
    name: sorted_transactions
    type: csv
    path: "./data/transactions_sorted.csv"
    schema:
      - { name: account_id, type: string }
      - { name: txn_date, type: date }
      - { name: amount, type: float }
    sort_order:
      - { field: "account_id", order: asc }
      - { field: "txn_date", order: asc }

Sort order declarations are trusted – Clinker does not verify that the data is actually sorted. If the data violates the declared order, downstream streaming aggregation may produce incorrect results.

The shorthand form is also accepted – a bare string defaults to ascending:

    sort_order:
      - "account_id"
      - { field: "txn_date", order: desc }

Array paths

For nested data (JSON/XML sources with embedded arrays), array_paths controls how nested arrays are handled:

- type: source
  name: invoices
  config:
    name: invoices
    type: json
    path: "./data/invoices.json"
    schema:
      - { name: invoice_id, type: int }
      - { name: customer, type: string }
      - { name: line_item, type: string }
      - { name: line_amount, type: float }
    array_paths:
      - path: "$.line_items"
        mode: explode         # One output record per array element
      - path: "$.tags"
        mode: join            # Concatenate array elements into a string
        separator: ","
  • explode (default) – produces one output record per array element, with parent fields repeated.
  • join – concatenates array elements into a single string using the specified separator.

Transform Nodes

Transform nodes apply CXL expressions to each record, producing new fields, filtering records, or both. They process one record at a time in streaming fashion with constant memory overhead.

Basic structure

- type: transform
  name: enrich
  input: customers
  config:
    cxl: |
      emit full_name = first_name + " " + last_name
      emit tier = if lifetime_value >= 10000 then "gold" else "standard"
      filter status == "active"

The cxl: field is required and contains a CXL program. The three core CXL statements for transforms are:

  • emit – produces an output field. Only emitted fields appear in downstream nodes.
  • filter – drops records that do not match the boolean condition.
  • let – binds a local variable for use in subsequent expressions (not emitted).
    cxl: |
      let margin = revenue - cost
      emit product_id = product_id
      emit margin = margin
      emit margin_pct = if revenue > 0 then margin / revenue * 100 else 0
      filter margin > 0

Analytic window

The analytic_window field enables cross-source lookups by joining a secondary dataset into the transform. The secondary source is loaded into memory and indexed by the join key.

- type: transform
  name: enrich_orders
  input: orders
  config:
    analytic_window:
      source: products
      on: product_id
      group_by: [product_id]
    cxl: |
      emit order_id = order_id
      emit product_name = $window.first()
      emit quantity = quantity
      emit line_total = quantity * price

The $window.* namespace provides access to the windowed data. Functions like $window.first(), $window.last(), and $window.count() operate over the matched group.

Validations

Declarative validation checks can be attached to a transform. They run against each record and either route failures to the DLQ (severity error) or log a warning and continue (severity warn).

- type: transform
  name: validate_orders
  input: raw_orders
  config:
    cxl: |
      emit order_id = order_id
      emit amount = amount
      emit email = email
    validations:
      - field: email
        check: "not_empty"
        severity: error
        message: "Email is required"
      - check: "amount > 0"
        severity: warn
        message: "Non-positive amount"
      - field: order_id
        check: "not_empty"
        severity: error

Validation fields

FieldRequiredDescription
fieldNoRestrict the check to a single field
checkYesValidation name (e.g. "not_empty") or CXL boolean expression
severityNoerror (default) routes to DLQ; warn logs and continues
messageNoCustom error message for DLQ entries
nameNoValidation name for DLQ reporting. Auto-derived from field + check if omitted
argsNoAdditional arguments as key-value pairs

Log directives

Log directives control diagnostic output during transform execution:

- type: transform
  name: process
  input: validated
  config:
    cxl: |
      emit id = id
      emit result = compute(value)
    log:
      - level: info
        when: per_record
        every: 1000
        message: "Processed record"
      - level: warn
        when: on_error
        message: "Record failed processing"
      - level: debug
        when: before_transform
        message: "Starting transform"

Log directive fields

FieldRequiredDescription
levelYestrace, debug, info, warn, or error
whenYesbefore_transform, after_transform, per_record, or on_error
messageYesLog message text
everyNoOnly log every N records (for per_record timing)
conditionNoCXL boolean expression – only log when true
fieldsNoList of field names to include in the log output
log_ruleNoReference to an external log rule definition

Complete example

- type: source
  name: employees
  config:
    name: employees
    type: csv
    path: "./data/employees.csv"
    schema:
      - { name: employee_id, type: string }
      - { name: first_name, type: string }
      - { name: last_name, type: string }
      - { name: department, type: string }
      - { name: salary, type: int }
      - { name: hire_date, type: date }

- type: transform
  name: enrich_employees
  description: "Compute display name and tenure"
  input: employees
  config:
    cxl: |
      emit employee_id = employee_id
      emit display_name = last_name + ", " + first_name
      emit department = department.upper()
      emit salary = salary
      emit annual_bonus = if salary >= 80000 then salary * 0.15
        else salary * 0.10
    validations:
      - field: employee_id
        check: "not_empty"
        severity: error
        message: "Employee ID is required"
      - check: "salary > 0"
        severity: warn
        message: "Salary should be positive"
    log:
      - level: info
        when: per_record
        every: 5000
        message: "Processing employees"

Aggregate Nodes

Aggregate nodes group records by one or more fields and compute summary values using CXL aggregate functions. They consume all input records in a group before emitting a single summary record per group.

Basic structure

- type: aggregate
  name: dept_totals
  input: employees
  config:
    group_by: [department]
    cxl: |
      emit total_salary = sum(salary)
      emit headcount = count(*)
      emit avg_salary = avg(salary)

Group-by fields pass through automatically – you do not need to emit them. In this example, the output records contain department, total_salary, headcount, and avg_salary.

Group-by fields

The group_by: field is a list of field names from the input schema. Records sharing the same values for all group-by fields are placed in the same group.

    group_by: [region, department]
    cxl: |
      emit total_salary = sum(salary)
      emit max_salary = max(salary)

This produces one output record per unique (region, department) combination.

Global aggregation

An empty group_by list treats the entire input as a single group, producing exactly one output record:

- type: aggregate
  name: grand_totals
  input: orders
  config:
    group_by: []
    cxl: |
      emit grand_total = sum(amount)
      emit record_count = count(*)
      emit avg_order = avg(amount)

Aggregate functions

The following aggregate functions are available in CXL:

FunctionDescription
sum(field)Sum of all values in the group
count(*)Number of records in the group
avg(field)Arithmetic mean
min(field)Minimum value
max(field)Maximum value
collect(field)Collect all values into an array
weighted_avg(value, weight)Weighted average

Strategy hint

The strategy: field controls how aggregation is executed:

- type: aggregate
  name: totals
  input: sorted_data
  config:
    group_by: [account_id]
    strategy: streaming
    cxl: |
      emit total = sum(amount)
StrategyBehavior
autoDefault. The optimizer chooses based on whether the input is provably sorted for the group-by keys.
hashForce hash aggregation. Works on any input ordering. Holds all groups in memory (with disk spill if memory budget is exceeded).
streamingRequire streaming aggregation. Processes one group at a time with O(1) memory per group. Compile-time error if the input is not provably sorted for the group-by keys.

When to use streaming

If your source declares a sort_order: that covers the group-by fields, the optimizer will automatically choose streaming aggregation. Use strategy: streaming as an explicit assertion – it turns a silent fallback to hash aggregation into a compile error, which is useful for catching sort-order regressions.

When to use hash

Hash aggregation works on unsorted input and is the safe default. It uses more memory but handles any data ordering. Memory-aware disk spill kicks in when RSS approaches the pipeline’s memory_limit.

Complete example

- type: source
  name: transactions
  config:
    name: transactions
    type: csv
    path: "./data/transactions.csv"
    schema:
      - { name: account_id, type: string }
      - { name: txn_date, type: date }
      - { name: amount, type: float }
      - { name: category, type: string }
    sort_order:
      - { field: "account_id", order: asc }

- type: aggregate
  name: account_summary
  input: transactions
  config:
    group_by: [account_id]
    strategy: streaming
    cxl: |
      emit total_amount = sum(amount)
      emit txn_count = count(*)
      emit avg_amount = avg(amount)
      emit max_amount = max(amount)
      emit categories = collect(category)

- type: output
  name: summary_output
  input: account_summary
  config:
    name: summary_output
    type: csv
    path: "./output/account_summary.csv"

Route Nodes

Route nodes split a stream of records into named branches based on CXL boolean conditions. Each branch becomes an independent output port that downstream nodes can wire to using port syntax.

Basic structure

- type: route
  name: split_by_value
  input: orders
  config:
    mode: exclusive
    conditions:
      high: "amount.to_int() > 1000"
      medium: "amount.to_int() > 100"
    default: low

This creates three output ports: split_by_value.high, split_by_value.medium, and split_by_value.low.

Conditions

The conditions: field is an ordered map of branch names to CXL boolean expressions. Each expression is evaluated against the incoming record.

    conditions:
      priority: "urgency == \"high\" and amount > 500"
      standard: "urgency == \"medium\""
      bulk: "quantity > 100"
    default: other

Condition keys become the port names used in downstream input: wiring.

Default branch

The default: field is required. Records that match no condition are routed to the default branch. The default branch name must not collide with any condition key.

Routing modes

Exclusive (default)

In exclusive mode, conditions are evaluated in declaration order and the first matching condition wins. A record appears in exactly one branch. Order matters – put more specific conditions first.

    mode: exclusive
    conditions:
      vip: "lifetime_value > 100000"
      high: "lifetime_value > 10000"
      medium: "lifetime_value > 1000"
    default: standard

A customer with lifetime_value = 50000 matches both vip and high, but because exclusive stops at first match, they go to high only if vip was checked first – and they do, because vip comes first. Actually, 50000 is not > 100000, so they match high.

Inclusive

In inclusive mode, all matching conditions route the record. A single record can appear in multiple branches simultaneously.

    mode: inclusive
    conditions:
      needs_review: "amount > 10000"
      flagged: "status == \"flagged\""
      international: "country != \"US\""
    default: standard

A flagged international order over 10000 would appear in needs_review, flagged, and international – three copies routed to three branches.

Downstream wiring

Downstream nodes reference route branches using port syntax: route_name.branch_name.

- type: route
  name: classify
  input: transactions
  config:
    mode: exclusive
    conditions:
      high: "amount > 1000"
      medium: "amount > 100"
    default: low

- type: transform
  name: high_value_processing
  input: classify.high
  config:
    cxl: |
      emit txn_id = txn_id
      emit amount = amount
      emit review_flag = true

- type: transform
  name: standard_processing
  input: classify.medium
  config:
    cxl: |
      emit txn_id = txn_id
      emit amount = amount

- type: output
  name: low_value_out
  input: classify.low
  config:
    name: low_value_out
    type: csv
    path: "./output/low_value.csv"

Constraints

  • At least 1 condition is required.
  • Maximum 256 branches (conditions + default).
  • Branch names must be unique.
  • The default name must not collide with any condition key.

Complete example

pipeline:
  name: order_routing

nodes:
  - type: source
    name: orders
    config:
      name: orders
      type: csv
      path: "./data/orders.csv"
      schema:
        - { name: order_id, type: int }
        - { name: region, type: string }
        - { name: amount, type: float }
        - { name: priority, type: string }

  - type: route
    name: by_region
    input: orders
    config:
      mode: exclusive
      conditions:
        domestic: "region == \"US\" or region == \"CA\""
        emea: "region == \"UK\" or region == \"DE\" or region == \"FR\""
        apac: "region == \"JP\" or region == \"AU\" or region == \"SG\""
      default: other

  - type: output
    name: domestic_orders
    input: by_region.domestic
    config:
      name: domestic_orders
      type: csv
      path: "./output/domestic.csv"

  - type: output
    name: emea_orders
    input: by_region.emea
    config:
      name: emea_orders
      type: csv
      path: "./output/emea.csv"

  - type: output
    name: apac_orders
    input: by_region.apac
    config:
      name: apac_orders
      type: csv
      path: "./output/apac.csv"

  - type: output
    name: other_orders
    input: by_region.other
    config:
      name: other_orders
      type: csv
      path: "./output/other_regions.csv"

Merge Nodes

Merge nodes combine multiple upstream branches into a single stream. They are the counterpart to route nodes – where a route splits one stream into many, a merge joins many streams back into one.

Basic structure

- type: merge
  name: combined
  inputs:
    - east_data
    - west_data
  config: {}

Note the key differences from other node types:

  • Uses inputs: (plural), not input: (singular).
  • The config: block is empty – all wiring is on the node header.
  • Using input: (singular) on a merge node is a parse error.

Wiring

The inputs: field is a list of upstream node references. These can be bare node names or port references from route nodes:

- type: merge
  name: rejoin
  inputs:
    - process_high
    - process_medium
    - classify.low           # Port syntax for a route branch
  config: {}

Downstream nodes wire to the merge as a normal single-input reference:

- type: output
  name: final_output
  input: rejoin
  config:
    name: final_output
    type: csv
    path: "./output/combined.csv"

Record ordering

Records arrive in the order they are produced by upstream nodes. There is no guaranteed interleaving order between upstream branches. If you need sorted output, use a sort_order on the downstream output node.

Use cases

Reuniting route branches

The most common pattern is routing records through different processing paths and then merging them back together:

- type: route
  name: classify
  input: orders
  config:
    mode: exclusive
    conditions:
      high: "amount > 1000"
    default: standard

- type: transform
  name: process_high
  input: classify.high
  config:
    cxl: |
      emit order_id = order_id
      emit amount = amount
      emit surcharge = amount * 0.02
      emit tier = "premium"

- type: transform
  name: process_standard
  input: classify.standard
  config:
    cxl: |
      emit order_id = order_id
      emit amount = amount
      emit surcharge = 0
      emit tier = "standard"

- type: merge
  name: all_orders
  inputs:
    - process_high
    - process_standard
  config: {}

- type: output
  name: result
  input: all_orders
  config:
    name: result
    type: csv
    path: "./output/all_orders.csv"

Unioning multiple sources

Merge nodes can combine records from multiple source files that share the same schema:

- type: source
  name: jan_sales
  config:
    name: jan_sales
    type: csv
    path: "./data/sales_jan.csv"
    schema:
      - { name: sale_id, type: int }
      - { name: amount, type: float }
      - { name: region, type: string }

- type: source
  name: feb_sales
  config:
    name: feb_sales
    type: csv
    path: "./data/sales_feb.csv"
    schema:
      - { name: sale_id, type: int }
      - { name: amount, type: float }
      - { name: region, type: string }

- type: merge
  name: all_sales
  inputs:
    - jan_sales
    - feb_sales
  config: {}

- type: aggregate
  name: totals
  input: all_sales
  config:
    group_by: [region]
    cxl: |
      emit total = sum(amount)
      emit count = count(*)

Output Nodes

Output nodes write processed records to files. They are the terminal nodes of a pipeline – every pipeline path must end at an output (or records are silently dropped).

Basic structure

- type: output
  name: result
  input: transform_node
  config:
    name: output_stage
    type: csv
    path: "./output/result.csv"

The type: field selects the output format: csv, json, xml, or fixed_width.

Field control

By default, output nodes write only the fields explicitly emitted by upstream transforms. Several options control which fields appear and how they are named.

Include unmapped fields

    include_unmapped: true    # Default: false

When true, fields that were not explicitly emitted by transforms but exist on the record are included in the output. Useful for pass-through pipelines where you want all original fields plus a few computed ones.

Field mapping

Rename fields at output time without changing upstream CXL:

    mapping:
      "Customer Name": "full_name"
      "Order Total": "amount"

Keys are output column names; values are the source field names from upstream.

Excluding fields

Remove specific fields from output:

    exclude: [internal_id, _debug_flag, temp_calc]

Header control (CSV)

    include_header: true      # Default: true

Set to false to omit the CSV header row.

Null handling

    preserve_nulls: false     # Default: false

When false, null values are written as empty strings. When true, nulls are preserved in the output format’s native null representation (e.g., null in JSON).

Metadata inclusion

Control whether per-record $meta.* metadata fields appear in output:

    include_metadata: all       # Include all metadata fields
    include_metadata: none      # Default -- strip all metadata
    include_metadata:
      - source_file             # Include only listed metadata keys
      - source_row

Metadata fields are prefixed with meta. in the output.

Output format options

CSV

- type: output
  name: csv_out
  input: processed
  config:
    name: csv_out
    type: csv
    path: "./output/result.csv"
    options:
      delimiter: "|"

JSON

- type: output
  name: json_out
  input: processed
  config:
    name: json_out
    type: json
    path: "./output/result.json"
    options:
      format: ndjson           # array | ndjson
      pretty: true             # Pretty-print JSON
  • array (default) – writes a single JSON array containing all records.
  • ndjson – writes one JSON object per line.

XML

- type: output
  name: xml_out
  input: processed
  config:
    name: xml_out
    type: xml
    path: "./output/result.xml"
    options:
      root_element: "data"
      record_element: "row"

Fixed-width

- type: output
  name: fw_out
  input: processed
  config:
    name: fw_out
    type: fixed_width
    path: "./output/result.dat"
    schema: "./schemas/output.schema.yaml"
    options:
      line_separator: crlf

Fixed-width output requires a format schema defining field positions and widths.

Sort order

Sort records before writing:

    sort_order:
      - { field: "name", order: asc }
      - { field: "amount", order: desc, null_order: last }
Sort optionValuesDefault
orderasc, descasc
null_orderfirst, last, droplast
  • first – nulls sort before all non-null values.
  • last – nulls sort after all non-null values.
  • drop – records with null sort keys are excluded from output.

Shorthand: a bare string defaults to ascending with nulls last:

    sort_order:
      - "name"
      - { field: "amount", order: desc }

File splitting

Split output into multiple files based on record count, byte size, or group boundaries:

- type: output
  name: split_output
  input: processed
  config:
    name: split_output
    type: csv
    path: "./output/result.csv"
    split:
      max_records: 10000
      max_bytes: 10485760           # 10 MB
      group_key: "department"       # Never split mid-group
      naming: "{stem}_{seq:04}.{ext}"
      repeat_header: true           # Repeat CSV header in each file
      oversize_group: warn          # warn | error | allow

Split configuration fields

FieldRequiredDefaultDescription
max_recordsNoSoft record count limit per file
max_bytesNoSoft byte size limit per file
group_keyNoField name – never split within a group sharing this key value
namingNo"{stem}_{seq:04}.{ext}"File naming pattern. {stem} is the base name, {seq:04} is a zero-padded sequence number, {ext} is the file extension
repeat_headerNotrueRepeat CSV header row in each split file
oversize_groupNowarnWhat to do when a single key group exceeds file limits

At least one of max_records or max_bytes should be specified for splitting to have any effect.

Oversize group policies

  • warn (default) – log a warning and allow the oversized file.
  • error – stop the pipeline.
  • allow – silently allow the oversized file.

When group_key is set, the split point is the first group boundary after the threshold is reached (greedy). Without group_key, files are split at the exact limit.

Complete example

- type: output
  name: department_reports
  input: enriched_employees
  config:
    name: department_reports
    type: csv
    path: "./output/employees.csv"
    mapping:
      "Employee ID": "employee_id"
      "Full Name": "display_name"
      "Department": "department"
      "Annual Salary": "salary"
    exclude: [internal_flags]
    include_header: true
    sort_order:
      - { field: "department", order: asc }
      - { field: "display_name", order: asc }
    split:
      max_records: 5000
      group_key: "department"
      naming: "employees_{seq:03}.csv"
      repeat_header: true

Error Handling & DLQ

Clinker provides structured error handling with a dead-letter queue (DLQ) for records that fail processing. The error_handling: block at the top level of the pipeline YAML controls the behavior.

Configuration

error_handling:
  strategy: continue
  dlq:
    path: "./output/errors.csv"
    include_reason: true
    include_source_row: true

Strategies

The strategy: field controls what happens when a record fails:

StrategyBehavior
fail_fastDefault. Stop the pipeline on the first error.
continueRoute bad records to the DLQ and keep processing good records.
best_effortContinue processing with partial results, even if some stages produce incomplete output.

fail_fast

The safest strategy. Any record-level error (type coercion failure, validation error, missing required field) halts the pipeline immediately. Use this when data quality is critical and you prefer to fix issues before reprocessing.

continue

The production workhorse. Bad records are written to the DLQ file with diagnostic metadata, and the pipeline continues processing remaining records. After the run completes, inspect the DLQ to understand and correct failures.

A pipeline that completes with DLQ entries exits with code 2 – this signals “pipeline completed successfully but some records were rejected.” It is not a crash or internal error.

best_effort

The most lenient strategy. Processing continues even with partial results. Use this for exploratory data analysis where completeness is less important than progress.

DLQ configuration

The DLQ is always written as CSV, regardless of the pipeline’s input/output formats.

  dlq:
    path: "./output/errors.csv"
    include_reason: true
    include_source_row: true
FieldRequiredDefaultDescription
pathNoFile path for DLQ output. If omitted, DLQ records are logged but not written to file.
include_reasonNoInclude _cxl_dlq_error_category and _cxl_dlq_error_detail columns.
include_source_rowNoInclude original source fields alongside DLQ metadata.

DLQ columns

Every DLQ record includes these metadata columns:

ColumnDescription
_cxl_dlq_idUUID v7 (time-ordered unique identifier)
_cxl_dlq_timestampRFC 3339 timestamp of when the error occurred
_cxl_dlq_source_fileInput filename that produced the failing record
_cxl_dlq_source_row1-based row number in the source file
_cxl_dlq_stageName of the transform or aggregate node where the error occurred
_cxl_dlq_routeRoute branch name (if the error occurred after routing)
_cxl_dlq_triggerValidation rule name that triggered the rejection

When include_reason: true is set, two additional columns appear:

ColumnDescription
_cxl_dlq_error_categoryMachine-readable error classification
_cxl_dlq_error_detailHuman-readable error description

Error categories

The _cxl_dlq_error_category column contains one of these values:

CategoryDescription
missing_required_fieldA required field is absent from the record
type_coercion_failureA value could not be converted to the expected type
required_field_conversion_failureA required field exists but its value cannot be converted
nan_in_output_fieldA computation produced NaN
aggregate_type_errorAn aggregate function received an incompatible type
validation_failureA declarative validation check failed
aggregate_finalizeAn aggregate function failed during finalization

Advanced options

Type error threshold

Abort the pipeline if the fraction of failing records exceeds a threshold:

  type_error_threshold: 0.05    # Abort if >5% of records fail

This acts as a circuit breaker – if your input data is unexpectedly corrupt, the pipeline stops early rather than filling the DLQ with millions of entries.

Correlation key

Group DLQ rejections by a key field. When any record in a correlation group fails, all records in that group are routed to the DLQ:

  correlation_key: order_id

For compound keys:

  correlation_key: [order_id, customer_id]

This is useful for transactional data where partial processing of a group is worse than rejecting the entire group. For example, if one line item in an order fails validation, you may want to reject the entire order.

Max group buffer

Limit the number of records buffered per correlation group:

  max_group_buffer: 100000     # Default: 100,000

Groups exceeding this limit are DLQ’d entirely with a group_size_exceeded summary entry.

Exit codes

CodeMeaning
0Pipeline completed successfully, no errors
1Pipeline failed (internal error, config error, or fail_fast triggered)
2Pipeline completed, but DLQ entries were produced

Exit code 2 is not a failure – it means the pipeline ran to completion and handled errors according to the configured strategy. Check the DLQ file for details.

Complete example

pipeline:
  name: order_processing
  memory_limit: "512M"

nodes:
  - type: source
    name: orders
    config:
      name: orders
      type: csv
      path: "./data/orders.csv"
      schema:
        - { name: order_id, type: int }
        - { name: customer_id, type: int }
        - { name: amount, type: float }
        - { name: email, type: string }

  - type: transform
    name: validate_orders
    input: orders
    config:
      cxl: |
        emit order_id = order_id
        emit customer_id = customer_id
        emit amount = amount
        emit email = email
      validations:
        - field: email
          check: "not_empty"
          severity: error
          message: "Customer email is required"
        - check: "amount > 0"
          severity: error
          message: "Order amount must be positive"

  - type: output
    name: valid_orders
    input: validate_orders
    config:
      name: valid_orders
      type: csv
      path: "./output/valid_orders.csv"

error_handling:
  strategy: continue
  dlq:
    path: "./output/rejected_orders.csv"
    include_reason: true
    include_source_row: true
  type_error_threshold: 0.10
  correlation_key: order_id

Pipeline Variables

Pipeline variables are constants defined in the YAML pipeline: header and accessible in CXL expressions. They provide a way to parameterize pipeline behavior without modifying CXL logic.

Defining variables

Variables are declared in the vars: block of the pipeline: section:

pipeline:
  name: order_processing
  vars:
    high_value_threshold: 500
    express_surcharge: 5.99
    report_title: "Monthly Orders"
    apply_discount: true
    tax_rate: 0.085

Variables are scalar values only: strings, numbers (integers and floats), and booleans. Complex types (arrays, objects) are not supported.

Accessing variables in CXL

Use the $vars.* namespace to reference variables in CXL expressions:

- type: transform
  name: classify_orders
  input: orders
  config:
    cxl: |
      emit order_id = order_id
      emit amount = amount
      emit is_high_value = amount > $vars.high_value_threshold
      emit surcharge = if shipping == "express" then $vars.express_surcharge else 0
      emit total = amount + surcharge

The $pipeline.* namespace also provides access to pipeline-level metadata.

Variable semantics

Variables are frozen at pipeline start and constant across all records. They cannot be modified during execution. This makes them suitable for:

  • Thresholds and cutoff values
  • Fixed surcharges, tax rates, and multipliers
  • Labels and titles for report headers
  • Feature flags that control conditional logic

Using variables with channels

Variables defined in pipeline.vars serve as defaults. The channel system can override these values per client or environment without modifying the pipeline YAML:

# Pipeline: pipeline.yaml
pipeline:
  name: invoice_processing
  vars:
    express_surcharge: 5.99
    late_fee: 25.00

# Channel: channels/acme-corp/channel.yaml
_channel:
  id: acme-corp
  name: "Acme Corp"
  active: true
variables:
  EXPRESS_SURCHARGE: "8.99"

When run with --channel acme-corp, the channel variable overrides the pipeline default.

Complete example

pipeline:
  name: sales_report
  vars:
    min_amount: 100
    commission_rate: 0.12
    region_label: "North America"
    include_pending: false

nodes:
  - type: source
    name: sales
    config:
      name: sales
      type: csv
      path: "./data/sales.csv"
      schema:
        - { name: sale_id, type: int }
        - { name: rep_name, type: string }
        - { name: amount, type: float }
        - { name: status, type: string }

  - type: transform
    name: compute
    input: sales
    config:
      cxl: |
        filter amount >= $vars.min_amount
        filter if $vars.include_pending then true else status == "closed"
        emit sale_id = sale_id
        emit rep_name = rep_name
        emit amount = amount
        emit commission = amount * $vars.commission_rate
        emit region = $vars.region_label

  - type: aggregate
    name: rep_totals
    input: compute
    config:
      group_by: [rep_name]
      cxl: |
        emit total_sales = sum(amount)
        emit total_commission = sum(commission)
        emit deal_count = count(*)

  - type: output
    name: report
    input: rep_totals
    config:
      name: report
      type: csv
      path: "./output/sales_report.csv"
      sort_order:
        - { field: "total_sales", order: desc }

Channels

Channels enable multi-tenant pipeline customization. A single pipeline definition can be run with different configurations per client, environment, or business unit – without duplicating or modifying the base YAML.

Channel structure

Each channel lives in its own directory under a channels/ folder, with a channel.yaml manifest:

project/
  pipeline.yaml
  channels/
    acme-corp/
      channel.yaml
    globex/
      channel.yaml

Channel manifest

# channels/acme-corp/channel.yaml
_channel:
  id: acme-corp
  name: "Acme Corporation"
  description: "Custom config for Acme Corp"
  contact: "ops@acme.example.com"
  active: true
  tags: [enterprise, priority]
  tier: gold

variables:
  EXPRESS_SURCHARGE: "8.99"
  TAX_RATE: "0.095"
  OUTPUT_DIR: "/data/acme/output"

Channel metadata fields

FieldRequiredDescription
idYesUnique channel identifier (used in CLI and logs)
nameYesHuman-readable display name
descriptionNoChannel purpose and notes
contactNoResponsible team or person
activeNoWhether the channel is enabled (default: true)
tagsNoArbitrary tags for filtering and grouping
tierNoService tier classification

Running with a channel

Use the --channel flag to run a pipeline with a specific channel’s configuration:

clinker run pipeline.yaml --channel acme-corp

Clinker looks for the channel manifest in the workspace’s channels/ directory, loads the variable overrides, and applies them before pipeline execution begins.

Variable overrides

The variables: section in a channel manifest provides values that override pipeline-level vars: defaults. This lets each tenant customize thresholds, paths, labels, and other parameters:

# Pipeline defaults
pipeline:
  name: invoice_processing
  vars:
    late_fee: 25.00
    output_path: "./output/"

# Channel override
variables:
  LATE_FEE: "50.00"
  OUTPUT_PATH: "/data/acme/invoices/"

Channel variables support ${ENV_VAR} syntax for referencing system environment variables – these are resolved at channel load time.

Workspace discovery

Channels are part of the broader workspace system. Clinker discovers workspaces via clinker.toml files, which can define the channel directory layout and other workspace-level settings.

Current status

Note: The channel system is being rebuilt in Phase 16c. The current implementation supports variable overrides. Full channel documentation – including path overrides, schema overrides, and channel inheritance – will be expanded when the rebuild is complete.

Complete example

# channels/globex/channel.yaml
_channel:
  id: globex
  name: "Globex Industries"
  active: true
  tier: standard

variables:
  COMMISSION_RATE: "0.15"
  MIN_ORDER: "250"
  REPORT_LABEL: "Globex Monthly"
# Run the pipeline with Globex's configuration
clinker run sales_pipeline.yaml --channel globex

Compositions

Compositions are reusable pipeline fragments that can be imported into multiple pipelines. They encapsulate common transform patterns – date derivations, address normalization, currency conversion – into self-contained, testable units.

Using a composition

A composition node in your pipeline references an external .comp.yaml file:

- type: composition
  name: fiscal_dates
  input: invoices
  use: "./compositions/fiscal_date.comp.yaml"
  config:
    start_month: 4

The use: field points to the composition definition file. The config: block passes parameters that customize the composition’s behavior for this specific invocation.

Composition definition file

A .comp.yaml file declares the composition’s interface – what fields it requires from upstream and what fields it produces:

# compositions/fiscal_date.comp.yaml
composition:
  name: fiscal_date
  description: "Derive fiscal year, quarter, and period from a date field"

  requires:
    - { name: invoice_date, type: date }

  produces:
    - { name: fiscal_year, type: int }
    - { name: fiscal_quarter, type: string }
    - { name: fiscal_period, type: int }

  params:
    - name: start_month
      type: int
      default: 1
      description: "First month of the fiscal year (1-12)"

Composition fields

FieldRequiredDescription
nameYesComposition identifier
descriptionNoHuman-readable purpose
requiresYesInput fields the composition needs from upstream (name + type)
producesYesOutput fields the composition adds to the record (name + type)
paramsNoConfigurable parameters with optional defaults

Advanced wiring

For compositions with multiple input or output ports, the node supports explicit port bindings:

- type: composition
  name: enrich_address
  input: customers
  use: "./compositions/address_normalize.comp.yaml"
  inputs:
    primary: customers
    reference: zip_lookup
  outputs:
    normalized: next_stage
  config:
    country_code: "US"
  resources:
    zip_database: "./data/zipcodes.csv"

Port and resource fields

FieldRequiredDescription
inputsNoMap of composition input ports to upstream node references
outputsNoMap of composition output ports to downstream node references
configNoParameter overrides (key-value pairs)
resourcesNoExternal resource bindings (file paths, connection strings)
aliasNoNamespace prefix for expanded node names (avoids collisions)

Complete example

pipeline:
  name: invoice_pipeline

nodes:
  - type: source
    name: invoices
    config:
      name: invoices
      type: csv
      path: "./data/invoices.csv"
      schema:
        - { name: invoice_id, type: int }
        - { name: customer_id, type: int }
        - { name: invoice_date, type: date }
        - { name: amount, type: float }

  - type: composition
    name: fiscal_dates
    input: invoices
    use: "./compositions/fiscal_date.comp.yaml"
    config:
      start_month: 4

  - type: transform
    name: final_enrich
    input: fiscal_dates
    config:
      cxl: |
        emit invoice_id = invoice_id
        emit customer_id = customer_id
        emit amount = amount
        emit fiscal_year = fiscal_year
        emit fiscal_quarter = fiscal_quarter

  - type: output
    name: result
    input: final_enrich
    config:
      name: result
      type: csv
      path: "./output/invoices_enriched.csv"

Current status

Note: Composition support is being built in Phase 16c. The YAML shape parses and validates, but compilation currently returns a diagnostic (E100) per composition node. The documentation above reflects the intended design. Full compilation and expansion will land when Phase 16c is complete.

CXL Overview

CXL (Clinker Expression Language) is a per-record expression language designed for ETL transformations. Every CXL program operates on one record at a time, producing output fields, filtering records, or computing derived values.

CXL is not SQL. There are no SELECT, FROM, or WHERE keywords. CXL programs are sequences of statements – emit, let, filter, distinct – that execute top to bottom against the current record.

Key differences from SQL

SQLCXL
SELECT col AS aliasemit alias = col
WHERE conditionfilter condition
AND / OR / NOTand / or / not (keywords)
&& / || / !Not supported – use keywords
COALESCE(a, b)a ?? b
CASE WHEN ... THEN ... ENDif ... then ... else ... or match { }

Boolean operators are keywords

CXL uses English keywords for boolean logic, not symbols:

$ cxl eval -e 'emit result = true and false' --field dummy=1
{
  "result": false
}

The operators &&, ||, and ! are syntax errors in CXL. Always use and, or, and not.

System namespaces use $ prefix

CXL provides built-in namespaces for accessing pipeline state, metadata, and window functions. All system namespaces are prefixed with $:

  • $pipeline.* – pipeline execution context (name, counters, provenance)
  • $meta.* – per-record metadata
  • $window.* – window function calls
  • $vars.* – user-defined pipeline variables
$ cxl eval -e 'emit name = $pipeline.name'
{
  "name": "cxl-eval"
}

Compile-time type checking

CXL catches type errors before data processing begins. The compilation pipeline runs four phases:

  1. Parse – tokenize and build an AST from CXL source text
  2. Resolve – bind field references, validate method names, check arity
  3. Typecheck – infer types, validate operator compatibility, check method receiver types
  4. Eval – execute the typed program against each record

Errors at any phase produce rich diagnostics with source locations and fix suggestions via miette.

$ cxl check transform.cxl
ok: transform.cxl is valid

If there are type errors, the checker reports them with spans:

error[typecheck]: cannot apply '+' to String and Int (at transform.cxl:12)
  help: convert one operand — use .to_int() or .to_string()

A minimal CXL program

emit greeting = "hello"
emit doubled = amount * 2
filter amount > 0

This program:

  1. Emits a constant string field greeting
  2. Emits doubled as twice the input amount
  3. Filters out records where amount is not positive

Try it:

$ cxl eval -e 'emit greeting = "hello"' -e 'emit doubled = amount * 2' \
    --field amount=5
{
  "greeting": "hello",
  "doubled": 10
}

Statement order matters

CXL statements execute sequentially. Later statements can reference fields produced by earlier emit or let statements:

$ cxl eval -e 'let tax_rate = 0.21' -e 'emit tax = price * tax_rate' \
    --field price=100
{
  "tax": 21.0
}

A filter statement short-circuits execution – if the condition is false, remaining statements do not run and the record is excluded from output.

Types & Literals

CXL has 9 value types. Every field value, literal, and expression result is one of these types.

Value types

TypeRust backingDescription
NullValue::NullMissing or absent value
Boolbooltrue or false
Integeri6464-bit signed integer
Floatf6464-bit double-precision float
StringBox<str>UTF-8 text
DateNaiveDateCalendar date without timezone
DateTimeNaiveDateTimeDate and time without timezone
ArrayVec<Value>Ordered collection of values
MapIndexMap<Box<str>, Value>Key-value pairs

Literal syntax

Integers

Standard decimal notation. Negative values use the unary minus operator.

$ cxl eval -e 'emit a = 42' -e 'emit b = -5' -e 'emit c = 0'
{
  "a": 42,
  "b": -5,
  "c": 0
}

Floats

Decimal notation with a dot. Must have digits on both sides of the decimal point.

$ cxl eval -e 'emit a = 3.14' -e 'emit b = -0.5'
{
  "a": 3.14,
  "b": -0.5
}

Strings

Double-quoted or single-quoted. Supports escape sequences: \\, \", \', \n, \t, \r.

$ cxl eval -e 'emit greeting = "hello world"'
{
  "greeting": "hello world"
}

Booleans

The keywords true and false.

$ cxl eval -e 'emit flag = true' -e 'emit neg = not flag'
{
  "flag": true,
  "neg": false
}

Dates

Hash-delimited ISO 8601 format: #YYYY-MM-DD#.

$ cxl eval -e 'emit d = #2024-01-15#'
{
  "d": "2024-01-15"
}

Null

The keyword null.

$ cxl eval -e 'emit nothing = null'
{
  "nothing": null
}

Schema types

When declaring column types in YAML pipeline schemas, use these type names:

Schema typeCXL typeDescription
stringStringText values
intInteger64-bit integers
floatFloat64-bit floats
boolBoolBoolean values
dateDateCalendar dates
date_timeDateTimeDate and time
arrayArrayOrdered collections
numericInt or FloatUnion type – accepts either
anyAnyUnknown type – no type constraints
nullable(T)Nullable(T)Wrapper – value may be null

Example YAML schema declaration:

schema:
  employee_id: int
  name: string
  salary: nullable(float)
  start_date: date

Type promotion

CXL automatically promotes types in mixed expressions:

Int + Float promotes to Float:

$ cxl eval -e 'emit result = 2 + 3.5'
{
  "result": 5.5
}

Null + T produces Nullable(T): Any operation involving null produces a nullable result.

$ cxl eval -e 'emit result = null + 5'
{
  "result": null
}

Nullable(A) + B unifies to Nullable(unified): When a nullable value meets a non-nullable value, the result type wraps the unified inner type in Nullable.

Type unification rules

The type checker follows these rules when two types meet in an expression:

  1. Same types unify to themselves: Int + Int produces Int
  2. Any unifies with anything: Any + T produces T
  3. Numeric resolves to the concrete type: Numeric + Int produces Int, Numeric + Float produces Float
  4. Int promotes to Float: Int + Float produces Float
  5. Null wraps: Null + T produces Nullable(T)
  6. Nullable propagates: Nullable(A) + B produces Nullable(unified(A, B))
  7. Incompatible types fail: String + Int is a type error

Operators & Expressions

CXL provides arithmetic, comparison, boolean, null coalescing, and string operators. Boolean logic uses keywords (and, or, not), not symbols.

Arithmetic operators

OperatorDescriptionExample
+Addition (or string concatenation)2 + 3
-Subtraction10 - 4
*Multiplication3 * 5
/Division10 / 3
%Modulo (remainder)10 % 3
$ cxl eval -e 'emit result = 2 + 3 * 4'
{
  "result": 14
}

Multiplication binds tighter than addition, so 2 + 3 * 4 is 2 + (3 * 4) = 14, not (2 + 3) * 4 = 20.

$ cxl eval -e 'emit result = 10 % 3'
{
  "result": 1
}

Comparison operators

OperatorDescriptionExample
==Equalx == 0
!=Not equalx != 0
>Greater thanx > 10
<Less thanx < 10
>=Greater than or equalx >= 10
<=Less than or equalx <= 10
$ cxl eval -e 'emit result = 5 > 3' --field dummy=1
{
  "result": true
}

Boolean operators

CXL uses keywords for boolean logic. The symbols &&, ||, and ! are not valid CXL syntax.

OperatorDescriptionExample
andLogical ANDa and b
orLogical ORa or b
notLogical NOT (unary)not a
$ cxl eval -e 'emit result = true and not false'
{
  "result": true
}
$ cxl eval -e 'emit result = 5 > 3 or 10 < 2'
{
  "result": true
}

Null coalesce operator

The ?? operator returns its left operand if non-null, otherwise its right operand.

$ cxl eval -e 'emit result = null ?? "default"'
{
  "result": "default"
}
$ cxl eval -e 'emit result = "present" ?? "default"'
{
  "result": "present"
}

String concatenation

The + operator concatenates strings when both operands are strings.

$ cxl eval -e 'emit result = "hello" + " " + "world"'
{
  "result": "hello world"
}

Unary operators

OperatorDescriptionExample
-Numeric negation-x
notBoolean negationnot done
$ cxl eval -e 'emit result = -42'
{
  "result": -42
}

Method calls

Methods are called on a receiver using dot notation:

$ cxl eval -e 'emit result = "hello".upper()'
{
  "result": "HELLO"
}

Methods can be chained:

$ cxl eval -e 'emit result = "  hello  ".trim().upper()'
{
  "result": "HELLO"
}

Field references

Bare identifiers reference fields from the input record:

$ cxl eval -e 'emit result = price * qty' \
    --field price=10 \
    --field qty=3
{
  "result": 30
}

Qualified field references use dot notation for multi-source pipelines: source.field.

Operator precedence

From highest (binds tightest) to lowest:

PrecedenceOperatorsAssociativity
1 (highest). (method calls, field access)Left
2- (unary), notPrefix
3* / %Left
4+ -Left
5== != > < >= <=Left
6andLeft
7orLeft
8 (lowest)??Right

Use parentheses to override precedence:

$ cxl eval -e 'emit result = (2 + 3) * 4'
{
  "result": 20
}

Comments

Line comments start with # (when not followed by a digit – digit-prefixed # starts a date literal):

# This is a comment
emit total = price * qty  # inline comment
emit deadline = #2024-12-31#  # this is a date literal, not a comment

Statements

CXL programs are sequences of statements that execute top-to-bottom against each input record. Statement order matters – later statements can reference values produced by earlier ones.

emit

The emit statement produces an output field. Each emit becomes a column in the output record.

emit name = expression
$ cxl eval -e 'emit greeting = "hello"' -e 'emit doubled = 21 * 2'
{
  "greeting": "hello",
  "doubled": 42
}

Multiple emit statements build up the output record field by field:

$ cxl eval -e 'emit first = "Alice"' -e 'emit last = "Smith"' \
    -e 'emit full = first + " " + last'
{
  "first": "Alice",
  "last": "Smith",
  "full": "Alice Smith"
}

let

The let statement creates a local variable binding. The variable is available to subsequent statements but is NOT included in the output record.

let name = expression
$ cxl eval -e 'let tax_rate = 0.21' -e 'emit tax = 100 * tax_rate'
{
  "tax": 21.0
}

Note that tax_rate does not appear in the output – only emit statements produce output fields.

filter

The filter statement excludes records where the condition evaluates to false. When a filter excludes a record, remaining statements do not execute (short-circuit).

filter condition
$ cxl eval -e 'filter amount > 0' -e 'emit result = amount * 2' \
    --field amount=5
{
  "result": 10
}

When the filter condition is false, the entire record is dropped and no output is produced.

Filters can appear anywhere in the statement sequence. Place them early to skip unnecessary computation:

filter status == "active"
let discount = if tier == "gold" then 0.2 else 0.1
emit final_price = price * (1 - discount)

distinct

The distinct statement deduplicates records. The bare form deduplicates on all emitted fields. The by form deduplicates on a specific field.

distinct
distinct by field_name

In a pipeline, distinct tracks values seen so far and drops records that have already been emitted with the same key.

emit meta

The emit meta statement writes a value to the $meta.* namespace – per-record metadata that is not part of the output columns. Metadata can be read by downstream nodes via $meta.field.

emit meta quality_flag = if amount < 0 then "suspect" else "ok"

Access metadata downstream:

filter $meta.quality_flag == "ok"

trace

The trace statement emits debug logging. It has no effect on the output record. Trace messages are only visible when tracing is enabled at the appropriate level.

trace "processing record"
trace warn "unusual value detected"
trace info if amount > 10000 then "high value transaction"

Trace levels: trace (default), debug, info, warn, error. An optional guard condition (via if) limits when the trace fires.

Statement ordering

Statements execute sequentially. A statement can reference any field or variable defined by a preceding emit or let:

$ cxl eval -e 'let base = 100' -e 'let rate = 0.15' \
    -e 'emit subtotal = base * rate' \
    -e 'emit total = base + subtotal'
{
  "subtotal": 15.0,
  "total": 115.0
}

Referencing a name before it is defined is a resolve-time error:

emit total = base + tax    # error: 'base' is not defined yet
let base = 100
let tax = base * 0.21

use

The use statement imports a CXL module for reuse. See Modules & use for details.

use shared.dates as d
emit fy = d::fiscal_year(invoice_date)

Conditionals

CXL provides two conditional expression forms: if/then/else and match. Both are expressions – they return values and can be used anywhere an expression is expected.

If / then / else

The basic conditional expression:

if condition then value else alternative
$ cxl eval -e 'emit label = if amount > 100 then "high" else "low"' \
    --field amount=250
{
  "label": "high"
}

The else branch is optional. When omitted, records where the condition is false produce null:

$ cxl eval -e 'emit bonus = if score > 90 then score * 0.1' \
    --field score=80
{
  "bonus": null
}

Chained conditionals

Chain multiple conditions with else if:

$ cxl eval -e 'emit tier = if amount > 1000 then "platinum"
    else if amount > 500 then "gold"
    else if amount > 100 then "silver"
    else "bronze"' \
    --field amount=750
{
  "tier": "gold"
}

Nested usage

Since if/then/else is an expression, it can be used inside other expressions:

$ cxl eval -e 'emit price = base * (if member then 0.8 else 1.0)' \
    --field base=100 \
    --field member=true
{
  "price": 80.0
}

Match

The match expression provides pattern matching. It comes in two forms: value matching (with a subject) and condition matching (without a subject).

Value form (with subject)

Match a subject expression against literal patterns:

match subject {
  pattern1 => result1,
  pattern2 => result2,
  _ => default
}
$ cxl eval -e 'emit label = match status {
    "A" => "Active",
    "I" => "Inactive",
    "P" => "Pending",
    _ => "Unknown"
  }' \
    --field status=A
{
  "label": "Active"
}

The wildcard _ is the catch-all arm. It matches any value not covered by preceding arms.

Condition form (without subject)

When no subject is provided, each arm’s pattern is evaluated as a boolean condition. This is CXL’s equivalent of SQL’s CASE WHEN:

match {
  condition1 => result1,
  condition2 => result2,
  _ => default
}
$ cxl eval -e 'emit tier = match {
    amount > 1000 => "high",
    amount > 100 => "medium",
    _ => "low"
  }' \
    --field amount=500
{
  "tier": "medium"
}

Practical examples

Tiered pricing:

emit discount = match {
  qty >= 1000 => 0.25,
  qty >= 100  => 0.15,
  qty >= 10   => 0.05,
  _           => 0.0
}

Status code mapping:

emit status_text = match http_code {
  200 => "OK",
  201 => "Created",
  400 => "Bad Request",
  404 => "Not Found",
  500 => "Internal Server Error",
  _   => "HTTP " + http_code.to_string()
}

Region classification:

emit region = match country {
  "US" => "North America",
  "CA" => "North America",
  "MX" => "North America",
  "GB" => "Europe",
  "DE" => "Europe",
  "FR" => "Europe",
  _    => "Other"
}

Match arms are evaluated in order

The first matching arm wins. Place more specific conditions before general ones:

# Correct: specific before general
emit category = match {
  amount > 10000 => "enterprise",
  amount > 1000  => "business",
  _              => "personal"
}

# Wrong: first arm always matches
emit category = match {
  amount > 0     => "personal",    # catches everything positive
  amount > 1000  => "business",    # never reached
  amount > 10000 => "enterprise",  # never reached
  _              => "unknown"
}

Built-in Methods

CXL provides 68 built-in scalar methods organized into 8 categories. Methods are called on a receiver value using dot notation: receiver.method(args).

Null propagation

Most methods return null when the receiver is null. This means null values flow through method chains without causing errors. The exceptions are documented in Introspection & Debug.

Method categories

String Methods (24 methods)

Text manipulation: case conversion, trimming, padding, searching, splitting, regex matching.

MethodDescription
upper, lowerCase conversion
trim, trim_start, trim_endWhitespace removal
starts_with, ends_with, containsSubstring testing
replaceFind and replace
substring, left, rightExtraction
pad_left, pad_rightPadding
repeat, reverseRepetition and reversal
lengthCharacter count
split, joinSplitting and joining
matches, find, captureRegex operations
format, concatFormatting and concatenation

Numeric Methods (8 methods)

Rounding, clamping, and comparison for integers and floats.

MethodDescription
absAbsolute value
ceil, floorCeiling and floor
round, round_toRounding to decimal places
clampConstrain to range
min, maxPairwise minimum/maximum

Date & Time Methods (13 methods)

Date component extraction, arithmetic, and formatting.

MethodDescription
year, month, dayDate component extraction
hour, minute, secondTime component extraction (DateTime only)
add_days, add_months, add_yearsDate arithmetic
diff_days, diff_months, diff_yearsDate difference
format_dateCustom date formatting

Conversion Methods (11 methods)

Type conversion in strict (error on failure) and lenient (null on failure) variants.

MethodDescription
to_int, to_float, to_string, to_boolStrict conversion
to_date, to_datetimeStrict date parsing
try_int, try_float, try_boolLenient conversion
try_date, try_datetimeLenient date parsing

Introspection & Debug (5 methods)

Type inspection, null checking, and debugging. These are the only methods that accept null receivers without propagating null.

MethodDescription
type_ofReturns the type name as a string
is_nullTests for null
is_emptyTests for empty string, empty array, or null
catchNull fallback (equivalent to ??)
debugPassthrough with tracing side effect

Path Methods (5 methods)

File path component extraction.

MethodDescription
file_nameFull filename with extension
file_stemFilename without extension
extensionFile extension
parentParent directory path
parent_nameParent directory name

String Methods

CXL provides 24 built-in methods for string manipulation. All string methods return null when the receiver is null (null propagation).

Case conversion

upper()

Converts all characters to uppercase.

$ cxl eval -e 'emit result = "hello world".upper()'
{
  "result": "HELLO WORLD"
}

lower()

Converts all characters to lowercase.

$ cxl eval -e 'emit result = "Hello World".lower()'
{
  "result": "hello world"
}

Whitespace trimming

trim()

Removes leading and trailing whitespace.

$ cxl eval -e 'emit result = "  hello  ".trim()'
{
  "result": "hello"
}

trim_start()

Removes leading whitespace only.

$ cxl eval -e 'emit result = "  hello  ".trim_start()'
{
  "result": "hello  "
}

trim_end()

Removes trailing whitespace only.

$ cxl eval -e 'emit result = "  hello  ".trim_end()'
{
  "result": "  hello"
}

Substring testing

starts_with(prefix: String) -> Bool

Tests whether the string starts with the given prefix.

$ cxl eval -e 'emit result = "hello world".starts_with("hello")'
{
  "result": true
}

ends_with(suffix: String) -> Bool

Tests whether the string ends with the given suffix.

$ cxl eval -e 'emit result = "report.csv".ends_with(".csv")'
{
  "result": true
}

contains(substring: String) -> Bool

Tests whether the string contains the given substring.

$ cxl eval -e 'emit result = "hello world".contains("lo wo")'
{
  "result": true
}

Find and replace

replace(find: String, replacement: String) -> String

Replaces all occurrences of find with replacement.

$ cxl eval -e 'emit result = "foo-bar-baz".replace("-", "_")'
{
  "result": "foo_bar_baz"
}

Extraction

substring(start: Int [, length: Int]) -> String

Extracts a substring starting at start (0-based character index). If length is provided, takes at most that many characters. If omitted, takes all remaining characters.

$ cxl eval -e 'emit result = "hello world".substring(6)'
{
  "result": "world"
}
$ cxl eval -e 'emit result = "hello world".substring(0, 5)'
{
  "result": "hello"
}

left(n: Int) -> String

Returns the first n characters.

$ cxl eval -e 'emit result = "hello world".left(5)'
{
  "result": "hello"
}

right(n: Int) -> String

Returns the last n characters.

$ cxl eval -e 'emit result = "hello world".right(5)'
{
  "result": "world"
}

Padding

pad_left(width: Int [, char: String]) -> String

Left-pads the string to the given width. Default pad character is a space.

$ cxl eval -e 'emit result = "42".pad_left(5, "0")'
{
  "result": "00042"
}
$ cxl eval -e 'emit result = "hi".pad_left(6)'
{
  "result": "    hi"
}

pad_right(width: Int [, char: String]) -> String

Right-pads the string to the given width. Default pad character is a space.

$ cxl eval -e 'emit result = "hi".pad_right(6, ".")'
{
  "result": "hi...."
}

Repetition and reversal

repeat(n: Int) -> String

Repeats the string n times.

$ cxl eval -e 'emit result = "ab".repeat(3)'
{
  "result": "ababab"
}

reverse() -> String

Reverses the characters in the string.

$ cxl eval -e 'emit result = "hello".reverse()'
{
  "result": "olleh"
}

Length

length() -> Int

Returns the number of characters in the string. Also works on arrays, returning the number of elements.

$ cxl eval -e 'emit result = "hello".length()'
{
  "result": 5
}

Splitting and joining

split(delimiter: String) -> Array

Splits the string by the delimiter, returning an array of strings.

$ cxl eval -e 'emit result = "a,b,c".split(",")'
{
  "result": ["a", "b", "c"]
}

join(delimiter: String) -> String

Joins an array of values into a string with the given delimiter. The receiver must be an array.

$ cxl eval -e 'emit result = "a,b,c".split(",").join(" - ")'
{
  "result": "a - b - c"
}

Regex operations

matches(pattern: String) -> Bool

Tests whether the string fully matches the given regex pattern.

$ cxl eval -e 'emit result = "abc123".matches("^[a-z]+[0-9]+$")'
{
  "result": true
}

find(pattern: String) -> Bool

Tests whether the string contains a substring matching the given regex pattern (partial match).

$ cxl eval -e 'emit result = "hello world 42".find("[0-9]+")'
{
  "result": true
}

capture(pattern: String [, group: Int]) -> String

Extracts a capture group from the first regex match. Default group is 0 (the full match).

$ cxl eval -e 'emit result = "order-12345".capture("order-([0-9]+)", 1)'
{
  "result": "12345"
}

Formatting and concatenation

format(fmt: String) -> String

Formats the receiver value as a string.

$ cxl eval -e 'emit result = 42.format("")'
{
  "result": "42"
}

concat(args: String…) -> String

Concatenates the receiver with one or more string arguments. Null arguments are treated as empty strings.

$ cxl eval -e 'emit result = "hello".concat(" ", "world")'
{
  "result": "hello world"
}

This is variadic – it accepts any number of string arguments:

$ cxl eval -e 'emit result = "a".concat("b", "c", "d")'
{
  "result": "abcd"
}

Numeric Methods

CXL provides 8 built-in methods for numeric operations. These methods work on both Integer and Float values (the Numeric receiver type). All return null when the receiver is null.

abs() -> Numeric

Returns the absolute value. Preserves the original type (Int stays Int, Float stays Float).

$ cxl eval -e 'emit result = (-42).abs()'
{
  "result": 42
}
$ cxl eval -e 'emit result = (-3.14).abs()'
{
  "result": 3.14
}

ceil() -> Int

Rounds up to the nearest integer. Returns the value unchanged for integers.

$ cxl eval -e 'emit result = 3.2.ceil()'
{
  "result": 4
}
$ cxl eval -e 'emit result = (-3.2).ceil()'
{
  "result": -3
}

floor() -> Int

Rounds down to the nearest integer. Returns the value unchanged for integers.

$ cxl eval -e 'emit result = 3.8.floor()'
{
  "result": 3
}
$ cxl eval -e 'emit result = (-3.2).floor()'
{
  "result": -4
}

round([decimals: Int]) -> Float

Rounds to the specified number of decimal places. Default is 0 decimal places.

$ cxl eval -e 'emit result = 3.456.round()'
{
  "result": 3.0
}
$ cxl eval -e 'emit result = 3.456.round(2)'
{
  "result": 3.46
}

round_to(decimals: Int) -> Float

Rounds to the specified number of decimal places. Unlike round(), the decimals argument is required.

$ cxl eval -e 'emit result = 3.14159.round_to(3)'
{
  "result": 3.142
}

Use round_to when you want to be explicit about precision in financial or scientific calculations:

$ cxl eval -e 'emit price = 19.995.round_to(2)'
{
  "price": 20.0
}

clamp(min: Numeric, max: Numeric) -> Numeric

Constrains the value to the given range. Returns min if the value is below it, max if above, or the value itself if within range.

$ cxl eval -e 'emit result = 150.clamp(0, 100)'
{
  "result": 100
}
$ cxl eval -e 'emit result = (-5).clamp(0, 100)'
{
  "result": 0
}
$ cxl eval -e 'emit result = 50.clamp(0, 100)'
{
  "result": 50
}

min(other: Numeric) -> Numeric

Returns the smaller of the receiver and the argument.

$ cxl eval -e 'emit result = 10.min(20)'
{
  "result": 10
}
$ cxl eval -e 'emit result = 10.min(5)'
{
  "result": 5
}

max(other: Numeric) -> Numeric

Returns the larger of the receiver and the argument.

$ cxl eval -e 'emit result = 10.max(20)'
{
  "result": 20
}
$ cxl eval -e 'emit result = 10.max(5)'
{
  "result": 10
}

Practical examples

Clamp a percentage:

emit pct = (completed / total * 100).clamp(0, 100).round_to(1)

Absolute difference:

emit diff = (actual - expected).abs()

Floor division for batch numbering:

emit batch = (row_number / 1000).floor()

Date & Time Methods

CXL provides 13 built-in methods for date and time manipulation. These methods work on Date and DateTime values. All return null when the receiver is null.

Component extraction

year() -> Int

Returns the year component.

$ cxl eval -e 'emit result = #2024-03-15#.year()'
{
  "result": 2024
}

month() -> Int

Returns the month component (1-12).

$ cxl eval -e 'emit result = #2024-03-15#.month()'
{
  "result": 3
}

day() -> Int

Returns the day-of-month component (1-31).

$ cxl eval -e 'emit result = #2024-03-15#.day()'
{
  "result": 15
}

hour() -> Int

Returns the hour component (0-23). DateTime only – returns null for Date values.

$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime().hour()'
{
  "result": 14
}

minute() -> Int

Returns the minute component (0-59). DateTime only – returns null for Date values.

$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime().minute()'
{
  "result": 30
}

second() -> Int

Returns the second component (0-59). DateTime only – returns null for Date values.

$ cxl eval -e 'emit result = "2024-03-15T14:30:45".to_datetime().second()'
{
  "result": 45
}

Date arithmetic

add_days(n: Int) -> Date

Adds n days to the date. Use negative values to subtract. Works on both Date and DateTime.

$ cxl eval -e 'emit result = #2024-01-15#.add_days(10)'
{
  "result": "2024-01-25"
}
$ cxl eval -e 'emit result = #2024-01-15#.add_days(-5)'
{
  "result": "2024-01-10"
}

add_months(n: Int) -> Date

Adds n months to the date. Day is clamped to the last day of the target month if necessary.

$ cxl eval -e 'emit result = #2024-01-31#.add_months(1)'
{
  "result": "2024-02-29"
}
$ cxl eval -e 'emit result = #2024-03-15#.add_months(-2)'
{
  "result": "2024-01-15"
}

add_years(n: Int) -> Date

Adds n years to the date. Leap day (Feb 29) is clamped to Feb 28 in non-leap years.

$ cxl eval -e 'emit result = #2024-02-29#.add_years(1)'
{
  "result": "2025-02-28"
}

Date difference

diff_days(other: Date) -> Int

Returns the number of days between the receiver and the argument (receiver - other). Positive when the receiver is later.

$ cxl eval -e 'emit result = #2024-03-15#.diff_days(#2024-03-01#)'
{
  "result": 14
}
$ cxl eval -e 'emit result = #2024-01-01#.diff_days(#2024-03-15#)'
{
  "result": -74
}

diff_months(other: Date) -> Int

Returns the difference in months between two dates.

Note: This method currently returns null (unimplemented). Use diff_days and divide by 30 as an approximation.

diff_years(other: Date) -> Int

Returns the difference in years between two dates.

Note: This method currently returns null (unimplemented). Use diff_days and divide by 365 as an approximation.

Formatting

format_date(format: String) -> String

Formats the date/datetime using a chrono format string. See chrono format syntax.

Common format specifiers:

SpecifierDescriptionExample
%Y4-digit year2024
%m2-digit month03
%d2-digit day15
%HHour (24h)14
%MMinute30
%SSecond00
%BFull month nameMarch
%bAbbreviated monthMar
%AFull weekdayFriday
$ cxl eval -e 'emit result = #2024-03-15#.format_date("%B %d, %Y")'
{
  "result": "March 15, 2024"
}
$ cxl eval -e 'emit result = #2024-03-15#.format_date("%Y/%m/%d")'
{
  "result": "2024/03/15"
}

Practical examples

Fiscal year calculation (April start):

let d = invoice_date
emit fiscal_year = if d.month() < 4 then d.year() - 1 else d.year()

Age in days:

emit days_since = now.diff_days(created_date)

Quarter:

emit quarter = match {
  invoice_date.month() <= 3  => "Q1",
  invoice_date.month() <= 6  => "Q2",
  invoice_date.month() <= 9  => "Q3",
  _                          => "Q4"
}

ISO week format:

emit formatted = order_date.format_date("%Y-W%V")

Conversion Methods

CXL provides two families of conversion methods: strict (6 methods) and lenient (5 methods). Strict conversions raise an error on failure, halting pipeline execution. Lenient conversions return null on failure, allowing graceful handling of dirty data.

All conversion methods accept any receiver type (Any).

Strict conversions

Use strict conversions for required fields where invalid data should halt processing.

to_int() -> Int

Converts the receiver to an integer. Errors on failure.

  • Float: truncates toward zero
  • String: parses as integer
  • Bool: true becomes 1, false becomes 0
$ cxl eval -e 'emit result = "42".to_int()'
{
  "result": 42
}
$ cxl eval -e 'emit result = 3.9.to_int()'
{
  "result": 3
}

to_float() -> Float

Converts the receiver to a float. Errors on failure.

  • Integer: promotes to float
  • String: parses as float
$ cxl eval -e 'emit result = "3.14".to_float()'
{
  "result": 3.14
}
$ cxl eval -e 'emit result = 42.to_float()'
{
  "result": 42.0
}

to_string() -> String

Converts any value to its string representation. Never fails.

$ cxl eval -e 'emit result = 42.to_string()'
{
  "result": "42"
}
$ cxl eval -e 'emit result = true.to_string()'
{
  "result": "true"
}

to_bool() -> Bool

Converts the receiver to a boolean. Errors on failure.

  • String: "true", "1", "yes" become true; "false", "0", "no" become false (case-insensitive)
  • Integer: 0 is false, everything else is true
$ cxl eval -e 'emit result = "yes".to_bool()'
{
  "result": true
}
$ cxl eval -e 'emit result = 0.to_bool()'
{
  "result": false
}

to_date([format: String]) -> Date

Parses a string to a Date. Without a format argument, expects ISO 8601 (YYYY-MM-DD). With a format, uses chrono strftime syntax.

$ cxl eval -e 'emit result = "2024-03-15".to_date()'
{
  "result": "2024-03-15"
}
$ cxl eval -e 'emit result = "15/03/2024".to_date("%d/%m/%Y")'
{
  "result": "2024-03-15"
}

to_datetime([format: String]) -> DateTime

Parses a string to a DateTime. Without a format argument, expects ISO 8601 (YYYY-MM-DDTHH:MM:SS). With a format, uses chrono strftime syntax.

$ cxl eval -e 'emit result = "2024-03-15T14:30:00".to_datetime()'
{
  "result": "2024-03-15T14:30:00"
}

Lenient conversions

Use lenient conversions for optional or dirty data fields. They return null instead of raising errors, making them safe to combine with ?? for fallback values.

try_int() -> Int

Attempts to convert to integer. Returns null on failure.

$ cxl eval -e 'emit a = "42".try_int()' -e 'emit b = "abc".try_int()'
{
  "a": 42,
  "b": null
}

try_float() -> Float

Attempts to convert to float. Returns null on failure.

$ cxl eval -e 'emit a = "3.14".try_float()' -e 'emit b = "N/A".try_float()'
{
  "a": 3.14,
  "b": null
}

try_bool() -> Bool

Attempts to convert to boolean. Returns null on failure.

$ cxl eval -e 'emit a = "yes".try_bool()' -e 'emit b = "maybe".try_bool()'
{
  "a": true,
  "b": null
}

try_date([format: String]) -> Date

Attempts to parse a string as a Date. Returns null on failure.

$ cxl eval -e 'emit a = "2024-03-15".try_date()' \
    -e 'emit b = "not a date".try_date()'
{
  "a": "2024-03-15",
  "b": null
}

try_datetime([format: String]) -> DateTime

Attempts to parse a string as a DateTime. Returns null on failure.

$ cxl eval -e 'emit a = "2024-03-15T14:30:00".try_datetime()' \
    -e 'emit b = "invalid".try_datetime()'
{
  "a": "2024-03-15T14:30:00",
  "b": null
}

When to use each

Strict conversions (to_*) for:

  • Required fields that must be valid
  • Schema-enforced data where bad input should halt the pipeline
  • Fields already validated upstream

Lenient conversions (try_*) for:

  • Optional fields that may be missing or malformed
  • Dirty data with mixed formats
  • Fields where a fallback value is acceptable

Practical patterns

Safe numeric parsing with fallback:

emit amount = raw_amount.try_float() ?? 0.0

Parse dates from multiple formats:

emit parsed = raw_date.try_date("%Y-%m-%d")
    ?? raw_date.try_date("%m/%d/%Y")
    ?? raw_date.try_date("%d-%b-%Y")

Strict conversion for required fields:

emit employee_id = raw_id.to_int()    # halts on bad data -- correct behavior
emit salary = raw_salary.to_float()   # must be numeric

Lenient conversion for optional fields:

emit bonus = raw_bonus.try_float()    # null if missing or non-numeric
emit total = salary + (bonus ?? 0.0)  # safe arithmetic

Introspection & Debug

CXL provides 4 introspection methods and 1 debug method. These are the only methods that accept null receivers without propagating null – they are designed specifically for inspecting and handling null values.

type_of() -> String

Returns the type name of the receiver as a string. Works on any value, including null.

Type name strings: "String", "Int", "Float", "Bool", "Date", "DateTime", "Null", "Array", "Map".

$ cxl eval -e 'emit a = 42.type_of()' -e 'emit b = "hello".type_of()' \
    -e 'emit c = null.type_of()'
{
  "a": "Int",
  "b": "String",
  "c": "Null"
}

Useful for branching on dynamic types:

emit formatted = match value.type_of() {
  "Int"   => value.to_string() + " (integer)",
  "Float" => value.round_to(2).to_string() + " (decimal)",
  _       => value.to_string()
}

is_null() -> Bool

Returns true if the receiver is null, false otherwise. This is the primary way to test for null values – it is NOT subject to null propagation.

$ cxl eval -e 'emit a = null.is_null()' -e 'emit b = 42.is_null()'
{
  "a": true,
  "b": false
}

Use in filter statements:

filter not field.is_null()

is_empty() -> Bool

Returns true for empty strings, empty arrays, or null values. Returns false for all other values.

$ cxl eval -e 'emit a = "".is_empty()' -e 'emit b = "hello".is_empty()' \
    -e 'emit c = null.is_empty()'
{
  "a": true,
  "b": false,
  "c": true
}

Useful for filtering out blank or missing records:

filter not name.is_empty()

catch(fallback: Any) -> Any

Returns the receiver if it is non-null, otherwise returns the fallback value. This is the method equivalent of the ?? operator.

$ cxl eval -e 'emit a = null.catch("default")' \
    -e 'emit b = "present".catch("default")'
{
  "a": "default",
  "b": "present"
}

catch and ?? are interchangeable:

# These two are equivalent:
emit name = raw_name.catch("Unknown")
emit name = raw_name ?? "Unknown"

debug(label: String) -> Any

Passes the receiver through unchanged while emitting a trace log with the given label. Zero overhead when tracing is disabled. The return value is always the receiver, making it safe to insert into any expression chain.

$ cxl eval -e 'emit result = 42.debug("check value")'
{
  "result": 42
}

Insert debug anywhere in a method chain for inspection without affecting the output:

emit total = price.debug("price")
    * qty.debug("qty")

When tracing is enabled, this produces log lines like:

TRACE source_row=1 source_file=input.csv: price: Integer(100)
TRACE source_row=1 source_file=input.csv: qty: Integer(5)

Null-safe summary

MethodNull receiver behavior
type_of()Returns "Null"
is_null()Returns true
is_empty()Returns true
catch(x)Returns x
debug(l)Passes through null, logs it
All other methodsReturn null (propagation)

Path Methods

CXL provides 5 built-in methods for extracting components from file path strings. All path methods take a string receiver and return a string. They return null when the receiver is null or when the requested component does not exist.

file_name() -> String

Returns the full filename (with extension) from the path.

$ cxl eval -e 'emit result = "/data/reports/sales.csv".file_name()'
{
  "result": "sales.csv"
}

file_stem() -> String

Returns the filename without the extension.

$ cxl eval -e 'emit result = "/data/reports/sales.csv".file_stem()'
{
  "result": "sales"
}

extension() -> String

Returns the file extension (without the leading dot).

$ cxl eval -e 'emit result = "/data/reports/sales.csv".extension()'
{
  "result": "csv"
}

Returns null when no extension is present:

$ cxl eval -e 'emit result = "/data/reports/README".extension()'
{
  "result": null
}

parent() -> String

Returns the parent directory path.

$ cxl eval -e 'emit result = "/data/reports/sales.csv".parent()'
{
  "result": "/data/reports"
}

parent_name() -> String

Returns just the name of the parent directory (not the full path).

$ cxl eval -e 'emit result = "/data/reports/sales.csv".parent_name()'
{
  "result": "reports"
}

Practical examples

Organize output by source directory:

emit source_dir = $pipeline.source_file.parent_name()
emit source_type = $pipeline.source_file.extension()

Extract file identifiers:

emit file_id = $pipeline.source_file.file_stem()
emit is_csv = $pipeline.source_file.extension() == "csv"

Route by file type:

let ext = input_path.extension()
emit format = match ext {
  "csv"  => "delimited",
  "json" => "structured",
  "xml"  => "markup",
  _      => "unknown"
}

Window Functions

Window functions allow CXL expressions to access aggregated values across a set of records within an analytic window. Unlike aggregate functions (which collapse groups into single rows), window functions attach computed values to each individual record.

Window functions are accessed via the $window.* namespace and require an analytic_window: configuration on the transform node.

Configuring an analytic window

Window functions are only available in transform nodes that declare an analytic_window: section in YAML:

nodes:
  - name: ranked_sales
    type: transform
    input: raw_sales
    analytic_window:
      partition_by: [region]
      order_by:
        - field: amount
          direction: desc
      frame:
        type: rows
        start: unbounded_preceding
        end: current_row
    cxl: |
      emit region = region
      emit amount = amount
      emit running_total = $window.sum(amount)
      emit rank_position = $window.count()

Window configuration fields

FieldDescription
partition_byList of fields to partition the window by (like SQL PARTITION BY)
order_byList of ordering specifications (field + direction)
frame.typeFrame type: rows or range
frame.startFrame start: unbounded_preceding, current_row, or preceding(n)
frame.endFrame end: unbounded_following, current_row, or following(n)

Aggregate window functions

These compute aggregate values over the window frame.

$window.sum(field)

Sum of the field values in the window frame.

emit running_total = $window.sum(amount)

$window.avg(field)

Average of the field values in the window frame. Returns Float.

emit moving_avg = $window.avg(amount)

$window.min(field)

Minimum value in the window frame.

emit window_min = $window.min(amount)

$window.max(field)

Maximum value in the window frame.

emit window_max = $window.max(amount)

$window.count()

Count of records in the window frame. Takes no arguments.

emit window_size = $window.count()

Positional window functions

These access specific records by position within the window frame.

$window.first()

Returns the value of the current field from the first record in the window frame.

emit first_amount = $window.first()

$window.last()

Returns the value of the current field from the last record in the window frame.

emit last_amount = $window.last()

$window.lag(n)

Returns the value from n records before the current record. Returns null if there is no record at that offset.

emit prev_amount = $window.lag(1)
emit two_back = $window.lag(2)

$window.lead(n)

Returns the value from n records after the current record. Returns null if there is no record at that offset.

emit next_amount = $window.lead(1)

Iterable window functions

These evaluate predicates or collect values across the window.

$window.any(predicate)

Returns true if the predicate is true for any record in the window.

emit has_high = $window.any(amount > 1000)

$window.all(predicate)

Returns true if the predicate is true for all records in the window.

emit all_positive = $window.all(amount > 0)

$window.collect(field)

Collects all values of the field in the window into an array.

emit all_amounts = $window.collect(amount)

$window.distinct(field)

Collects distinct values of the field in the window into an array.

emit unique_regions = $window.distinct(region)

Complete example

nodes:
  - name: sales_analysis
    type: transform
    input: daily_sales
    analytic_window:
      partition_by: [store_id]
      order_by:
        - field: sale_date
          direction: asc
      frame:
        type: rows
        start: preceding(6)
        end: current_row
    cxl: |
      emit store_id = store_id
      emit sale_date = sale_date
      emit daily_revenue = revenue
      emit week_avg = $window.avg(revenue)
      emit week_total = $window.sum(revenue)
      emit prev_day_revenue = $window.lag(1)
      emit day_over_day = revenue - ($window.lag(1) ?? revenue)

This computes a 7-day rolling average and total per store, along with day-over-day revenue change.

Aggregate Functions

Aggregate functions operate across grouped record sets in aggregate nodes, collapsing multiple input records into summary rows. They are distinct from window functions, which attach computed values to each individual record.

Aggregate functions

CXL provides 7 aggregate functions. These are called as free-standing function calls (not method calls) within the CXL block of an aggregate node.

FunctionSignatureReturnsDescription
sum(expr)NumericNumericSum of values
count(*)IntCount of records in the group
avg(expr)NumericFloatArithmetic mean
min(expr)AnyAnyMinimum value
max(expr)AnyAnyMaximum value
collect(expr)AnyArrayAll values collected into an array
weighted_avg(value, weight)Numeric, NumericFloatWeighted arithmetic mean

YAML aggregate node

Aggregate functions are used inside the cxl: block of a node with type: aggregate. The node must declare group_by: fields.

nodes:
  - name: dept_summary
    type: aggregate
    input: employees
    group_by: [department]
    cxl: |
      emit total_salary = sum(salary)
      emit headcount = count(*)
      emit avg_salary = avg(salary)
      emit max_salary = max(salary)
      emit min_salary = min(salary)

Group-by fields pass through automatically

Fields listed in group_by: are automatically included in the output. You do NOT need to emit them – they are carried through as group keys.

In the example above, department is automatically present in every output record without an explicit emit department = department statement.

Function details

sum(expr) -> Numeric

Computes the sum of the expression across all records in the group. Null values are skipped.

cxl: |
  emit total_revenue = sum(price * quantity)

count(*) -> Int

Counts the number of records in the group. The argument is the wildcard *.

cxl: |
  emit num_orders = count(*)

avg(expr) -> Float

Computes the arithmetic mean. Null values are skipped. Returns Float.

cxl: |
  emit avg_order_value = avg(order_total)

min(expr) -> Any

Returns the minimum value in the group. Works on numeric, string, and date types.

cxl: |
  emit earliest_order = min(order_date)
  emit lowest_price = min(unit_price)

max(expr) -> Any

Returns the maximum value in the group. Works on numeric, string, and date types.

cxl: |
  emit latest_order = max(order_date)
  emit highest_price = max(unit_price)

collect(expr) -> Array

Collects all values of the expression into an array. Useful for building lists of values per group.

cxl: |
  emit all_order_ids = collect(order_id)

weighted_avg(value, weight) -> Float

Computes a weighted average: sum(value * weight) / sum(weight). Takes two arguments.

cxl: |
  emit weighted_price = weighted_avg(unit_price, quantity)

Aggregates vs. windows

FeatureAggregate nodeWindow function
Record outputOne row per groupOne row per input record
Syntaxsum(field) (free-standing)$window.sum(field) (namespace)
Configurationtype: aggregate + group_by:type: transform + analytic_window:
Use caseSummarize groupsEnrich records with group context

Combining aggregates with expressions

Aggregate function calls can be mixed with regular CXL expressions in emit statements:

nodes:
  - name: category_stats
    type: aggregate
    input: products
    group_by: [category]
    cxl: |
      emit total_revenue = sum(price * quantity)
      emit avg_price = avg(price)
      emit margin_pct = (sum(revenue) - sum(cost)) / sum(revenue) * 100
      emit product_count = count(*)
      emit has_premium = max(price) > 100

Restrictions

  • let bindings in aggregate transforms are restricted to row-pure expressions (no aggregate function calls in let).
  • filter in aggregate transforms runs pre-aggregation – it filters input records before grouping.
  • distinct is not permitted inside aggregate transforms. Place a separate distinct transform upstream.

Complete example

pipeline:
  name: sales_summary
  nodes:
    - name: raw_sales
      type: source
      format: csv
      path: sales.csv

    - name: monthly_summary
      type: aggregate
      input: raw_sales
      group_by: [region, month]
      cxl: |
        emit total_sales = sum(amount)
        emit order_count = count(*)
        emit avg_order = avg(amount)
        emit top_sale = max(amount)
        emit all_reps = collect(sales_rep)

    - name: output
      type: output
      input: monthly_summary
      format: csv
      path: summary.csv

System Variables

CXL provides several system variable namespaces prefixed with $. These give CXL expressions access to pipeline execution context, user-defined variables, per-record metadata, and the current time.

$pipeline.* – Pipeline context

Pipeline variables are accessed via $pipeline.member_name. Some are frozen at pipeline start; others update per record.

Stable (frozen at pipeline start)

VariableTypeDescription
$pipeline.nameStringPipeline name from YAML config
$pipeline.execution_idStringUUID v7, unique per pipeline run
$pipeline.batch_idStringFrom --batch-id CLI flag, or auto-generated UUID v7
$pipeline.start_timeDateTimeFrozen at pipeline start, deterministic within a run
$ cxl eval -e 'emit name = $pipeline.name' \
    -e 'emit exec = $pipeline.execution_id'
{
  "name": "cxl-eval",
  "exec": "00000000-0000-0000-0000-000000000000"
}

Per-record provenance

VariableTypeDescription
$pipeline.source_fileStringPath of the source file for the current record
$pipeline.source_rowIntRow number within the source file

These change per record, tracking where each record originated. Useful for diagnostics and auditing.

emit meta audit_source = $pipeline.source_file
emit meta audit_row = $pipeline.source_row

Counters

VariableTypeDescription
$pipeline.total_countIntTotal records processed so far
$pipeline.ok_countIntRecords that passed successfully
$pipeline.dlq_countIntRecords sent to dead-letter queue
$pipeline.filtered_countIntRecords excluded by filter statements
$pipeline.distinct_countIntRecords excluded by distinct statements
trace info if $pipeline.total_count % 10000 == 0 then "processed " + $pipeline.total_count.to_string() + " records"

$vars.* – User-defined variables

User-defined variables are declared in the YAML pipeline config under pipeline.vars: and accessed via $vars.name in CXL expressions.

YAML declaration

pipeline:
  name: invoice_processing
  vars:
    high_value_threshold: 10000
    tax_rate: 0.21
    output_currency: "USD"
    fiscal_year_start_month: 4

CXL usage

filter amount > $vars.high_value_threshold
emit tax = amount * $vars.tax_rate
emit currency = $vars.output_currency

Variables provide a clean way to externalize configuration from CXL logic. Combined with channels, different variable sets can parameterize the same pipeline for different environments or clients.

$meta.* – Per-record metadata

Metadata is a per-record key-value store that travels with the record through the pipeline but is not part of the output columns. Write to it with emit meta; read from it with $meta.field.

Writing metadata

emit meta quality = if amount < 0 then "suspect" else "ok"
emit meta source_system = "legacy_erp"

Reading metadata

Downstream nodes can read metadata:

filter $meta.quality == "ok"
emit audit_system = $meta.source_system

Metadata is useful for tagging records with quality flags, routing hints, or audit information that should not appear in the final output unless explicitly emitted.

now – Current time

The now keyword returns the current wall-clock time as a DateTime value. It is evaluated fresh per record, so each record gets the actual time of its processing.

$ cxl eval -e 'emit timestamp = now'
{
  "timestamp": "2026-04-11T15:30:00"
}

now is useful for timestamping records:

emit processed_at = now
emit days_old = now.diff_days(created_date)

Note: now is a keyword, not a function call. Write now, not now().

Complete example

pipeline:
  name: order_enrichment
  vars:
    discount_threshold: 500
    tax_rate: 0.08

  nodes:
    - name: orders
      type: source
      format: csv
      path: orders.csv

    - name: enrich
      type: transform
      input: orders
      cxl: |
        emit order_id = order_id
        emit amount = amount
        emit discount = if amount > $vars.discount_threshold then 0.1 else 0.0
        emit tax = amount * $vars.tax_rate
        emit total = amount * (1 - discount) + tax
        emit processed_at = now
        emit meta source_row = $pipeline.source_row
        emit pipeline_run = $pipeline.execution_id

    - name: output
      type: output
      input: enrich
      format: csv
      path: enriched_orders.csv

Null Handling

Null values in CXL represent missing or absent data. CXL uses null propagation – most operations on null produce null – with specific tools for detecting and handling nulls.

Null propagation

When a method receives a null receiver, it returns null without executing. This is called null propagation and applies to all methods except the introspection methods.

$ cxl eval -e 'emit result = null.upper()'
{
  "result": null
}

Propagation flows through method chains:

$ cxl eval -e 'emit result = null.trim().upper().length()'
{
  "result": null
}

Null propagation exceptions

Five methods are exempt from null propagation and actively handle null receivers:

MethodNull behavior
is_null()Returns true
type_of()Returns "Null"
is_empty()Returns true
catch(x)Returns x
debug(l)Passes through null, logs it
$ cxl eval -e 'emit a = null.is_null()' -e 'emit b = null.type_of()' \
    -e 'emit c = null.catch("fallback")'
{
  "a": true,
  "b": "Null",
  "c": "fallback"
}

Null coalesce operator (??)

The ?? operator returns its left operand if non-null, otherwise its right operand. It is the primary tool for providing default values.

$ cxl eval -e 'emit a = null ?? "default"' \
    -e 'emit b = "present" ?? "default"'
{
  "a": "default",
  "b": "present"
}

Chain multiple ?? operators for fallback chains:

$ cxl eval -e 'emit result = null ?? null ?? "last resort"'
{
  "result": "last resort"
}

Three-valued logic

Boolean operations with null follow three-valued logic (like SQL):

and

LeftRightResult
truenullnull
falsenullfalse
nulltruenull
nullfalsefalse
nullnullnull

The key insight: false and null is false because the result is false regardless of the unknown value.

or

LeftRightResult
truenulltrue
falsenullnull
nulltruetrue
nullfalsenull
nullnullnull

The key insight: true or null is true because the result is true regardless of the unknown value.

not

OperandResult
truefalse
falsetrue
nullnull

Arithmetic with null

Any arithmetic operation involving null produces null:

$ cxl eval -e 'emit result = 5 + null'
{
  "result": null
}

Comparison with null

Comparisons involving null produce null (not false):

$ cxl eval -e 'emit result = null == null'
{
  "result": null
}

To test for null, use is_null():

$ cxl eval -e 'emit result = null.is_null()'
{
  "result": true
}

Practical patterns

Fallback values with ??

emit name = raw_name ?? "Unknown"
emit amount = raw_amount ?? 0
emit active = is_active ?? false

Safe conversion with try_* and ??

emit price = raw_price.try_float() ?? 0.0
emit qty = raw_qty.try_int() ?? 1

Explicit null testing

filter not amount.is_null()
emit has_email = not email.is_null()

Catch method (equivalent to ??)

emit name = raw_name.catch("Unknown")

Conditional null handling

emit status = if amount.is_null() then "missing"
    else if amount < 0 then "invalid"
    else "ok"

Filter blank or null

# Filter out records where name is null or empty string
filter not name.is_empty()

Null-safe chaining

When working with fields that may be null, place the null check early or use ??:

# Safe: coalesce first, then transform
emit normalized = (raw_name ?? "").trim().upper()

# Safe: test before use
emit name = if raw_name.is_null() then "N/A" else raw_name.trim()

Modules & use

CXL supports a module system for organizing reusable expressions. Modules contain function declarations and constant bindings that can be imported into CXL programs.

Module files

A module is a .cxl file containing fn declarations and let constants. Module files live in the rules path (default: ./rules/).

Function declarations

Functions are pure, single-expression bodies with named parameters:

fn fiscal_year(d) = if d.month() < 4 then d.year() - 1 else d.year()

fn full_name(first, last) = first.trim() + " " + last.trim()

fn clamp_pct(value) = value.clamp(0, 100).round_to(1)

Functions are pure – they have no side effects and always return a value.

Module constants

Constants are let bindings at the module level:

let tax_rate = 0.21
let max_retries = 3
let default_currency = "USD"

Example module file

File: rules/shared/dates.cxl

fn fiscal_year(d) = if d.month() < 4 then d.year() - 1 else d.year()

fn quarter(d) = match {
  d.month() <= 3  => 1,
  d.month() <= 6  => 2,
  d.month() <= 9  => 3,
  _               => 4
}

fn fiscal_quarter(d) = quarter(d.add_months(-3))

let fiscal_start_month = 4

Importing modules

Use the use statement to import a module. Module paths use dot notation (not ::):

use shared.dates as d

This imports the module at rules/shared/dates.cxl and binds it to the alias d.

Import syntax

use module.path
use module.path as alias

The as alias clause is optional. When omitted, the last segment of the path becomes the default name.

use shared.dates          # access as dates::fiscal_year(...)
use shared.dates as d     # access as d::fiscal_year(...)

Path resolution

Module paths are resolved relative to the rules path:

ImportFile path
use shared.datesrules/shared/dates.cxl
use transforms.normalizerules/transforms/normalize.cxl
use utilsrules/utils.cxl

The rules path defaults to ./rules/ and can be overridden with --rules-path.

Using imported functions and constants

After importing, reference module members with :: (double colon) syntax:

use shared.dates as d
use shared.finance as f

emit fiscal_year = d::fiscal_year(invoice_date)
emit quarter = d::quarter(invoice_date)
emit tax = amount * f::tax_rate
emit net = amount - tax

Functions

Call functions with alias::function_name(args):

use shared.dates as d
emit fy = d::fiscal_year(order_date)

Constants

Access constants with alias::constant_name:

use shared.finance as f
emit tax = amount * f::tax_rate

Restrictions

  • No wildcard imports. use shared.* is not supported. Import modules explicitly.
  • Dot separator only. Module paths use ., not ::. The :: syntax is reserved for member access after import.
  • Single expression bodies. Functions must be a single expression – no multi-statement bodies.
  • Pure functions. Functions cannot use emit, filter, distinct, or other statement forms. They are pure computations.
  • No recursion. Functions cannot call themselves (directly or indirectly).

Complete example

File: rules/etl/clean.cxl

fn normalize_name(name) = name.trim().upper()

fn safe_amount(raw) = raw.try_float() ?? 0.0

fn flag_suspicious(amount, threshold) =
  if amount > threshold then "review" else "ok"

let max_amount = 999999.99

Pipeline CXL block:

use etl.clean as c

emit customer = c::normalize_name(raw_customer)
emit amount = c::safe_amount(raw_amount)
filter amount <= c::max_amount
emit review_flag = c::flag_suspicious(amount, 10000)

The cxl CLI Tool

The cxl command-line tool validates, evaluates, and formats CXL source files. It is the standalone companion to the Clinker pipeline engine, useful for testing expressions, validating transforms, and debugging CXL logic.

Commands

cxl check

Parse, resolve, and type-check a .cxl file. Reports errors with source locations and fix suggestions.

$ cxl check transform.cxl
ok: transform.cxl is valid

On errors:

error[parse]: expected expression, found '}' (at transform.cxl:12)
  help: check for missing operand or extra closing brace
error[resolve]: unknown field 'amoutn' (at transform.cxl:5)
  help: did you mean 'amount'?
error[typecheck]: cannot apply '+' to String and Int (at transform.cxl:8)
  help: convert one operand — use .to_int() or .to_string()

cxl eval

Evaluate CXL expressions against provided data and print the result as JSON.

Inline expression:

$ cxl eval -e 'emit result = 1 + 2'
{
  "result": 3
}

From a file with field values:

$ cxl eval transform.cxl \
    --field Price=10.5 \
    --field Qty=3

From a file with JSON input:

$ cxl eval transform.cxl --record '{"price": 10.5, "qty": 3}'

Multiple inline expressions:

$ cxl eval -e 'let tax = 0.21' -e 'emit net = price * (1 - tax)' \
    --field price=100
{
  "net": 79.0
}

cxl fmt

Parse and pretty-print a .cxl file in canonical format with normalized whitespace and consistent styling.

$ cxl fmt transform.cxl

Output is printed to stdout. Redirect to overwrite:

$ cxl fmt transform.cxl > transform.cxl.tmp && mv transform.cxl.tmp transform.cxl

Input data

–field name=value

Provide individual field values as key-value pairs. Values are automatically type-inferred:

InputInferred typeExample
Integer patternInt--field count=42
Decimal patternFloat--field price=10.5
true / falseBool--field active=true
nullNull--field value=null
Anything elseString--field name=Alice
$ cxl eval -e 'emit t = amount.type_of()' --field amount=42
{
  "t": "Int"
}
$ cxl eval -e 'emit t = name.type_of()' --field name=Alice
{
  "t": "String"
}

–record JSON

Provide a full JSON object as input. Mutually exclusive with --field.

$ cxl eval -e 'emit total = price * qty' \
    --record '{"price": 10.5, "qty": 3}'
{
  "total": 31.5
}

JSON types map directly:

JSON typeCXL type
nullNull
true / falseBool
integer numberInt
decimal numberFloat
"string"String
[array]Array

Output format

Output is always JSON. Each emit statement produces a key-value pair:

$ cxl eval -e 'emit a = 1' -e 'emit b = "two"' -e 'emit c = true'
{
  "a": 1,
  "b": "two",
  "c": true
}

Date and DateTime values are serialized as ISO 8601 strings:

$ cxl eval -e 'emit d = #2024-03-15#'
{
  "d": "2024-03-15"
}

Exit codes

CodeMeaning
0Success (or warnings only)
1Parse, resolve, type-check, or evaluation errors
2I/O error (file not found, invalid JSON, etc.)

Pipeline context in eval mode

When running cxl eval, a minimal pipeline context is provided:

VariableValue
$pipeline.name"cxl-eval"
$pipeline.execution_idZeroed UUID
$pipeline.batch_idZeroed UUID
$pipeline.start_timeCurrent wall-clock time
$pipeline.source_fileFilename or "<inline>"
$pipeline.source_row1
nowCurrent wall-clock time (live)

Practical usage

Quick expression testing:

$ cxl eval -e 'emit result = "hello world".upper().split(" ").length()'
{
  "result": 2
}

Validate a transform file:

$ cxl check transforms/enrich_orders.cxl && echo "Valid"

Test conditional logic:

$ cxl eval -e 'emit tier = match {
    amount > 1000 => "high",
    amount > 100 => "med",
    _ => "low"
  }' \
    --field amount=500
{
  "tier": "med"
}

Test date operations:

$ cxl eval -e 'emit year = d.year()' -e 'emit month = d.month()' \
    -e 'emit next_week = d.add_days(7)' \
    --record '{"d": "2024-03-15"}'

Test null handling:

$ cxl eval -e 'emit safe = raw.try_int() ?? 0' --field raw=abc
{
  "safe": 0
}

CLI Reference

Clinker ships two command-line tools: clinker (the pipeline runner) and cxl (the expression REPL, covered in the CXL CLI chapter). This page is the complete reference for clinker.

clinker run

Execute a pipeline.

clinker run [OPTIONS] <CONFIG>

Positional arguments

ArgumentDescription
<CONFIG>Path to the pipeline YAML configuration file (required)

Options

FlagDefaultDescription
--memory-limit <SIZE>256MMemory budget for the execution. Accepts K, M, G suffixes. When the limit is approached, aggregation operators spill to disk rather than crashing. CLI value overrides any memory_limit set in the YAML.
--threads <N>number of CPUsSize of the thread pool used for parallel node execution.
--error-threshold <N>0 (unlimited)Maximum number of records routed to the dead-letter queue before the pipeline aborts. 0 means no limit – the pipeline will run to completion regardless of DLQ volume.
--batch-id <ID>UUID v7Custom execution identifier. Appears in metrics output and log lines. Use a meaningful value (e.g. daily-2026-04-11) for correlation across retries.
--explain [FORMAT]textPrint the execution plan and exit without processing data. Accepted formats: text, json, dot. See Explain Plans.
--dry-runValidate the configuration (YAML structure, CXL syntax, type checking, DAG wiring) without reading any data.
-n, --dry-run-n <N>Process only the first N records through the full pipeline. Implies --dry-run.
--dry-run-output <FILE>stdoutRedirect dry-run output to a file instead of stdout. Only meaningful with -n.
--rules-path <DIR>./rules/Search path for CXL module files referenced by use statements.
--base-dir <DIR>Base directory for resolving relative paths in the YAML config. Defaults to the directory containing the config file.
--allow-absolute-pathsPermit absolute file paths in the pipeline YAML. By default, absolute paths are rejected to encourage portable configs.
--env <NAME>Set the active environment. Equivalent to setting CLINKER_ENV. Used by when: conditions in channel overrides.
--quietSuppress progress output. Errors are still printed to stderr.
--forceAllow output files to be overwritten if they already exist. Without this flag, the pipeline aborts rather than clobbering existing output.
--log-level <LEVEL>infoLogging verbosity. One of: error, warn, info, debug, trace.
--metrics-spool-dir <DIR>Directory for per-execution metrics files. See Metrics & Monitoring.

Examples

# Basic execution
clinker run pipeline.yaml

# Production run with memory budget and forced overwrite
clinker run pipeline.yaml --memory-limit 512M --force --log-level warn

# Validate without processing
clinker run pipeline.yaml --dry-run

# Preview first 10 records
clinker run pipeline.yaml --dry-run -n 10

# Show execution plan as Graphviz
clinker run pipeline.yaml --explain dot | dot -Tpng -o plan.png

# Run with a custom batch ID for tracing
clinker run pipeline.yaml --batch-id "daily-2026-04-11" --metrics-spool-dir ./metrics/

clinker metrics collect

Sweep per-execution metrics files from a spool directory into a single NDJSON archive.

clinker metrics collect [OPTIONS]

Options

FlagDescription
--spool-dir <DIR>Spool directory to sweep (required).
--output-file <FILE>NDJSON archive destination (required). If the file exists, new entries are appended.
--delete-after-collectRemove spool files after they have been successfully written to the archive.
--dry-runPreview which files would be collected without writing anything.

Examples

# Collect and archive, then clean up spool
clinker metrics collect \
  --spool-dir /var/spool/clinker/ \
  --output-file /var/log/clinker/metrics.ndjson \
  --delete-after-collect

# Preview what would be collected
clinker metrics collect \
  --spool-dir ./metrics/ \
  --output-file ./archive.ndjson \
  --dry-run

Environment Variables

VariableDescription
CLINKER_ENVActive environment name. Equivalent to --env. Used by when: conditions in channel overrides to select environment-specific configuration.
CLINKER_METRICS_SPOOL_DIRDefault metrics spool directory. Overridden by --metrics-spool-dir.

Precedence (highest to lowest): CLI flag, environment variable, YAML config value.

Validation & Dry Run

Clinker provides two levels of pre-flight validation so you can catch problems before committing to a full run.

Config-only validation

clinker run pipeline.yaml --dry-run

This validates everything that can be checked without reading data:

  • YAML structure and required fields
  • CXL syntax and compile-time type checking
  • Schema compatibility between connected nodes
  • DAG wiring (no cycles, no dangling inputs, no missing nodes)
  • File path resolution (existence checks for inputs)

No records are read. No output files are created. The command exits with code 0 on success or code 1 with a diagnostic message on failure.

Use this after every YAML edit. It runs in milliseconds and catches the majority of configuration mistakes.

Record preview

clinker run pipeline.yaml --dry-run -n 10

This reads the first 10 records from each source and processes them through the full pipeline – transforms, aggregations, routing, and output formatting. Results are printed to stdout.

The record preview exercises the runtime evaluation path, catching issues that config-only validation cannot:

  • CXL expressions that are syntactically valid but fail at runtime (e.g., calling a string method on an integer)
  • Data format mismatches between the declared schema and actual file contents
  • Unexpected null values in required fields

Save preview to file

clinker run pipeline.yaml --dry-run -n 100 --dry-run-output preview.csv

The output format matches what the pipeline’s output node would produce, so preview.csv shows you exactly what the full run will write.

Use both validation levels in sequence before every production run:

  1. --dry-run – catch configuration and type errors instantly.
  2. --dry-run -n 10 – verify output shape and values against real data.
  3. Full run – execute with confidence.

This three-step pattern is especially valuable when:

  • Editing CXL expressions in transform or aggregate nodes
  • Changing source schemas or swapping input files
  • Adding or removing nodes from the pipeline DAG
  • Modifying route conditions

Combining with explain

You can also inspect the execution plan before running:

clinker run pipeline.yaml --explain

This shows the DAG structure, parallelism strategy, and node ordering without reading any data. See Explain Plans for details.

The typical full pre-flight sequence is:

clinker run pipeline.yaml --explain          # inspect the DAG
clinker run pipeline.yaml --dry-run          # validate config
clinker run pipeline.yaml --dry-run -n 10    # preview with data
clinker run pipeline.yaml --force            # run for real

Explain Plans

The --explain flag prints the execution plan – the DAG of nodes, their connections, and the parallelism strategy the optimizer has chosen – without reading any data.

Text format

clinker run pipeline.yaml --explain
# or explicitly:
clinker run pipeline.yaml --explain text

The text format shows a human-readable summary of the execution plan:

Execution Plan: customer_etl
============================

Node 0: customers (Source, parallel: file-chunked)
  -> transform_1

Node 1: transform_1 (Transform, parallel: record)
  -> route_1

Node 2: route_1 (Route, parallel: record)
  -> [high] output_high
  -> [default] output_standard

Node 3: output_high (Output, parallel: serial)

Node 4: output_standard (Output, parallel: serial)

Key information shown:

  • Node index and name – the topological position in the DAG
  • Node type – Source, Transform, Aggregate, Route, Merge, Output, Composition
  • Parallelism strategy – how the optimizer plans to execute the node
  • Connections – downstream nodes, with port labels for route branches

JSON format

clinker run pipeline.yaml --explain json

Produces a machine-readable JSON object for programmatic consumption. Useful for:

  • CI pipelines that need to assert plan properties
  • Custom dashboards that visualize execution plans
  • Diffing plans between config versions
# Compare plans before and after a config change
clinker run old.yaml --explain json > plan_old.json
clinker run new.yaml --explain json > plan_new.json
diff plan_old.json plan_new.json

Graphviz DOT format

clinker run pipeline.yaml --explain dot

Produces a Graphviz DOT graph. Pipe it to dot to render an image:

# PNG
clinker run pipeline.yaml --explain dot | dot -Tpng -o pipeline.png

# SVG (scalable, good for documentation)
clinker run pipeline.yaml --explain dot | dot -Tsvg -o pipeline.svg

# PDF
clinker run pipeline.yaml --explain dot | dot -Tpdf -o pipeline.pdf

This requires the graphviz package to be installed on the system.

The resulting diagram shows:

  • Nodes as labeled boxes with type and parallelism annotations
  • Edges as arrows with port labels where applicable
  • Branch/merge fan-out and fan-in structure

When to use explain

  • During development – verify the DAG shape matches your mental model before writing test data.
  • After adding route or merge nodes – confirm branch wiring is correct.
  • When tuning parallelism – check which strategy the optimizer selected for each node.
  • In code review – generate a DOT diagram and include it in the PR for visual confirmation.

Explain runs instantly because it only parses the YAML and builds the plan – no data is touched. Pair it with --dry-run for full config validation:

clinker run pipeline.yaml --explain       # inspect plan
clinker run pipeline.yaml --dry-run       # validate config

Memory Tuning

Clinker is designed to be a good neighbor on shared servers. Rather than consuming all available memory, it works within a configurable budget and spills to disk when necessary.

Setting the memory limit

CLI flag (highest priority):

clinker run pipeline.yaml --memory-limit 512M

YAML config:

pipeline:
  memory_limit: "512M"

The CLI flag overrides the YAML value. Accepted suffixes: K (kilobytes), M (megabytes), G (gigabytes).

Default: 256M

How it works

Clinker uses a custom accounting allocator to track memory usage across all pipeline nodes. When memory pressure rises toward the configured limit, aggregation operations spill intermediate state to temporary files on disk. The pipeline continues with degraded throughput rather than crashing with an out-of-memory error.

This means:

  • Pipelines always complete if disk space is available, regardless of input size.
  • Performance degrades gracefully under memory pressure – you will see slower execution, not failures.
  • The memory limit is a soft ceiling, not a hard wall. Momentary spikes may briefly exceed the limit before spill kicks in.

Sizing guidelines

WorkloadRecommended limitNotes
Small files (<10 MB)128MMinimal memory pressure
Medium files (10-50 MB)256M (default)Covers most ETL jobs
Large files or complex aggregations512M-1GMultiple group-by keys, large cardinality
Multiple large group-by keys1G+High-cardinality distinct values

Target workload: Clinker is optimized for 1-5 input files of up to 100 MB each, processing 10K-2M records per run.

Aggregation strategy interaction

Memory consumption depends heavily on the aggregation strategy the optimizer selects:

  • Hash aggregation accumulates state in a hash map. Memory usage is proportional to the number of distinct group-by values. With high-cardinality keys, this can consume significant memory before spill triggers.

  • Streaming aggregation processes groups in order and emits results as each group completes. Memory usage is minimal (proportional to a single group’s state) but requires the input to be sorted by the group-by keys.

  • strategy: auto (the default) lets the optimizer choose based on the declared sort order of the input. If the data arrives sorted by the group-by keys, streaming aggregation is selected automatically.

To influence strategy selection:

  - type: aggregate
    name: rollup
    input: sorted_data
    config:
      group_by: [department]
      strategy: streaming    # force streaming (input MUST be sorted)
      cxl: |
        emit total = sum(amount)

Only force streaming when you are certain the input is sorted by the group-by keys. If the data is not sorted, results will be incorrect. Use auto when in doubt.

Monitoring memory usage

Use the metrics system to track peak_rss_bytes across runs:

clinker run pipeline.yaml --metrics-spool-dir ./metrics/

The metrics file includes peak_rss_bytes, which shows the maximum resident memory during execution. If this consistently approaches your memory limit, consider increasing the budget or restructuring the pipeline to reduce intermediate state.

Shared server considerations

On servers running JVM applications, memory is often at a premium. Recommendations:

  • Set --memory-limit explicitly rather than relying on the default. Know your budget.
  • Use --threads to limit CPU contention alongside memory limits.
  • Monitor peak_rss_bytes in production metrics to right-size the limit over time.
  • Schedule large pipelines during off-peak hours when JVM heap pressure is lower.

Metrics & Monitoring

Clinker writes per-execution metrics as JSON files to a spool directory. These files can be collected into an NDJSON archive for ingestion into monitoring systems.

Enabling metrics

There are three ways to enable metrics collection, listed from highest to lowest priority:

CLI flag:

clinker run pipeline.yaml --metrics-spool-dir ./metrics/

Environment variable:

export CLINKER_METRICS_SPOOL_DIR=./metrics/
clinker run pipeline.yaml

YAML config:

pipeline:
  metrics:
    spool_dir: "./metrics/"

When metrics are enabled, each execution writes one JSON file to the spool directory, named <execution_id>.json.

Metrics schema

Each metrics file follows schema version 1:

{
  "execution_id": "01912345-6789-7abc-def0-123456789abc",
  "schema_version": 1,
  "pipeline_name": "customer_etl",
  "config_path": "/opt/clinker/pipelines/daily_etl.yaml",
  "hostname": "prod-etl-01",
  "started_at": "2026-04-11T10:00:00Z",
  "finished_at": "2026-04-11T10:00:05Z",
  "duration_ms": 5000,
  "exit_code": 0,
  "records_total": 50000,
  "records_ok": 49950,
  "records_dlq": 50,
  "execution_mode": "streaming",
  "peak_rss_bytes": 134217728,
  "thread_count": 4,
  "input_files": ["./data/customers.csv"],
  "output_files": ["./output/enriched.csv"],
  "dlq_path": "./output/errors.csv",
  "error": null
}

Field reference

FieldTypeDescription
execution_idstringUUID v7 or custom --batch-id value
schema_versionintegerAlways 1 for this release
pipeline_namestringThe name from the pipeline YAML
config_pathstringAbsolute path to the config file
hostnamestringMachine hostname
started_atstringISO 8601 UTC timestamp
finished_atstringISO 8601 UTC timestamp
duration_msintegerWall-clock duration in milliseconds
exit_codeintegerProcess exit code (see Exit Codes)
records_totalintegerTotal records read from all sources
records_okintegerRecords that reached an output node
records_dlqintegerRecords routed to the dead-letter queue
execution_modestringstreaming or batch
peak_rss_bytesintegerMaximum resident set size during execution
thread_countintegerThread pool size used
input_filesarrayPaths to all source files
output_filesarrayPaths to all output files written
dlq_pathstring/nullPath to the DLQ file, or null if none
errorstring/nullError message on failure, or null on success

Collecting metrics

The spool directory accumulates one file per execution. Use clinker metrics collect to sweep them into an NDJSON archive:

clinker metrics collect \
  --spool-dir ./metrics/ \
  --output-file ./metrics/archive.ndjson \
  --delete-after-collect

This appends all spool files to the archive (one JSON object per line) and removes the originals. The NDJSON format is compatible with most log aggregation and monitoring tools.

Preview without writing:

clinker metrics collect \
  --spool-dir ./metrics/ \
  --output-file ./metrics/archive.ndjson \
  --dry-run

Integration with monitoring systems

Grafana / Prometheus

Parse the NDJSON archive with a log shipper (Promtail, Filebeat, Vector) and create dashboards tracking:

  • duration_ms – execution time trends
  • records_dlq – data quality over time
  • peak_rss_bytes – memory utilization

Datadog

Ship NDJSON to Datadog Logs, then create metrics from log attributes:

# Example: tail the archive and ship to Datadog
tail -f ./metrics/archive.ndjson | datadog-agent log-stream

ELK Stack

Filebeat can ingest NDJSON directly:

# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/clinker/metrics.ndjson
    json.keys_under_root: true

Simple alerting with jq

For environments without a full monitoring stack, use jq to query the archive directly:

# Find all runs with DLQ entries in the last 24 hours
jq 'select(.records_dlq > 0)' metrics/archive.ndjson

# Find runs that exceeded 400MB RSS
jq 'select(.peak_rss_bytes > 419430400)' metrics/archive.ndjson

# Average duration by pipeline
jq -s 'group_by(.pipeline_name) | map({
  pipeline: .[0].pipeline_name,
  avg_ms: (map(.duration_ms) | add / length)
})' metrics/archive.ndjson

Operational recommendations

  • Always enable metrics in production. The overhead is negligible (one small JSON write at the end of each run).
  • Run metrics collect --delete-after-collect on a schedule (e.g., hourly) to prevent spool directory growth.
  • Use --batch-id with meaningful identifiers to correlate metrics across retries and environments.
  • Alert on records_dlq > 0 to catch data quality regressions early.
  • Track peak_rss_bytes trends to anticipate when memory limits need adjustment.

Exit Codes & Error Diagnosis

Clinker uses structured exit codes to communicate the outcome of a pipeline run. These codes are designed for integration with schedulers, cron, CI systems, and monitoring tools.

Exit code reference

CodeMeaningDescription
0SuccessPipeline completed. All records processed successfully.
1Configuration errorInvalid YAML, CXL syntax error, type mismatch, or DAG wiring problem. Fix the pipeline configuration.
2Partial successPipeline ran to completion, but some records were routed to the dead-letter queue. Check the DLQ file.
3Evaluation errorCXL runtime error during record processing (e.g., division by zero, type coercion failure).
4I/O errorFile not found, permission denied, disk full, or input format mismatch.

Understanding exit code 2

Exit code 2 is not a crash. It means:

  • The pipeline started and ran to completion.
  • All viable records were processed and written to output files.
  • Some records could not be processed and were diverted to the dead-letter queue.

Your scheduler should treat exit code 2 as a warning, not a failure. The DLQ file contains the problematic records along with the error that caused each one to be rejected.

To control when exit code 2 escalates to a hard failure, use --error-threshold:

# Abort if more than 100 records hit the DLQ
clinker run pipeline.yaml --error-threshold 100

With a threshold set, the pipeline aborts (exit code 3) when the DLQ count exceeds the threshold, rather than continuing to completion.

Diagnosing failures

Exit code 1: Configuration error

The error message includes a span-annotated diagnostic pointing to the exact location of the problem:

Error: CXL type error in node 'transform_1'
  --> pipeline.yaml:25:15
   |
25 |   emit total = amount + name
   |                ^^^^^^^^^^^^^ cannot add Int and String

Action: Fix the YAML or CXL expression indicated in the diagnostic, then re-run with --dry-run to confirm the fix.

Exit code 2: Partial success (DLQ entries)

Check the DLQ file for details:

# The DLQ path is shown in the run output and in metrics
cat output/errors.csv

Common causes:

  • Null values in fields that a CXL expression does not handle
  • Data that does not match the declared schema (e.g., non-numeric value in an integer column)
  • Coercion failures between types

Action: Review the DLQ records, fix the data or add null handling to CXL expressions, and re-run.

Exit code 3: Evaluation error

A CXL expression failed at runtime. The error message includes the failing expression and the record that triggered it:

Error: division by zero in node 'compute_ratio'
  expression: emit ratio = total / count
  record: {total: 500, count: 0}

Action: Add guard conditions to the CXL expression:

emit ratio = if count == 0 then 0 else total / count

Exit code 4: I/O error

File system or format errors:

Error: file not found: ./data/customers.csv
  --> pipeline.yaml:8:12

Common causes:

  • Input file does not exist or path is wrong
  • Permission denied on input or output directories
  • Output file already exists (use --force to overwrite)
  • Disk full during output writing
  • Input file format does not match the declared type (e.g., invalid CSV)

Action: Fix file paths, permissions, or disk space, then re-run.

Scheduler integration

Cron script

#!/bin/bash
set -euo pipefail

PIPELINE=/opt/clinker/pipelines/daily_etl.yaml
METRICS_DIR=/var/spool/clinker/

clinker run "$PIPELINE" \
  --memory-limit 512M \
  --log-level warn \
  --metrics-spool-dir "$METRICS_DIR" \
  --force

EXIT=$?

case $EXIT in
  0)
    echo "$(date): Success" >> /var/log/clinker/daily_etl.log
    ;;
  2)
    echo "$(date): Warning - DLQ entries produced" >> /var/log/clinker/daily_etl.log
    mail -s "Clinker ETL Warning: DLQ entries" ops@company.com < /dev/null
    ;;
  *)
    echo "$(date): FAILURE (exit code $EXIT)" >> /var/log/clinker/daily_etl.log
    mail -s "Clinker ETL FAILURE (exit $EXIT)" ops@company.com < /dev/null
    ;;
esac

exit $EXIT

CI pipeline (GitHub Actions)

- name: Run ETL pipeline
  run: clinker run pipeline.yaml --dry-run
  # Exit code 1 fails the build on config errors

- name: Smoke test with real data
  run: clinker run pipeline.yaml --dry-run -n 100
  # Catches runtime evaluation errors

Systemd

Systemd Type=oneshot services interpret non-zero exit codes as failures. To allow exit code 2 (partial success) without triggering service failure:

[Service]
Type=oneshot
SuccessExitStatus=2
ExecStart=/opt/clinker/bin/clinker run /opt/clinker/pipelines/daily_etl.yaml --force

Production Deployment

Clinker is a single statically-linked binary with no runtime dependencies. Deployment is straightforward: copy the binary to the server.

Installation

# Copy the binary
scp target/release/clinker user@server:/opt/clinker/bin/

# Verify it runs
ssh user@server /opt/clinker/bin/clinker --version

No JVM, no Python, no container runtime required.

/opt/clinker/
  bin/
    clinker                    # The binary
  pipelines/
    daily_etl.yaml             # Pipeline configs
    weekly_report.yaml
  data/                        # Input data (or symlinks to data locations)
  output/                      # Output files
  rules/                       # CXL module files (for use statements)
  metrics/                     # Metrics spool directory

Create a dedicated user:

sudo useradd --system --home-dir /opt/clinker --shell /usr/sbin/nologin clinker
sudo chown -R clinker:clinker /opt/clinker

Systemd service

For scheduled one-shot execution:

[Unit]
Description=Clinker ETL - Daily Customer Processing
After=network.target

[Service]
Type=oneshot
ExecStart=/opt/clinker/bin/clinker run /opt/clinker/pipelines/daily_etl.yaml \
  --memory-limit 512M \
  --log-level warn \
  --metrics-spool-dir /var/spool/clinker/ \
  --force
WorkingDirectory=/opt/clinker
User=clinker
Group=clinker
SuccessExitStatus=2

# Resource limits
MemoryMax=1G
CPUQuota=200%

# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=clinker-daily

[Install]
WantedBy=multi-user.target

Pair with a systemd timer for scheduling:

[Unit]
Description=Run Clinker daily ETL at 2 AM

[Timer]
OnCalendar=*-*-* 02:00:00
Persistent=true

[Install]
WantedBy=timers.target
sudo systemctl enable --now clinker-daily.timer

Note: SuccessExitStatus=2 tells systemd that exit code 2 (partial success with DLQ entries) is not a service failure. See Exit Codes for the full reference.

Cron scheduling

# Run daily at 2 AM, log to syslog
0 2 * * * /opt/clinker/bin/clinker run \
  /opt/clinker/pipelines/daily_etl.yaml \
  --log-level warn --force \
  2>&1 | logger -t clinker

# Collect metrics hourly
0 * * * * /opt/clinker/bin/clinker metrics collect \
  --spool-dir /var/spool/clinker/ \
  --output-file /var/log/clinker/metrics.ndjson \
  --delete-after-collect

Environment-based configuration

Use the CLINKER_ENV variable or --env flag to activate environment-specific overrides:

# Production
CLINKER_ENV=production clinker run pipeline.yaml

# Staging
CLINKER_ENV=staging clinker run pipeline.yaml

Combined with channel overrides in the pipeline YAML, this allows a single pipeline definition to target different file paths, connection strings, or thresholds per environment.

Logging

Log levels for production

LevelUse case
warnRecommended for production cron jobs. Prints warnings and errors only.
infoDefault. Includes progress messages. Useful during initial deployment.
errorMinimal output. Only prints when something fails.
debugTroubleshooting. Generates significant output.
traceDevelopment only. Extremely verbose.

Directing logs

To syslog via logger:

clinker run pipeline.yaml --log-level warn 2>&1 | logger -t clinker

To a log file:

clinker run pipeline.yaml --log-level warn 2>> /var/log/clinker/etl.log

Systemd journal captures stdout and stderr automatically when running as a service.

DLQ monitoring

When a pipeline exits with code 2, records that could not be processed are written to the dead-letter queue file. Set up a daily check:

#!/bin/bash
# Check for DLQ files produced today
DLQ_DIR=/opt/clinker/output/
DLQ_FILES=$(find "$DLQ_DIR" -name "*_errors.csv" -mtime 0 -size +0c)

if [ -n "$DLQ_FILES" ]; then
    echo "DLQ entries found:" | mail -s "Clinker DLQ Alert" ops@company.com <<EOF
The following DLQ files were produced today:

$DLQ_FILES

Review the files and address data quality issues.
EOF
fi

Batch ID for tracing

Use --batch-id with a meaningful, consistent naming scheme:

# Date-based
clinker run pipeline.yaml --batch-id "daily-$(date +%Y-%m-%d)"

# Include environment
clinker run pipeline.yaml --batch-id "prod-daily-$(date +%Y-%m-%d)"

The batch ID appears in metrics output and log lines, making it easy to correlate a specific run across logs, metrics, and DLQ files. On retries, use a different batch ID (e.g., append -retry-1) to distinguish attempts.

Upgrades

To upgrade Clinker:

  1. Validate the new version against your pipelines:
    /opt/clinker/bin/clinker-new run pipeline.yaml --dry-run
    
  2. Replace the binary:
    cp clinker-new /opt/clinker/bin/clinker
    
  3. Verify:
    /opt/clinker/bin/clinker --version
    

There is no configuration migration. Pipeline YAML files are forward-compatible within the same major version.

CSV-to-CSV Transform

This recipe reads employee data from a CSV file, computes salary tiers using CXL expressions, and writes the enriched result to a new CSV file.

Input data

employees.csv:

id,name,department,salary
1,Alice Chen,Engineering,95000
2,Bob Martinez,Marketing,62000
3,Carol Johnson,Engineering,88000
4,Dave Williams,Sales,71000
5,Eva Brown,Marketing,58000
6,Frank Lee,Engineering,102000

Pipeline

salary_tiers.yaml:

pipeline:
  name: salary_tiers

nodes:
  - type: source
    name: employees
    config:
      name: employees
      type: csv
      path: "./employees.csv"
      schema:
        - { name: id, type: int }
        - { name: name, type: string }
        - { name: department, type: string }
        - { name: salary, type: int }

  - type: transform
    name: classify
    input: employees
    config:
      cxl: |
        emit id = id
        emit name = name
        emit department = department
        emit salary = salary
        emit level = if salary >= 90000 then "senior" else "junior"
        emit salary_band = match {
          salary >= 100000 => "100k+",
          salary >= 90000 => "90-100k",
          salary >= 70000 => "70-90k",
          _ => "under 70k"
        }

  - type: output
    name: report
    input: classify
    config:
      name: salary_report
      type: csv
      path: "./output/salary_report.csv"

error_handling:
  strategy: fail_fast

Run it

# Validate first
clinker run salary_tiers.yaml --dry-run

# Preview output
clinker run salary_tiers.yaml --dry-run -n 3

# Full run
clinker run salary_tiers.yaml

Expected output

output/salary_report.csv:

id,name,department,salary,level,salary_band
1,Alice Chen,Engineering,95000,senior,90-100k
2,Bob Martinez,Marketing,62000,junior,under 70k
3,Carol Johnson,Engineering,88000,junior,70-90k
4,Dave Williams,Sales,71000,junior,70-90k
5,Eva Brown,Marketing,58000,junior,under 70k
6,Frank Lee,Engineering,102000,senior,100k+

Key points

Schema declaration. The source node declares the schema explicitly with typed columns. This enables compile-time type checking of CXL expressions – if you write salary + name, the type checker catches the error before any data is read.

Emit statements. Each emit in the transform produces one output column. The output schema is defined entirely by the emit statements – input columns that are not emitted are dropped. This is intentional: explicit output schemas prevent accidental data leakage.

Match expressions. The match block evaluates conditions top to bottom and returns the value of the first matching arm. The _ wildcard is the default case and must appear last.

Error handling. The fail_fast strategy aborts the pipeline on the first record error. For production pipelines processing dirty data, consider dead_letter_queue instead – see Error Handling & DLQ.

Variations

Filtering records

Add a filter statement to exclude records:

  - type: transform
    name: classify
    input: employees
    config:
      cxl: |
        filter salary >= 60000
        emit id = id
        emit name = name
        emit salary = salary

Records where salary < 60000 are dropped silently – they do not appear in the output or the DLQ.

Computed columns with type conversion

      cxl: |
        emit id = id
        emit name = name
        emit monthly_salary = (salary.to_float() / 12.0).round(2)
        emit salary_display = "$" + salary.to_string()

The .to_float() conversion is required because salary is declared as int and division by a float literal requires matching types.

Multi-Source Lookup

This recipe enriches order records with product information from a separate catalog file using the lookup: transform configuration.

Input data

orders.csv:

order_id,product_id,quantity,unit_price
ORD-001,PROD-A,5,29.99
ORD-002,PROD-B,2,149.99
ORD-003,PROD-A,1,29.99
ORD-004,PROD-C,10,9.99
ORD-005,PROD-B,3,149.99

products.csv:

product_id,product_name,category
PROD-A,Widget Pro,Hardware
PROD-B,DataSync License,Software
PROD-C,Cable Kit,Hardware

Pipeline

order_enrichment.yaml:

pipeline:
  name: order_enrichment

nodes:
  - type: source
    name: orders
    config:
      name: orders
      type: csv
      path: "./orders.csv"
      schema:
        - { name: order_id, type: string }
        - { name: product_id, type: string }
        - { name: quantity, type: int }
        - { name: unit_price, type: float }

  - type: source
    name: products
    config:
      name: products
      type: csv
      path: "./products.csv"
      schema:
        - { name: product_id, type: string }
        - { name: product_name, type: string }
        - { name: category, type: string }

  - type: transform
    name: enrich
    input: orders
    config:
      lookup:
        source: products
        where: "product_id == products.product_id"
      cxl: |
        emit order_id = order_id
        emit product_id = product_id
        emit product_name = products.product_name
        emit category = products.category
        emit quantity = quantity
        emit unit_price = unit_price
        emit line_total = quantity.to_float() * unit_price

  - type: output
    name: result
    input: enrich
    config:
      name: enriched_orders
      type: csv
      path: "./output/enriched_orders.csv"

Run it

clinker run order_enrichment.yaml --dry-run
clinker run order_enrichment.yaml --dry-run -n 3
clinker run order_enrichment.yaml

Expected output

output/enriched_orders.csv:

order_id,product_id,product_name,category,quantity,unit_price,line_total
ORD-001,PROD-A,Widget Pro,Hardware,5,29.99,149.95
ORD-002,PROD-B,DataSync License,Software,2,149.99,299.98
ORD-003,PROD-A,Widget Pro,Hardware,1,29.99,29.99
ORD-004,PROD-C,Cable Kit,Hardware,10,9.99,99.90
ORD-005,PROD-B,DataSync License,Software,3,149.99,449.97

How lookup works

The lookup: configuration on a transform node loads a secondary source into memory and matches each input record against it:

  • source – the name of another source node in the pipeline (here, products).
  • where – a CXL boolean expression evaluated per candidate lookup row. Bare field names reference the primary input; qualified names (products.field) reference the lookup source.
  • on_miss (optional) – what to do when no lookup row matches: null_fields (default), skip, or error.
  • match (optional) – first (default, 1:1) or all (1:N fan-out).

Lookup fields are accessed using qualified syntax: products.product_name, products.category, etc.

Unmatched records

By default (on_miss: null_fields), unmatched records emit with all lookup fields set to null. Other options:

lookup:
  source: products
  where: "product_id == products.product_id"
  on_miss: skip        # drop records with no match
lookup:
  source: products
  where: "product_id == products.product_id"
  on_miss: error       # fail the pipeline on first unmatched record

Range predicates

Lookups support any CXL boolean expression, not just equality. This classifies employees into pay bands:

- type: transform
  name: classify
  input: employees
  config:
    lookup:
      source: rate_bands
      where: |
        ee_group == rate_bands.ee_group
        and pay >= rate_bands.min_pay
        and pay <= rate_bands.max_pay
    cxl: |
      emit employee_id = employee_id
      emit rate_class = rate_bands.rate_class

Fan-out (match: all)

When match: all is set, each matching lookup row produces a separate output record. This is useful for one-to-many enrichments:

- type: transform
  name: expand_benefits
  input: employees
  config:
    lookup:
      source: benefits
      where: "department == benefits.department"
      match: all
    cxl: |
      emit employee_id = employee_id
      emit benefit = benefits.benefit_name

An employee in a department with 3 benefits produces 3 output records.

Multiple lookup sources

Each transform can reference one lookup source. Chain transforms to enrich from multiple sources:

- type: transform
  name: enrich_product
  input: orders
  config:
    lookup:
      source: products
      where: "product_id == products.product_id"
    cxl: |
      emit order_id = order_id
      emit product_name = products.product_name
      emit customer_id = customer_id
      emit amount = amount

- type: transform
  name: enrich_customer
  input: enrich_product
  config:
    lookup:
      source: customers
      where: "customer_id == customers.customer_id"
    cxl: |
      emit order_id = order_id
      emit product_name = product_name
      emit customer_name = customers.customer_name
      emit amount = amount

Memory considerations

The entire lookup source is loaded into memory. For large reference files, ensure --memory-limit accounts for this. A 10 MB CSV lookup table uses roughly 10-20 MB of heap when loaded.

Routing to Multiple Outputs

This recipe splits a stream of order records into separate output files based on business rules. High-value orders go to one file, standard orders to another.

Input data

orders.csv:

order_id,customer,amount,region
ORD-001,Acme Corp,15000,US
ORD-002,Globex,450,EU
ORD-003,Initech,8500,US
ORD-004,Umbrella,22000,APAC
ORD-005,Stark Ind,950,US
ORD-006,Wayne Ent,3200,EU

Pipeline

order_routing.yaml:

pipeline:
  name: order_routing
  vars:
    high_value_threshold: 5000

nodes:
  - type: source
    name: orders
    config:
      name: orders
      type: csv
      path: "./orders.csv"
      schema:
        - { name: order_id, type: string }
        - { name: customer, type: string }
        - { name: amount, type: float }
        - { name: region, type: string }

  - type: route
    name: split_by_value
    input: orders
    config:
      mode: exclusive
      conditions:
        high: "amount >= $vars.high_value_threshold"
      default: standard

  - type: output
    name: high_value_output
    input: split_by_value.high
    config:
      name: high_value_orders
      type: csv
      path: "./output/high_value.csv"

  - type: output
    name: standard_output
    input: split_by_value.standard
    config:
      name: standard_orders
      type: csv
      path: "./output/standard.csv"

Run it

clinker run order_routing.yaml --dry-run
clinker run order_routing.yaml

Expected output

output/high_value.csv:

order_id,customer,amount,region
ORD-001,Acme Corp,15000,US
ORD-003,Initech,8500,US
ORD-004,Umbrella,22000,APAC

output/standard.csv:

order_id,customer,amount,region
ORD-002,Globex,450,EU
ORD-005,Stark Ind,950,US
ORD-006,Wayne Ent,3200,EU

How routing works

Port syntax

Route nodes produce named output ports. Downstream nodes reference these ports using dot syntax: split_by_value.high and split_by_value.standard.

The port names come from two places:

  • Condition names in the conditions map (here, high)
  • The default field (here, standard)

Exclusive mode

With mode: exclusive, each record goes to exactly one branch. Conditions are evaluated top to bottom – the first matching condition wins, and the record is sent to that port. Records that match no condition go to the default port.

Pipeline variables

The threshold is defined in pipeline.vars and referenced in the CXL expression as $vars.high_value_threshold. This makes it easy to adjust the threshold without editing the route condition, and channel overrides can change it per environment.

Variations

Multiple branches

Route nodes can have any number of named branches:

  - type: route
    name: split_by_region
    input: orders
    config:
      mode: exclusive
      conditions:
        us: "region == \"US\""
        eu: "region == \"EU\""
        apac: "region == \"APAC\""
      default: other

  - type: output
    name: us_output
    input: split_by_region.us
    config:
      name: us_orders
      type: csv
      path: "./output/us_orders.csv"

  - type: output
    name: eu_output
    input: split_by_region.eu
    config:
      name: eu_orders
      type: csv
      path: "./output/eu_orders.csv"

  # ... additional outputs for apac, other

Transform before output

Insert a transform between the route and output to shape the data differently per branch:

  - type: transform
    name: enrich_high_value
    input: split_by_value.high
    config:
      cxl: |
        emit order_id = order_id
        emit customer = customer
        emit amount = amount
        emit priority = "URGENT"
        emit review_required = true

  - type: output
    name: high_value_output
    input: enrich_high_value
    config:
      name: high_value_orders
      type: csv
      path: "./output/high_value.csv"

Combining routing with aggregation

Route first, then aggregate each branch independently:

  - type: aggregate
    name: high_value_summary
    input: split_by_value.high
    config:
      group_by: [region]
      cxl: |
        emit total = sum(amount)
        emit count = count(*)

This produces a per-region summary of high-value orders only.

Aggregation & Rollups

This recipe demonstrates grouping records and computing summary statistics. The pipeline filters active sales records, then rolls them up by department.

Input data

sales.csv:

id,department,amount,status,rep
1,Engineering,5000,active,Alice
2,Marketing,3000,active,Bob
3,Engineering,7000,active,Carol
4,Sales,4000,inactive,Dave
5,Marketing,2000,active,Eva
6,Engineering,9500,active,Frank
7,Sales,6000,active,Grace
8,Marketing,1500,inactive,Hank

Pipeline

dept_rollup.yaml:

pipeline:
  name: dept_rollup

nodes:
  - type: source
    name: sales
    config:
      name: sales
      type: csv
      path: "./sales.csv"
      schema:
        - { name: id, type: int }
        - { name: department, type: string }
        - { name: amount, type: float }
        - { name: status, type: string }
        - { name: rep, type: string }

  - type: transform
    name: active_only
    input: sales
    config:
      cxl: |
        filter status == "active"

  - type: aggregate
    name: rollup
    input: active_only
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)
        emit count = count(*)
        emit average = avg(amount)
        emit maximum = max(amount)
        emit minimum = min(amount)

  - type: output
    name: report
    input: rollup
    config:
      name: dept_totals
      type: csv
      path: "./output/dept_totals.csv"

Run it

clinker run dept_rollup.yaml --dry-run
clinker run dept_rollup.yaml

Expected output

output/dept_totals.csv:

department,total,count,average,maximum,minimum
Engineering,21500,3,7166.67,9500,5000
Marketing,5000,2,2500,3000,2000
Sales,6000,1,6000,6000,6000

One row per department. The inactive records (Dave’s $4000, Hank’s $1500) are excluded by the filter.

How aggregation works

Group-by keys

The group_by field lists the columns that define each group. Records with the same values for all group-by columns are aggregated together. The group-by columns appear automatically in the output – you do not need to emit them.

Aggregate functions

Available aggregate functions in CXL:

FunctionDescription
sum(expr)Sum of values
count(*)Number of records
avg(expr)Arithmetic mean
min(expr)Minimum value
max(expr)Maximum value
first(expr)First value encountered
last(expr)Last value encountered

Strategy selection

Clinker offers two aggregation strategies:

  • Hash aggregation (default): Builds an in-memory hash map keyed by the group-by columns. Works with any input order. Memory usage is proportional to the number of distinct groups.

  • Streaming aggregation: Processes records in order, emitting each group’s result as soon as the next group starts. Requires input sorted by the group-by keys. Uses minimal memory regardless of the number of groups.

The default strategy (auto) selects streaming when the optimizer can prove the input is sorted by the group-by keys, and hash otherwise. You can force a strategy:

    config:
      group_by: [department]
      strategy: streaming   # requires sorted input

See Memory Tuning for details on memory implications.

Variations

Multiple group-by keys

    config:
      group_by: [department, region]
      cxl: |
        emit total = sum(amount)
        emit count = count(*)

Produces one row per unique (department, region) combination.

Pre-aggregation transform

Compute derived fields before aggregating:

  - type: transform
    name: prepare
    input: sales
    config:
      cxl: |
        filter status == "active"
        emit department = department
        emit amount = amount
        emit is_large = amount >= 5000

  - type: aggregate
    name: rollup
    input: prepare
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)
        emit large_count = sum(if is_large then 1 else 0)
        emit small_count = sum(if not is_large then 1 else 0)

Aggregation followed by routing

Aggregate first, then route the summary rows:

  - type: aggregate
    name: rollup
    input: active_only
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)

  - type: route
    name: split_by_total
    input: rollup
    config:
      mode: exclusive
      conditions:
        large: "total >= 10000"
      default: small

This routes departments with over $10,000 in total sales to one output and the rest to another.

No group-by (grand total)

Omit group_by to aggregate all records into a single output row:

    config:
      cxl: |
        emit grand_total = sum(amount)
        emit record_count = count(*)
        emit average_amount = avg(amount)

File Splitting

This recipe demonstrates splitting large output files into smaller chunks, optionally keeping related records together.

Basic record-count splitting

Split output into files of at most 5,000 records each:

pipeline:
  name: monthly_report

nodes:
  - type: source
    name: transactions
    config:
      name: transactions
      type: csv
      path: "./data/transactions.csv"
      schema:
        - { name: id, type: int }
        - { name: date, type: string }
        - { name: department, type: string }
        - { name: amount, type: float }
        - { name: description, type: string }

  - type: output
    name: split_output
    input: transactions
    config:
      name: monthly_report
      type: csv
      path: "./output/report.csv"
      split:
        max_records: 5000
        naming: "{stem}_{seq:04}.{ext}"
        repeat_header: true

Output files

output/report_0001.csv  (5000 records + header)
output/report_0002.csv  (5000 records + header)
output/report_0003.csv  (remaining records + header)

Naming pattern variables

VariableDescriptionExample
{stem}Base filename without extensionreport
{ext}File extensioncsv
{seq:04}Zero-padded sequence number (width 4)0001

The path field provides the template: ./output/report.csv means stem is report and ext is csv.

Header behavior

When repeat_header: true, each output file includes the CSV header row. This is the recommended setting – each file is self-contained and can be processed independently.

Grouped splitting

Keep all records with the same group key value in the same file:

      split:
        max_records: 5000
        group_key: "department"
        naming: "{stem}_{seq:04}.{ext}"
        repeat_header: true
        oversize_group: warn

With group_key: "department", the splitter ensures that all records for a given department land in the same output file. A new file starts only at a group boundary (when the department value changes), even if the current file has not reached max_records yet.

Oversize group policy

If a single group contains more records than max_records, the oversize_group setting controls behavior:

PolicyBehavior
warn (default)Log a warning and write all records for the group into one file, exceeding the limit
errorStop the pipeline with an error
allowSilently allow the oversized file

For example, if max_records is 5,000 but the Engineering department has 7,000 records, the warn policy produces a file with 7,000 records and logs a warning.

Byte-based splitting

Split by file size instead of record count:

      split:
        max_bytes: 10485760  # 10 MB per file
        naming: "{stem}_{seq:04}.{ext}"
        repeat_header: true

The splitter estimates the current file size and starts a new file when the limit is approached. The actual file size may slightly exceed the limit because the current record is always completed before splitting.

Combined limits

Use both max_records and max_bytes together – whichever limit is reached first triggers a new file:

      split:
        max_records: 10000
        max_bytes: 5242880   # 5 MB
        naming: "{stem}_{seq:04}.{ext}"
        repeat_header: true

This is useful when record sizes vary widely. Short records might produce a tiny file at 10,000 records, while long records might hit the byte limit well before 10,000.

Full pipeline example

A complete pipeline that reads a large transaction file, filters it, and splits the output:

pipeline:
  name: split_transactions

nodes:
  - type: source
    name: transactions
    config:
      name: transactions
      type: csv
      path: "./data/all_transactions.csv"
      schema:
        - { name: id, type: int }
        - { name: date, type: string }
        - { name: department, type: string }
        - { name: category, type: string }
        - { name: amount, type: float }

  - type: transform
    name: current_year
    input: transactions
    config:
      cxl: |
        filter date.starts_with("2026")

  - type: output
    name: chunked
    input: current_year
    config:
      name: transactions_2026
      type: csv
      path: "./output/transactions_2026.csv"
      split:
        max_records: 5000
        group_key: "department"
        naming: "{stem}_{seq:04}.{ext}"
        repeat_header: true
        oversize_group: warn
clinker run split_transactions.yaml --force

Practical considerations

  • Downstream consumers. Splitting is useful when the receiving system has file size limits (e.g., an upload API that accepts files up to 10 MB) or when parallel processing of chunks is desired.

  • Record ordering. Records within each output file maintain their original order from the pipeline. Across files, the sequence number ({seq}) indicates the order.

  • Group key sorting. For group_key to work correctly, the input should ideally be sorted by the group key. If the input is not sorted, records for the same group may appear in multiple files. Pre-sort with a transform if needed, or accept the split-group behavior.

  • Overwrite behavior. Use --force when re-running a pipeline with splitting enabled. Without it, the pipeline aborts if any of the output chunk files already exist.