Skip to content

Epic: Pipeline SDK (livepeer-runner) #8

@rickstaa

Description

@rickstaa

Outcome

Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.

The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.

Spec

Design lives in livepeer-specs / pipeline-sdk.md. Update the spec rather than this issue body when the design moves.

Architecture decisions (monorepo + PEP 420 namespace packages, three distributions livepeer-runner / livepeer-client / livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.

Roadmap

Each step yields a working SDK strictly more capable than the previous one.

  • C1Pipeline base + serve() + hello-world BYOC E2E
  • C2setup() lifecycle + HuggingFace sentiment example
  • C3 — FastAPI HTTP layer (/health, /predict, /docs, /openapi.json)
  • C4 — Pydantic BaseModel for inputs / outputs via signature introspection
  • C5 — Image upscale example (binary I/O via Pydantic Base64Bytes)
  • C6/health state machine matching go-livepeer's HealthCheck wire format
  • C7 — SSE auto-detection from generator predict() + LLM chat example
  • C8LivePipeline for trickle transport (real-time video) — see breakdown below
  • C9livepeer push CLI + livepeer.yaml manifest
  • C10 — Schema as Docker image label (org.livepeer.pipeline.schema)
  • C11 — Agent-friendly docs (AGENTS.md, expanded Pipeline docstring, examples/runner/_template/)
  • C12 — Migrate to monorepo with PEP 420 namespace packages — see client SDK packaging spec. Coordinated with #9.
  • C13 — Container self-registration to orch /capability/register — env-gated, wired into serve() lifespan, lenient on failure (degrade /health, keep FastAPI serving). Deregister on shutdown. Retry on conn-refused / timeout / 5xx; fail fast on 400 / 404 / 405.

C8 breakdown — LivePipeline

  • Step 1: skeleton (HTTP routes + ABC) — 04cc697
  • Step 2: bytes-through (validate trickle wire) — 831ee44
  • Step 3: frame-loop dispatch + runner.frames namespace — 9688f67
  • Step 5: live_grayscale example + chroma assertion + ffplay viewer — 9ef95d9 series
  • Step 4 — _LiveSession + lifecycle (the only outstanding piece)
    • _LiveSession class encapsulating per-session state
    • Periodic heartbeat on events_url (gateway liveness signal)
    • emit_event(payload) user-facing helper
    • emit_data(payload) helper for data_url when enable_data_output=true
    • on_stream_stop lifecycle hook
    • Drain runner-side state on stop — measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.
    • Unified error surface — three error sources (subscribe / publish / user process_video raise) log distinctly today with no consistent state propagation. Add _record_error(source, exc, severity): structured ErrorEvent schema (severity ∈ WARN / ERROR / FATAL, source, message, timestamp, consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flip pipeline._state = ERROR on terminal failures, push events via events_url. Quick first step (~5 LOC): flip _state on TricklePublisherTerminalError. Full schema after live_transcribe surfaces real failure modes.
    • Verify the live-viewer demo can be brought back once heartbeat + state-drain land
    • Pydantic param schema for LivePipeline — today on_stream_start(params) and on_params_update(params) receive dict[str, Any]; users parse / validate manually. The batch Pipeline already supports typed params via signature introspection (C4). Extend the same to LivePipeline: let users type on_stream_start(params: MyParams) and have the runner introspect, validate at the HTTP boundary, and emit a meaningful /openapi.json schema for /stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.
    • Enrich heartbeat payload with PipelineStatus (ai-runner pattern) — today's heartbeat is minimal {"type": "heartbeat", "timestamp"} keep-alive only. ai-runner's report_status_loop uses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters to MediaOutput / MediaPublish and swap heartbeat payload for the rich shape. Keeps /health minimal (k8s contract).

Companion issues — runner / examples polish

Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).

Blocking (data-loss bugs)

  • #12 — SDK-side. _resolve_next_seq returns -1 on probe failure; combined with the publisher's +1 increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable on live_transcribe as missing transcript[1]. Fix: one line (return 0 instead of -1) + demote the warning to debug.
  • Upstream: livepeer/go-livepeer#3924 — gateway's data subscriber tears down too early on /stream/stop, dropping the final emit_data from on_stream_stop. Observable on live_transcribe as missing transcript[4] (orch log shows client disconnected on the final POST). Fix: bounded drain loop in byoc/trickle.go:startDataSubscribe.

Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.

Cosmetic (log noise, no functional impact)

  • Upstream: livepeer/go-livepeer#3922 — spurious ERROR-level logs at every clean /stream/stop (5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).

Examples follow-ups

  • Live viewer tool for live_grayscale — bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetic testsrc + capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.

  • Worked example covering full LivePipeline lifecyclelive_grayscale exercises the SDK plumbing but only overrides process_video. live_transcribe (Whisper STT) and live_depth (DepthAnything V2) now exercise more of the lifecycle (setup, on_stream_start, process_audio, emit_data, emit_event, on_stream_stop). Still TODO: a test.sh that subscribes to data_url from the caller side and asserts structured records arrive — needs start_byoc_job from #6.

  • Migrate /stream/params and /stream/stop to control_url subscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives to control_url (byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via /stream/params (byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).

  • Production-grade live transcribe examplelive_transcribe is intentionally the minimal lifecycle demo with explicit 3 s chunking + vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example using whisper_streaming's OnlineASRProcessor (LocalAgreement-2 → ~1 s latency, cleaner boundaries) — same LivePipeline lifecycle, different process_audio body. Same folder shape as live_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.

  • Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (Retrying stream with a different orchestrator err=unknown swap reasonno orchestrators available, ending stream). Once dropped, every subsequent /process/stream/start either 400s or kills mid-flight, until register_capability is re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.

  • Switch examples to -network offchain once go-livepeer #3906 lands. Current compose files run with -network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-password and rely on pricePerUnit=0 so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with 429 Too Many Requests lines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop -network, -ethUrl, -ethPassword from each example's docker-compose.yml (5 files) and run with bare -network offchain. Eliminates the 429 noise entirely.

  • Assert grayscale, not just bytes-receivedlive_grayscale/test.sh now extracts U / V plane averages via ffprobe signalstats and asserts ≈128 (chroma-zero = grayscale).

Framework adapters (deferred — build on demand)

Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.

  • livepeer-runner-cog — wraps cog.BasePredictor
  • livepeer-runner-fal — wraps fal.App
  • livepeer-runner-modal — wraps Modal @app.function
  • livepeer-runner-bentoml — wraps @bentoml.service

Future protocol work (cross-team, gated on C9 + upstream go-livepeer)

  • Fix trickle control-channel size / segment-changeover bug (upstream)control_url params updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possibly FirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.

  • Capability identity via OCI digest — replace free-form capability names with content-hashed references like byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side: livepeer push captures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing + OrchestratorInfo carry the digest.

  • Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.

  • Name:version aliases over digest-pinned wire — Replicate-style mutable names (byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions