You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.
The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.
Architecture decisions (monorepo + PEP 420 namespace packages, three distributions livepeer-runner / livepeer-client / livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.
Roadmap
Each step yields a working SDK strictly more capable than the previous one.
C1 — Pipeline base + serve() + hello-world BYOC E2E
C2 — setup() lifecycle + HuggingFace sentiment example
Step 5: live_grayscale example + chroma assertion + ffplay viewer — 9ef95d9 series
Step 4 — _LiveSession + lifecycle (the only outstanding piece)
_LiveSession class encapsulating per-session state
Periodic heartbeat on events_url (gateway liveness signal)
emit_event(payload) user-facing helper
emit_data(payload) helper for data_url when enable_data_output=true
on_stream_stop lifecycle hook
Drain runner-side state on stop — measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.
Unified error surface — three error sources (subscribe / publish / user process_video raise) log distinctly today with no consistent state propagation. Add _record_error(source, exc, severity): structured ErrorEvent schema (severity ∈ WARN / ERROR / FATAL, source, message, timestamp, consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flip pipeline._state = ERROR on terminal failures, push events via events_url. Quick first step (~5 LOC): flip _state on TricklePublisherTerminalError. Full schema after live_transcribe surfaces real failure modes.
Verify the live-viewer demo can be brought back once heartbeat + state-drain land
Pydantic param schema for LivePipeline — today on_stream_start(params) and on_params_update(params) receive dict[str, Any]; users parse / validate manually. The batch Pipeline already supports typed params via signature introspection (C4). Extend the same to LivePipeline: let users type on_stream_start(params: MyParams) and have the runner introspect, validate at the HTTP boundary, and emit a meaningful /openapi.json schema for /stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.
Enrich heartbeat payload with PipelineStatus (ai-runner pattern) — today's heartbeat is minimal {"type": "heartbeat", "timestamp"} keep-alive only. ai-runner's report_status_loop uses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters to MediaOutput / MediaPublish and swap heartbeat payload for the rich shape. Keeps /health minimal (k8s contract).
Companion issues — runner / examples polish
Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).
Blocking (data-loss bugs)
#12 — SDK-side. _resolve_next_seq returns -1 on probe failure; combined with the publisher's +1 increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable on live_transcribe as missing transcript[1]. Fix: one line (return 0 instead of -1) + demote the warning to debug.
Upstream: livepeer/go-livepeer#3924 — gateway's data subscriber tears down too early on /stream/stop, dropping the final emit_data from on_stream_stop. Observable on live_transcribe as missing transcript[4] (orch log shows client disconnected on the final POST). Fix: bounded drain loop in byoc/trickle.go:startDataSubscribe.
Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.
Cosmetic (log noise, no functional impact)
Upstream: livepeer/go-livepeer#3922 — spurious ERROR-level logs at every clean /stream/stop (5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).
Examples follow-ups
Live viewer tool for live_grayscale — bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetic testsrc + capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.
Worked example covering full LivePipeline lifecycle — live_grayscale exercises the SDK plumbing but only overrides process_video. live_transcribe (Whisper STT) and live_depth (DepthAnything V2) now exercise more of the lifecycle (setup, on_stream_start, process_audio, emit_data, emit_event, on_stream_stop). Still TODO: a test.sh that subscribes to data_url from the caller side and asserts structured records arrive — needs start_byoc_job from #6.
Migrate /stream/params and /stream/stop to control_url subscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives to control_url (byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via /stream/params (byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).
Production-grade live transcribe example — live_transcribe is intentionally the minimal lifecycle demo with explicit 3 s chunking + vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example using whisper_streaming's OnlineASRProcessor (LocalAgreement-2 → ~1 s latency, cleaner boundaries) — same LivePipeline lifecycle, different process_audio body. Same folder shape as live_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.
Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (Retrying stream with a different orchestrator err=unknown swap reason → no orchestrators available, ending stream). Once dropped, every subsequent /process/stream/start either 400s or kills mid-flight, until register_capability is re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.
Switch examples to -network offchain once go-livepeer #3906 lands. Current compose files run with -network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-password and rely on pricePerUnit=0 so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with 429 Too Many Requests lines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop -network, -ethUrl, -ethPassword from each example's docker-compose.yml (5 files) and run with bare -network offchain. Eliminates the 429 noise entirely.
Assert grayscale, not just bytes-received — live_grayscale/test.sh now extracts U / V plane averages via ffprobe signalstats and asserts ≈128 (chroma-zero = grayscale).
Framework adapters (deferred — build on demand)
Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.
livepeer-runner-cog — wraps cog.BasePredictor
livepeer-runner-fal — wraps fal.App
livepeer-runner-modal — wraps Modal @app.function
livepeer-runner-bentoml — wraps @bentoml.service
Future protocol work (cross-team, gated on C9 + upstream go-livepeer)
Fix trickle control-channel size / segment-changeover bug (upstream) — control_url params updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possibly FirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.
Capability identity via OCI digest — replace free-form capability names with content-hashed references like byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side: livepeer push captures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing + OrchestratorInfo carry the digest.
Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.
Name:version aliases over digest-pinned wire — Replicate-style mutable names (byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.
Outcome
Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.
The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.
Spec
Design lives in livepeer-specs / pipeline-sdk.md. Update the spec rather than this issue body when the design moves.
Architecture decisions (monorepo + PEP 420 namespace packages, three distributions
livepeer-runner/livepeer-client/livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.Roadmap
Each step yields a working SDK strictly more capable than the previous one.
Pipelinebase +serve()+ hello-world BYOC E2Esetup()lifecycle + HuggingFace sentiment example/health,/predict,/docs,/openapi.json)BaseModelfor inputs / outputs via signature introspectionBase64Bytes)/healthstate machine matching go-livepeer'sHealthCheckwire formatpredict()+ LLM chat exampleLivePipelinefor trickle transport (real-time video) — see breakdown belowlivepeer pushCLI +livepeer.yamlmanifestorg.livepeer.pipeline.schema)AGENTS.md, expandedPipelinedocstring,examples/runner/_template/)/capability/register— env-gated, wired intoserve()lifespan, lenient on failure (degrade/health, keep FastAPI serving). Deregister on shutdown. Retry on conn-refused / timeout / 5xx; fail fast on 400 / 404 / 405.C8 breakdown —
LivePipeline04cc697831ee44runner.framesnamespace —9688f67live_grayscaleexample + chroma assertion + ffplay viewer —9ef95d9series_LiveSession+ lifecycle (the only outstanding piece)_LiveSessionclass encapsulating per-session stateevents_url(gateway liveness signal)emit_event(payload)user-facing helperemit_data(payload)helper fordata_urlwhenenable_data_output=trueon_stream_stoplifecycle hookDrain runner-side state on stop— measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.process_videoraise) log distinctly today with no consistent state propagation. Add_record_error(source, exc, severity): structuredErrorEventschema (severity∈ WARN / ERROR / FATAL,source,message,timestamp,consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flippipeline._state = ERRORon terminal failures, push events viaevents_url. Quick first step (~5 LOC): flip_stateonTricklePublisherTerminalError. Full schema afterlive_transcribesurfaces real failure modes.on_stream_start(params)andon_params_update(params)receivedict[str, Any]; users parse / validate manually. The batchPipelinealready supports typed params via signature introspection (C4). Extend the same toLivePipeline: let users typeon_stream_start(params: MyParams)and have the runner introspect, validate at the HTTP boundary, and emit a meaningful/openapi.jsonschema for/stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.PipelineStatus(ai-runner pattern) — today's heartbeat is minimal{"type": "heartbeat", "timestamp"}keep-alive only. ai-runner'sreport_status_loopuses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters toMediaOutput/MediaPublishand swap heartbeat payload for the rich shape. Keeps/healthminimal (k8s contract).Companion issues — runner / examples polish
Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).
Blocking (data-loss bugs)
_resolve_next_seqreturns-1on probe failure; combined with the publisher's+1increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable onlive_transcribeas missingtranscript[1]. Fix: one line (return 0instead of-1) + demote the warning to debug./stream/stop, dropping the finalemit_datafromon_stream_stop. Observable onlive_transcribeas missingtranscript[4](orch log showsclient disconnectedon the final POST). Fix: bounded drain loop inbyoc/trickle.go:startDataSubscribe.Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.
Cosmetic (log noise, no functional impact)
/stream/stop(5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).Examples follow-ups
Live viewer tool for
live_grayscale— bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetictestsrc+ capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.Worked example covering full LivePipeline lifecycle —
live_grayscaleexercises the SDK plumbing but only overridesprocess_video.live_transcribe(Whisper STT) andlive_depth(DepthAnything V2) now exercise more of the lifecycle (setup,on_stream_start,process_audio,emit_data,emit_event,on_stream_stop). Still TODO: atest.shthat subscribes todata_urlfrom the caller side and asserts structured records arrive — needsstart_byoc_jobfrom #6.Migrate
/stream/paramsand/stream/stoptocontrol_urlsubscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives tocontrol_url(byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via/stream/params(byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).Production-grade live transcribe example —
live_transcribeis intentionally the minimal lifecycle demo with explicit 3 s chunking +vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example usingwhisper_streaming'sOnlineASRProcessor(LocalAgreement-2 → ~1 s latency, cleaner boundaries) — sameLivePipelinelifecycle, differentprocess_audiobody. Same folder shape aslive_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (
Retrying stream with a different orchestrator err=unknown swap reason→no orchestrators available, ending stream). Once dropped, every subsequent/process/stream/starteither 400s or kills mid-flight, untilregister_capabilityis re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.Switch examples to
-network offchainonce go-livepeer #3906 lands. Current compose files run with-network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-passwordand rely onpricePerUnit=0so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with429 Too Many Requestslines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop-network,-ethUrl,-ethPasswordfrom each example'sdocker-compose.yml(5 files) and run with bare-network offchain. Eliminates the 429 noise entirely.Assert grayscale, not just bytes-received —
live_grayscale/test.shnow extracts U / V plane averages viaffprobe signalstatsand asserts ≈128 (chroma-zero = grayscale).Framework adapters (deferred — build on demand)
Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.
livepeer-runner-cog— wrapscog.BasePredictorlivepeer-runner-fal— wrapsfal.Applivepeer-runner-modal— wraps Modal@app.functionlivepeer-runner-bentoml— wraps@bentoml.serviceFuture protocol work (cross-team, gated on C9 + upstream go-livepeer)
Fix trickle control-channel size / segment-changeover bug (upstream) —
control_urlparams updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possiblyFirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.Capability identity via OCI digest — replace free-form capability names with content-hashed references like
byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side:livepeer pushcaptures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing +OrchestratorInfocarry the digest.Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.
Name:version aliases over digest-pinned wire — Replicate-style mutable names (
byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.Related
livepeer-specs/_followups.mduvworkspace docs: https://docs.astral.sh/uv/concepts/projects/workspaces/