Skip to content

[SPARK-56413] Add gRPC UDF execution protocol#55657

Open
haiyangsun-db wants to merge 7 commits into
apache:masterfrom
haiyangsun-db:SPARK-56413
Open

[SPARK-56413] Add gRPC UDF execution protocol#55657
haiyangsun-db wants to merge 7 commits into
apache:masterfrom
haiyangsun-db:SPARK-56413

Conversation

@haiyangsun-db
Copy link
Copy Markdown
Contributor

@haiyangsun-db haiyangsun-db commented May 3, 2026

What changes were proposed in this pull request?

Adds udf_protocol.proto, the gRPC wire contract between the Spark engine and a
UDF worker process, as described in SPIP. Sits next to the existing worker_spec.proto.

Defines a Worker service with two RPCs:

  • Execute(stream UdfRequest) returns (stream UdfResponse) — one bidirectional
    stream per UDF execution. Lifecycle on the stream: Init → 0..N
    DataRequest / DataResponse → exactly one Finish or Cancel.
    PayloadChunk streams oversized UDF bodies.
  • Manage(WorkerRequest) returns (WorkerResponse) — unary, worker-scoped
    (heartbeat, graceful shutdown).

UdfPayload carries the engine-opaque callable bytes plus a format tag,
an eval_type worker-dispatch hint, and optional input/output encoders.
Init carries data_format, schemas, session_conf, task_context, and
timezone (the first graduate from session_conf); a reserved field range
absorbs future graduates.

Also fixes two typos in common.proto (exachanged/bidrectional).

Out of scope

No planning info on the wire (no execution-shape / cardinality enum, no
chained-UDF metadata). Both can be added additively later.

Why are the changes needed?

Spark Connect's UDF support today is Python-only and tied to a Python-specific
socket protocol. Onboarding other client languages requires a structured,
language-neutral wire contract. This PR lands the proto layer; engine and
worker implementations will follow.

Does this PR introduce any user-facing change?

No. Wire contract only; not yet wired into any end-to-end path.

How was this patch tested?

Verified the proto compiles with protoc against common.proto and
worker_spec.proto, and inspected the generated descriptor for field-number
and oneof correctness. End-to-end conformance tests will land with the
engine-side client and first worker implementation.

Was this patch authored or co-authored using generative AI tooling?

Yes

@haiyangsun-db haiyangsun-db marked this pull request as ready for review May 3, 2026 16:13
@haiyangsun-db haiyangsun-db changed the title [SPARK-56413] Introduce the grpc protocol for UDF execution. [SPARK-56413] Add gRPC UDF execution protocol May 3, 2026
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated

// (Optional) Session timezone, promoted out of [[session_conf]]
// because every eval needs it for timestamp encoding/decoding.
optional string timezone = 7;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is string the canonical type to represent the timezone? I am afraid all kinds of conversion errors may happen with no schema/enum enforcement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is convention from Spark, timezone is a string in spark.

Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated

// (Optional) Session timezone, promoted out of [[session_conf]]
// because every eval needs it for timestamp encoding/decoding.
optional string timezone = 7;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should specify the exact format in which the timezone will be reported since its a string

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timezone in spark is a string config, we should get it from spark following the same format.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, great. Thank you for clarifying!

Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing the comments

