Skip to content

How does data get into the pipeline?

March 22, 2026 — Experiment: asset-registry

The Question

We originally assumed each pipeline would have one feed hardwired to it — one schedule in, one transformed schedule out. But three things forced us to rethink that. First, schedule transforms need supplemental reference data (like stop name overrides) that doesn't come from the feed itself. Second, Sound Transit anticipates new vendors coming on line with partial feeds that need merging. Third, we need to consume both staging and production versions of feeds from the ESB, with flexibility to route them to different pipeline environments. We needed a model that decouples "where data comes from" from "what pipeline uses it."

What We Tried

  • A generalized asset registry backed by PostgreSQL, where each input data source gets a unique key (like pims/production/schedule or pims/qa/vehicle_positions)
  • Two versioning strategies: content-hash for schedule feeds (so identical re-exports don't trigger reprocessing) and timestamp for realtime feeds (every fetch is meaningful)
  • Pipeline "slots" that bind named inputs to assets — a pipeline declares what it needs (e.g. "schedule" and "stop_names") and the registry provides the data
  • Per-pipeline debounce so rapid arrivals of multiple inputs collapse into a single pipeline run
  • Secret-backed fetching via GCP Secret Manager for authenticated ESB endpoints
  • A push API for data that doesn't come from a URL (like manually uploaded reference files)

What We Found

  1. Content-hash dedup works perfectly on real feeds. We fetched Sound Transit's 1.5MB schedule feed three times in a row — only one version was created. BLAKE3 hashing is fast enough that the dedup check adds negligible overhead to the fetch.

  2. QA and production feeds have different content. This sounds obvious, but it confirms the environments are genuinely independent and the hash-based dedup won't accidentally cross-match them. QA schedule hashes to ac46ca..., production to 8941....

  3. PIMS feeds are much smaller than we expected. Trip updates from the direct PIMS endpoint are ~8.5KB, compared to the 52KB we had from the OBA aggregated feed. Vehicle positions are ~6KB. This matters for latency budgets — smaller payloads mean faster fetches.

  4. Debounce naturally handles the "multiple inputs arrive together" case. When we fetch vehicle positions and trip updates back-to-back, the 500ms debounce window collapses both into a single pipeline run. The run's slot snapshot reflects the latest version of every input, not just the one that triggered it.

  5. Fan-out from one fetch to multiple pipelines works. A single push of reference data triggered independent runs for both the QA and production schedule pipelines, each with their own debounce window. This is the pattern that lets us fetch each feed URL once and route it to many consumers.

  6. First fetch is 2-4x slower than subsequent ones. RT feeds take 280-330ms on the first call but drop to 84-125ms after that, thanks to HTTP connection reuse. Something to be aware of for cold-start scenarios.

What It Looks Like

Seven assets registered across two environments, bound to four pipelines:

flowchart LR VP_QA[pims/qa/vehicle_positions] --> RT_QA[st-realtime-qa\n500ms debounce] TU_QA[pims/qa/trip_updates] --> RT_QA VP_PROD[pims/prod/vehicle_positions] --> RT_PROD[st-realtime-prod\n500ms debounce] TU_PROD[pims/prod/trip_updates] --> RT_PROD S_QA[pims/qa/schedule] --> SCHED_QA[st-schedule-qa\n2000ms debounce] REF[reference/stop_name_overrides] --> SCHED_QA REF --> SCHED_PROD S_PROD[pims/prod/schedule] --> SCHED_PROD[st-schedule-prod\n2000ms debounce]

After 3 fetch rounds: 16 asset versions created, 10 pipeline runs triggered. Schedule feeds fetched 3x each but only created 1 version (content unchanged). RT feeds created a new version every time.

The Decision

We're using a PostgreSQL-backed asset registry as the central data ingestion layer, with content-hash versioning for schedule feeds and timestamp versioning for realtime feeds.

What This Means

  • Pipelines no longer need to know where their data comes from — they just declare named input slots
  • Reference data (stop name overrides, route metadata) is a first-class input alongside feeds, not bundled into the transform build
  • Adding a new vendor feed means registering a new asset and binding it to the appropriate pipeline slots
  • The same feed can be routed to multiple pipelines (staging, production, testing) without redundant fetching
  • The host service will run this registry internally — the host-orchestration experiment will integrate it with Cloud Run worker pools

Open Questions

  • Smarter schedule hashing. We used a simple BLAKE3 hash of the raw zip bytes for dedup. This catches identical re-downloads, but not the case where the upstream system regenerates a zip with different compression or metadata but identical schedule data. The gtfs-digester-core experiment already validated a canonical fingerprinting technique for exactly this — decompress, strip non-spec fields, sort deterministically, then hash. We should test using that as the versioning strategy for schedule assets.
  • How should the registry handle feed source failures? Retry logic, staleness alerts?
  • Should pipeline runs be triggered on a schedule regardless of new data, or only on new versions?
  • What's the right debounce window for production RT feeds? 500ms worked in testing but real polling intervals may need tuning.
  • Should VehiclePositions and TripUpdates be bound together or run independently? We currently bind both to the same RT pipeline with a shared debounce window, so they arrive together in one run. But they could be separate pipelines — they're independent feeds with independent update cadences. Binding them means a pipeline run always has both, but a delayed fetch of one blocks the other. Independent pipelines would be simpler but couldn't cross-reference trip updates with vehicle positions in the same run.