Skip to content

Concepts

Steps

A Step is the fundamental unit of pipeline work. Every transform — whether a parameterized builtin or a custom function — is a Step instance.

Steps have a name (inferred from the Python variable name), a list of files they operate on, and optional dependency declarations:

from continuous_gtfs import Step

class MyBuiltin(Step):
    files = ["routes.txt"]
    description = "Does something to routes"

    def apply(self, ctx):
        df = ctx.output["routes.txt"]
        # ... modify df ...
        ctx.output["routes.txt"] = df

Most of the time you'll use builtins (pre-built Step subclasses) or the @step decorator rather than subclassing Step directly.

Builtins vs custom code

Builtins are parameterized Step subclasses for common operations:

from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition

remove_inactive = RemoveRows(
    "calendar.txt",
    [MatchCondition("monday", value="0"), MatchCondition("tuesday", value="0"), ...],
)

Custom code uses the @step decorator for anything builtins don't cover:

from continuous_gtfs import step

@step(files=["stops.txt"])
def normalize_stop_names(ctx):
    df = ctx.output["stops.txt"]
    ctx.output["stops.txt"] = df.with_columns(
        pl.col("stop_name").str.strip_chars()
    )

Both produce Step instances. The framework treats them identically — they're indistinguishable in the DAG.

The DAG

Steps declare dependencies using after and before lists, which reference other Step objects directly:

update_feed = UpdateFeedInfo(publisher_name="My Agency")

remove_stops = RemoveRows("stops.txt", [...], after=[update_feed])

The framework resolves these into a directed acyclic graph (DAG) and executes steps in topological order. Within a dependency level, steps are sorted by (priority, name) for determinism.

If you don't declare any dependencies, the step runs in alphabetical order relative to other independent steps. This is usually fine — most transforms don't depend on each other.

Circular dependencies are detected at scan time and raise an error.

PipelineContext

Every step receives a PipelineContext with:

Field Type Description
inputs dict[str, Any] Read-only, already-parsed named inputs keyed by the pipeline's INPUTS manifest. Shape depends on each input's content_kind: gtfs_schedule_zipdict[str, DataFrame], csv_tableDataFrame, gtfs_rt_protobufFeedMessage, opaque_bytes → raw bytes.
output dict[str, Any] Mutable working set transforms populate and mutate. The framework seeds it automatically (schedule: from the first gtfs_schedule_zip input; RT: mirrors each FeedMessage input by name); pipelines with custom needs replace it in an explicit init step at the top of the DAG. The final state of ctx.output is what the worker serializes as the run's artifact(s).
environment str Current environment (e.g. "production")
config dict Arbitrary pipeline configuration
metadata dict Mutable dict for steps to share non-data state
id_mappings dict Cross-file ID mappings for cascade operations (see ID Mappings)

Transforms read inputs and mutate output — never the other way around. Reference-data inputs (CSVs, secondary schedules, etc.) stay in ctx.inputs[name]; the GTFS files a schedule pipeline is editing live in ctx.output["stops.txt"] etc.

The context also provides helpers:

  • ctx.add_id_mapping(file, field, old, new) / ctx.get_id_mappings(file, field) — see ID Mappings

Pipeline types

Schedule

Processes GTFS schedule feeds (zip of CSV files):

ingest → validate input → transform (DAG) → validate output → package

Transforms operate on Polars DataFrames via ctx.output["filename.txt"], seeded from the pipeline's gtfs_schedule_zip input.

Realtime

Processes GTFS-RT protobuf feeds:

parse inputs → seed outputs → transform (DAG) → encode <name>.pb + <name>.json per output

Each declared RT input is its own FeedMessage in ctx.output (by default, mirrored by name). Transforms mutate the per-feed FeedMessages in place. The framework emits one .pb + .json pair per entry in ctx.output — there is no combined feed.

Error modes

The executor supports two modes:

  • Fail-fast (default for realtime): Stop on first step failure. A partial RT result is worse than no result.
  • Continue (default for schedule): Log the error, skip the failed step, continue remaining steps. Collects all errors for review.