Pipeline Structure
The folder is the pipeline
A pipeline is a directory of .py files. The framework scans the folder, imports each file, and discovers all module-level Step instances. No config file or registry needed.
my-agency/
calendar.py # calendar cleanup rules
routes.py # route metadata updates
stops.py # station renames, stop removals
trips.py # trip field cleanup
What gets scanned
- All
.pyfiles in the folder (non-recursive, sorted alphabetically) - Files starting with
_are skipped (__init__.py,_helpers.py, etc.) - Attributes starting with
_are skipped - Only module-level objects that are
isinstance(obj, Step)are discovered
Names
The variable name becomes the step name. Choose clear, descriptive names — they appear in the DAG, CLI output, and Control Console:
# Good — the variable name IS the step identity
remove_inactive_calendars = RemoveRows(...)
update_2line_metadata = UpdateFields(...)
# Avoid — generic names make the DAG hard to read
step1 = RemoveRows(...)
x = UpdateFields(...)
Cross-file imports
Files in the same pipeline folder can import from each other using relative imports:
# stops.py
from .routes import update_1line # import for DAG dependency
@step(files=["stops.txt"], after=[update_1line])
def rename_stations(ctx):
...
The scanner handles this automatically — it registers the folder as a Python package so relative imports work.
Warning
Imported Step instances are deduplicated by object identity. If stops.py imports update_1line from routes.py, it appears once in the DAG (not twice).
__init__.py
Optional. If present, it's executed when the folder is registered as a package. You can use it to set up shared imports, but it's not required.
Helper files
Prefix helper files with _ to exclude them from scanning:
my-agency/
_constants.py # shared constants, not scanned
_utils.py # helper functions, not scanned
calendar.py # scanned — defines steps
routes.py # scanned — defines steps
Or put helpers in a subdirectory — the scanner doesn't recurse:
my-agency/
helpers/
constants.py # not scanned (in subdirectory)
calendar.py # scanned
Organizing a large pipeline
There's no prescribed structure. Some options:
By GTFS file (what Sound Transit uses):
sound-transit/
calendar_cleanup.py
route_metadata.py
station_renames.py
trip_cleanup.py
By concern:
my-agency/
data_quality.py # validation-oriented transforms
branding.py # route names, colors, URLs
expansion.py # pre-opening station adjustments
By team:
my-agency/
rail_team.py
bus_team.py
operations.py
The folder structure also maps naturally to CODEOWNERS — different teams can own different transform files with PR review enforced by path.