Comment thread udf/worker/README.md Outdated
session.init(Init.newBuilder()
.setUdf(UdfPayload.newBuilder()
.setPayload(ByteString.copyFrom(serializedFunction))
.setFormat("py-cloudpickle-v3"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this already exists? Or do we still need to create this? The only reason why I am bringing it up is because examples are forever :)...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@haiyangsun-db
Copy link
Copy Markdown
Contributor Author

@hvanhovell could you please help take another pass?

@dtenedor
Copy link
Copy Markdown
Contributor

PR Review: SPARK-56413 — gRPC UDF execution protocol

Summary

The PR adds udf/worker/proto/src/main/protobuf/udf_protocol.proto, swaps the placeholder InitMessage case class on WorkerSession for the generated Init proto, fixes two typos in common.proto, and updates the README and a test. The protocol defines:

  • service Worker { rpc Execute(stream UdfRequest) returns (stream UdfResponse); rpc Manage(WorkerRequest) returns (WorkerResponse); }
  • Execute wire order: InitPayloadChunk* → (DataRequest/DataResponse)* → exactly one Finish or Cancel.
  • Manage operations: Heartbeat, ShutdownRequest.

Overall this is a high-quality, well-documented wire contract. The doc comments are unusually thorough (lifecycle, ordering, "Required/Optional" tags, escape-hatch conventions, reserved ranges), and the engine/client split — typed engine-side fields vs. opaque UdfPayload carrying everything the client packs — is a good factoring.

Below are mostly questions / suggested clarifications; only a couple are blocking-ish.


Strengths

  • Clear separation of envelope (UdfRequest/UdfResponse) vs. control (UdfControlRequest/UdfControlResponse) vs. data (DataRequest/DataResponse).
  • Top-level bytes data on DataRequest/DataResponse (not nested) — explicit copy-avoidance rationale is great.
  • Reserved field range 8 to 99 for graduated session_conf keys, with the timezone precedent already showing the pattern.
  • >= 100 convention for opaque escape-hatch fields (parameters, future siblings).
  • PayloadChunk semantics (concatenation order, single InitResponse covering Init + all chunks, first DataRequest ending the chunk phase) are well-specified.
  • Scala-side: the new cancel-lifecycle doc + idempotency of cancel/close is exactly the kind of contract that pays off in practice.

Concerns / suggestions

1. Cancel-vs-finish race contradicts wire-level "mutually exclusive" wording (medium)

Finish doc in udf_protocol.proto says:

// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
// they are mutually exclusive. If the engine has already sent
// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa).
message Finish {}

…but the new Scala doc on WorkerSession describes a benign race:

 * Cancel-vs-finish race: when the session driver has finished
 * sending input (and therefore queued an implicit finish on the
 * underlying transport) and a [[cancel]] arrives concurrently, both
 * are valid stream-terminating actions; the response side carries
 * either a `FinishResponse` or a `CancelResponse` depending on which
 * the worker observes first, and either is acceptable to the caller.

If Finish is already queued on the transport and cancel() then writes Cancel, the engine has, on the wire, sent both — violating the proto's MUST NOT. Two reconciliations are possible, please pick one and state it clearly in the proto:

  • a) The wire really does forbid both, so cancel() after a buffered Finish is a no-op on the transport (just an early gRPC half-close). Reword the Scala "queued an implicit finish" sentence to match.
  • b) The wire tolerates a Cancel after Finish only before any FinishResponse has been observed (the engine raced with itself, not with another client). Then the proto MUST NOT needs softening to "MUST NOT send Cancel after observing FinishResponse" or similar.

As written, the Scala doc invites engine implementations that the proto's strict reading prohibits.

2. InitResponse timing vs. PayloadChunk is ambiguous (low/medium)

PayloadChunk says "The single InitResponse covers Init plus all of its chunks together", implying the worker must wait until chunking completes. But the order diagram doesn't say this explicitly, and it interacts with the optional PayloadChunk.last early-completion hint:

    bytes data = 1;

    // (Optional) Set to true on the final chunk. Receivers MAY use
    // this as an early signal that the payload is complete and
    // decoding can begin; receivers that prefer to wait for the
    // first [[DataRequest]] (which marks the end of the chunking
    // phase) MAY ignore this. When unset, the receiver determines
    // completeness by the arrival of the first [[DataRequest]].
    optional bool last = 2;

