Skip to content

Pipeline Structure

The folder is the pipeline

A pipeline is a directory of .py files. The framework scans the folder, imports each file, and discovers all module-level Step instances. No config file or registry needed.

my-agency/
  calendar.py        # calendar cleanup rules
  routes.py          # route metadata updates
  stops.py           # station renames, stop removals
  trips.py           # trip field cleanup

What gets scanned

  • All .py files in the folder (non-recursive, sorted alphabetically)
  • Files starting with _ are skipped (__init__.py, _helpers.py, etc.)
  • Attributes starting with _ are skipped
  • Only module-level objects that are isinstance(obj, Step) are discovered

Names

The variable name becomes the step name. Choose clear, descriptive names — they appear in the DAG, CLI output, and Control Console:

# Good — the variable name IS the step identity
remove_inactive_calendars = RemoveRows(...)
update_2line_metadata = UpdateFields(...)

# Avoid — generic names make the DAG hard to read
step1 = RemoveRows(...)
x = UpdateFields(...)

Cross-file imports

Files in the same pipeline folder can import from each other using relative imports:

# stops.py
from .routes import update_1line  # import for DAG dependency

@step(files=["stops.txt"], after=[update_1line])
def rename_stations(ctx):
    ...

The scanner handles this automatically — it registers the folder as a Python package so relative imports work.

Warning

Imported Step instances are deduplicated by object identity. If stops.py imports update_1line from routes.py, it appears once in the DAG (not twice).

__init__.py

Optional. If present, it's executed when the folder is registered as a package. You can use it to set up shared imports, but it's not required.

Helper files

Prefix helper files with _ to exclude them from scanning:

my-agency/
  _constants.py      # shared constants, not scanned
  _utils.py           # helper functions, not scanned
  calendar.py         # scanned — defines steps
  routes.py           # scanned — defines steps

Or put helpers in a subdirectory — the scanner doesn't recurse:

my-agency/
  helpers/
    constants.py      # not scanned (in subdirectory)
  calendar.py         # scanned

Organizing a large pipeline

There's no prescribed structure. Some options:

By GTFS file (what Sound Transit uses):

sound-transit/
  calendar_cleanup.py
  route_metadata.py
  station_renames.py
  trip_cleanup.py

By concern:

my-agency/
  data_quality.py      # validation-oriented transforms
  branding.py          # route names, colors, URLs
  expansion.py         # pre-opening station adjustments

By team:

my-agency/
  rail_team.py
  bus_team.py
  operations.py

The folder structure also maps naturally to CODEOWNERS — different teams can own different transform files with PR review enforced by path.