Skip to content

feat(runner): minimal Pipeline SDK + BYOC hello-world E2E#7

Draft
rickstaa wants to merge 31 commits intofix/trickle-cancellation-noisefrom
feat/runner-mvp
Draft

feat(runner): minimal Pipeline SDK + BYOC hello-world E2E#7
rickstaa wants to merge 31 commits intofix/trickle-cancellation-noisefrom
feat/runner-mvp

Conversation

@rickstaa
Copy link
Copy Markdown
Member

@rickstaa rickstaa commented Apr 28, 2026

Summary

The Pipeline SDK — both halves: a request/response Pipeline for batch HTTP/SSE and a LivePipeline for real-time bidirectional video/audio over BYOC trickle. Developers write a single Python class and get a containerised, BYOC-compatible, schema-described capability with /health, /docs, /openapi.json, SSE streaming, and (when complete) /stream/* real-time endpoints. No go-livepeer changes required.

Each commit on this branch ships a strictly more capable SDK than the last; test.sh stays green at every step.

Authoring surfaces

Batch / streaming HTTP — Pipeline:

from livepeer_gateway.runner import Pipeline, serve

class Sentiment(Pipeline):
    def setup(self) -> None:
        self.classifier = pipeline("sentiment-analysis", model="...")

    def predict(self, text: str) -> dict:
        return self.classifier(text)[0]

if __name__ == "__main__":
    serve(Sentiment())

For binary I/O, swap text: str for image: Base64Bytes. For streaming, return a generator from predict() — the runtime auto-detects and frames each yielded value as SSE.

Real-time A/V — LivePipeline (in progress in this PR):

from livepeer_gateway.runner import LivePipeline, serve

class GrayscaleFilter(LivePipeline):
    async def process_video(self, frame):
        # frame.frame is an av.VideoFrame; transform and return
        return frame

if __name__ == "__main__":
    serve(GrayscaleFilter())

Subclasses override any of setup, on_stream_start, process_video, process_audio, on_params_update, on_stream_stop (all default to no-op / passthrough). A subclass that overrides nothing is a valid passthrough relay.

What's in this PR

Commit-by-commit progression (each shippable on its own):

Commit What it adds
C1 — minimal Pipeline SDK + hello-world E2E Pipeline ABC with predict() + aiohttp serve() + first BYOC docker-compose
C2 — setup() lifecycle + sentiment example One-time model loading hook; HuggingFace sentiment classifier example
FastAPI migration aiohttp → FastAPI; brings /docs, /openapi.json, /redoc for free
C4 — Pydantic typed I/O via signature introspection predict() params with type hints become a Pydantic model automatically; explicit BaseModel parameter also supported; /docs shows real fields
C5 — image upscale example (binary via Base64Bytes) Pydantic's Base64Bytes proves the SDK handles binary I/O cleanly
C6 — /health state machine LOADING / OK / ERROR / IDLE matching go-livepeer's HealthCheck wire format
C7 — SSE auto-detection + LLM chat example Generator predict()text/event-stream with [DONE] terminator; Qwen2.5-0.5B example
C8 Step 1 — LivePipeline ABC + /stream/* HTTP skeleton (landed) New LivePipeline base class + _make_live_pipeline_app dispatch path; routes accept the orchestrator's wire contract (validated via Pydantic); streaming logic lands in subsequent commits
C8 Step 2 — trickle bytes-through on /stream/start|stop (landed) _run_passthrough bridges subscribe → publish via existing TrickleSubscriber / TricklePublisher, segment-aligned and unmodified. /stream/start spawns the bridge as a background task on the pipeline; /stream/stop cancels and waits up to 5s for cleanup. Single-session for now (409 on double-start).
C8 Step 3 — frame loop dispatch + runner.frames namespace (landed) /stream/start dispatches between _run_passthrough (no overrides) and _run_frame_loop (decode → user → encode via existing MediaOutput / MediaPublish). runner.frames re-exports VideoFrame / AudioFrame as the user-facing namespace, keeping PyAV opt-in via the submodule import. on_stream_start and on_params_update lifecycle hooks wired.
C8 Step 4 — remaining lifecycle hooks (planned) on_stream_stop dispatch; emit_event / emit_data over events / data trickle channels; introduce _LiveSession to encapsulate per-session state
C8 Step 5 — examples/runner/live_grayscale/ end-to-end (planned) Full BYOC compose with go-livepeer orchestrator, register_capability, test.sh exercising real stream lifecycle

Surface

Module / class What it is
livepeer_gateway.runner.Pipeline ABC with setup() and abstract predict(**kwargs) -> Any
livepeer_gateway.runner.LivePipeline Base class for real-time A/V pipelines on the BYOC trickle protocol; default-passthrough hooks for process_video / process_audio
livepeer_gateway.runner.PipelineState LOADING / OK / ERROR / IDLE enum; matches go-livepeer HealthCheck format
livepeer_gateway.runner.serve(pipeline, *, host, port) FastAPI app + uvicorn server; dispatches on Pipeline vs LivePipeline
livepeer_gateway.runner.make_app(pipeline) Just the FastAPI app (for tests, custom uvicorn config)
POST /predict (Pipeline) Body validated via Pydantic from predict()'s signature; returns JSON or text/event-stream if predict() is a generator
POST /stream/start | stop | params (LivePipeline) Real-time stream lifecycle endpoints; bodies match the orchestrator's wire contract
GET /health HealthResponse { status: PipelineState } — orchestrator-aligned
GET /docs, GET /openapi.json, GET /redoc Standard FastAPI surface
examples/runner/hello_world/ Smoke test — minimal Pipeline + Dockerfile + compose + register_capability + curl test.sh
examples/runner/sentiment/ setup() lifecycle + HF sentiment classifier
examples/runner/image_upscale/ Binary I/O via Pydantic Base64Bytes — Swin2SR ~2x super-resolution
examples/runner/llm/ SSE streaming via TextIteratorStreamer — Qwen2.5-0.5B-Instruct
examples/runner/live_grayscale/ (planned) Real-time A/V pipeline E2E example via LivePipeline + go-livepeer trickle

The container's /predict and /stream/* paths match the existing go-livepeer BYOC contract verified against byoc/stream_orchestrator.go. No go-livepeer changes required.

Authoring patterns (Pipeline)

# Plain typed kwargs — Pydantic model built automatically
class Sentiment(Pipeline):
    def predict(self, text: str) -> dict: ...

# Explicit Pydantic input + output — full /docs + typed response
class Request(BaseModel):
    text: str
    threshold: float = 0.5

class Response(BaseModel):
    label: str
    score: float

class Sentiment(Pipeline):
    def predict(self, body: Request) -> Response: ...

# Streaming via generator return
class LLM(Pipeline):
    def predict(self, prompt: str) -> Iterator[ChatChunk]:
        for token in self.streamer:
            yield ChatChunk(token=token)

Test plan

Each example ships its own test.sh that prints PASS on success.

  • Local Python: uv run python examples/runner/hello_world/pipeline.pycurl localhost:5000/{health,predict} returns expected JSON.
  • Hello world E2E: cd examples/runner/hello_world && docker compose up -d --wait && ./test.shPASS. Round-trip: curl → gateway → orchestrator → SDK container.
  • Sentiment: setup() loads HF model once; test.sh exercises POSITIVE / NEGATIVE cases via EXPECTED_LABEL.
  • Image upscale: Base64Bytes round-trip; output asserted to be at least 2x input dimensions.
  • LLM: SSE round-trip via curl -N; assertion validates token framing + [DONE] terminator.
  • OpenAPI: /docs and /openapi.json render for every example with the actual field names (no additionalProp1).
  • Health state machine: /health returns LOADING during setup(), OK after, ERROR on setup failure.
  • LivePipeline skeleton: /stream/start|stop|params accept the orchestrator's wire contract; missing required fields → 422; /docs and /openapi.json show the new routes; batch Pipeline regression-checked.
  • LivePipeline trickle lifecycle: full session lifecycle (idle stop, missing-URL → 400, double-start → 409, cancel + restart, idempotent stop) tested via curl against a local server; trickle code paths exercised end-to-end including aiohttp connection setup, segment fetch loop, cancellation propagation through finally blocks.
  • LivePipeline E2E against go-livepeer: real Docker compose with live_grayscale example, full stream lifecycle (start → frames → stop) — lands with C8 Step 5.

Compose details

Each example's docker-compose.yml mirrors go-livepeer/doc/byoc.md:

  • livepeer/go-livepeer:master for orchestrator + gateway (no local build prerequisite)
  • On-chain mode against the free public Arbitrum RPC, pricePerUnit 0 → no real chain interaction, no funded wallet
  • Drops to bare -network offchain once livepeer/go-livepeer#3906 ships in :master — TODO comments in each compose track the cleanup
  • Examples that load HF models bake them into the image at build time via prepare_models.py so setup() loads from local cache in milliseconds

What's next (separate PRs, after this one merges)

Tracked in #8 — Pipeline SDK roadmap:

  • C9 — livepeer push CLI + livepeer.yaml manifest
  • C10 — Schema as Docker image label (org.livepeer.pipeline.schema)
  • C11 — Agent-friendly docs (AGENTS.md, expanded docstrings, examples/runner/_template/)
  • C12 — Migrate to monorepo with PEP 420 namespace packages (livepeer.runner.*, livepeer.client.*, livepeer.trickle.*)

Related work

🤖 Generated with Claude Code

rickstaa and others added 24 commits May 5, 2026 11:38
Adds livepeer_gateway.runner — a Pipeline ABC and a thin aiohttp serve
layer — plus a hello-world example that runs end-to-end against an
unmodified go-livepeer BYOC stack.

Surface:
- livepeer_gateway.runner.Pipeline — ABC with predict()
- livepeer_gateway.runner.serve(pipeline) → aiohttp app:
  - POST /predict — body JSON kwargs to predict();
                    TypeError → 400, other exception → 500
  - GET  /health  — {"status": "ready"}
- examples/runner/hello_world/ — Pipeline subclass + Dockerfile +
  docker-compose + capability registration + e2e curl test

The container's /predict path matches the existing go-livepeer BYOC
contract — no go-livepeer changes required.
./examples/runner/hello_world/test.sh printing PASS proves the round-trip:
curl → gateway → orchestrator → SDK container → response.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pipeline.setup() is a non-abstract no-op called once before serve()
accepts requests. Subclasses override to load model weights.

Adds examples/runner/sentiment/ — a Pipeline subclass that classifies
text via Hugging Face transformers. setup() loads the distilbert model
from the local HF cache populated at build time by prepare_models.py.

Surface:
- Pipeline.setup() no-op default
- make_app() invokes pipeline.setup() before binding routes
- examples/runner/sentiment/ — pipeline + prepare_models + Dockerfile +
  docker-compose + register + test.sh + README

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tracks operational items not suited to code comments — examples
extraction trigger, BYOC offchain compose cleanup pending #3906,
SDK feature gaps mapped to planned commits, related upstream PRs.

Working surface, drained as items land.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the aiohttp serve layer with FastAPI + uvicorn. Pipeline API
unchanged — Pipeline.predict() and Pipeline.setup() behave identically.

Free additions from FastAPI:
- GET /docs (Swagger UI)
- GET /redoc
- GET /openapi.json (minimal until Input/Output land)

Handler dispatch:
- /predict and /health are sync def, so pipeline.predict() (CPU/GPU
  bound) runs in FastAPI's threadpool and never blocks the event loop.
- Request body parsed via Body(...) — framework handles JSON parse
  errors and dict-type validation, returning HTTP 422.

Notes:
- Error response shape changes from {"error": ...} to {"detail": ...}.
  Body validation errors return 422 (was 400 in aiohttp). Other status
  codes unchanged: TypeError on wrong predict() kwargs → 400; pipeline
  exceptions → 500.
- aiohttp stays in deps; livepeer_gateway.transport's trickle client
  uses aiohttp.ClientSession. FastAPI server + aiohttp client coexist.

Refs #8 (C3)
Switches expose: to ports: so /docs, /redoc, and /openapi.json
are browsable on http://localhost:5000 during dev. Example
READMEs updated.
…tion

predict()'s signature drives FastAPI's body type and response model.
Two paths:
- Explicit BaseModel param: pass body to predict() directly
- Bare typed params: auto-derive a Pydantic model via create_model
  and unpack as kwargs

OpenAPI now reflects real types — /docs shows declared fields with
descriptions, examples, constraints, and typed responses when the
return annotation is a BaseModel.

Refs #8 (C4)
…se64Bytes

Swin2SR x2 super-resolution as a BYOC capability. Input image is a
base64-encoded JPEG/PNG; output is a base64-encoded PNG. Pydantic's
Base64Bytes auto-decodes the request body to bytes, so the pipeline
gets bytes directly and the SDK ships zero binary-handling code.

Refs #8 (C5)
Pipeline tracks state across setup() and exposes it via /health,
matching go-livepeer's HealthCheck wire format
(ai/worker/runner.gen.go). Re-raises on setup() failure so the
container still exits fail-fast.

Refs #8 (C6)
When predict() is a generator, the SDK wraps the response with
StreamingResponse(text/event-stream) and frames each yielded value as
an OpenAI-style SSE event terminated by [DONE]. Both go-livepeer's
BYOC gateway and the Python caller-side gateway watch for [DONE] to
end the stream.

Co-authored-by: John | Elite Encoder <john@eliteencoder.net>
pricePerUnit=0 means no orchestrator charges, no ticket settlement,
empty wallet stays unused. Replaces the previous pricePerUnit=1
workaround that relied on tickets rarely firing.
Adds LivePipeline base class with setup/on_stream_start/process_video/
process_audio/on_params_update/on_stream_stop hooks (all default-passthrough)
plus emit_event/emit_data stubs. Splits make_app dispatch into
_make_pipeline_app (Pipeline → /predict) and _make_live_pipeline_app
(LivePipeline → /stream/start|stop|params), sharing _run_setup and
_add_health_route. Routes accept and validate the orchestrator's wire
contract; streaming coordinator (subscribe/publish loops, lifecycle
dispatch) lands in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _run_passthrough coroutine that bridges subscribe → publish trickle
channels segment-by-segment using the existing TrickleSubscriber and
TricklePublisher. /stream/start spawns it as a background task on the
LivePipeline; /stream/stop cancels and waits up to 5s for graceful
cleanup before returning. Single-session for now (409 on double-start);
data-only / event-only streams (no subscribe_url + publish_url) return
400 — both extensions land in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
/stream/start dispatches between _run_passthrough (no overrides) and
_run_frame_loop (decode → user → encode via existing MediaOutput /
MediaPublish). Adds runner.frames re-exporting VideoFrame / AudioFrame
as the user-facing namespace. Wires on_stream_start (fired before the
frame loop) and on_params_update (per /stream/params). Aligns runner
log messages with the wider gateway _LOG style.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The orchestrator sends `params: null` in the /stream/start body when the
caller provided no params, not an absent field. Field(default_factory=dict)
rejected the request with a 422 validation error. Switching to
`dict[str, Any] | None = None` accepts the actual wire shape; the frame
loop already handles None via `pipeline._session_params or {}`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Full BYOC E2E example for the new LivePipeline real-time path:
GrayscaleFilter zeros the U/V chroma planes per video frame (audio
passes through). Compose stack mirrors the other examples — go-livepeer
master orchestrator + gateway, register_capability one-shot, runner
container built from the project root. test.sh asserts the session
lifecycle (start → stop) end-to-end through the full BYOC stack.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Feeds a pre-created MP2T segment through the runner directly (bypasses
gateway, which can't carry media over plain HTTP). _smoke_server.py
sends Lp-Trickle-Seq=0 so TrickleSubscriber's start_seq=-2 advances
correctly; runner reaches the host via extra_hosts. Asserts bytes
flowed + no frame-processor errors. Chroma assertion deferred to demo.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…moke

Replaces the synthetic-fixture/direct-runner smoke from 8beabc1 with a
single E2E test that pushes a colored stream through the full BYOC stack
(ffmpeg → mediamtx → gateway → orch → runner → orch → mediamtx → ffmpeg)
and asserts non-empty bytes come back.

Adds demo.sh: same path with webcam input + ffplay output, so users can
visually verify the GrayscaleFilter works on their own video.

Drops _fixtures/, _smoke_server.py, extra_hosts. Adds mediamtx service +
LIVE_AI_PLAYBACK_HOST env on gateway. Comment marks both scripts for
post-PR-#6 migration to start_byoc_job (the customer-flow SDK).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The 60-iter poll loop in each test.sh was redundant given the documented
prereq is `docker compose up -d --wait --build`. Replace with a single
check + clear error pointing the developer at the missing prereq.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ay viewer

test.sh verifies the runner actually grayscaled (U/V chroma ≈128 via
ffprobe signalstats), instead of bytes-received only. mediamtx is
repackaged from stock bluenviron+alpine+curl with runOnReady wired
to the gateway's BYOC ingest webhook — drops the Livepeer fork
dependency. PASS opens the captured .mts in ffplay (SKIP_VIEWER=1
to skip). demo.sh removed; live webcam viewer was unreliable under
the current runner-loop performance, tracked in issue #8.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the two class attrs (_session_task, _session_params) with a single
_LiveSession instance carrying the wire URLs and the session task.
_run_frame_loop reads URLs and params off the session directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a 10s periodic heartbeat to events_url so the gateway's 30s
events-channel watchdog doesn't tear down the session. Wires
emit_event() and emit_data() to publish JSON on the events and
data trickle channels. Closes the events-timeout cause of the
orch-drop bug; runner-state accumulation is a separate issue
addressed in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fires the user's on_stream_stop() after publishers are closed and the
session task has been cancelled. Errors in the hook are logged but
don't break the stop flow. Pipeline authors can now use on_stream_stop
to release per-session resources like model handles, file descriptors,
or external connections.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rickstaa rickstaa changed the base branch from main to fix/trickle-cancellation-noise May 5, 2026 09:39
Move the user's on_stream_start hook out of _run_frame_loop into the
/stream/start HTTP handler so it covers both the frame-loop and
passthrough dispatch paths and pairs symmetrically with on_stream_stop.

On hook failure, tear down the session and surface 500 — the loop
never starts on a half-initialized pipeline.

Consolidate session teardown into _LiveSession.close() (heartbeat →
dispatch task → publishers, so writers stop before the channels they
write to close). Both /stream/start failure path and /stream/stop
collapse to `await session.close()`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rickstaa rickstaa force-pushed the feat/runner-mvp branch from 40cd04e to dd7d058 Compare May 5, 2026 09:40
rickstaa and others added 4 commits May 5, 2026 14:55
demo.sh pushes your webcam through the pipeline and shows the
processed result in ffplay (complement to test.sh's CI assertion).
README documents env-overridable WEBCAM_RES/FPS/DEVICE knobs and
the 30fps throughput-ceiling caveat.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per-frame monocular depth estimation. The model output is written to the
Y plane and chroma is zeroed, so the egress reads as a grayscale "bright
= close, dark = far" visualization through the existing BYOC video path.

Same single-hook shape as live_grayscale (process_video only), but with
setup() loading DepthAnything V2 Base on CUDA in fp16. Dockerfile bakes
the model into the image so first stream skips the download. test.sh
auto-downloads a depth-rich basketball clip from the upstream repo
(cached under assets/, gitignored), pushes it through the BYOC stack,
and asserts both that chroma is zeroed AND the luma plane has spatial
variance — so a chroma-only no-op can't pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- pipeline.py: collapse 4-line docstring to a single line.
- docker-compose.yml: healthcheck rationale to one line.
- test.sh: add `-y` to the pull ffmpeg so an existing OUTPUT_FILE gets
  overwritten silently instead of the retry loop reporting "ok" against
  a stale capture. Trim chroma-assertion comment to one line.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without the regex carve-out the hook also fires for the gateway's own
egress paths (<sid>-out, <sid>-<reqid>-out) — gateway then 404s the
lookup because they're not registered input streams. Same trickle
config now in live_depth, live_grayscale (live_transcribe will follow
when that example lands).
@rickstaa rickstaa force-pushed the feat/runner-mvp branch from 00d745e to 1326e0a Compare May 6, 2026 13:43
/stream/stop closed publishers and cleared `pipeline._session`
before invoking on_stream_stop, so any final emit_data / emit_event
in that hook saw `session is None` and silently no-op'd.

Split `_LiveSession.close()` into `cancel_tasks()` and
`close_publishers()` and reorder /stream/stop to:

  1. cancel_tasks()             — stop frame loop racing user emits
  2. pipeline.on_stream_stop()  — user can publish final records
  3. close_publishers() + clear session
@rickstaa rickstaa force-pushed the feat/runner-mvp branch from 1326e0a to 9082e64 Compare May 6, 2026 13:46
New docs/runner-sdk.md gives reviewers a single page covering Pipeline
(batch + SSE) and LivePipeline (trickle) — surface, lifecycle, heartbeat,
coverage, open design questions, and pending work.

TODO.md is removed; its items now live in the SDK epic (#8) so there's
one source of truth instead of two drifting lists.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rickstaa rickstaa force-pushed the feat/runner-mvp branch from 1d88492 to 9aad666 Compare May 6, 2026 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant