Writing Transforms
This walks through building a schedule pipeline from scratch. The same patterns apply to realtime — see the RT builtins reference for the protobuf-specific API.
Create a pipeline folder
A pipeline is a folder of .py files. No config file, no registry — the folder IS the pipeline definition.
sound-transit/
calendar_cleanup.py
route_metadata.py
station_renames.py
Files are scanned alphabetically. The framework discovers all Step instances defined at module level.
Tip
Organize files however makes sense for your team — by concern, by GTFS file, by who owns the rules. The framework doesn't care about file names or structure within the folder.
Using builtins
Most transforms are parameterized builtins. Import and instantiate them as module-level variables:
# calendar_cleanup.py
from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition
# Remove calendar entries with no active service days
remove_inactive = RemoveRows(
"calendar.txt",
[
MatchCondition("monday", value="0"),
MatchCondition("tuesday", value="0"),
MatchCondition("wednesday", value="0"),
MatchCondition("thursday", value="0"),
MatchCondition("friday", value="0"),
MatchCondition("saturday", value="0"),
MatchCondition("sunday", value="0"),
],
description="Remove calendar records with no active service days",
)
# Remove LLR service IDs (regex match)
remove_llr = RemoveRows(
"calendar.txt",
[MatchCondition("service_id", regex=r"^LLR.*")],
after=[remove_inactive],
)
The variable name (remove_inactive, remove_llr) becomes the step name in the DAG.
Writing custom transforms
When builtins don't cover your use case, use the @step decorator:
# station_renames.py
import polars as pl
from continuous_gtfs import step
from .route_metadata import update_1line # cross-file import for DAG ordering
@step(files=["stops.txt"], after=[update_1line])
def normalize_stop_names(ctx):
"""Strip whitespace and normalize capitalization."""
df = ctx.datasets["stops.txt"]
ctx.datasets["stops.txt"] = df.with_columns(
pl.col("stop_name").str.strip_chars().str.to_titlecase()
)
The @step decorator returns a Step instance (not a function). The framework treats it identically to builtins.
What your function receives
Your function gets a PipelineContext. For schedule transforms:
- Read data:
df = ctx.datasets["stops.txt"] - Write data:
ctx.datasets["stops.txt"] = modified_df - Share state:
ctx.metadata["my_key"] = value - Record ID changes:
ctx.add_id_mapping("routes.txt", "route_id", "OLD", "NEW")
DataFrames are Polars with all string columns. Use Polars operations for filtering and transformation.
Declaring dependencies
Dependencies use Python object references, not string names:
a = RemoveRows("routes.txt", [...])
b = UpdateFields("routes.txt", [...], after=[a]) # b runs after a
You can import steps from other files in the same pipeline folder using relative imports:
# station_renames.py
from .calendar_cleanup import remove_inactive
@step(files=["stops.txt"], after=[remove_inactive])
def my_step(ctx):
...
The before parameter works in the opposite direction — "run me before these steps":
setup = Step(before=[main_transform]) # setup runs before main_transform
Note
Dependencies on steps outside the current pipeline folder are silently ignored. This means you can safely reference steps that may or may not be present.
Testing locally
# Show the DAG
continuous-gtfs dag my-pipeline/
# Run against real data
continuous-gtfs schedule data/sound-transit/schedule.zip my-pipeline/ -o output.zip
# Check the output
unzip -l output.zip
For programmatic testing:
from continuous_gtfs import scan_pipeline, resolve_dag
from continuous_gtfs.pipelines.schedule import run_schedule_pipeline
steps = resolve_dag(scan_pipeline("my-pipeline/"))
result = run_schedule_pipeline(zip_bytes, steps)
assert result.execution_result.success
Mixing builtins and custom code
A pipeline folder commonly has a mix — builtins for the standard operations, custom @step functions for anything agency-specific:
# transforms.py
from continuous_gtfs import step
from continuous_gtfs.builtins.schedule import RemoveRows, UpdateFields, MatchCondition
# Builtin: remove specific routes
remove_route = RemoveRows("routes.txt", [MatchCondition("route_id", value="OLD_ROUTE")])
# Builtin: update metadata
update_meta = UpdateFields(
"routes.txt",
[MatchCondition("route_id", value="NEW_ROUTE")],
{"route_color": "FF0000"},
after=[remove_route],
)
# Custom: complex logic that builtins can't express
@step(files=["stops.txt", "stop_times.txt"], after=[update_meta])
def remove_orphaned_stops(ctx):
"""Remove stops not referenced by any stop_time."""
stops = ctx.datasets["stops.txt"]
stop_times = ctx.datasets["stop_times.txt"]
referenced = set(stop_times["stop_id"].unique().to_list())
ctx.datasets["stops.txt"] = stops.filter(
pl.col("stop_id").is_in(referenced)
)