Skip to content

Concepts

Steps

A Step is the fundamental unit of pipeline work. Every transform — whether a parameterized builtin or a custom function — is a Step instance.

Steps have a name (inferred from the Python variable name), a list of files they operate on, and optional dependency declarations:

from continuous_gtfs import Step

class MyBuiltin(Step):
    files = ["routes.txt"]
    description = "Does something to routes"

    def apply(self, ctx):
        df = ctx.datasets["routes.txt"]
        # ... modify df ...
        ctx.datasets["routes.txt"] = df

Most of the time you'll use builtins (pre-built Step subclasses) or the @step decorator rather than subclassing Step directly.

Builtins vs custom code

Builtins are parameterized Step subclasses for common operations:

from continuous_gtfs.builtins.schedule import RemoveRows, MatchCondition

remove_inactive = RemoveRows(
    "calendar.txt",
    [MatchCondition("monday", value="0"), MatchCondition("tuesday", value="0"), ...],
)

Custom code uses the @step decorator for anything builtins don't cover:

from continuous_gtfs import step

@step(files=["stops.txt"])
def normalize_stop_names(ctx):
    df = ctx.datasets["stops.txt"]
    ctx.datasets["stops.txt"] = df.with_columns(
        pl.col("stop_name").str.strip_chars()
    )

Both produce Step instances. The framework treats them identically — they're indistinguishable in the DAG.

The DAG

Steps declare dependencies using after and before lists, which reference other Step objects directly:

update_feed = UpdateFeedInfo(publisher_name="My Agency")

remove_stops = RemoveRows("stops.txt", [...], after=[update_feed])

The framework resolves these into a directed acyclic graph (DAG) and executes steps in topological order. Within a dependency level, steps are sorted by (priority, name) for determinism.

If you don't declare any dependencies, the step runs in alphabetical order relative to other independent steps. This is usually fine — most transforms don't depend on each other.

Circular dependencies are detected at scan time and raise an error.

PipelineContext

Every step receives a PipelineContext with:

Field Type Description
datasets dict[str, Any] Data keyed by name. Schedule: {"stops.txt": DataFrame, ...}. RT: {"feed": FeedMessage}.
environment str Current environment (e.g. "production")
config dict Arbitrary pipeline configuration
metadata dict Mutable dict for steps to share non-data state
id_mappings dict Cross-file ID mappings for cascade operations (see ID Mappings)

For schedule transforms, datasets are Polars DataFrames with all string columns (GTFS is CSV). For realtime transforms, datasets are protobuf FeedMessage objects.

Pipeline types

Schedule

Processes GTFS schedule feeds (zip of CSV files):

ingest → validate input → transform (DAG) → validate output → package

Transforms operate on Polars DataFrames via ctx.datasets["filename.txt"].

Realtime

Processes GTFS-RT protobuf feeds:

decode → transform (DAG) → encode PB → encode JSON

Transforms mutate the FeedMessage object in place via ctx.datasets["feed"].

Error modes

The executor supports two modes:

  • Fail-fast (default for realtime): Stop on first step failure. A partial RT result is worse than no result.
  • Continue (default for schedule): Log the error, skip the failed step, continue remaining steps. Collects all errors for review.