CLI Reference
The continuous-gtfs CLI runs pipelines locally for development and testing.
continuous-gtfs dag
Show the resolved DAG for a pipeline folder.
continuous-gtfs dag <pipeline_dir>
continuous-gtfs dag <pipeline_dir> --json
continuous-gtfs dag <pipeline_dir> --mermaid
| Option | Description |
|---|---|
--json |
Output ReactFlow-compatible JSON (nodes + edges) for visualization |
--mermaid |
Output a Mermaid flowchart — renders natively in GitHub and MkDocs |
Example (text):
$ continuous-gtfs dag sound-transit/
DAG: 16 steps
1. clear_1line_short_name [builtin] ['trips.txt']
2. clear_2line_short_name [builtin] ['trips.txt']
3. remove_inactive_calendars [builtin] ['calendar.txt']
4. remove_llr_calendar [builtin] ['calendar.txt'] (after: remove_inactive_calendars)
...
Render as SVG with the Mermaid CLI:
continuous-gtfs dag --mermaid sound-transit/ \
| npx --yes @mermaid-js/mermaid-cli -i - -o dag.svg
continuous-gtfs schedule
Run the schedule pipeline, with its input files supplied via --input flags.
continuous-gtfs schedule <pipeline_dir> --input NAME[:KIND]=PATH [options]
| Option | Description |
|---|---|
--input NAME[:KIND]=PATH |
Supply a named input to the pipeline. NAME must be declared in the pipeline's INPUTS manifest; content_kind resolves from the manifest by default, or is overridden by an optional :KIND suffix. Repeatable. See Inputs. |
-o, --output FILE |
Write output GTFS zip to file |
-v, --verbose |
Stream per-step progress as the pipeline runs |
--env NAME |
Environment name (default: production) |
--diff-against BASELINE |
After running, diff the output zip against this baseline using gtfs-digester |
--diff-detail |
Show per-row content changes in the diff output (requires --diff-against) |
--diff-limit N |
Max rows shown per category per file in diff detail. Default 50. 0 = unlimited. |
--json-events PATH |
Write structured run events (stages, steps, validation, diff) to a JSON file |
--select STEP |
Run only the named step plus its transitive dependencies. Useful for isolating the diff impact of a single step while iterating. |
Example:
$ continuous-gtfs schedule sound-transit/ --input schedule=data/schedule.zip -o output.zip
Schedule pipeline: 592.0ms
Input: 23 files, 148,406 rows
Output: 23 files, 148,385 rows
Delta: -21 rows
Ingest: 92.6ms [23 files, 148,406 rows]
Files: agency.txt, calendar.txt, ...
Validate input: 22.1ms [pass]
Transform: 33.8ms [16 steps]
1. remove_llr_calendar [RemoveRows] calendar.txt: 3.1ms [ok] (-5 rows) — Remove LLR service IDs
2. rename_symphony_455 [UpdateFields] stops.txt: 1.3ms [ok] — Rename University Street to Symphony
...
Validate output: 10.0ms [pass]
Package: 433.3ms [1273 KB]
Written to output.zip
Each transform step shows its ordinal, name, builtin class (or @step), target files, duration, row delta, status, and description.
Pipeline stages
| Stage | Description |
|---|---|
| ingest | Extract zip to Polars DataFrames (all string columns) |
| validate_input | Check required files, required fields, referential integrity |
| transform | Execute the step DAG |
| validate_output | Re-validate after transforms |
| package | Re-zip DataFrames to GTFS zip |
Verbose mode
With -v, step events stream live as each one executes (useful for long pipelines to see progress):
▸ remove_llr_calendar [RemoveRows] calendar.txt — Remove LLR service IDs
1.7ms [ok] (-5 rows)
▸ rename_symphony_455 [UpdateFields] stops.txt — Rename University Street to Symphony
1.5ms [ok]
Exit codes
| Code | Meaning |
|---|---|
0 |
All steps succeeded, diff (if any) was identical |
1 |
One or more steps errored, OR diff found changes |
2 |
CLI argument or input-loading error |
On step failure the CLI prints a separated banner listing failed steps and their error messages at the end of output (to stderr), so errors are impossible to miss.
Inputs
The --input NAME=PATH flag supplies a declared input to the pipeline. Every NAME must appear in the pipeline's INPUTS manifest (__init__.py) — unknown names fail before the run starts. Use repeated --input flags for each named input the pipeline consumes:
continuous-gtfs schedule pipelines/schedule/ \
--input schedule=data/schedule.zip \
--input stop_overrides=data/stop_overrides.csv \
-o output.zip
Each input's bytes are parsed by the framework's parser registry according to the content_kind the pipeline declared for that name — no extension sniffing. Inputs are exposed to transforms as already-parsed values via ctx.inputs[NAME]:
| content_kind | Value shape |
|---|---|
gtfs_schedule_zip |
dict[str, polars.DataFrame] keyed by GTFS filename |
csv_table |
polars.DataFrame (all string columns) |
gtfs_rt_protobuf |
gtfs_realtime_pb2.FeedMessage |
opaque_bytes |
raw bytes (escape hatch) |
The optional :KIND suffix (--input NAME:KIND=PATH) overrides the manifest's declared kind for that flag — a dev escape hatch for feeding a declared input something other than its declared shape (e.g. a parser-behavior fixture). The CLI logs a warning so the override is visible.
Missing files, unknown names, or malformed args exit with code 2. See Writing Transforms: Reference-data inputs for how to access inputs in transforms.
Diff integration
With --diff-against BASELINE, after the pipeline completes the output zip is compared to the baseline using gtfs-digester canonical diffing. This requires the [dev] extra:
uv pip install 'continuous-gtfs[dev]'
Default diff output is a per-file summary. Add --diff-detail (optionally with --diff-limit N) to expand modified files to per-row content changes with colorized field diffs — the same format produced by the standalone diff --detail subcommand:
continuous-gtfs schedule pipelines/schedule/ \
--input schedule=feed.zip \
-o out.zip \
--diff-against baseline.zip \
--diff-detail --diff-limit 20
Exit code is non-zero if diffs are found — useful for CI that should fail when pipeline output unexpectedly changes.
continuous-gtfs diff
Compare two GTFS zip archives using gtfs-digester canonical diffing. Requires the [dev] extra.
continuous-gtfs diff <baseline.zip> <candidate.zip> [options]
| Option | Description |
|---|---|
--json |
Output structured JSON instead of text (mutually exclusive with --detail) |
--detail |
Expand modified files to show per-row content changes with colorized field diffs (mutually exclusive with --json) |
--limit N |
Max rows shown per category (added / removed / modified) per file in detail mode. Default 50. 0 = unlimited. |
Default (summary) output:
$ continuous-gtfs diff baseline.zip candidate.zip
Diff: baseline.zip vs candidate.zip
Identical: no
Modified files (4):
~ calendar.txt: +0 added, -8 removed, ~0 modified
~ routes.txt: +0 added, -0 removed, ~2 modified
~ stops.txt: +0 added, -2 removed, ~5 modified
Detail output (--detail) expands modified files to show exactly what changed — full rows for added/removed, field-level old → new diffs for modified rows, skipping unchanged fields:
$ continuous-gtfs diff baseline.zip candidate.zip --detail
Diff: baseline.zip vs candidate.zip
Identical: no
~ routes.txt (+0 / -0 / ~2)
primary key: route_id
Modified (2):
~ route_id=100479
route_long_name: "Lynnwood - Federal Way" → "Lynnwood - Angle Lake"
~ route_id=2LINE
route_long_name: "Lynnwood - Downtown Redmond" → "South Bellevue - Downtown Redmond"
~ stops.txt (+0 / -2 / ~5)
primary key: stop_id
Removed (2):
- stop_id=E01 stop_name="Judkins Park" stop_desc="Judkins Park Station..." ...
- stop_id=E07 stop_name="Mercer Island" stop_desc="Mercer Island Station..." ...
Modified (5):
~ stop_id=455
stop_desc: "Symphony to Federal Way" → "Symphony to Angle Lake"
...
ANSI colorization auto-enables when stdout is a TTY and auto-disables when piped or redirected. No --color flag needed.
Use --limit N to cap per-category output per file. Truncated output ends with ... and M more (use --limit 0 to see all).
Exit code 0 if identical, 1 if different, 2 if arguments are invalid or gtfs-digester is not installed.
continuous-gtfs realtime
Run the realtime pipeline, with its input feeds supplied via --input flags. RT pipelines emit one artifact pair (.pb + .json) per declared output feed — there is no combined feed.
continuous-gtfs realtime <pipeline_dir> --input NAME[:KIND]=PATH [options]
| Option | Description |
|---|---|
--input NAME[:KIND]=PATH |
Supply a named RT input. Same resolution rules as schedule --input. Repeatable. |
-o, --output-dir DIR |
Output directory; each ctx.output[name] is written as <name>.pb and <name>.json |
--env NAME |
Environment name (default: production) |
Example:
$ continuous-gtfs realtime sound-transit-rt/ \
--input vehicle_positions=data/rt/vehicle_positions.pb \
--input trip_updates=data/rt/trip_updates.pb \
-o out/
RT pipeline: 5.5ms
Input entities: 32
Output entities: 32
vehicle_positions: 16 entities (4705B pb / 23620B json)
trip_updates: 16 entities (6502B pb / 42712B json)
Written out/vehicle_positions.pb + out/vehicle_positions.json
Written out/trip_updates.pb + out/trip_updates.json
Pipeline stages
| Stage | Description |
|---|---|
| parse | Framework parses each input's bytes per its declared content_kind |
| seed | ctx.output is mirrored from each FeedMessage input (pipelines can override in an init step) |
| transform | Execute the step DAG; transforms mutate the per-feed FeedMessages in ctx.output |
| encode | Serialize each ctx.output[name] to both <name>.pb and <name>.json |
continuous-gtfs worker
Run as a gRPC pipeline worker that connects to the orchestrator, receives dispatch requests, executes pipelines, and streams results back. This is the command used by deployed Cloud Run worker containers — not typically run directly during development.
continuous-gtfs worker <pipeline_dir> [options]
| Option | Description |
|---|---|
--orchestrator URL |
Orchestrator gRPC address (default: localhost:50051) |
--version VERSION |
Pipeline version identifier (default: dev) |
On startup the worker scans <pipeline_dir> to resolve the step DAG, then opens a bidirectional gRPC stream to the orchestrator. It registers itself with its version and capabilities (schedule, realtime), then waits for Dispatch messages.
For each dispatch the worker:
- Parses each slot's bytes through the content_kind parser registry, populating
ctx.inputson a freshPipelineContext - Executes the step DAG, streaming
StepStarted/StepCompletedevents back via the gRPC stream - Streams output artifacts — schedule pipelines emit one
schedule.zip; RT pipelines emit<name>.pb+<name>.jsonperctx.outputentry - Sends a
RunCompletedevent with aggregate timing and status
Connection security is automatic — insecure channels for localhost, TLS with GCE metadata-server ID tokens for Cloud Run URLs.