diff --git a/udf/worker/README.md b/udf/worker/README.md index b843c430d0e04..861f69b898aa0 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -19,7 +19,7 @@ WorkerDispatcher -- manages workers, creates sessions | v WorkerSession -- one UDF execution - | 1. session.init(InitMessage(payload, inputSchema, outputSchema)) + | 1. session.init(Init proto) | 2. val results = session.process(inputBatches) | 3. session.close() ``` @@ -34,12 +34,13 @@ provisioning service or daemon). ``` udf/worker/ ├── proto/ -│ worker_spec.proto -- UDFWorkerSpecification protobuf (+ generated Java classes) +│ worker_spec.proto -- UDFWorkerSpecification protobuf +│ udf_protocol.proto -- UDF execution protocol (Init, UdfPayload, ...) │ common.proto -- shared enums (UDFWorkerDataFormat, etc.) │ └── core/ -- abstract interfaces WorkerDispatcher.scala -- creates sessions, manages worker lifecycle - WorkerSession.scala -- per-UDF init/process/cancel/close + InitMessage + WorkerSession.scala -- per-UDF init/process/cancel/close WorkerConnection.scala -- transport channel abstraction WorkerSecurityScope.scala -- security boundary for worker pooling │ @@ -55,6 +56,19 @@ worker creation where Spark spawns local OS processes. Future packages (e.g., `core/indirect/`) can implement alternative creation modes such as obtaining workers from a provisioning service or daemon. +## Wire protocol + +Each UDF execution uses a single bidirectional `Execute` gRPC stream. + +``` +Engine -> Worker: Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)? + | Cancel +Worker -> Engine: InitResponse -> (DataResponse)* -> (ExecutionError)? -> (FinishResponse | CancelResponse) +``` + +See `udf/worker/proto/src/main/protobuf/udf_protocol.proto` for the complete +protocol definition, ordering invariants, and error contract. + ### Direct worker creation `DirectWorkerDispatcher` spawns worker processes locally. On the first @@ -76,10 +90,12 @@ Workers are terminated via SIGTERM/SIGKILL when the dispatcher is closed. ```scala import org.apache.spark.udf.worker.{ - DirectWorker, ProcessCallable, UDFProtoCommunicationPattern, - UDFWorkerDataFormat, UDFWorkerProperties, UDFWorkerSpecification, - UnixDomainSocket, WorkerCapabilities, WorkerConnectionSpec, WorkerEnvironment} + DirectWorker, Init, ProcessCallable, UdfPayload, + UDFProtoCommunicationPattern, UDFWorkerDataFormat, UDFWorkerProperties, + UDFWorkerSpecification, UnixDomainSocket, WorkerCapabilities, + WorkerConnectionSpec, WorkerEnvironment} import org.apache.spark.udf.worker.core._ +import com.google.protobuf.ByteString // 1. Define a worker spec (direct creation mode). val spec = UDFWorkerSpecification.newBuilder() @@ -112,10 +128,16 @@ val dispatcher: WorkerDispatcher = ... val session = dispatcher.createSession(securityScope = None) try { // 4. Initialize with the serialized function and schemas. - session.init(InitMessage( - functionPayload = serializedFunction, - inputSchema = arrowInputSchema, - outputSchema = arrowOutputSchema)) + session.init(Init.newBuilder() + .setProtocolVersion(1) + .setUdf(UdfPayload.newBuilder() + .setPayload(ByteString.copyFrom(serializedFunction)) + .setFormat(payloadFormat) // worker-recognised tag + .build()) + .setDataFormat(UDFWorkerDataFormat.ARROW) + .setInputSchema(ByteString.copyFrom(arrowInputSchema)) + .setOutputSchema(ByteString.copyFrom(arrowOutputSchema)) + .build()) // 5. Process data -- Iterator in, Iterator out. val results: Iterator[Array[Byte]] = diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala index 008cfc2993a09..e938c3e04be5b 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala @@ -27,6 +27,14 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification * as security scope). It owns the underlying worker processes and connections, * handling pooling, reuse, and lifecycle behind the scenes. Spark interacts with * workers exclusively through the [[WorkerSession]]s returned by [[createSession]]. + * + * '''Worker invalidation:''' if a session terminates with a transport error the + * worker that backed it MUST NOT be returned to any reuse pool. A transport + * error leaves the worker in an unknown state; only workers that complete + * sessions cleanly are eligible for reuse. Implementations are responsible for + * tracking this condition -- typically [[WorkerSession.doProcess]] flags the + * worker as invalid before [[WorkerSession.doClose]] releases it, so the + * dispatcher can distinguish a clean release from a failed one. */ @Experimental trait WorkerDispatcher extends AutoCloseable { diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala index f4c4091688c94..fa063a7963996 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -19,31 +19,7 @@ package org.apache.spark.udf.worker.core import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.annotation.Experimental - -/** - * :: Experimental :: - * Carries all information needed to initialize a UDF execution on a worker. - * - * This message is passed to [[WorkerSession#init]] and contains the function - * definition, schemas, and any additional configuration. - * - * Placeholder: will be replaced by a generated proto message once the - * UDF wire protocol lands. Do not rely on case-class equality -- - * `Array[Byte]` fields compare by reference. - * - * @param functionPayload serialized function (e.g., pickled Python, JVM bytes) - * @param inputSchema serialized input schema (e.g., Arrow schema bytes) - * @param outputSchema serialized output schema (e.g., Arrow schema bytes) - * @param properties additional key-value configuration. Can carry - * protocol-specific or engine-specific metadata that - * does not yet have a dedicated field. - */ -@Experimental -case class InitMessage( - functionPayload: Array[Byte], - inputSchema: Array[Byte], - outputSchema: Array[Byte], - properties: Map[String, String] = Map.empty) +import org.apache.spark.udf.worker.Init /** * :: Experimental :: @@ -62,7 +38,11 @@ case class InitMessage( * {{{ * val session = dispatcher.createSession(securityScope = None) * try { - * session.init(InitMessage(functionPayload, inputSchema, outputSchema)) + * session.init(Init.newBuilder() + * .setProtocolVersion(1) + * .setUdf(UdfPayload.newBuilder().setPayload(callable).setFormat(fmt).build()) + * .setDataFormat(UDFWorkerDataFormat.ARROW) + * .build()) * val results = session.process(inputBatches) * results.foreach(handleBatch) * } finally { @@ -74,7 +54,8 @@ case class InitMessage( * - [[init]] must be called exactly once before [[process]]. * - [[process]] must be called at most once per session. * - [[close]] must always be called (use try-finally). - * - [[cancel]] may be called at any time to abort execution. + * - [[cancel]] may be called at any time from any execution context. + * See [[cancel]] for the full contract. * * The lifecycle is enforced here: [[init]] and [[process]] are `final` * and delegate to [[doInit]] / [[doProcess]] after AtomicBoolean guards. @@ -93,10 +74,11 @@ abstract class WorkerSession extends AutoCloseable { * * Throws `IllegalStateException` if called more than once. * - * @param message the initialization parameters including the serialized - * function, input/output schemas, and configuration. + * @param message the [[Init]] message carrying the UDF body, data + * format, optional schemas, and any session context + * the worker needs to start processing. */ - final def init(message: InitMessage): Unit = { + final def init(message: Init): Unit = { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("init has already been called on this session") } @@ -108,7 +90,7 @@ abstract class WorkerSession extends AutoCloseable { * * Follows Spark's Iterator-to-Iterator pattern: input batches are streamed * to the worker, and result batches are lazily pulled from the returned - * iterator. The session sends a Finish signal to the worker when the input + * iterator. The session sends a finish signal to the worker when the input * iterator is exhausted. * * Must be called after [[init]] and at most once per session. @@ -127,8 +109,12 @@ abstract class WorkerSession extends AutoCloseable { doProcess(input) } - /** Subclass hook for [[init]]. Called once, after the guard. */ - protected def doInit(message: InitMessage): Unit + /** + * Subclass hook for [[init]]. Called once, after the guard. + * The session MUST NOT be activated before this call, since + * [[cancel]] before [[init]] is contractually a no-op. + */ + protected def doInit(message: Init): Unit /** Subclass hook for [[process]]. Called at most once, after the guard. */ protected def doProcess(input: Iterator[Array[Byte]]): Iterator[Array[Byte]] @@ -136,13 +122,51 @@ abstract class WorkerSession extends AutoCloseable { /** * Requests cancellation of the current UDF execution. * - * '''Thread-safety:''' implementations must allow [[cancel]] to be called - * from a thread different from the one driving [[process]] (typically a - * task interruption thread). It may be invoked at any point after - * [[init]] and should be a no-op if execution has already finished. + * '''Thread-safety:''' [[cancel]] may be called concurrently with + * [[process]] from any execution context. + * + * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in + * the session's life: + * - before [[init]] -- a no-op; the session may still be closed + * normally via [[close]]. + * - between [[init]] and [[process]] -- signals that the session + * should be terminated; the caller should not invoke [[process]] + * and should call [[close]] to release resources. + * Implementations SHOULD surface this as an error if [[process]] + * is subsequently invoked despite the cancellation. + * - during [[process]] (data flowing or awaiting completion) + * -- requests the worker to abort on a best-effort basis. + * - after [[process]] has returned (session already terminated) + * -- a no-op. + * + * Implementations are responsible for the lifecycle-aware behavior + * described above so that the caller does not need to coordinate + * with the execution context driving [[process]]. */ def cancel(): Unit - /** Closes this session and releases resources. */ - override def close(): Unit + /** + * Closes this session and releases resources. Idempotent; safe to + * call from a `finally` block regardless of whether [[init]], + * [[process]], or [[cancel]] have been invoked. + * + * If [[init]] was called but [[process]] was not (e.g. an exception + * was thrown between the two), [[close]] signals cancellation to the + * worker before releasing resources so it can clean up + * deterministically. Subclasses implement [[doClose]] for resource + * teardown; the base class handles the cancel-before-close guarantee + * automatically. + */ + final override def close(): Unit = { + if (initialized.get() && !processed.get()) { + cancel() + } + doClose() + } + + /** Subclass hook for [[close]]. The base class guarantees that + * [[cancel]] has already been called if [[init]] was invoked but + * [[process]] was not. + */ + protected def doClose(): Unit } diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala index afaf23791d80f..14db8da7ac89e 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala @@ -373,7 +373,8 @@ abstract class DirectWorkerDispatcher( "DirectWorker.runner must have at least one entry in command or arguments") val workerId = UUID.randomUUID().toString val address = newEndpointAddress(workerId) - // Proto contract: the engine must pass --id and --connection. + // The engine injects --connection (the socket address it manages) and + // --id (an internal correlation identifier) into the worker command. val cmd = baseCmd ++ Seq("--id", workerId, "--connection", address) val env = runner.getEnvironmentVariablesMap.asScala.toMap val outputFile = Files.createTempFile("udf-worker-", ".log") diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala index 7cdc5329350e3..de1dc45b5a8d3 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala @@ -27,7 +27,7 @@ import org.apache.spark.udf.worker.core.{WorkerConnection, WorkerSession} * * This is the session type returned by [[DirectWorkerDispatcher]]. It ties * the session lifecycle to the worker's ref-count: the dispatcher increments - * the count before construction, and [[close]] decrements it, so the + * the count before construction, and [[doClose]] decrements it, so the * dispatcher knows when a worker process is idle and can be terminated or * reused. * @@ -48,7 +48,7 @@ abstract class DirectWorkerSession( /** The connection to the worker for this session. */ def connection: WorkerConnection = workerProcess.connection - override def close(): Unit = { + override protected def doClose(): Unit = { if (released.compareAndSet(false, true)) { workerProcess.releaseSession() } diff --git a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala index 60f5e2211b702..7302c697d93c0 100644 --- a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala +++ b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.udf.worker.{ - DirectWorker, LocalTcpConnection, ProcessCallable, UDFWorkerProperties, + DirectWorker, Init, LocalTcpConnection, ProcessCallable, UDFWorkerProperties, UDFWorkerSpecification, UnixDomainSocket, WorkerConnectionSpec, WorkerEnvironment} import org.apache.spark.udf.worker.core.direct.{DirectUnixSocketWorkerDispatcher, @@ -51,14 +51,14 @@ class SocketFileConnection(socketPath: String) * TODO: [[cancel]] is a no-op here. Once a concrete [[DirectWorkerSession]] * with real data-plane wiring lands, add tests exercising cancel() in * particular: cancel from a different thread than process(), cancel - * after process() has returned, and cancel before init (should be a - * no-op). Tracking the thread-safety contract in the docstring on + * after process() has returned, and cancel before init (should be a no-op). + * See the thread-safety contract in the docstring on * [[org.apache.spark.udf.worker.core.WorkerSession.cancel]]. */ class StubWorkerSession( workerProcess: DirectWorkerProcess) extends DirectWorkerSession(workerProcess) { - override protected def doInit(message: InitMessage): Unit = {} + override protected def doInit(message: Init): Unit = {} override protected def doProcess( input: Iterator[Array[Byte]]): Iterator[Array[Byte]] = diff --git a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala new file mode 100644 index 0000000000000..1684a5e789b25 --- /dev/null +++ b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import com.google.protobuf.ByteString + +// Requires grpc-stub and grpc-inprocess dependencies, plus grpc-java codegen +// in udf/worker/proto/pom.xml to generate UdfWorkerGrpc. +import io.grpc.stub.StreamObserver +import io.grpc.{ManagedChannel, Server, Status} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.apache.spark.udf.worker.UdfWorkerGrpc + +import org.apache.spark.udf.worker.{ + Cancel, CancelResponse, DataRequest, DataResponse, + ExecutionError, UserError, WorkerError, ProtocolError, + Finish, FinishResponse, Heartbeat, HeartbeatResponse, + Init, InitResponse, ShutdownRequest, ShutdownResponse, + UDFWorkerDataFormat, UdfControlRequest, UdfControlResponse, + UdfPayload, UdfRequest, UdfResponse, WorkerRequest, WorkerResponse +} + +// scalastyle:off funsuite +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.BeforeAndAfterEach + +/** + * Protocol validation test for the UDF gRPC execution protocol. + * + * Implements a minimal echo worker (gRPC server) and engine client to verify + * the full Execute stream lifecycle: init, data streaming, finish, cancel, + * error handling, and the Manage RPC. The worker echoes each DataRequest + * batch back as a DataResponse; error paths are triggered by a sentinel + * payload value. + */ +class EchoProtocolSuite extends AnyFunSuite with BeforeAndAfterEach { +// scalastyle:on funsuite + + private val SUPPORTED_VERSION: Int = 1 + // A DataRequest whose payload equals this value triggers an ExecutionError. + private val ERROR_TRIGGER: ByteString = ByteString.copyFromUtf8("ERROR") + + private var server: Server = _ + private var channel: ManagedChannel = _ + private var stub: UdfWorkerGrpc.UdfWorkerStub = _ + + override def beforeEach(): Unit = { + val serverName = InProcessServerBuilder.generateName() + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new EchoWorkerService) + .build() + .start() + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build() + stub = UdfWorkerGrpc.newStub(channel) + } + + override def afterEach(): Unit = { + channel.shutdownNow() + server.shutdownNow() + } + + // =========================================================================== + // WORKER SIDE (gRPC server) + // =========================================================================== + + /** + * Worker state machine for one Execute stream. + * + * State Meaning Enters on Exits on + * AwaitingInit Stream open, no Init received yet stream open Init received + * AwaitingData Chunking: Init received, awaiting first chunk Init(is_chunking_payload=true) PayloadChunk(last=true) → Data; PayloadChunk(last=false) → Chunking; Finish → Finishing; Cancel → Done + * Chunking Non-final chunks accumulating PayloadChunk(last=false) PayloadChunk(last=true) → Data; Finish → Finishing; Cancel → Done + * Data Processing DataRequests / emitting DataResponse Init (inline) or last chunk ExecutionError sent → PostError; Finish → Finishing; Cancel → Done + * PostError Data-phase error sent; finish callback skipped ExecutionError sent Finish or Cancel → CancelResponse → Done + * Finishing Finish callback (if any) running Finish (from Data/AwaitingData/ callback completes → FinishResponse → Done; + * Chunking) or Cancel wins → CancelResponse → Done + * Done Stream closed FinishResponse or CancelResponse — + * sent, or onError + * + * Any protocol violation sends ExecutionError(ProtocolError) + CancelResponse → Done. + * A gRPC transport error (onError) transitions to Done without sending any response. + */ + private sealed trait WorkerState + private case object AwaitingInit extends WorkerState + private case class AwaitingData(initPayload: ByteString) extends WorkerState + private case class Chunking(accumulated: ByteString) extends WorkerState + private case object Data extends WorkerState + private case object PostError extends WorkerState + private case object Finishing extends WorkerState + private case object Done extends WorkerState + + private class EchoWorkerService extends UdfWorkerGrpc.UdfWorkerImplBase { + + override def execute( + responseObserver: StreamObserver[UdfResponse]): StreamObserver[UdfRequest] = + new ExecuteStreamHandler(responseObserver) + + override def manage( + request: WorkerRequest, + responseObserver: StreamObserver[WorkerResponse]): Unit = { + request.getManage match { + case WorkerRequest.Manage.Heartbeat(_) => + responseObserver.onNext(WorkerResponse.newBuilder() + .setHeartbeat(HeartbeatResponse.getDefaultInstance) + .build()) + responseObserver.onCompleted() + + case WorkerRequest.Manage.Shutdown(_) => + responseObserver.onNext(WorkerResponse.newBuilder() + .setShutdown(ShutdownResponse.newBuilder().setSessionsSettled(true).build()) + .build()) + responseObserver.onCompleted() + + case _ => + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription("empty manage request") + .asRuntimeException()) + } + } + } + + private class ExecuteStreamHandler( + responseObserver: StreamObserver[UdfResponse]) extends StreamObserver[UdfRequest] { + + @volatile private var state: WorkerState = AwaitingInit + // gRPC does not permit concurrent calls to the response StreamObserver. + // All writes are serialized through this lock. + private val responseLock = new Object + + override def onNext(request: UdfRequest): Unit = { + request.getRequest match { + case UdfRequest.Request.Control(ctrl) => handleControl(ctrl) + case UdfRequest.Request.Data(data) => handleDataRequest(data) + case _ => closeWithProtocolError("empty request oneof") + } + } + + private def handleControl(ctrl: UdfControlRequest): Unit = { + ctrl.getControl match { + case UdfControlRequest.Control.Init(init) => handleInit(init) + case UdfControlRequest.Control.Payload(chunk) => handleChunk(chunk) + case UdfControlRequest.Control.Finish(_) => handleFinish() + case UdfControlRequest.Control.Cancel(cancel) => handleCancel(cancel) + case _ => closeWithProtocolError("empty control oneof") + } + } + + private def handleInit(init: Init): Unit = state match { + case AwaitingInit => + if (init.hasProtocolVersion && init.getProtocolVersion != SUPPORTED_VERSION) { + closeWithProtocolError(s"unsupported protocol version: ${init.getProtocolVersion}") + return + } + if (init.getIsChunkingPayload) { + // Payload will arrive via PayloadChunk messages; wait for last=true + // before sending InitResponse. + state = AwaitingData(init.getUdf.getPayload) + } else { + // Payload is fully inline; send InitResponse immediately. + sendInitResponse() + state = Data + } + + case _ => closeWithProtocolError(s"Init received in state $state") + } + + private def handleChunk(chunk: org.apache.spark.udf.worker.PayloadChunk): Unit = + state match { + case AwaitingData(existing) => + if (chunk.hasLast && chunk.getLast) { + sendInitResponse() + state = Data + } else { + state = Chunking(existing.concat(chunk.getData)) + } + + case Chunking(existing) => + if (chunk.hasLast && chunk.getLast) { + sendInitResponse() + state = Data + } else { + state = Chunking(existing.concat(chunk.getData)) + } + + case _ => closeWithProtocolError(s"PayloadChunk received in state $state") + } + + private def handleDataRequest(data: DataRequest): Unit = state match { + case Data => processEcho(data) + + case _ => closeWithProtocolError(s"DataRequest received in state $state") + } + + private def processEcho(data: DataRequest): Unit = { + if (data.getData == ERROR_TRIGGER) { + // Data-phase error: send ExecutionError and enter PostError. + // The finish callback will not run; the engine must send Cancel + // (or Cancel-after-Finish) and the terminator is CancelResponse. + sendControl(UdfControlResponse.newBuilder() + .setError(ExecutionError.newBuilder() + .setUser(UserError.newBuilder() + .setMessage("simulated user-code error") + .setErrorClass("SimulatedError") + .build()) + .build()) + .build()) + state = PostError + } else { + responseLock.synchronized { + responseObserver.onNext(UdfResponse.newBuilder() + .setData(DataResponse.newBuilder().setData(data.getData).build()) + .build()) + } + } + } + + private def handleFinish(): Unit = state match { + case AwaitingData(_) | Chunking(_) => + // Chunking path: engine sends Finish before all chunks arrived. + // Send InitResponse now (treating the partial payload as complete) + // then finish normally. + sendInitResponse() + drainAndFinish() + + case Data => + drainAndFinish() + + case PostError => + // ExecutionError was already sent; finish callback does not run. + // Respond with CancelResponse to complete the stream. + sendCancelResponse() + + case _ => closeWithProtocolError(s"Finish received in state $state") + } + + // Transitions to Finishing, drains any remaining output (instant for the + // echo worker), then sends FinishResponse. An async worker would remain in + // Finishing until the drain completes, allowing a concurrent Cancel to win. + private def drainAndFinish(): Unit = { + state = Finishing + sendControl(UdfControlResponse.newBuilder() + .setFinish(FinishResponse.newBuilder() + .putMetrics("status", "ok") + .build()) + .build()) + state = Done + responseLock.synchronized { responseObserver.onCompleted() } + } + + private def handleCancel(cancel: Cancel): Unit = state match { + case Data | AwaitingData(_) | Chunking(_) | PostError | Finishing => + // Finishing: Cancel arrived while finish callback was in progress -- abort. + // PostError: engine sent Cancel after receiving ExecutionError (expected). + sendCancelResponse() + + case Done => + // FinishResponse already sent; Cancel arrived too late and is ignored. + + case _ => closeWithProtocolError(s"Cancel received in state $state") + } + + override def onError(t: Throwable): Unit = { + // gRPC transport error: the engine-side connection dropped. + // The stream is already dead so the worker MUST NOT attempt to send + // CancelResponse or any other message. However, the worker MUST still + // run the same cleanup it would perform on an explicit Cancel: + // - stop any in-progress UDF execution + // - release file handles, locks, and temporary state + // - free any buffers holding input batches not yet processed + // A real worker would invoke its internal cancel/cleanup path here. + // The echo worker has no such resources, so only the state is updated. + state = Done + } + + override def onCompleted(): Unit = state match { + case Done => // normal: engine half-closed after session terminated + case _ => + closeWithProtocolError( + s"request stream closed by engine in unexpected state $state") + } + + private def sendInitResponse(): Unit = + sendControl(UdfControlResponse.newBuilder() + .setInit(InitResponse.getDefaultInstance) + .build()) + + private def sendCancelResponse(): Unit = { + sendControl(UdfControlResponse.newBuilder() + .setCancel(CancelResponse.getDefaultInstance) + .build()) + state = Done + responseLock.synchronized { responseObserver.onCompleted() } + } + + private def sendControl(ctrl: UdfControlResponse): Unit = + responseLock.synchronized { + responseObserver.onNext( + UdfResponse.newBuilder().setControl(ctrl).build()) + } + + private def closeWithProtocolError(msg: String): Unit = { + sendControl(UdfControlResponse.newBuilder() + .setError(ExecutionError.newBuilder() + .setProtocol(ProtocolError.newBuilder().setMessage(msg).build()) + .build()) + .build()) + sendCancelResponse() + } + } + + // =========================================================================== + // ENGINE SIDE (gRPC client) + // =========================================================================== + + /** + * Minimal engine client that drives the Execute stream and collects results. + * + * The request stream is half-closed (onCompleted) only after the session + * outcome is known from the server: on receiving FinishResponse, + * CancelResponse, or a gRPC error. This keeps the stream open long enough + * for Cancel to follow Finish when needed. + */ + private class EngineClient(stub: UdfWorkerGrpc.UdfWorkerStub) { + private val results = new LinkedBlockingQueue[Array[Byte]]() + private val done = new CountDownLatch(1) + @volatile var executionError: Option[ExecutionError] = None + @volatile var streamError: Option[Throwable] = None + private val requestCompleted = new AtomicBoolean(false) + + private val responseObserver = new StreamObserver[UdfResponse] { + override def onNext(response: UdfResponse): Unit = { + response.getResponse match { + case UdfResponse.Response.Data(data) => + results.add(data.getData.toByteArray) + + case UdfResponse.Response.Control(ctrl) => + ctrl.getControl match { + case UdfControlResponse.Control.Init(_) => + // InitResponse received: data phase can begin. + + case UdfControlResponse.Control.Error(err) => + // Record the error. If the request stream is still open, send + // Cancel so the worker can abort cleanly. The error is surfaced + // after the response terminator (FinishResponse or CancelResponse). + executionError = Some(err) + if (!requestCompleted.get()) { + sendCancel("aborting after ExecutionError") + } + + case UdfControlResponse.Control.Finish(_) => + completeRequestStream() + done.countDown() + + case UdfControlResponse.Control.Cancel(_) => + completeRequestStream() + done.countDown() + + case _ => + } + + case _ => + } + } + + override def onError(t: Throwable): Unit = { + streamError = Some(t) + completeRequestStream() + done.countDown() + } + + override def onCompleted(): Unit = { + done.countDown() + } + } + + private val requestObserver: StreamObserver[UdfRequest] = stub.execute(responseObserver) + + def sendInit(payloadBytes: Array[Byte], sendChunked: Boolean = false): Unit = { + if (sendChunked) { + requestObserver.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setInit(Init.newBuilder() + .setProtocolVersion(SUPPORTED_VERSION) + .setIsChunkingPayload(true) + .setDataFormat(UDFWorkerDataFormat.ARROW) + .setUdf(UdfPayload.newBuilder() + .setPayload(ByteString.EMPTY) + .setFormat("echo") + .build()) + .build()) + .build()) + .build()) + requestObserver.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setPayload(org.apache.spark.udf.worker.PayloadChunk.newBuilder() + .setData(ByteString.copyFrom(payloadBytes)) + .setLast(true) + .build()) + .build()) + .build()) + } else { + requestObserver.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setInit(Init.newBuilder() + .setProtocolVersion(SUPPORTED_VERSION) + .setDataFormat(UDFWorkerDataFormat.ARROW) + .setUdf(UdfPayload.newBuilder() + .setPayload(ByteString.copyFrom(payloadBytes)) + .setFormat("echo") + .build()) + .build()) + .build()) + .build()) + } + } + + def sendData(data: Array[Byte]): Unit = + requestObserver.onNext(UdfRequest.newBuilder() + .setData(DataRequest.newBuilder() + .setData(ByteString.copyFrom(data)) + .build()) + .build()) + + def sendFinish(): Unit = { + requestObserver.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setFinish(Finish.getDefaultInstance) + .build()) + .build()) + // Request stream stays open: Cancel may still follow Finish. + // completeRequestStream() is called by the response observer. + } + + def sendCancel(reason: String = ""): Unit = { + requestObserver.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setCancel(Cancel.newBuilder().setReason(reason).build()) + .build()) + .build()) + // Request stream stays open until the response terminator arrives; + // completeRequestStream() is called by the response observer. + } + + def completeRequestStream(): Unit = { + if (requestCompleted.compareAndSet(false, true)) { + requestObserver.onCompleted() + } + } + + def awaitDone(timeoutMs: Long = 5000): Boolean = + done.await(timeoutMs, TimeUnit.MILLISECONDS) + + def drainResults(): Seq[Array[Byte]] = { + val buf = new java.util.ArrayList[Array[Byte]]() + results.drainTo(buf) + import scala.jdk.CollectionConverters._ + buf.asScala.toSeq + } + } + + // =========================================================================== + // TESTS + // =========================================================================== + + test("echo: single DataRequest round-trip") { + val client = new EngineClient(stub) + client.sendInit("dummy-payload".getBytes) + client.sendData("hello".getBytes) + client.sendFinish() + + assert(client.awaitDone(), "stream did not complete in time") + assert(client.streamError.isEmpty, s"unexpected stream error: ${client.streamError}") + assert(client.executionError.isEmpty, s"unexpected execution error: ${client.executionError}") + val results = client.drainResults() + assert(results.length == 1) + assert(new String(results.head) == "hello") + } + + test("echo: multiple DataRequest batches are all echoed") { + val client = new EngineClient(stub) + client.sendInit("dummy-payload".getBytes) + Seq("batch1", "batch2", "batch3").foreach(b => client.sendData(b.getBytes)) + client.sendFinish() + + assert(client.awaitDone()) + assert(client.streamError.isEmpty) + val results = client.drainResults().map(new String(_)) + assert(results == Seq("batch1", "batch2", "batch3")) + } + + // The engine drives the request side from a producer thread while the + // response observer fires on a gRPC-managed callback thread. gRPC's + // bidirectional streaming and HTTP/2 flow control manage the interleaving; + // no explicit coordination is needed beyond the protocol ordering invariants. + test("echo: concurrent sending and receiving (producer/consumer pattern)") { + val asyncStub = UdfWorkerGrpc.newStub(channel) + + val receivedCount = new java.util.concurrent.atomic.AtomicInteger(0) + val doneLatch = new CountDownLatch(1) + @volatile var streamErr: Option[Throwable] = None + val requestCompleted = new AtomicBoolean(false) + // reqObs is assigned after responseObs is created; @volatile ensures + // the response observer (which runs on a gRPC callback thread) sees the + // assignment made by the test thread. + @volatile var reqObs: StreamObserver[UdfRequest] = _ + + val responseObs = new StreamObserver[UdfResponse] { + private def completeRequestStream(): Unit = + if (requestCompleted.compareAndSet(false, true)) reqObs.onCompleted() + + override def onNext(r: UdfResponse): Unit = r.getResponse match { + case UdfResponse.Response.Data(_) => receivedCount.incrementAndGet() + case UdfResponse.Response.Control(c) => + c.getControl match { + case UdfControlResponse.Control.Finish(_) => + completeRequestStream() + doneLatch.countDown() + case _ => + } + case _ => + } + override def onError(t: Throwable): Unit = { + streamErr = Some(t) + completeRequestStream() + doneLatch.countDown() + } + override def onCompleted(): Unit = doneLatch.countDown() + } + reqObs = asyncStub.execute(responseObs) + + val producer = new Thread(() => { + reqObs.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setInit(Init.newBuilder() + .setProtocolVersion(SUPPORTED_VERSION) + .setDataFormat(UDFWorkerDataFormat.ARROW) + .setUdf(UdfPayload.newBuilder() + .setPayload(ByteString.copyFromUtf8("payload")) + .setFormat("echo").build()) + .build()) + .build()) + .build()) + (1 to 5).foreach { i => + reqObs.onNext(UdfRequest.newBuilder() + .setData(DataRequest.newBuilder() + .setData(ByteString.copyFromUtf8(s"batch-$i")).build()) + .build()) + } + reqObs.onNext(UdfRequest.newBuilder() + .setControl(UdfControlRequest.newBuilder() + .setFinish(Finish.getDefaultInstance).build()) + .build()) + // Request stream stays open; completeRequestStream() is called by + // the response observer on FinishResponse or gRPC error. + }, "producer") + producer.start() + + assert(doneLatch.await(10, TimeUnit.SECONDS), "stream did not complete") + assert(streamErr.isEmpty, s"unexpected error: $streamErr") + assert(receivedCount.get() == 5, s"expected 5 echoes, got ${receivedCount.get()}") + } + + test("echo: chunked payload delivery") { + val client = new EngineClient(stub) + client.sendInit("chunked-payload".getBytes, sendChunked = true) + client.sendData("data".getBytes) + client.sendFinish() + + assert(client.awaitDone()) + assert(client.streamError.isEmpty) + assert(new String(client.drainResults().head) == "data") + } + + test("echo: generator-style UDF (zero DataRequests, engine sends Finish after Init)") { + val client = new EngineClient(stub) + client.sendInit("generator-payload".getBytes) + client.sendFinish() + + assert(client.awaitDone()) + assert(client.streamError.isEmpty) + assert(client.drainResults().isEmpty) + } + + test("cancel: engine cancels mid-stream before sending Finish") { + val client = new EngineClient(stub) + client.sendInit("dummy-payload".getBytes) + client.sendData("batch1".getBytes) + client.sendCancel("task interrupted") + + assert(client.awaitDone()) + assert(client.streamError.isEmpty) + } + + // Cancel MAY follow Finish. The worker sends CancelResponse if Cancel arrives + // before FinishResponse is sent, or FinishResponse if it arrived too late. + // The engine must accept either outcome. + test("cancel: engine sends Cancel after Finish -- accepts FinishResponse or CancelResponse") { + val client = new EngineClient(stub) + client.sendInit("dummy-payload".getBytes) + client.sendData("data".getBytes) + client.sendFinish() + client.sendCancel("task interrupted after finish") + + assert(client.awaitDone(), "stream did not complete") + assert(client.streamError.isEmpty, + s"Cancel-after-Finish must not cause a gRPC error: ${client.streamError}") + } + + test("ExecutionError: worker signals UserError, engine sends Cancel and receives CancelResponse") { + val client = new EngineClient(stub) + client.sendInit("dummy-payload".getBytes) + client.sendData(ERROR_TRIGGER.toByteArray) + + assert(client.awaitDone()) + assert(client.streamError.isEmpty, s"expected no gRPC error, got ${client.streamError}") + assert(client.executionError.isDefined, "expected an ExecutionError") + assert(client.executionError.get.hasUser, "expected UserError kind") + assert(client.executionError.get.getUser.getErrorClass == "SimulatedError") + } + + test("protocol error: second Init is rejected with ProtocolError + FinishResponse") { + val client = new EngineClient(stub) + client.sendInit("payload".getBytes) + client.sendInit("second-init".getBytes) + + assert(client.awaitDone()) + assert(client.streamError.isEmpty, "expected ProtocolError, not a gRPC stream error") + assert(client.executionError.isDefined, "expected an ExecutionError") + assert(client.executionError.get.hasProtocol, "expected ProtocolError kind") + } + + test("Manage: heartbeat is acknowledged") { + val blockingStub = UdfWorkerGrpc.newBlockingStub(channel) + val resp = blockingStub.manage(WorkerRequest.newBuilder() + .setHeartbeat(Heartbeat.getDefaultInstance) + .build()) + assert(resp.hasHeartbeat, "expected HeartbeatResponse") + } + + test("Manage: ShutdownRequest is acknowledged") { + val blockingStub = UdfWorkerGrpc.newBlockingStub(channel) + val resp = blockingStub.manage(WorkerRequest.newBuilder() + .setShutdown(ShutdownRequest.newBuilder().setReason("test done").build()) + .build()) + assert(resp.hasShutdown, "expected ShutdownResponse") + } +} diff --git a/udf/worker/proto/src/main/protobuf/common.proto b/udf/worker/proto/src/main/protobuf/common.proto index ee032def73efe..7028b13571874 100644 --- a/udf/worker/proto/src/main/protobuf/common.proto +++ b/udf/worker/proto/src/main/protobuf/common.proto @@ -26,7 +26,7 @@ option java_multiple_files = true; // The UDF in & output data format. enum UDFWorkerDataFormat { UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0; - + // The worker accepts and produces Apache arrow batches. ARROW = 1; } @@ -42,7 +42,7 @@ enum UDFWorkerDataFormat { enum UDFProtoCommunicationPattern { UDF_PROTO_COMMUNICATION_PATTERN_UNSPECIFIED = 0; - // Data exachanged as a bidrectional + // Data exchanged as a bidirectional // stream of bytes. BIDIRECTIONAL_STREAMING = 1; } diff --git a/udf/worker/proto/src/main/protobuf/udf_protocol.proto b/udf/worker/proto/src/main/protobuf/udf_protocol.proto new file mode 100644 index 0000000000000..1377a5bb68cdf --- /dev/null +++ b/udf/worker/proto/src/main/protobuf/udf_protocol.proto @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +import "common.proto"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// ===================================================================== +// Language-agnostic UDF execution protocol. +// +// The Spark engine acts as the gRPC client; a UDF worker (in any +// language) acts as the gRPC server. +// ===================================================================== + +// The default UDF gRPC service. A worker that exposes this service +// MUST do so over the default connection of the worker specification. +// +// Future revisions of the worker specification may introduce additional +// dedicated connections for specific purposes (e.g. a separate channel +// for streaming state store access in stateful UDFs). +service UdfWorker { + // Per-execution stream. See [[UdfControlRequest]] for the complete + // wire protocol and ordering invariants. + // + // Error contract: a gRPC error on this stream indicates a transport + // or connection failure. Application-level errors (user code exceptions, + // worker errors, protocol violations) are communicated via + // [[ExecutionError]] so the stream lifecycle remains intact. + // Note: a hanging UDF keeps the connection alive and therefore does + // not produce a gRPC error. Worker-side per-batch timeouts are the + // appropriate mechanism for detecting and surfacing hung user code; + // the engine should also apply client-side timeouts as a safety net. + // + // Stream lifecycle: the engine MUST half-close the request stream + // (call onCompleted() on the gRPC request side) only after the session + // outcome is known: on receiving [[FinishResponse]] or [[CancelResponse]] + // (clean termination), or on receiving a gRPC error. Deferring the + // half-close keeps the stream open long enough for [[Cancel]] to follow + // [[Finish]] when needed (see [[Finish]] for the full contract). + // + // For stateful execution, the state is maintained per bi-directional + // stream, mapping to a `WorkerSession` on the engine side. + rpc Execute(stream UdfRequest) returns (stream UdfResponse); + + // Worker-scoped management RPC for heartbeat, capability query, and + // graceful shutdown. Workers MUST ensure this RPC remains serviceable + // regardless of how many [[Execute]] streams are in flight; failing to + // do so can prevent the engine from detecting a hung worker or + // initiating a clean shutdown. + rpc Manage(WorkerRequest) returns (WorkerResponse); +} + +// ===================================================================== +// Execute stream -- envelope +// ===================================================================== + +// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]] +// / [[Finish]] / [[Cancel]]) or a data message. +message UdfRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof request { + UdfControlRequest control = 1; + DataRequest data = 2; + } +} + +// Worker -> Engine. Either a control response ([[InitResponse]] / +// [[FinishResponse]] / [[CancelResponse]] / [[ExecutionError]]) or a +// data response message. +message UdfResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof response { + UdfControlResponse control = 1; + DataResponse data = 2; + } +} + +// Engine -> Worker control messages. +// +// Wire protocol for one Execute stream (both directions): +// +// Engine -> Worker: Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)? +// | Cancel +// Worker -> Engine: InitResponse -> (DataResponse)* -> (ExecutionError)? -> (FinishResponse | CancelResponse) +// +// DataRequest and DataResponse are independent streams: the worker +// may emit DataResponse messages at any point after InitResponse, +// including before the first DataRequest arrives. For generator-style +// UDFs that produce output without consuming input, there may be zero +// DataRequest messages -- the engine sends Finish directly after Init. +// The arrows above denote ordering constraints within each direction, +// not a request/response pairing. +// +// The engine MAY send DataRequests before receiving InitResponse (pipeline +// mode). The worker MUST buffer such DataRequests and process them in +// arrival order once init succeeds. They MAY be discarded only if init +// fails (i.e. the worker sends ExecutionError before InitResponse). +// +// Ordering invariants: +// - PayloadChunk* only after Init and before the first DataRequest. +// [[Init.is_chunking_payload]] = true signals that chunks will follow; +// [[PayloadChunk.last]] = true is the canonical end-of-chunking signal. +// When [[Init.is_chunking_payload]] is false or absent, [[InitResponse]] +// MAY be sent immediately after [[Init]] without waiting for chunks. +// - InitResponse MUST be emitted before any DataResponse. +// - ExecutionError (if any) MUST be emitted after all DataResponse +// messages and only from the init or data-processing phase (not from +// the finish or cancel callback). It signals that the execution is +// aborted; the terminator is always CancelResponse. When sent before +// [[InitResponse]] it indicates an init failure (no [[InitResponse]] +// will follow). +// The engine MUST send Cancel upon receiving ExecutionError (using +// Cancel-after-Finish if Finish was already sent), and the worker +// MUST respond with CancelResponse. At most one ExecutionError is +// sent per stream; the worker aggregates multiple errors internally. +// - Errors in the finish callback (if any) are reported via +// [[FinishResponse.error]]; errors in the cancel callback (if any) +// via [[CancelResponse.error]]. These are distinct from +// ExecutionError, which covers data-processing errors only. +// - The engine terminates with one of: +// (a) Finish alone -> worker sends FinishResponse. +// (b) Cancel alone -> worker sends CancelResponse. +// (c) Finish then Cancel -> worker sends CancelResponse if it has not +// yet sent FinishResponse, otherwise FinishResponse (see [[Finish]]). +// - Cancel MUST NOT precede Finish on the same stream; if the engine +// cancels before all data is submitted, it sends Cancel alone (case b). +// +// A worker that receives messages out of order (e.g. a second Init, +// a PayloadChunk after the first DataRequest, a DataRequest before Init, +// or a Cancel before Init) MUST send [[ExecutionError]] with a +// [[ProtocolError]] kind, followed by [[FinishResponse]] or +// [[CancelResponse]] to close the stream cleanly. +message UdfControlRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + Init init = 1; + PayloadChunk payload = 2; + Finish finish = 3; + Cancel cancel = 4; + } +} + +// Worker -> Engine control messages. +message UdfControlResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + InitResponse init = 1; + FinishResponse finish = 2; + CancelResponse cancel = 3; + ExecutionError error = 4; + } +} + +// ===================================================================== +// Init phase +// ===================================================================== + +// Sent once, as the first message on an Execute stream. Describes +// the UDF body to run plus the minimum metadata the worker needs to +// start processing it. +// +// Today the protocol mandates exactly one Init per UDF execution +// (one Init -> data -> Finish). This is the simplest contract and +// covers all currently supported UDF kinds. In the future we may +// evolve to support multiple init phases on the same stream -- e.g. +// when worker setup requires an interactive handshake (negotiate a +// schema, exchange capabilities, fetch driver-side metadata, ...) +// before the data plane opens. Such an extension would be additive +// and would not change the single-Init semantics already in use. +// +// Engine vs. client split: +// * Most fields on Init are engine-side. They describe what +// flows on the wire for this session ([[data_format]] / +// [[input_schema]] / [[output_schema]] -- matching the worker +// spec, not the function's view) and what per-session +// context the worker needs ([[timezone]], [[session_conf]], +// [[task_context]], [[parameters]]). +// * [[UdfPayload]] carries everything the client side of Spark +// (where the UDF is defined and serialized) packs -- the +// serialized callable, an opaque format tag, and any encoder +// metadata bundled with the callable. The wire protocol does +// not enumerate encoder shapes; that is left to the client and +// worker to agree on per UDF type. +message Init { + // (Optional) Protocol version declared by the engine for this stream. + // Allows the worker to detect version mismatches early and reject + // streams using a protocol revision it does not support. When not set, + // the worker SHOULD assume the initial protocol version. + optional uint32 protocol_version = 1; + + // (Required) Wire format used for [[DataRequest.data]] and + // [[DataResponse.data]] for the life of this session. Must be + // one of the formats the worker declared in + // [[WorkerCapabilities.supported_data_formats]]; the client side + // of the protocol picks one at planning time and sticks with it. + // + // Workers MUST reject an [[Init]] whose [[data_format]] is + // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, or whose value is not + // present in their declared + // [[WorkerCapabilities.supported_data_formats]]. The latter covers + // unknown enum values that proto3 passes through as numeric + // constants -- e.g. a newer engine selecting a format the worker + // does not implement. + UDFWorkerDataFormat data_format = 2; + + // (Required) The UDF body to execute on the worker for this + // session. Exactly one payload per Execute stream. + UdfPayload udf = 3; + + // (Optional) Schema of the input data plane in the wire format + // declared by [[data_format]] -- e.g. an Arrow IPC schema when + // data_format = ARROW. This is an engine-side requirement: it + // describes the bytes the engine will actually put on + // [[DataRequest.data]] for this session, matching what the + // worker advertised in its spec. It is NOT necessarily the + // schema the function definer expressed; the UDF's own type + // information lives inside [[UdfPayload]], typically embedded + // alongside the callable in [[UdfPayload.payload]] (e.g. as + // input/output encoders chosen per UDF type). + // + // Left unset when the worker can derive the schema from the + // payload alone. + optional bytes input_schema = 4; + + // (Optional) Schema of the output data plane in the wire format + // declared by [[data_format]]. Same semantics as + // [[input_schema]] -- engine-side requirement describing the + // bytes the engine expects on [[DataResponse.data]]. + optional bytes output_schema = 5; + + // (Optional; defaults to an empty map.) Per-task context + // provided by the engine. Common keys identify the task instance + // for diagnostics, logging, and stateful workers -- e.g. + // partition id, task attempt id, stage id, micro-batch id. + // Engine and worker agree on the keys they share; the protocol + // does not enumerate them. + map task_context = 6; + + // (Optional; defaults to an empty map.) Worker-private knobs not + // already captured by typed fields above. Free-form; both sides + // agree on the keys they need. + // + // Any key that two languages converge on is a candidate for + // promotion to a structured proto field -- once promoted, it gets + // a typed field number from the reserved range right after this + // block and is removed from [[session_conf]]. [[timezone]] below + // is an example of a key that has already been promoted. + map session_conf = 7; + + // (Optional) Session timezone, promoted out of [[session_conf]] + // because every eval needs it for timestamp encoding/decoding. + // + // Format follows Spark's `spark.sql.session.timeZone` config -- + // typically an IANA TZ id (e.g. "America/Los_Angeles") or a + // fixed offset (e.g. "+08:00"). The engine MUST pass the value + // it would resolve from the session conf without further + // transformation, so the worker can interpret it the same way + // Spark does. + optional string timezone = 8; + + // (Optional) When true, the UDF payload will be delivered via + // [[PayloadChunk]] messages rather than inline in [[UdfPayload.payload]]. + // The worker MUST wait for [[PayloadChunk.last]] = true before sending + // [[InitResponse]]. When false or absent, the payload is fully contained + // in [[UdfPayload.payload]] and the worker MAY send [[InitResponse]] + // immediately after [[Init]] without waiting for any [[PayloadChunk]]. + optional bool is_chunking_payload = 9; + + // Reserved for future typed Init fields, in particular keys + // graduated from [[session_conf]] (see the [[timezone]] precedent + // above). Numbers >= 100 are intentionally NOT reserved here; if + // a future revision needs an opaque escape-hatch field, give it a + // number >= 100 alongside [[parameters]] and add a field-level + // comment so the convention stays visible. + reserved 10 to 99; + + // (Optional) Engine-packed opaque parameters specific to a + // particular kind of UDF execution. The escape hatch for + // anything the engine needs the worker to see at init time + // that is not already captured by the typed fields above and + // does not fit naturally into [[task_context]]. The encoding + // is agreed between the engine and the worker; the protocol + // does not interpret it. The matching response, also opaque + // bytes, is returned via [[InitResponse.data]]. + // + // Numbers >= 100 are reserved by convention for opaque + // escape-hatch fields like this one; new typed fields use the + // reserved 10..99 range. + // + // Client-side init data (anything packed by the layer that + // defines and serializes the UDF) does NOT belong here -- it + // travels inside [[UdfPayload.payload]] instead. + optional bytes parameters = 100; +} + +// Acknowledgment for [[Init]] on success. The worker MUST send exactly +// one [[InitResponse]] before any [[DataResponse]]. When [[PayloadChunk]] +// is used to deliver the UDF payload, the worker MUST also wait until +// end-of-chunking to emit it (see [[PayloadChunk]]). +// +// On init failure (e.g. unsupported payload format, failed deserialization), +// the worker sends [[ExecutionError]] instead -- no [[InitResponse]] is +// emitted. The engine treats a received [[ExecutionError]] without a +// prior [[InitResponse]] as an init failure, sends [[Cancel]], and waits +// for [[CancelResponse]]. +// +// The init phase is a bidirectional handshake: the worker can return +// inline bytes for the engine to consume before data starts flowing. +// This enables certain UDF execution types to communicate init-time +// results back to the engine early -- for example, signalling that +// execution should be skipped entirely (e.g. the UDF determined during +// init that its output is empty), or returning an output schema derived +// from the payload. The semantics of those bytes are agreed between the +// client side of the protocol and the worker; this message itself is +// otherwise opaque. +message InitResponse { + // (Optional) Inline init result returned by the worker. Opaque + // to the protocol; the client side of the protocol and the + // worker agree on what (if anything) it carries. + optional bytes data = 1; +} + +// Optional. Used to stream the single UDF payload when it does not +// fit in a single gRPC message. The default is to send the payload +// inline on [[UdfPayload.payload]]; chunking is only needed when a +// payload exceeds the gRPC message size limit. +// +// When used, at least one chunk MUST be sent after [[Init]] and +// before the first [[DataRequest]], with the final chunk carrying +// [[PayloadChunk.last]] = true. The worker concatenates the +// inline [[UdfPayload.payload]] (if any) followed by all chunks in +// arrival order to form the final payload. +// +// Chunks are part of the Init handshake, not standalone control +// messages: they extend [[Init.udf.payload]] and are not +// individually acknowledged. The single [[InitResponse]] covers +// Init plus all of its chunks together. [[PayloadChunk.last]] = true +// is the canonical end-of-chunking signal; the worker MUST NOT send +// [[InitResponse]] before receiving it. +// +// When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers +// MAY validate that the total assembled payload (inline +// [[UdfPayload.payload]] bytes plus all chunk bytes) matches it; a +// mismatch is a protocol error. +message PayloadChunk { + // (Required, non-empty.) Bytes appended to the [[Init.udf]] + // payload. + bytes data = 1; + + // Marks the final chunk. When the engine uses [[PayloadChunk]] + // at all, it MUST set `last = true` on the last chunk. This is + // the canonical end-of-chunking signal: the worker MUST wait for + // it before emitting [[InitResponse]] and before treating any + // subsequent message as a [[DataRequest]]. Non-final chunks omit + // this field. + // + // Kept `optional` so future revisions can distinguish "engine did + // not set this field" from "engine set false" without renumbering. + optional bool last = 2; +} + +// ===================================================================== +// Data phase +// +// `data` is intentionally a top-level `bytes` field on both request +// and response messages -- not nested inside a wrapper -- so that +// implementations can avoid an extra copy when reading or writing +// the payload. The wire format (Arrow IPC etc.) is declared once per +// session via [[Init.data_format]] and stays the same for the life +// of the stream. +// +// Backpressure: this protocol currently relies on gRPC's transport-level +// (HTTP/2) flow control for backpressure. +// ===================================================================== + +// Engine -> Worker per-batch payload. +message DataRequest { + // (Required) Encoded data bytes for one batch in the + // session-declared format. What "empty" means for a batch is + // defined by the session's [[Init.data_format]] -- for Arrow IPC + // even a zero-row batch carries a non-empty header, while future + // formats may permit truly zero-length payloads. Validation + // beyond "non-empty bytes" is delegated to the format decoder. + bytes data = 1; +} + +// Worker -> Engine per-batch payload. The worker emits zero or more +// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] / +// [[CancelResponse]]. Sink-style UDFs (which consume input but +// produce no output rows on the data plane) emit exactly zero. +message DataResponse { + // (Required) Encoded data bytes for one batch in the + // session-declared format. See [[DataRequest.data]] for the + // meaning of "empty". + bytes data = 1; +} + +// ===================================================================== +// Finish / Cancel phase +// ===================================================================== + +// Sent by the engine when all input data has been submitted and normal +// completion is expected. The worker MUST drain any remaining output, +// then emit [[FinishResponse]] and close the response stream. +// +// [[Cancel]] MAY follow [[Finish]] on the same stream if the engine +// wants to abort processing of already-submitted data (e.g. a Spark +// task is interrupted after all input batches were sent). [[Cancel]] +// MUST NOT precede [[Finish]]; if the engine cancels before sending +// all data it sends [[Cancel]] without [[Finish]]. +// +// Worker behavior when [[Cancel]] follows [[Finish]]: +// - If [[FinishResponse]] has not yet been sent, the worker MUST +// abort output, run cleanup, and send [[CancelResponse]]. +// - If [[FinishResponse]] has already been sent, [[Cancel]] arrives +// too late and is ignored; the engine receives [[FinishResponse]]. +// The engine MUST therefore be prepared to receive either +// [[FinishResponse]] or [[CancelResponse]] when it sends both. +message Finish {} + +// Worker -> Engine completion message. Carries per-execution summary metrics. +// +// Metrics design: +// - [[FinishResponse.metrics]] / [[CancelResponse.metrics]]: per-execution +// metrics accumulated up to the point of stream termination (e.g. rows +// processed, time per phase). Emitted once per [[Execute]] stream, +// regardless of whether the stream ended cleanly or was cancelled. +// - [[HeartbeatResponse.metrics]]: worker-global metrics aggregated across +// all sessions (e.g. total rows processed, memory usage). Emitted +// periodically via the [[Manage]] RPC. +// - Real-time per-execution metrics (e.g. incremental progress during a +// long-running UDF) are not yet defined. They may be introduced as an +// optional message in a future revision without changing this contract. +message FinishResponse { + // Per-execution metrics accumulated over the session. Free-form; + // names are worker-defined. + map metrics = 1; + + // (Optional) Inline finish result returned by the worker. + // Mirrors [[InitResponse.data]] -- the finish phase allows the + // engine to interact with the UDF after data has stopped + // flowing, with the worker returning opaque bytes the engine (or + // higher-level code) may consume during teardown. The semantics + // of those bytes are agreed between the client side of the + // protocol and the worker. + optional bytes data = 2; + + // (Optional) Error raised by the finish callback (if any), invoked + // after all input data has been consumed. This field is only set on + // the non-error execution path: if [[ExecutionError]] was sent during + // data processing, the terminator is [[CancelResponse]], not + // [[FinishResponse]]. The engine SHOULD surface this as an exception. + optional ExecutionError error = 3; +} + +// Engine -> Worker explicit cancel. Distinct from a gRPC stream error +// so the worker can run cleanup deterministically (release file +// handles, drop temp state, etc.). After receiving [[Cancel]] the +// worker MUST stop emitting [[DataResponse]] messages, run cleanup, +// and emit [[CancelResponse]] before closing. +// +// [[Cancel]] is the cooperative cancellation path and may be sent +// either instead of [[Finish]] (engine cancels before all data is +// submitted) or after [[Finish]] (engine aborts processing of +// already-submitted data -- see [[Finish]] for the full contract). +// A broken gRPC connection is the involuntary fallback -- in that +// case gRPC surfaces an error on the stream (see [[Execute]]). +// +// Cancellation latency: [[Cancel]] is delivered in-order on the same +// stream as [[DataRequest]] messages, so it takes effect only after all +// preceding batches in the gRPC receive queue have been processed. The +// delay is therefore (queue depth) × (batch processing time), where +// the queue depth is bounded by HTTP/2 flow control. Workers SHOULD +// set a cancellation flag as soon as [[Cancel]] is received so that +// any in-flight processing thread can abort at the next safe checkpoint. +// +// Future: for use cases that require lower-latency cancellation (e.g. +// interrupting a long-running batch), an out-of-band cancel signal may +// be added to the [[Manage]] RPC. Because [[Manage]] uses a separate +// connection it bypasses the Execute stream queue and can signal a +// processing thread immediately, independent of buffered batches. +// +// Worker behavior on involuntary stream error: when the worker observes +// a gRPC error on the Execute stream (i.e. the engine-side connection +// dropped), it MUST treat this as equivalent to [[Cancel]] for cleanup +// purposes -- stop producing output and release resources. The worker +// MUST NOT attempt to send [[CancelResponse]] or any other message, +// since the stream is already dead. +message Cancel { + // (Optional) Free-form reason for diagnostics. + optional string reason = 1; +} + +// Worker -> Engine acknowledgment of [[Cancel]], or the terminator after +// [[ExecutionError]] (the execution is always considered aborted when an +// error occurred during data processing). Carries any per-execution metrics +// accumulated up to the point of cancellation -- even a partial execution +// may produce useful diagnostics (e.g. rows processed before abort). +message CancelResponse { + // Per-execution metrics accumulated up to cancellation. Free-form; + // names are worker-defined. See [[FinishResponse.metrics]] for the + // full metrics design. + map metrics = 1; + + // (Optional) Error raised by the cancel callback (if any), invoked + // when [[Cancel]] is received. The engine SHOULD surface this + // alongside any prior [[ExecutionError]] (e.g. as a suppressed + // exception). + optional ExecutionError error = 2; +} + +// Worker -> Engine. Signals an application-level error that occurred +// during data processing. Distinct from a gRPC stream error: gRPC errors +// indicate transport or connection failures; [[ExecutionError]] carries +// errors that should be raised as user-facing exceptions. +// +// Wire position: emitted at most once, after all [[DataResponse]] messages, +// and only from the data-processing phase. Errors in the finish or cancel +// callback (if any) are reported via [[FinishResponse.error]] or +// [[CancelResponse.error]] respectively, not as a separate ExecutionError. +// +// Terminator: after [[ExecutionError]] the terminator is always +// [[CancelResponse]] -- the execution is considered aborted regardless of +// whether the engine already sent [[Finish]]. The engine MUST send +// [[Cancel]] (using Cancel-after-Finish if [[Finish]] was already sent) +// and wait for [[CancelResponse]], then raise the error. +// +// If more than one [[ExecutionError]] arrives (protocol violation), +// the engine SHOULD surface the first as the primary exception and +// attach subsequent ones as suppressed exceptions. +message ExecutionError { + // Exactly one kind MUST be set. + oneof kind { + UserError user = 1; + WorkerError worker = 2; + ProtocolError protocol = 3; + } +} + +// Error raised by the user's UDF code. +message UserError { + // (Required) Human-readable error message from the UDF. + string message = 1; + + // (Optional) Full stack trace or traceback in the worker's + // language-specific format. Forwarded to the user as-is. + optional string traceback = 2; + + // (Optional) Language-specific error class name (e.g. "ValueError" + // in Python, "RuntimeException" in Java). + optional string error_class = 3; +} + +// Error originating from the worker implementation itself, not user code. +message WorkerError { + // (Required) Human-readable description of the worker error. + string message = 1; + + // (Optional) Stack trace for diagnostics. + optional string traceback = 2; +} + +// Protocol violation detected by the worker (e.g. messages received out +// of order, unsupported [[Init.protocol_version]]). Sending this type +// instead of closing with a gRPC error keeps the stream lifecycle intact: +// [[FinishResponse]] or [[CancelResponse]] still follows. +message ProtocolError { + // (Required) Description of the protocol violation. + string message = 1; +} + +// The single UDF body delivered to the worker on [[Init]]. Opaque to +// the engine: the engine forwards [[payload]] and [[format]] +// unchanged, and the worker decodes them per the format the client +// and worker have agreed on. +message UdfPayload { + // (Required, may be empty when chunked.) Serialized UDF bundle, + // opaque to the engine. The encoding is declared in [[format]]. + // + // The bundle is not necessarily just the serialized callable; + // it is up to the client side of the protocol and the worker to + // agree on what is packed inside it -- e.g. custom encoders for + // user-defined types, type hints, or any other metadata the + // worker needs to invoke the UDF. + // + // For payloads too large to fit on a single gRPC message, this + // field MAY be left empty (zero-length bytes) and the bytes + // delivered via the [[PayloadChunk]] mechanism instead. See + // [[PayloadChunk]] for chunking semantics. + bytes payload = 1; + + // (Required, non-empty.) Format tag identifying the encoding of + // [[payload]]. The protocol does not enumerate the values: the + // client side of the protocol and the worker agree on the + // namespace, and each worker recognises the tags it knows how + // to decode. The engine forwards this string unchanged. + string format = 2; + + // (Optional) Total payload size in bytes. Useful when chunked + // streaming is used so the worker can pre-allocate buffers. + optional int64 payload_size = 3; + + // (Optional) Human-readable name for diagnostics and metrics. + optional string name = 4; + + // (Optional) Worker / language-specific dispatch hint. A + // free-form string the worker uses to pick the code path that + // handles this payload. The protocol does not enumerate eval + // types because they are language-specific; the client side of + // the protocol and the worker agree on the namespace and the + // values. + // + // When the worker can derive the eval type from the payload + // itself (embedded metadata, format tag, etc.), this field is + // left unset. Otherwise the client side of the protocol sets it + // explicitly. + optional string eval_type = 5; +} + +// ===================================================================== +// Manage RPC -- worker-scoped operations independent of Execute +// ===================================================================== + +// Engine -> Worker. Wraps the manage operations in a oneof so the RPC +// is a single typed call, leaving room for future operations +// (capability query, profiling, ...). +message WorkerRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof manage { + Heartbeat heartbeat = 1; + ShutdownRequest shutdown = 2; + } +} + +// Worker -> Engine. +message WorkerResponse { + // Exactly one branch MUST be set, and it MUST be the branch + // matching the request's oneof (e.g. [[Heartbeat]] is answered + // with [[HeartbeatResponse]], not [[ShutdownResponse]]). A + // mismatched response is a protocol error. + oneof manage { + HeartbeatResponse heartbeat = 1; + ShutdownResponse shutdown = 2; + } +} + +// Liveness probe. The engine may send this periodically to detect a +// hung worker process. The worker SHOULD reply within a small bounded +// time. +// +// This is an application-level liveness check distinct from gRPC's +// transport-level keepalive: gRPC keepalive proves the TCP connection +// is alive, whereas [[Heartbeat]] proves the worker's request-handling +// thread is responsive. Deployments may use either or both; they do +// not replace each other. +// +// What the engine does in response to a missed heartbeat (e.g., +// tearing down the worker) is outside the scope of this protocol and +// depends on the worker management mode defined in the worker +// specification. +// +// Note: [[Heartbeat]] can only detect a hung worker process (one +// whose request-handling thread is unresponsive). It cannot detect +// user code that is executing but taking unexpectedly long -- during +// init, data processing, or finish -- because such code is +// indistinguishable from legitimately slow UDF execution. Handling +// hanging user code (e.g. via UDF-level timeouts) is the +// responsibility of the UDF author or the worker implementation, and +// is outside the scope of this protocol. +message Heartbeat { + // Reserved for future additive fields (e.g. an engine-side + // sequence number or a request-id tag for correlating heartbeats + // when sent over a long-lived connection). + reserved 1; +} + +// Acknowledgment for [[Heartbeat]]. +message HeartbeatResponse { + // (Optional) Worker-global metrics aggregated across all active sessions + // (e.g. total rows processed, active session count, memory usage). + // Complements the per-execution metrics in [[FinishResponse.metrics]]. + // Free-form; names are worker-defined. + map metrics = 1; +} + +// Engine-initiated graceful shutdown request. This lets the worker +// know the engine has finished with it and intends no further Execute +// streams. OS-level process management is outside the scope of this +// protocol and is defined in the worker specification. +// +// Interaction with in-flight Execute streams: when [[cancel_sessions]] +// is false (the default), in-flight Execute streams MUST be allowed to +// complete normally (via [[Finish]] or [[Cancel]]). When [[cancel_sessions]] +// is true, the worker MUST cancel all in-flight streams immediately by +// sending [[ExecutionError]] with [[WorkerError]] kind followed by the +// appropriate terminator on each stream, then exit. The engine SHOULD NOT +// send [[ShutdownRequest]] while it still intends to start new Execute +// streams. Once all Execute streams have terminated (cleanly or via gRPC +// error), the worker SHOULD exit. +message ShutdownRequest { + // (Optional) Free-form reason for diagnostics. + optional string reason = 1; + + // (Optional; defaults to false.) When true, the worker MUST cancel + // all in-flight Execute streams immediately rather than waiting for + // them to complete naturally. + optional bool cancel_sessions = 2; +} + +// Worker -> Engine acknowledgment of [[ShutdownRequest]]. +message ShutdownResponse { + // True when all Execute streams were fully settled (finished or + // cancelled) before this response was sent. False when the worker + // acknowledged the request but streams were still in flight (only + // possible when [[ShutdownRequest.cancel_sessions]] is false and the + // worker sends the response before draining). + bool sessions_settled = 1; +}