Concepts
Steps
A Step is the fundamental unit of pipeline work. Every transform — whether a parameterized builtin or a custom function — is a Step instance.
Steps have a name (inferred from the Python variable name), a list of files they operate on, and optional dependency declarations:
from continuous_gtfs import Step
class MyBuiltin(Step):
files = ["routes.txt"]
description = "Does something to routes"
def apply(self, ctx):
df = ctx.datasets["routes.txt"]
# ... modify df ...
ctx.datasets["routes.txt"] = df
Most of the time you'll use builtins (pre-built Step subclasses) or the @step decorator rather than subclassing Step directly.
Builtins vs custom code
Builtins are parameterized Step subclasses for common operations:
from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition
remove_inactive = RemoveRows(
"calendar.txt",
[MatchCondition("monday", value="0"), MatchCondition("tuesday", value="0"), ...],
)
Custom code uses the @step decorator for anything builtins don't cover:
from continuous_gtfs import step
@step(files=["stops.txt"])
def normalize_stop_names(ctx):
df = ctx.datasets["stops.txt"]
ctx.datasets["stops.txt"] = df.with_columns(
pl.col("stop_name").str.strip_chars()
)
Both produce Step instances. The framework treats them identically — they're indistinguishable in the DAG.
The DAG
Steps declare dependencies using after and before lists, which reference other Step objects directly:
update_feed = UpdateFeedInfo(publisher_name="My Agency")
remove_stops = RemoveRows("stops.txt", [...], after=[update_feed])
The framework resolves these into a directed acyclic graph (DAG) and executes steps in topological order. Within a dependency level, steps are sorted by (priority, name) for determinism.
If you don't declare any dependencies, the step runs in alphabetical order relative to other independent steps. This is usually fine — most transforms don't depend on each other.
Circular dependencies are detected at scan time and raise an error.
PipelineContext
Every step receives a PipelineContext with:
| Field | Type | Description |
|---|---|---|
datasets |
dict[str, Any] |
Data keyed by name. Schedule: {"stops.txt": DataFrame, ...}. RT: {"feed": FeedMessage}. |
environment |
str |
Current environment (e.g. "production") |
config |
dict |
Arbitrary pipeline configuration |
metadata |
dict |
Mutable dict for steps to share non-data state |
id_mappings |
dict |
Cross-file ID mappings for cascade operations (see ID Mappings) |
For schedule transforms, datasets are Polars DataFrames with all string columns (GTFS is CSV). For realtime transforms, datasets are protobuf FeedMessage objects.
Pipeline types
Schedule
Processes GTFS schedule feeds (zip of CSV files):
ingest → validate input → transform (DAG) → validate output → package
Transforms operate on Polars DataFrames via ctx.datasets["filename.txt"].
Realtime
Processes GTFS-RT protobuf feeds:
decode → transform (DAG) → encode PB → encode JSON
Transforms mutate the FeedMessage object in place via ctx.datasets["feed"].
Error modes
The executor supports two modes:
- Fail-fast (default for realtime): Stop on first step failure. A partial RT result is worse than no result.
- Continue (default for schedule): Log the error, skip the failed step, continue remaining steps. Collects all errors for review.