ID Mappings
GTFS files are heavily cross-referenced — trips.route_id references routes.route_id, stop_times.trip_id references trips.trip_id, and so on. When a transform removes or renames an ID in one file, dependent files need cascading updates.
The PipelineContext provides an id_mappings mechanism for this: one step records the change, a later step reads it and applies the cascade.
When to use this
Use ID mappings when:
- You remove a route and need to remove its trips and stop_times
- You rename an ID and need all references updated
- You consolidate multiple IDs into one
If your transform only operates within a single file, you don't need this — just modify the DataFrame directly.
Writing mappings
The step that removes or renames the ID records the change:
@step(files=["routes.txt"])
def consolidate_routes(ctx):
df = ctx.datasets["routes.txt"]
# Merge route OLD into route NEW
ctx.add_id_mapping("routes.txt", "route_id", "OLD", "NEW")
ctx.datasets["routes.txt"] = df.filter(pl.col("route_id") != "OLD")
For removals, use None as the new ID:
ctx.add_id_mapping("routes.txt", "route_id", "DEPRECATED", None)
Reading mappings
A dependent step reads the mappings and applies cascading changes:
@step(files=["trips.txt"], after=[consolidate_routes])
def cascade_to_trips(ctx):
mappings = ctx.get_id_mappings("routes.txt", "route_id")
if not mappings:
return
df = ctx.datasets["trips.txt"]
for old_id, new_id in mappings.items():
if new_id is None:
# Cascade delete
df = df.filter(pl.col("route_id") != old_id)
else:
# Cascade rename
df = df.with_columns(
pl.when(pl.col("route_id") == old_id)
.then(pl.lit(new_id))
.otherwise(pl.col("route_id"))
.alias("route_id")
)
ctx.datasets["trips.txt"] = df
API
ctx.add_id_mapping(file, field, old_id, new_id)
| Parameter | Type | Description |
|---|---|---|
file |
str |
GTFS filename (e.g. "routes.txt") |
field |
str |
Field name (e.g. "route_id") |
old_id |
str |
The original ID value |
new_id |
str | None |
New value, or None if removed |
Mappings are additive — multiple steps can write to the same file/field combination.
ctx.get_id_mappings(file, field) -> dict[str, str | None]
Returns all mappings for the given file and field. Empty dict if none exist.
Key properties
- Loose coupling: The writing step doesn't need to know which steps will read the mappings.
- DAG enforcement: Reading steps declare
after=[writing_step]— the DAG guarantees ordering. - Additive: Multiple steps can contribute mappings to the same file/field.
- Convention, not magic: The framework doesn't automatically cascade anything. You write the cascade logic in your steps. The mappings dict is just a communication channel.