Realtime Builtins
All realtime builtins operate on protobuf FeedMessage objects in ctx.output, keyed by feed name (e.g. ctx.output["vehicle_positions"], ctx.output["trip_updates"]). They mutate the FeedMessage in place.
RT pipelines produce one output per feed type, with no combined feed. The pipeline's init step (typically @step(before="*")) seeds ctx.output from the inputs declared in the INPUTS manifest. Each entity type within a FeedMessage (TripUpdate, VehiclePosition, Alert) is inspected via protobuf's HasField() — the tagged-union discriminator — so a single transform handles any mix of entity types.
PassThrough
No-op transform for baseline measurement and equivalence testing.
from continuous_gtfs.builtins.realtime import PassThrough
passthrough = PassThrough()
FilterStopsByID
Remove stop_time_updates and vehicle stop references for blocklisted stop IDs.
from continuous_gtfs.builtins.realtime import FilterStopsByID
filter_stops = FilterStopsByID(
blocked_stop_ids={"E01", "E07"},
)
What it does:
- TripUpdate entities: Removes
stop_time_updateentries wherestop_idis in the blocklist. - VehiclePosition entities: Clears
stop_idandcurrent_stop_sequence, sets status toIN_TRANSIT_TO. - Alert entities: Not affected (alerts don't reference stops directly).
Use case: Remove non-revenue stations from feeds during simulated service prior to an expansion opening.
CombineFeeds
Merge entities from additional FeedMessage sources into the primary feed. Deduplicates by entity ID — primary feed wins.
from continuous_gtfs.builtins.realtime import CombineFeeds
combine = CombineFeeds(
additional_feed_keys=["vendor_b_feed", "vendor_c_feed"],
)
The primary feed is whichever output key the pipeline's init step seeded (e.g. ctx.output["vehicle_positions"]). Additional feeds are read from ctx.inputs by the specified slot keys — these are populated by the orchestrator from the asset registry's bound slots.
Use case: Combine inputs from multiple backend systems into a single output feed.
RenameVehicles
Prefix vehicle IDs and labels with a configurable string.
from continuous_gtfs.builtins.realtime import RenameVehicles
rename = RenameVehicles(id_prefix="ST-")
Only applies to entities with a vehicle.vehicle descriptor. Idempotent — won't double-prefix if the ID already starts with the prefix.
Use case: Namespace vehicle IDs when combining feeds from multiple vendors.
UpdateFeedHeader
Set the GTFS-RT header version.
from continuous_gtfs.builtins.realtime import UpdateFeedHeader
header = UpdateFeedHeader(version="2.0")
TransformTripId
Regex substitution on trip_id across all entity types (TripUpdate, VehiclePosition, and Alert informed_entity references).
from continuous_gtfs.builtins.realtime import TransformTripId
transform_trip = TransformTripId(
pattern=r"^LONG_PREFIX_(\d+)$",
replacement=r"T-\1",
)
Use case: Simplify or consolidate trip IDs in the output feed.
Common parameters
All realtime builtins accept these keyword arguments:
| Parameter | Type | Default | Description |
|---|---|---|---|
after |
list[Step] |
[] |
Steps that must run before this one |
before |
list[Step] |
[] |
Steps that must run after this one |
priority |
int |
100 |
Tiebreak for steps at the same DAG level |
enabled |
bool |
True |
Set to False to skip this step |
tags |
list[str] |
[] |
Arbitrary labels |
data_owner |
str |
None |
Email of responsible staff member |