Concepts
Steps
A Step is the fundamental unit of pipeline work. Every transform — whether a parameterized builtin or a custom function — is a Step instance.
Steps have a name (inferred from the Python variable name), a list of files they operate on, and optional dependency declarations:
from continuous_gtfs import Step
class MyBuiltin(Step):
files = ["routes.txt"]
description = "Does something to routes"
def apply(self, ctx):
df = ctx.output["routes.txt"]
# ... modify df ...
ctx.output["routes.txt"] = df
Most of the time you'll use builtins (pre-built Step subclasses) or the @step decorator rather than subclassing Step directly.
Builtins vs custom code
Builtins are parameterized Step subclasses for common operations:
from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition
remove_inactive = RemoveRows(
"calendar.txt",
[MatchCondition("monday", value="0"), MatchCondition("tuesday", value="0"), ...],
)
Custom code uses the @step decorator for anything builtins don't cover:
from continuous_gtfs import step
@step(files=["stops.txt"])
def normalize_stop_names(ctx):
df = ctx.output["stops.txt"]
ctx.output["stops.txt"] = df.with_columns(
pl.col("stop_name").str.strip_chars()
)
Both produce Step instances. The framework treats them identically — they're indistinguishable in the DAG.
The DAG
Steps declare dependencies using after and before lists, which reference other Step objects directly:
update_feed = UpdateFeedInfo(publisher_name="My Agency")
remove_stops = RemoveRows("stops.txt", [...], after=[update_feed])
The framework resolves these into a directed acyclic graph (DAG) and executes steps in topological order. Within a dependency level, steps are sorted by (priority, name) for determinism.
If you don't declare any dependencies, the step runs in alphabetical order relative to other independent steps. This is usually fine — most transforms don't depend on each other.
Circular dependencies are detected at scan time and raise an error.
PipelineContext
Every step receives a PipelineContext with:
| Field | Type | Description |
|---|---|---|
inputs |
dict[str, Any] |
Read-only, already-parsed named inputs keyed by the pipeline's INPUTS manifest. Shape depends on each input's content_kind: gtfs_schedule_zip → dict[str, DataFrame], csv_table → DataFrame, gtfs_rt_protobuf → FeedMessage, opaque_bytes → raw bytes. |
output |
dict[str, Any] |
Mutable working set transforms populate and mutate. The framework seeds it automatically (schedule: from the first gtfs_schedule_zip input; RT: mirrors each FeedMessage input by name); pipelines with custom needs replace it in an explicit init step at the top of the DAG. The final state of ctx.output is what the worker serializes as the run's artifact(s). |
environment |
str |
Current environment (e.g. "production") |
config |
dict |
Arbitrary pipeline configuration |
metadata |
dict |
Mutable dict for steps to share non-data state |
id_mappings |
dict |
Cross-file ID mappings for cascade operations (see ID Mappings) |
Transforms read inputs and mutate output — never the other way around. Reference-data inputs (CSVs, secondary schedules, etc.) stay in ctx.inputs[name]; the GTFS files a schedule pipeline is editing live in ctx.output["stops.txt"] etc.
The context also provides helpers:
ctx.add_id_mapping(file, field, old, new)/ctx.get_id_mappings(file, field)— see ID Mappings
Pipeline types
Schedule
Processes GTFS schedule feeds (zip of CSV files):
ingest → validate input → transform (DAG) → validate output → package
Transforms operate on Polars DataFrames via ctx.output["filename.txt"], seeded from the pipeline's gtfs_schedule_zip input.
Realtime
Processes GTFS-RT protobuf feeds:
parse inputs → seed outputs → transform (DAG) → encode <name>.pb + <name>.json per output
Each declared RT input is its own FeedMessage in ctx.output (by default, mirrored by name). Transforms mutate the per-feed FeedMessages in place. The framework emits one .pb + .json pair per entry in ctx.output — there is no combined feed.
Error modes
The executor supports two modes:
- Fail-fast (default for realtime): Stop on first step failure. A partial RT result is worse than no result.
- Continue (default for schedule): Log the error, skip the failed step, continue remaining steps. Collects all errors for review.