Skip to content

Realtime Builtins

All realtime builtins operate on protobuf FeedMessage objects in ctx.output, keyed by feed name (e.g. ctx.output["vehicle_positions"], ctx.output["trip_updates"]). They mutate the FeedMessage in place.

RT pipelines produce one output per feed type, with no combined feed. The pipeline's init step (typically @step(before="*")) seeds ctx.output from the inputs declared in the INPUTS manifest. Each entity type within a FeedMessage (TripUpdate, VehiclePosition, Alert) is inspected via protobuf's HasField() — the tagged-union discriminator — so a single transform handles any mix of entity types.

PassThrough

No-op transform for baseline measurement and equivalence testing.

from continuous_gtfs.builtins.realtime import PassThrough

passthrough = PassThrough()

FilterStopsByID

Remove stop_time_updates and vehicle stop references for blocklisted stop IDs.

from continuous_gtfs.builtins.realtime import FilterStopsByID

filter_stops = FilterStopsByID(
    blocked_stop_ids={"E01", "E07"},
)

What it does:

  • TripUpdate entities: Removes stop_time_update entries where stop_id is in the blocklist.
  • VehiclePosition entities: Clears stop_id and current_stop_sequence, sets status to IN_TRANSIT_TO.
  • Alert entities: Not affected (alerts don't reference stops directly).

Use case: Remove non-revenue stations from feeds during simulated service prior to an expansion opening.

CombineFeeds

Merge entities from additional FeedMessage sources into the primary feed. Deduplicates by entity ID — primary feed wins.

from continuous_gtfs.builtins.realtime import CombineFeeds

combine = CombineFeeds(
    additional_feed_keys=["vendor_b_feed", "vendor_c_feed"],
)

The primary feed is whichever output key the pipeline's init step seeded (e.g. ctx.output["vehicle_positions"]). Additional feeds are read from ctx.inputs by the specified slot keys — these are populated by the orchestrator from the asset registry's bound slots.

Use case: Combine inputs from multiple backend systems into a single output feed.

RenameVehicles

Prefix vehicle IDs and labels with a configurable string.

from continuous_gtfs.builtins.realtime import RenameVehicles

rename = RenameVehicles(id_prefix="ST-")

Only applies to entities with a vehicle.vehicle descriptor. Idempotent — won't double-prefix if the ID already starts with the prefix.

Use case: Namespace vehicle IDs when combining feeds from multiple vendors.

UpdateFeedHeader

Set the GTFS-RT header version.

from continuous_gtfs.builtins.realtime import UpdateFeedHeader

header = UpdateFeedHeader(version="2.0")

TransformTripId

Regex substitution on trip_id across all entity types (TripUpdate, VehiclePosition, and Alert informed_entity references).

from continuous_gtfs.builtins.realtime import TransformTripId

transform_trip = TransformTripId(
    pattern=r"^LONG_PREFIX_(\d+)$",
    replacement=r"T-\1",
)

Use case: Simplify or consolidate trip IDs in the output feed.

Common parameters

All realtime builtins accept these keyword arguments:

Parameter Type Default Description
after list[Step] [] Steps that must run before this one
before list[Step] [] Steps that must run after this one
priority int 100 Tiebreak for steps at the same DAG level
enabled bool True Set to False to skip this step
tags list[str] [] Arbitrary labels
data_owner str None Email of responsible staff member