Skip to content

Writing Transforms

This walks through building a schedule pipeline from scratch. The same patterns apply to realtime — see the RT builtins reference for the protobuf-specific API.

Create a pipeline folder

A pipeline is a folder of .py files. No config file, no registry — the folder IS the pipeline definition.

sound-transit/
  __init__.py          # INPUTS manifest (required)
  calendar_cleanup.py
  route_metadata.py
  station_renames.py

Files are scanned alphabetically. The framework discovers all Step instances defined at module level.

Declare the pipeline's inputs

Every pipeline must declare its named inputs in __init__.py — the manifest is the pipeline's input contract:

# sound-transit/__init__.py
INPUTS = {
    "schedule": "gtfs_schedule_zip",
    "stop_overrides": "csv_table",
}

Each content_kind determines the shape transforms see when they read the input:

content_kind ctx.inputs[name] shape
gtfs_schedule_zip dict[str, polars.DataFrame] keyed by GTFS filename
csv_table polars.DataFrame (all string columns)
gtfs_rt_protobuf gtfs_realtime_pb2.FeedMessage
opaque_bytes raw bytes

The worker validates every dispatch against this manifest, and the CLI uses it to resolve --input flags in local runs. Pipelines without an INPUTS manifest fail at scan time.

Tip

Organize files however makes sense for your team — by concern, by GTFS file, by who owns the rules. The framework doesn't care about file names or structure within the folder.

Using builtins

Most transforms are parameterized builtins. Import and instantiate them as module-level variables:

# calendar_cleanup.py
from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition

# Remove calendar entries with no active service days
remove_inactive = RemoveRows(
    "calendar.txt",
    [
        MatchCondition("monday", value="0"),
        MatchCondition("tuesday", value="0"),
        MatchCondition("wednesday", value="0"),
        MatchCondition("thursday", value="0"),
        MatchCondition("friday", value="0"),
        MatchCondition("saturday", value="0"),
        MatchCondition("sunday", value="0"),
    ],
    description="Remove calendar records with no active service days",
)

# Remove LLR service IDs (regex match)
remove_llr = RemoveRows(
    "calendar.txt",
    [MatchCondition("service_id", regex=r"^LLR.*")],
    after=[remove_inactive],
)

The variable name (remove_inactive, remove_llr) becomes the step name in the DAG.

Writing custom transforms

When builtins don't cover your use case, use the @step decorator:

# station_renames.py
import polars as pl
from continuous_gtfs import step
from .route_metadata import update_1line  # cross-file import for DAG ordering

@step(files=["stops.txt"], after=[update_1line])
def normalize_stop_names(ctx):
    """Strip whitespace and normalize capitalization."""
    df = ctx.output["stops.txt"]
    ctx.output["stops.txt"] = df.with_columns(
        pl.col("stop_name").str.strip_chars().str.to_titlecase()
    )

The @step decorator returns a Step instance (not a function). The framework treats it identically to builtins.

What your function receives

Your function gets a PipelineContext with two separate data handles:

  • ctx.inputs — read-only, populated by the framework before the first step runs. Each entry is the already-parsed value for a named input declared in INPUTS. For a schedule pipeline whose INPUTS["schedule"] = "gtfs_schedule_zip", ctx.inputs["schedule"] is a dict[str, DataFrame].
  • ctx.output — the mutable working set transforms populate and mutate. The framework seeds it automatically (schedule: from the first gtfs_schedule_zip input; RT: mirrors each FeedMessage input by name), but pipelines that want custom seeding add an explicit init step that replaces ctx.output before the rest of the DAG runs.

Typical access patterns for a schedule transform:

  • Read GTFS data: df = ctx.output["stops.txt"]
  • Write GTFS data: ctx.output["stops.txt"] = modified_df
  • Read reference inputs: overrides = ctx.inputs["stop_overrides"] (already parsed as a DataFrame)
  • Share state across steps: ctx.metadata["my_key"] = value
  • Record ID changes: ctx.add_id_mapping("routes.txt", "route_id", "OLD", "NEW")