Two questions worth pinning down:

  • Is the worker permitted to send InitResponse before observing all PayloadChunk messages (e.g., as soon as it's decoded enough to start)? Or must it wait until the chunking phase is complete?
  • If last is unset, the receiver can only detect end-of-chunking by the first DataRequest. That's fine for the worker, but it means InitResponse can never be sent until the first DataRequest arrives — i.e., the engine cannot block sending data on receiving InitResponse. If that's the intent, say so; otherwise consider making last true mandatory when chunking is used.

3. No protocol/version field on Init (medium)

WorkerCapabilities (in worker_spec.proto) presumably handles up-front capability negotiation, but a protocol_version (or min_required_version) on Init would make per-stream rollout and rollback much simpler, especially before the engine has fully read the worker spec. Worth at least reserving a field number for it now, even if not populated.

4. Unknown UDFWorkerDataFormat values (low)

Init says receivers MUST reject UDF_WORKER_DATA_FORMAT_UNSPECIFIED, which is good, but doesn't say what to do with values outside the worker's supported set (e.g., a newer client picks PARQUET=2 that the worker doesn't speak). proto3 will pass unknown enums through as numeric values; the doc should say the worker MUST reject any value not in its declared WorkerCapabilities.supported_data_formats.

5. WorkerResponse should require matching branch (low)

message WorkerResponse {
    // Exactly one branch MUST be set, mirroring the request oneof.
    oneof manage {
        HeartbeatResponse heartbeat = 1;
        ShutdownResponse  shutdown  = 2;
    }
}

"Mirroring" is implicit. Suggest tightening to "The engine MUST receive a response whose oneof branch matches the request's branch; a mismatched response is a protocol error."

6. Empty DataRequest/DataResponse (low)

message DataRequest {
    // (Required, non-empty.) Encoded data bytes for one batch in the
    // session-declared format.
    bytes data = 1;
}

// Worker -> Engine per-batch payload. The worker emits zero or more
// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
// [[CancelResponse]]. Sink-style UDFs (which consume input but
// produce no output rows on the data plane) emit exactly zero.
message DataResponse {
    // (Required, non-empty.) Encoded data bytes for one batch in the
    // session-declared format.
    bytes data = 1;
}

For Arrow this is fine (an empty IPC batch still has a header). For future formats that allow truly empty batches, requiring non-empty may be inconvenient. Either narrow the wording to "non-empty as defined by the session's data_format" or accept zero-length payloads and let the decoder reject. Not blocking.

7. Behavior of cancel() between init and process (low)

 * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in
 * the session's life:
 *  - before [[init]] -- nothing has been sent on the transport yet,
 *    so [[cancel]] is a no-op (the session may still be closed
 *    normally via [[close]]).
 *  - between [[init]] and [[process]] -- transitions the session
 *    into a cancelled state; subsequent [[process]] calls observe
 *    the cancellation.

"Subsequent process calls observe the cancellation" — by throwing? returning an empty iterator? returning a special exception class? Worth nailing down. My read of the surrounding contract is that callers expect an exception, since process returns an iterator and a silent empty result would mask cancellations.

8. Manage(Shutdown) while Execute streams are active (low)

The proto says Manage is "independent of any per-execution stream" but doesn't say what the worker is supposed to do if ShutdownRequest arrives while Execute streams are still open. Options:

  • Reject (return an error in ShutdownResponse?)
  • Accept and let active streams drain.
  • Accept and cancel active streams.

Worth specifying. Today there's no error channel on ShutdownResponse; if the worker can reject, you may want one.

9. PayloadChunk size validation vs. payload_size (nit)

UdfPayload.payload_size is documented for buffer pre-allocation. It would also be useful for chunk validation (sum of chunk bytes + inline payload == payload_size). Worth mentioning that validation use, or adding a sentence to PayloadChunk that the receiver MAY validate against payload_size.

10. Service naming (nit)

service Worker is quite generic in a project the size of Spark; another service named Worker could land elsewhere and confuse generated code lookups. Consider UdfWorker (matches UDFWorkerSpecification, UDFWorkerDataFormat).

11. Builder usage in README example (nit)

.setUdf(UdfPayload.newBuilder()
  .setPayload(ByteString.copyFrom(serializedFunction))
  .setFormat(payloadFormat))   // worker-recognised tag

This works because setUdf accepts a builder, but Spark's other proto-builder snippets tend to call .build() explicitly. Either pattern is fine, just suggest picking one for consistency across docs.

12. eval_type shape (discussion, not blocking)

    // (Optional) Worker / language-specific dispatch hint. A
    // free-form string the worker uses to pick the code path that
    // handles this payload. The protocol does not enumerate eval
    // types because they are language-specific; the client side of
    // the protocol and the worker agree on the namespace and the
    // values.

This deliberately mirrors PySpark's PythonEvalType. The "any string both sides agree on" approach is flexible, but in practice every worker will hard-code recognised values, so this is effectively an enum with no central registry. Worth at least linking from here to a doc that lists the recognised values per worker once the first worker lands, so users don't have to grep server source.

13. Coupling of abstract WorkerSession to generated proto class (discussion)

doInit(message: Init) now binds every WorkerSession implementation directly to the generated class. Pros: zero conversion overhead, single source of truth. Cons: any proto-incompatible change ripples into every implementation. Given the @Experimental annotation this is fine for now, but it does mean we should be conservative about breaking changes in Init going forward (more conservative than the proto itself would suggest).


Nits

  • common.proto: typo fixes look good.
  • The doc-link style [[Init]] / [[FinishResponse]] in proto comments isn't standard protoc-gen-doc markup but is harmless plain text — fine.
  • Heartbeat/HeartbeatResponse empty messages: if you'd like richer health probes later (e.g., a server-side load hint), reserve a field number now to make additive evolution easy.
  • Consider an explicit note on Heartbeat about its relationship to gRPC keepalive — they aren't the same thing, and users will ask.

Verdict

LGTM in shape; the protocol is well-thought-through and the docs are noticeably above average. I'd want at least the cancel-vs-finish wire contradiction (#1) reconciled and the unknown-data_format rejection rule (#4) made explicit before merge; the rest are doc clarifications and minor questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants