Skip to content

Wiring Up the Orchestrator

March 25, 2026 — Phase 2 Build

The Question

We have a working transform framework that processes feeds locally. But the production system needs a central coordinator that fetches data from PIMS, versions it, decides when to run pipelines, dispatches work to the right worker, and streams results back in real time. How do we build the orchestrator — and how does it talk to the Python pipeline worker?

What We Tried

  • Defined gRPC proto contracts for worker-orchestrator communication (bidirectional streaming) and web-orchestrator queries (unary RPCs)
  • Built the orchestrator as a Bun + TypeScript service with @grpc/grpc-js and the postgres driver
  • Implemented the asset registry DB layer (assets, versions, slots, runs, run_steps), fetcher with secret template resolution, per-pipeline debounce, worker pool manager, and dirty detection
  • Wired the Python pipeline worker to connect via gRPC, self-register, and stream PipelineEvent messages (StepStarted, StepCompleted, ArtifactChunk, RunCompleted) using executor hooks
  • Set up Docker Compose with Postgres 18 for local development

What We Found

  1. Proto-first beats runtime loading for production. The host-orchestration experiment used runtime proto loading for speed. For the real system, writing .proto files first gives type safety on both sides and the proto file is the single source of truth for the contract. We caught several issues in review (reserved keywords, missing pagination, inconsistent enums) that would have been implicit bugs with runtime loading.

  2. The debounce manager had a subtle race condition. It deleted trigger data before the callback could read it — every pipeline run would have been created with an empty triggered_by array. Code review caught this before it hit production. The fix was simple: pass the triggers directly to the callback instead of expecting the callback to read them from shared state.

  3. Schedule and RT runs need different step tracking strategies. Schedule runs write one pipeline_run_steps row per step as it streams back — the orchestrator inserts on StepStarted and updates on StepCompleted. RT runs fire every 20 seconds with 4 steps each (~17K step events/day), so they store step results inline in the run's result JSONB. The Control Console API normalizes both into the same response format.

  4. The worker needed a proper message queue, not a shared list. The first implementation used a Python list as a cross-thread queue between the gRPC background thread and the dispatch handler. This worked in testing but would have disconnected after 30 seconds of idle time (unhandled queue.Empty). Replaced with queue.Queue, a unique sentinel object for shutdown, and proper grpc.RpcError handling.

  5. Dirty detection on startup closes the crash recovery loop. When the orchestrator starts, it scans all pipelines: never run? Last run failed? Inputs changed since last success? Dirty pipelines are queued through the debounce manager with zero delay. If the orchestrator crashes mid-run, restart picks up where it left off.

What It Looks Like

The orchestrator starting up with a connected worker:

$ docker compose up -d
$ bun run index.ts
Orchestrator gRPC listening on :50051
Dirty detection: all pipelines clean

# In another terminal:
$ continuous-gtfs worker tests/fixtures/pipelines/st_schedule --orchestrator localhost:50051
Loaded 16 steps from tests/fixtures/pipelines/st_schedule
Connecting to orchestrator at localhost:50051 as worker-32b20080 (version=dev)

# Back in orchestrator logs:
Worker registered: worker-32b20080 (version=dev, capabilities=[schedule,realtime])

The data flow when a new asset version arrives:

sequenceDiagram participant F as Fetcher participant O as Orchestrator participant DB as PostgreSQL participant W as Worker F->>O: onNewVersion(asset, content) O->>O: Cache content in memory O->>O: Debounce timer starts (2s) Note over O: VP + TU arrive within 2s, collapse into one run O->>DB: Create pipeline_run (pending) O->>DB: Update status → running O->>W: DispatchRequest (run_id, slot_data) W->>O: StepStarted (step 0) O->>DB: Insert run_step (running) W->>O: StepCompleted (step 0, success, 12ms) O->>DB: Update run_step (success) W->>O: ArtifactChunk (feed.pb) W->>O: ArtifactChunk (feed.json) W->>O: RunCompleted (completed, 23ms) O->>DB: Update run (completed, result JSONB)

The Decision

The orchestrator core is built and communicating with the pipeline worker over gRPC. All the pieces exist: asset registry, fetcher, debounce, worker pool, dispatch, dirty detection, and PipelineEvent streaming. What's left is infrastructure (Phase 3), end-to-end integration with real PIMS feeds (Phase 4), and the REST API layer (Phase 5).

What This Means

  • The three-service architecture is implemented (orchestrator + worker done, web service is Phase 5)
  • Local development uses Docker Compose Postgres — start with docker compose up -d
  • The gRPC contract is defined in .proto files — both TypeScript and Python stubs are generated
  • GCS publishing and output asset registration are stubbed but need Phase 3 infrastructure
  • 10 code reviews were processed during this phase, catching bugs that would have been production failures

Open Questions

  • Should we build the E2E dispatch test (seeded assets → fetch → debounce → dispatch → worker → DB result) before moving to infrastructure, or let Phase 4 handle it?
  • The fetcher resolves secrets via environment variables in dev — do we need the GCP Secret Manager integration before deployment?
  • Worker reconnection logic isn't implemented yet — the worker exits on stream close. Should it retry with backoff?