DataFrames are Polars with all string columns. Use Polars operations for filtering and transformation.

Declaring dependencies

Dependencies use Python object references, not string names:

a = RemoveRows("routes.txt", [...])
b = UpdateFields("routes.txt", [...], after=[a])  # b runs after a

You can import steps from other files in the same pipeline folder using relative imports:

# station_renames.py
from .calendar_cleanup import remove_inactive

@step(files=["stops.txt"], after=[remove_inactive])
def my_step(ctx):
    ...

The before parameter works in the opposite direction — "run me before these steps":

setup = Step(before=[main_transform])  # setup runs before main_transform

Note

Dependencies on steps outside the current pipeline folder are silently ignored. This means you can safely reference steps that may or may not be present.

Reference-data inputs

Some transforms need reference data that isn't part of the GTFS feed — a CSV of stop description overrides exported from a spreadsheet, route metadata, etc. Declare them in the INPUTS manifest alongside the schedule, and they'll arrive parsed as ctx.inputs[name] for transforms to consume.

Declare in __init__.py:

INPUTS = {
    "schedule": "gtfs_schedule_zip",
    "stop_overrides": "csv_table",
}

Supply on the command line:

continuous-gtfs schedule pipelines/schedule/ \
  --input schedule=feed.zip \
  --input stop_overrides=data/stop_overrides.csv \
  -o output.zip

Access from your transform:

import polars as pl
from continuous_gtfs import step

@step(files=["stops.txt"])
def apply_stop_overrides(ctx):
    # Already parsed as a DataFrame per INPUTS[stop_overrides] = "csv_table"
    overrides = ctx.inputs["stop_overrides"]

    stops = ctx.output["stops.txt"]
    ctx.output["stops.txt"] = (
        stops
        .join(overrides, on="stop_id", how="left", suffix="_override")
        .with_columns([
            pl.coalesce(["stop_desc_override", "stop_desc"]).alias("stop_desc"),
        ])
        .drop("stop_desc_override")
    )

Since every declared input is validated at dispatch time (and at scan time when running locally), a missing stop_overrides fails the run before the DAG starts — no per-step guard needed.

Testing locally

# Show the DAG
continuous-gtfs dag my-pipeline/

# Run against real data
continuous-gtfs schedule my-pipeline/ --input schedule=data/sound-transit/schedule.zip -o output.zip

# Check the output
unzip -l output.zip

For programmatic testing:

from continuous_gtfs import scan_pipeline, resolve_dag
from continuous_gtfs.pipelines.schedule import run_schedule_pipeline, extract_zip

steps = resolve_dag(scan_pipeline("my-pipeline/"))
inputs = {"schedule": extract_zip(zip_bytes)}
result = run_schedule_pipeline(inputs, steps)
assert result.execution_result.success

Mixing builtins and custom code

A pipeline folder commonly has a mix — builtins for the standard operations, custom @step functions for anything agency-specific:

# transforms.py
from continuous_gtfs import step
from continuous_gtfs.builtins.schedule import RemoveRows, UpdateFields, MatchCondition

# Builtin: remove specific routes
remove_route = RemoveRows("routes.txt", [MatchCondition("route_id", value="OLD_ROUTE")])

# Builtin: update metadata
update_meta = UpdateFields(
    "routes.txt",
    [MatchCondition("route_id", value="NEW_ROUTE")],
    {"route_color": "FF0000"},
    after=[remove_route],
)

# Custom: complex logic that builtins can't express
@step(files=["stops.txt", "stop_times.txt"], after=[update_meta])
def remove_orphaned_stops(ctx):
    """Remove stops not referenced by any stop_time."""
    stops = ctx.output["stops.txt"]
    stop_times = ctx.output["stop_times.txt"]
    referenced = set(stop_times["stop_id"].unique().to_list())
    ctx.output["stops.txt"] = stops.filter(
        pl.col("stop_id").is_in(referenced)
    )