From 9ccc77ae7d9fae16c1fac20428e0696f35175fa7 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 15 May 2026 04:16:57 -0700 Subject: [PATCH 1/6] untrack generated python proto, regenerate in build --- .github/workflows/build.yml | 20 + .gitignore | 3 + AGENTS.md | 8 + amber/build.sbt | 16 + amber/dev-requirements.txt | 4 + amber/src/main/python/proto/__init__.py | 0 amber/src/main/python/proto/org/__init__.py | 0 .../main/python/proto/org/apache/__init__.py | 0 .../proto/org/apache/texera/__init__.py | 0 .../proto/org/apache/texera/amber/__init__.py | 0 .../org/apache/texera/amber/core/__init__.py | 146 -- .../apache/texera/amber/engine/__init__.py | 0 .../amber/engine/architecture/__init__.py | 0 .../amber/engine/architecture/rpc/__init__.py | 2204 ----------------- .../architecture/sendsemantics/__init__.py | 66 - .../engine/architecture/worker/__init__.py | 49 - .../texera/amber/engine/common/__init__.py | 156 -- .../proto/org/apache/texera/web/__init__.py | 158 -- .../src/main/python/proto/scalapb/__init__.py | 421 ---- bin/computing-unit-master.dockerfile | 18 +- bin/computing-unit-worker.dockerfile | 16 +- bin/python-proto-gen.sh | 3 + bin/texera-web-application.dockerfile | 18 +- 23 files changed, 101 insertions(+), 3205 deletions(-) delete mode 100644 amber/src/main/python/proto/__init__.py delete mode 100644 amber/src/main/python/proto/org/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py delete mode 100644 amber/src/main/python/proto/org/apache/texera/web/__init__.py delete mode 100644 amber/src/main/python/proto/scalapb/__init__.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8b0a5ea5222..c3aa4841b8e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -315,11 +315,19 @@ jobs: # mirrors a subset of common deps (e.g. pillow); without this # flag a dependabot bump to a version not yet mirrored there # fails to resolve even though PyPI has it. + # dev-requirements.txt provides the betterproto plugin used by + # genPythonProto (wired into amber/Compile in amber/build.sbt). run: | python -m pip install uv if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi + - name: Install protoc 3.19.4 + # Matches PB.protocVersion in amber/build.sbt. + run: | + curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc - name: Create Databases run: | psql -h localhost -U postgres -f sql/texera_ddl.sql @@ -621,6 +629,14 @@ jobs: run: | python -m pip install uv if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi + - name: Install protoc 3.19.4 + # Matches PB.protocVersion in amber/build.sbt. + run: | + curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc + - name: Generate Python proto bindings + run: bash bin/python-proto-gen.sh - name: Test with pytest run: | cd amber && pytest -m "not integration" --cov=src/main/python --cov-report=xml -sv @@ -664,6 +680,10 @@ jobs: if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi + - name: Install protoc + run: brew install protobuf + - name: Generate Python proto bindings + run: bash bin/python-proto-gen.sh - name: Run state-materialization integration tests run: | cd amber && pytest -sv \ diff --git a/.gitignore b/.gitignore index d17fe084cf3..ee570fceea8 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,9 @@ coverage.xml *.model *.pkl +# Regenerated by sbt amber/compile. +amber/src/main/python/proto/ + # Ingoring user generated resources user-resources/ diff --git a/AGENTS.md b/AGENTS.md index ba40082d595..f517eb71979 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -83,8 +83,16 @@ One Python venv shared across worktrees, sibling of the texera checkout: ```bash python3.12 -m venv ../venv312 && source ../venv312/bin/activate pip install -r amber/requirements.txt -r amber/operator-requirements.txt +# For pytest or sbt-driven Python codegen, also install dev deps: +pip install -r amber/dev-requirements.txt ``` +`amber/src/main/python/proto/` is gitignored and regenerated by +[`bin/python-proto-gen.sh`](bin/python-proto-gen.sh) on `sbt amber/compile` +(see `genPythonProto` in [`amber/build.sbt`](amber/build.sbt)). Requires +`protoc` on PATH (pin to `3.19.4` to match `PB.protocVersion`); skipped +with a warning when `protoc` is missing. + Tests that spawn Python workers need an interpreter path. Edit `python.path` in [`udf.conf`](common/config/src/main/resources/udf.conf) or `export UDF_PYTHON_PATH="$(pwd)/../venv312/bin/python"` (env var overrides). diff --git a/amber/build.sbt b/amber/build.sbt index 1f363e73e91..ce0bb1dfab2 100644 --- a/amber/build.sbt +++ b/amber/build.sbt @@ -200,6 +200,22 @@ libraryDependencies += "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0" // enable protobuf compilation in Test Test / PB.protoSources += PB.externalSourcePath.value +// Skipped with a warning if protoc is missing. +val genPythonProto = taskKey[Unit]("Generate Python betterproto bindings from .proto sources.") +genPythonProto := { + val log = streams.value.log + val repoRoot = (ThisBuild / baseDirectory).value + val script = repoRoot / "bin" / "python-proto-gen.sh" + val protocOnPath = scala.sys.process.Process(Seq("bash", "-c", "command -v protoc >/dev/null 2>&1")).! == 0 + if (!protocOnPath) { + log.warn("protoc not found on PATH; skipping Python proto generation. Install protoc and `pip install betterproto[compiler]` before running pytest.") + } else { + val exit = scala.sys.process.Process(Seq("bash", script.getAbsolutePath), repoRoot).!(log) + if (exit != 0) sys.error(s"python-proto-gen.sh failed with exit code $exit") + } +} +Compile / compile := (Compile / compile).dependsOn(genPythonProto).value + ///////////////////////////////////////////////////////////////////////////// // Test related // https://mvnrepository.com/artifact/org.scalamock/scalamock diff --git a/amber/dev-requirements.txt b/amber/dev-requirements.txt index 1bbacb78d63..7ae68cf4a2c 100644 --- a/amber/dev-requirements.txt +++ b/amber/dev-requirements.txt @@ -23,3 +23,7 @@ # Coverage instrumentation for pytest; emits coverage.xml consumed by # Codecov's Phase 1 upload. pytest-cov==5.0.0 + +# protoc plugin for bin/python-proto-gen.sh; runtime needs only the +# base `betterproto` in requirements.txt. +betterproto[compiler]==2.0.0b7 diff --git a/amber/src/main/python/proto/__init__.py b/amber/src/main/python/proto/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/__init__.py b/amber/src/main/python/proto/org/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/__init__.py b/amber/src/main/python/proto/org/apache/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/texera/__init__.py b/amber/src/main/python/proto/org/apache/texera/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/texera/amber/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py deleted file mode 100644 index 2d21638c263..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py +++ /dev/null @@ -1,146 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/amber/core/executor.proto, org/apache/texera/amber/core/virtualidentity.proto, org/apache/texera/amber/core/workflow.proto, org/apache/texera/amber/core/workflowruntimestate.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from datetime import datetime -from typing import ( - List, -) - -import betterproto - - -class OutputPortOutputMode(betterproto.Enum): - SET_SNAPSHOT = 0 - """outputs complete result set snapshot for each update""" - - SET_DELTA = 1 - """outputs incremental result set delta for each update""" - - SINGLE_SNAPSHOT = 2 - """ - outputs a single snapshot for the entire execution, - used explicitly to support visualization operators that may exceed the memory limit - TODO: remove this mode after we have a better solution for output size limit - """ - - -class FatalErrorType(betterproto.Enum): - COMPILATION_ERROR = 0 - EXECUTION_FAILURE = 1 - - -@dataclass(eq=False, repr=False) -class WorkflowIdentity(betterproto.Message): - id: int = betterproto.int64_field(1) - - -@dataclass(eq=False, repr=False) -class ExecutionIdentity(betterproto.Message): - id: int = betterproto.int64_field(1) - - -@dataclass(eq=False, repr=False) -class ActorVirtualIdentity(betterproto.Message): - name: str = betterproto.string_field(1) - - -@dataclass(eq=False, repr=False) -class ChannelIdentity(betterproto.Message): - from_worker_id: "ActorVirtualIdentity" = betterproto.message_field(1) - to_worker_id: "ActorVirtualIdentity" = betterproto.message_field(2) - is_control: bool = betterproto.bool_field(3) - - -@dataclass(eq=False, repr=False) -class OperatorIdentity(betterproto.Message): - id: str = betterproto.string_field(1) - - -@dataclass(eq=False, repr=False) -class PhysicalOpIdentity(betterproto.Message): - logical_op_id: "OperatorIdentity" = betterproto.message_field(1) - layer_name: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class EmbeddedControlMessageIdentity(betterproto.Message): - id: str = betterproto.string_field(1) - - -@dataclass(eq=False, repr=False) -class PortIdentity(betterproto.Message): - id: int = betterproto.int32_field(1) - internal: bool = betterproto.bool_field(2) - - -@dataclass(eq=False, repr=False) -class GlobalPortIdentity(betterproto.Message): - op_id: "PhysicalOpIdentity" = betterproto.message_field(1) - port_id: "PortIdentity" = betterproto.message_field(2) - input: bool = betterproto.bool_field(3) - - -@dataclass(eq=False, repr=False) -class InputPort(betterproto.Message): - id: "PortIdentity" = betterproto.message_field(1) - display_name: str = betterproto.string_field(2) - disallow_multi_links: bool = betterproto.bool_field(3) - dependencies: List["PortIdentity"] = betterproto.message_field(4) - - -@dataclass(eq=False, repr=False) -class OutputPort(betterproto.Message): - id: "PortIdentity" = betterproto.message_field(1) - display_name: str = betterproto.string_field(2) - blocking: bool = betterproto.bool_field(3) - mode: "OutputPortOutputMode" = betterproto.enum_field(4) - - -@dataclass(eq=False, repr=False) -class PhysicalLink(betterproto.Message): - from_op_id: "PhysicalOpIdentity" = betterproto.message_field(1) - from_port_id: "PortIdentity" = betterproto.message_field(2) - to_op_id: "PhysicalOpIdentity" = betterproto.message_field(3) - to_port_id: "PortIdentity" = betterproto.message_field(4) - - -@dataclass(eq=False, repr=False) -class OpExecWithCode(betterproto.Message): - code: str = betterproto.string_field(1) - language: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class OpExecWithClassName(betterproto.Message): - class_name: str = betterproto.string_field(1) - desc_string: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class OpExecSource(betterproto.Message): - storage_key: str = betterproto.string_field(1) - workflow_identity: "WorkflowIdentity" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class OpExecInitInfo(betterproto.Message): - op_exec_with_class_name: "OpExecWithClassName" = betterproto.message_field( - 1, group="sealed_value" - ) - op_exec_with_code: "OpExecWithCode" = betterproto.message_field( - 2, group="sealed_value" - ) - op_exec_source: "OpExecSource" = betterproto.message_field(3, group="sealed_value") - - -@dataclass(eq=False, repr=False) -class WorkflowFatalError(betterproto.Message): - type: "FatalErrorType" = betterproto.enum_field(1) - timestamp: datetime = betterproto.message_field(2) - message: str = betterproto.string_field(3) - details: str = betterproto.string_field(4) - operator_id: str = betterproto.string_field(5) - worker_id: str = betterproto.string_field(6) diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py deleted file mode 100644 index 2bad2b0bfbc..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ /dev/null @@ -1,2204 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto, org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto, org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto, org/apache/texera/amber/engine/architecture/rpc/testerservice.proto, org/apache/texera/amber/engine/architecture/rpc/workerservice.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from datetime import datetime -from typing import ( - TYPE_CHECKING, - Dict, - List, - Optional, -) - -import betterproto -import grpclib -from betterproto.grpc.grpclib_server import ServiceBase - -from .... import core as ___core__ -from .. import ( - sendsemantics as _sendsemantics__, - worker as _worker__, -) - - -if TYPE_CHECKING: - import grpclib.server - from betterproto.grpc.grpclib_client import MetadataLike - from grpclib.metadata import Deadline - - -class EmbeddedControlMessageType(betterproto.Enum): - ALL_ALIGNMENT = 0 - NO_ALIGNMENT = 1 - PORT_ALIGNMENT = 2 - - -class ConsoleMessageType(betterproto.Enum): - PRINT = 0 - ERROR = 1 - COMMAND = 2 - DEBUGGER = 3 - - -class StatisticsUpdateTarget(betterproto.Enum): - BOTH_UI_AND_PERSISTENCE = 0 - UI_ONLY = 1 - PERSISTENCE_ONLY = 2 - - -class ErrorLanguage(betterproto.Enum): - PYTHON = 0 - SCALA = 1 - - -class WorkflowAggregatedState(betterproto.Enum): - UNINITIALIZED = 0 - READY = 1 - RUNNING = 2 - PAUSING = 3 - PAUSED = 4 - RESUMING = 5 - COMPLETED = 6 - FAILED = 7 - UNKNOWN = 8 - KILLED = 9 - TERMINATED = 10 - - -@dataclass(eq=False, repr=False) -class ControlRequest(betterproto.Message): - propagate_embedded_control_message_request: ( - "PropagateEmbeddedControlMessageRequest" - ) = betterproto.message_field(1, group="sealed_value") - """request for controller""" - - take_global_checkpoint_request: "TakeGlobalCheckpointRequest" = ( - betterproto.message_field(2, group="sealed_value") - ) - debug_command_request: "DebugCommandRequest" = betterproto.message_field( - 3, group="sealed_value" - ) - evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = ( - betterproto.message_field(4, group="sealed_value") - ) - retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field( - 5, group="sealed_value" - ) - console_message_triggered_request: "ConsoleMessageTriggeredRequest" = ( - betterproto.message_field(6, group="sealed_value") - ) - port_completed_request: "PortCompletedRequest" = betterproto.message_field( - 7, group="sealed_value" - ) - worker_state_updated_request: "WorkerStateUpdatedRequest" = ( - betterproto.message_field(8, group="sealed_value") - ) - link_workers_request: "LinkWorkersRequest" = betterproto.message_field( - 9, group="sealed_value" - ) - workflow_reconfigure_request: "WorkflowReconfigureRequest" = ( - betterproto.message_field(10, group="sealed_value") - ) - jump_to_operator_region_request: "JumpToOperatorRegionRequest" = betterproto.message_field( - 11, group="sealed_value" - ) - add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( - 50, group="sealed_value" - ) - """request for worker""" - - add_partitioning_request: "AddPartitioningRequest" = betterproto.message_field( - 51, group="sealed_value" - ) - assign_port_request: "AssignPortRequest" = betterproto.message_field( - 52, group="sealed_value" - ) - finalize_checkpoint_request: "FinalizeCheckpointRequest" = ( - betterproto.message_field(53, group="sealed_value") - ) - initialize_executor_request: "InitializeExecutorRequest" = ( - betterproto.message_field(54, group="sealed_value") - ) - update_executor_request: "UpdateExecutorRequest" = betterproto.message_field( - 55, group="sealed_value" - ) - empty_request: "EmptyRequest" = betterproto.message_field(56, group="sealed_value") - prepare_checkpoint_request: "PrepareCheckpointRequest" = betterproto.message_field( - 57, group="sealed_value" - ) - query_statistics_request: "QueryStatisticsRequest" = betterproto.message_field( - 58, group="sealed_value" - ) - ping: "Ping" = betterproto.message_field(100, group="sealed_value") - """request for testing""" - - pong: "Pong" = betterproto.message_field(101, group="sealed_value") - nested: "Nested" = betterproto.message_field(102, group="sealed_value") - pass_: "Pass" = betterproto.message_field(103, group="sealed_value") - error_command: "ErrorCommand" = betterproto.message_field(104, group="sealed_value") - recursion: "Recursion" = betterproto.message_field(105, group="sealed_value") - collect: "Collect" = betterproto.message_field(106, group="sealed_value") - generate_number: "GenerateNumber" = betterproto.message_field( - 107, group="sealed_value" - ) - multi_call: "MultiCall" = betterproto.message_field(108, group="sealed_value") - chain: "Chain" = betterproto.message_field(109, group="sealed_value") - - -@dataclass(eq=False, repr=False) -class EmptyRequest(betterproto.Message): - pass - - -@dataclass(eq=False, repr=False) -class AsyncRpcContext(betterproto.Message): - sender: "___core__.ActorVirtualIdentity" = betterproto.message_field(1) - receiver: "___core__.ActorVirtualIdentity" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class ControlInvocation(betterproto.Message): - method_name: str = betterproto.string_field(1) - command: "ControlRequest" = betterproto.message_field(2) - context: "AsyncRpcContext" = betterproto.message_field(3) - command_id: int = betterproto.int64_field(4) - - -@dataclass(eq=False, repr=False) -class EmbeddedControlMessage(betterproto.Message): - id: "___core__.EmbeddedControlMessageIdentity" = betterproto.message_field(1) - ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(2) - scope: List["___core__.ChannelIdentity"] = betterproto.message_field(3) - command_mapping: Dict[str, "ControlInvocation"] = betterproto.map_field( - 4, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class PropagateEmbeddedControlMessageRequest(betterproto.Message): - source_op_to_start_prop: List["___core__.PhysicalOpIdentity"] = ( - betterproto.message_field(1) - ) - id: "___core__.EmbeddedControlMessageIdentity" = betterproto.message_field(2) - ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(3) - scope: List["___core__.PhysicalOpIdentity"] = betterproto.message_field(4) - target_ops: List["___core__.PhysicalOpIdentity"] = betterproto.message_field(5) - command: "ControlRequest" = betterproto.message_field(6) - method_name: str = betterproto.string_field(7) - - -@dataclass(eq=False, repr=False) -class TakeGlobalCheckpointRequest(betterproto.Message): - estimation_only: bool = betterproto.bool_field(1) - checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = ( - betterproto.message_field(2) - ) - destination: str = betterproto.string_field(3) - - -@dataclass(eq=False, repr=False) -class WorkflowReconfigureRequest(betterproto.Message): - reconfiguration: List["UpdateExecutorRequest"] = betterproto.message_field(1) - reconfiguration_id: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class DebugCommandRequest(betterproto.Message): - worker_id: str = betterproto.string_field(1) - cmd: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class EvaluatePythonExpressionRequest(betterproto.Message): - expression: str = betterproto.string_field(1) - operator_id: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class RetryWorkflowRequest(betterproto.Message): - workers: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class ConsoleMessage(betterproto.Message): - worker_id: str = betterproto.string_field(1) - timestamp: datetime = betterproto.message_field(2) - msg_type: "ConsoleMessageType" = betterproto.enum_field(3) - source: str = betterproto.string_field(4) - title: str = betterproto.string_field(5) - message: str = betterproto.string_field(6) - - -@dataclass(eq=False, repr=False) -class ConsoleMessageTriggeredRequest(betterproto.Message): - console_message: "ConsoleMessage" = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class PortCompletedRequest(betterproto.Message): - port_id: "___core__.PortIdentity" = betterproto.message_field(1) - input: bool = betterproto.bool_field(2) - - -@dataclass(eq=False, repr=False) -class WorkerStateUpdatedRequest(betterproto.Message): - state: "_worker__.WorkerState" = betterproto.enum_field(1) - - -@dataclass(eq=False, repr=False) -class LinkWorkersRequest(betterproto.Message): - link: "___core__.PhysicalLink" = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class Ping(betterproto.Message): - """Ping message""" - - i: int = betterproto.int32_field(1) - end: int = betterproto.int32_field(2) - to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3) - - -@dataclass(eq=False, repr=False) -class Pong(betterproto.Message): - """Pong message""" - - i: int = betterproto.int32_field(1) - end: int = betterproto.int32_field(2) - to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3) - - -@dataclass(eq=False, repr=False) -class Pass(betterproto.Message): - """Pass message""" - - value: str = betterproto.string_field(1) - - -@dataclass(eq=False, repr=False) -class Nested(betterproto.Message): - """Nested message""" - - k: int = betterproto.int32_field(1) - - -@dataclass(eq=False, repr=False) -class MultiCall(betterproto.Message): - """MultiCall message""" - - seq: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class ErrorCommand(betterproto.Message): - """ErrorCommand message""" - - pass - - -@dataclass(eq=False, repr=False) -class Collect(betterproto.Message): - """Collect message""" - - workers: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class GenerateNumber(betterproto.Message): - """GenerateNumber message""" - - pass - - -@dataclass(eq=False, repr=False) -class Chain(betterproto.Message): - """Chain message""" - - nexts: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class Recursion(betterproto.Message): - """Recursion message""" - - i: int = betterproto.int32_field(1) - - -@dataclass(eq=False, repr=False) -class AddInputChannelRequest(betterproto.Message): - """Messages for the commands""" - - channel_id: "___core__.ChannelIdentity" = betterproto.message_field(1) - port_id: "___core__.PortIdentity" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class AddPartitioningRequest(betterproto.Message): - tag: "___core__.PhysicalLink" = betterproto.message_field(1) - partitioning: "_sendsemantics__.Partitioning" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class AssignPortRequest(betterproto.Message): - port_id: "___core__.PortIdentity" = betterproto.message_field(1) - input: bool = betterproto.bool_field(2) - schema: Dict[str, str] = betterproto.map_field( - 3, betterproto.TYPE_STRING, betterproto.TYPE_STRING - ) - storage_uris: List[str] = betterproto.string_field(4) - partitionings: List["_sendsemantics__.Partitioning"] = betterproto.message_field(5) - - -@dataclass(eq=False, repr=False) -class FinalizeCheckpointRequest(betterproto.Message): - checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = ( - betterproto.message_field(1) - ) - write_to: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class InitializeExecutorRequest(betterproto.Message): - total_worker_count: int = betterproto.int32_field(1) - op_exec_init_info: "___core__.OpExecInitInfo" = betterproto.message_field(2) - is_source: bool = betterproto.bool_field(3) - - -@dataclass(eq=False, repr=False) -class UpdateExecutorRequest(betterproto.Message): - target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1) - new_exec_init_info: "___core__.OpExecInitInfo" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class PrepareCheckpointRequest(betterproto.Message): - checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = ( - betterproto.message_field(1) - ) - estimation_only: bool = betterproto.bool_field(2) - - -@dataclass(eq=False, repr=False) -class QueryStatisticsRequest(betterproto.Message): - filter_by_workers: List["___core__.ActorVirtualIdentity"] = ( - betterproto.message_field(1) - ) - update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2) - - -@dataclass(eq=False, repr=False) -class JumpToOperatorRegionRequest(betterproto.Message): - target_operator_id: "___core__.OperatorIdentity" = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class ControlReturn(betterproto.Message): - """The generic return message""" - - retrieve_workflow_state_response: "RetrieveWorkflowStateResponse" = ( - betterproto.message_field(1, group="sealed_value") - ) - """controller responses""" - - propagate_embedded_control_message_response: ( - "PropagateEmbeddedControlMessageResponse" - ) = betterproto.message_field(2, group="sealed_value") - take_global_checkpoint_response: "TakeGlobalCheckpointResponse" = ( - betterproto.message_field(3, group="sealed_value") - ) - evaluate_python_expression_response: "EvaluatePythonExpressionResponse" = ( - betterproto.message_field(4, group="sealed_value") - ) - start_workflow_response: "StartWorkflowResponse" = betterproto.message_field( - 5, group="sealed_value" - ) - worker_state_response: "WorkerStateResponse" = betterproto.message_field( - 50, group="sealed_value" - ) - """worker responses""" - - worker_metrics_response: "WorkerMetricsResponse" = betterproto.message_field( - 51, group="sealed_value" - ) - finalize_checkpoint_response: "FinalizeCheckpointResponse" = ( - betterproto.message_field(52, group="sealed_value") - ) - control_error: "ControlError" = betterproto.message_field(101, group="sealed_value") - """common responses""" - - empty_return: "EmptyReturn" = betterproto.message_field(102, group="sealed_value") - string_response: "StringResponse" = betterproto.message_field( - 103, group="sealed_value" - ) - int_response: "IntResponse" = betterproto.message_field(104, group="sealed_value") - - -@dataclass(eq=False, repr=False) -class EmptyReturn(betterproto.Message): - pass - - -@dataclass(eq=False, repr=False) -class ControlError(betterproto.Message): - error_message: str = betterproto.string_field(1) - error_details: str = betterproto.string_field(2) - stack_trace: str = betterproto.string_field(3) - language: "ErrorLanguage" = betterproto.enum_field(4) - - -@dataclass(eq=False, repr=False) -class ReturnInvocation(betterproto.Message): - command_id: int = betterproto.int64_field(1) - return_value: "ControlReturn" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class StringResponse(betterproto.Message): - value: str = betterproto.string_field(1) - - -@dataclass(eq=False, repr=False) -class IntResponse(betterproto.Message): - value: int = betterproto.int32_field(1) - - -@dataclass(eq=False, repr=False) -class RetrieveWorkflowStateResponse(betterproto.Message): - state: Dict[str, str] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_STRING - ) - - -@dataclass(eq=False, repr=False) -class FinalizeCheckpointResponse(betterproto.Message): - size: int = betterproto.int64_field(1) - - -@dataclass(eq=False, repr=False) -class PropagateEmbeddedControlMessageResponse(betterproto.Message): - returns: Dict[str, "ControlReturn"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class TakeGlobalCheckpointResponse(betterproto.Message): - total_size: int = betterproto.int64_field(1) - - -@dataclass(eq=False, repr=False) -class TypedValue(betterproto.Message): - expression: str = betterproto.string_field(1) - value_ref: str = betterproto.string_field(2) - value_str: str = betterproto.string_field(3) - value_type: str = betterproto.string_field(4) - expandable: bool = betterproto.bool_field(5) - - -@dataclass(eq=False, repr=False) -class EvaluatedValue(betterproto.Message): - value: "TypedValue" = betterproto.message_field(1) - attributes: List["TypedValue"] = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class EvaluatePythonExpressionResponse(betterproto.Message): - values: List["EvaluatedValue"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class StartWorkflowResponse(betterproto.Message): - workflow_state: "WorkflowAggregatedState" = betterproto.enum_field(1) - - -@dataclass(eq=False, repr=False) -class WorkerStateResponse(betterproto.Message): - state: "_worker__.WorkerState" = betterproto.enum_field(1) - - -@dataclass(eq=False, repr=False) -class WorkerMetricsResponse(betterproto.Message): - metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1) - - -class RpcTesterStub(betterproto.ServiceStub): - async def send_ping( - self, - ping: "Ping", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing", - ping, - IntResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_pong( - self, - pong: "Pong", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong", - pong, - IntResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_nested( - self, - nested: "Nested", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested", - nested, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_pass( - self, - pass_: "Pass", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass", - pass_, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_error_command( - self, - error_command: "ErrorCommand", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand", - error_command, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_recursion( - self, - recursion: "Recursion", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion", - recursion, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_collect( - self, - collect: "Collect", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect", - collect, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_generate_number( - self, - generate_number: "GenerateNumber", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber", - generate_number, - IntResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_multi_call( - self, - multi_call: "MultiCall", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall", - multi_call, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def send_chain( - self, - chain: "Chain", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain", - chain, - StringResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - -class WorkerServiceStub(betterproto.ServiceStub): - async def add_input_channel( - self, - add_input_channel_request: "AddInputChannelRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel", - add_input_channel_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def add_partitioning( - self, - add_partitioning_request: "AddPartitioningRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning", - add_partitioning_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def assign_port( - self, - assign_port_request: "AssignPortRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort", - assign_port_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def finalize_checkpoint( - self, - finalize_checkpoint_request: "FinalizeCheckpointRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "FinalizeCheckpointResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint", - finalize_checkpoint_request, - FinalizeCheckpointResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def flush_network_buffer( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def initialize_executor( - self, - initialize_executor_request: "InitializeExecutorRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor", - initialize_executor_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def open_executor( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def pause_worker( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker", - empty_request, - WorkerStateResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def prepare_checkpoint( - self, - prepare_checkpoint_request: "PrepareCheckpointRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", - prepare_checkpoint_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def query_statistics( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "WorkerMetricsResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics", - empty_request, - WorkerMetricsResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def resume_worker( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker", - empty_request, - WorkerStateResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def retrieve_state( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def retry_current_tuple( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def start_worker( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker", - empty_request, - WorkerStateResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def end_worker( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def start_channel( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def end_channel( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def debug_command( - self, - debug_command_request: "DebugCommandRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand", - debug_command_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def evaluate_python_expression( - self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EvaluatedValue": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatedValue, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def no_operation( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def update_executor( - self, - update_executor_request: "UpdateExecutorRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor", - update_executor_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - -class ControllerServiceStub(betterproto.ServiceStub): - async def retrieve_workflow_state( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "RetrieveWorkflowStateResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState", - empty_request, - RetrieveWorkflowStateResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def propagate_embedded_control_message( - self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "PropagateEmbeddedControlMessageResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage", - propagate_embedded_control_message_request, - PropagateEmbeddedControlMessageResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def take_global_checkpoint( - self, - take_global_checkpoint_request: "TakeGlobalCheckpointRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "TakeGlobalCheckpointResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint", - take_global_checkpoint_request, - TakeGlobalCheckpointResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def debug_command( - self, - debug_command_request: "DebugCommandRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand", - debug_command_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def evaluate_python_expression( - self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EvaluatePythonExpressionResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatePythonExpressionResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def console_message_triggered( - self, - console_message_triggered_request: "ConsoleMessageTriggeredRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered", - console_message_triggered_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def port_completed( - self, - port_completed_request: "PortCompletedRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted", - port_completed_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def start_workflow( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "StartWorkflowResponse": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow", - empty_request, - StartWorkflowResponse, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def resume_workflow( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def pause_workflow( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def worker_state_updated( - self, - worker_state_updated_request: "WorkerStateUpdatedRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated", - worker_state_updated_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def worker_execution_completed( - self, - empty_request: "EmptyRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted", - empty_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def jump_to_operator_region( - self, - jump_to_operator_region_request: "JumpToOperatorRegionRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion", - jump_to_operator_region_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def link_workers( - self, - link_workers_request: "LinkWorkersRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers", - link_workers_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def controller_initiate_query_statistics( - self, - query_statistics_request: "QueryStatisticsRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics", - query_statistics_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def retry_workflow( - self, - retry_workflow_request: "RetryWorkflowRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow", - retry_workflow_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - async def reconfigure_workflow( - self, - workflow_reconfigure_request: "WorkflowReconfigureRequest", - *, - timeout: Optional[float] = None, - deadline: Optional["Deadline"] = None, - metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": - return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow", - workflow_reconfigure_request, - EmptyReturn, - timeout=timeout, - deadline=deadline, - metadata=metadata, - ) - - -class RpcTesterBase(ServiceBase): - - async def send_ping(self, ping: "Ping") -> "IntResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_pong(self, pong: "Pong") -> "IntResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_nested(self, nested: "Nested") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_pass(self, pass_: "Pass") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_error_command( - self, error_command: "ErrorCommand" - ) -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_recursion(self, recursion: "Recursion") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_collect(self, collect: "Collect") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_generate_number( - self, generate_number: "GenerateNumber" - ) -> "IntResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_multi_call(self, multi_call: "MultiCall") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def send_chain(self, chain: "Chain") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def __rpc_send_ping( - self, stream: "grpclib.server.Stream[Ping, IntResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_ping(request) - await stream.send_message(response) - - async def __rpc_send_pong( - self, stream: "grpclib.server.Stream[Pong, IntResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_pong(request) - await stream.send_message(response) - - async def __rpc_send_nested( - self, stream: "grpclib.server.Stream[Nested, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_nested(request) - await stream.send_message(response) - - async def __rpc_send_pass( - self, stream: "grpclib.server.Stream[Pass, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_pass(request) - await stream.send_message(response) - - async def __rpc_send_error_command( - self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_error_command(request) - await stream.send_message(response) - - async def __rpc_send_recursion( - self, stream: "grpclib.server.Stream[Recursion, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_recursion(request) - await stream.send_message(response) - - async def __rpc_send_collect( - self, stream: "grpclib.server.Stream[Collect, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_collect(request) - await stream.send_message(response) - - async def __rpc_send_generate_number( - self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_generate_number(request) - await stream.send_message(response) - - async def __rpc_send_multi_call( - self, stream: "grpclib.server.Stream[MultiCall, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_multi_call(request) - await stream.send_message(response) - - async def __rpc_send_chain( - self, stream: "grpclib.server.Stream[Chain, StringResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.send_chain(request) - await stream.send_message(response) - - def __mapping__(self) -> Dict[str, grpclib.const.Handler]: - return { - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": grpclib.const.Handler( - self.__rpc_send_ping, - grpclib.const.Cardinality.UNARY_UNARY, - Ping, - IntResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": grpclib.const.Handler( - self.__rpc_send_pong, - grpclib.const.Cardinality.UNARY_UNARY, - Pong, - IntResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": grpclib.const.Handler( - self.__rpc_send_nested, - grpclib.const.Cardinality.UNARY_UNARY, - Nested, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": grpclib.const.Handler( - self.__rpc_send_pass, - grpclib.const.Cardinality.UNARY_UNARY, - Pass, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": grpclib.const.Handler( - self.__rpc_send_error_command, - grpclib.const.Cardinality.UNARY_UNARY, - ErrorCommand, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": grpclib.const.Handler( - self.__rpc_send_recursion, - grpclib.const.Cardinality.UNARY_UNARY, - Recursion, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": grpclib.const.Handler( - self.__rpc_send_collect, - grpclib.const.Cardinality.UNARY_UNARY, - Collect, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber": grpclib.const.Handler( - self.__rpc_send_generate_number, - grpclib.const.Cardinality.UNARY_UNARY, - GenerateNumber, - IntResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": grpclib.const.Handler( - self.__rpc_send_multi_call, - grpclib.const.Cardinality.UNARY_UNARY, - MultiCall, - StringResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": grpclib.const.Handler( - self.__rpc_send_chain, - grpclib.const.Cardinality.UNARY_UNARY, - Chain, - StringResponse, - ), - } - - -class WorkerServiceBase(ServiceBase): - - async def add_input_channel( - self, add_input_channel_request: "AddInputChannelRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def add_partitioning( - self, add_partitioning_request: "AddPartitioningRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def assign_port( - self, assign_port_request: "AssignPortRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def finalize_checkpoint( - self, finalize_checkpoint_request: "FinalizeCheckpointRequest" - ) -> "FinalizeCheckpointResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def flush_network_buffer( - self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def initialize_executor( - self, initialize_executor_request: "InitializeExecutorRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def open_executor(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def pause_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def prepare_checkpoint( - self, prepare_checkpoint_request: "PrepareCheckpointRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def query_statistics( - self, empty_request: "EmptyRequest" - ) -> "WorkerMetricsResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def resume_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def retrieve_state(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def retry_current_tuple(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def start_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatedValue": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def update_executor( - self, update_executor_request: "UpdateExecutorRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def __rpc_add_input_channel( - self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_input_channel(request) - await stream.send_message(response) - - async def __rpc_add_partitioning( - self, stream: "grpclib.server.Stream[AddPartitioningRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_partitioning(request) - await stream.send_message(response) - - async def __rpc_assign_port( - self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.assign_port(request) - await stream.send_message(response) - - async def __rpc_finalize_checkpoint( - self, - stream: "grpclib.server.Stream[FinalizeCheckpointRequest, FinalizeCheckpointResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.finalize_checkpoint(request) - await stream.send_message(response) - - async def __rpc_flush_network_buffer( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.flush_network_buffer(request) - await stream.send_message(response) - - async def __rpc_initialize_executor( - self, stream: "grpclib.server.Stream[InitializeExecutorRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.initialize_executor(request) - await stream.send_message(response) - - async def __rpc_open_executor( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.open_executor(request) - await stream.send_message(response) - - async def __rpc_pause_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.pause_worker(request) - await stream.send_message(response) - - async def __rpc_prepare_checkpoint( - self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.prepare_checkpoint(request) - await stream.send_message(response) - - async def __rpc_query_statistics( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerMetricsResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.query_statistics(request) - await stream.send_message(response) - - async def __rpc_resume_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.resume_worker(request) - await stream.send_message(response) - - async def __rpc_retrieve_state( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.retrieve_state(request) - await stream.send_message(response) - - async def __rpc_retry_current_tuple( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.retry_current_tuple(request) - await stream.send_message(response) - - async def __rpc_start_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.start_worker(request) - await stream.send_message(response) - - async def __rpc_end_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.end_worker(request) - await stream.send_message(response) - - async def __rpc_start_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.start_channel(request) - await stream.send_message(response) - - async def __rpc_end_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.end_channel(request) - await stream.send_message(response) - - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.debug_command(request) - await stream.send_message(response) - - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatedValue]", - ) -> None: - request = await stream.recv_message() - response = await self.evaluate_python_expression(request) - await stream.send_message(response) - - async def __rpc_no_operation( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.no_operation(request) - await stream.send_message(response) - - async def __rpc_update_executor( - self, stream: "grpclib.server.Stream[UpdateExecutorRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.update_executor(request) - await stream.send_message(response) - - def __mapping__(self) -> Dict[str, grpclib.const.Handler]: - return { - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( - self.__rpc_add_input_channel, - grpclib.const.Cardinality.UNARY_UNARY, - AddInputChannelRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning": grpclib.const.Handler( - self.__rpc_add_partitioning, - grpclib.const.Cardinality.UNARY_UNARY, - AddPartitioningRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": grpclib.const.Handler( - self.__rpc_assign_port, - grpclib.const.Cardinality.UNARY_UNARY, - AssignPortRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint": grpclib.const.Handler( - self.__rpc_finalize_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - FinalizeCheckpointRequest, - FinalizeCheckpointResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer": grpclib.const.Handler( - self.__rpc_flush_network_buffer, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor": grpclib.const.Handler( - self.__rpc_initialize_executor, - grpclib.const.Cardinality.UNARY_UNARY, - InitializeExecutorRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": grpclib.const.Handler( - self.__rpc_open_executor, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": grpclib.const.Handler( - self.__rpc_pause_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint": grpclib.const.Handler( - self.__rpc_prepare_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - PrepareCheckpointRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics": grpclib.const.Handler( - self.__rpc_query_statistics, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerMetricsResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": grpclib.const.Handler( - self.__rpc_resume_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": grpclib.const.Handler( - self.__rpc_retrieve_state, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple": grpclib.const.Handler( - self.__rpc_retry_current_tuple, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": grpclib.const.Handler( - self.__rpc_start_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": grpclib.const.Handler( - self.__rpc_end_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": grpclib.const.Handler( - self.__rpc_start_channel, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": grpclib.const.Handler( - self.__rpc_end_channel, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, - grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, - grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatedValue, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": grpclib.const.Handler( - self.__rpc_no_operation, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor": grpclib.const.Handler( - self.__rpc_update_executor, - grpclib.const.Cardinality.UNARY_UNARY, - UpdateExecutorRequest, - EmptyReturn, - ), - } - - -class ControllerServiceBase(ServiceBase): - - async def retrieve_workflow_state( - self, empty_request: "EmptyRequest" - ) -> "RetrieveWorkflowStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def propagate_embedded_control_message( - self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", - ) -> "PropagateEmbeddedControlMessageResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def take_global_checkpoint( - self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest" - ) -> "TakeGlobalCheckpointResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatePythonExpressionResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def console_message_triggered( - self, console_message_triggered_request: "ConsoleMessageTriggeredRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def port_completed( - self, port_completed_request: "PortCompletedRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def start_workflow( - self, empty_request: "EmptyRequest" - ) -> "StartWorkflowResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def resume_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def pause_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def worker_state_updated( - self, worker_state_updated_request: "WorkerStateUpdatedRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def worker_execution_completed( - self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def jump_to_operator_region( - self, jump_to_operator_region_request: "JumpToOperatorRegionRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def link_workers( - self, link_workers_request: "LinkWorkersRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def controller_initiate_query_statistics( - self, query_statistics_request: "QueryStatisticsRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def retry_workflow( - self, retry_workflow_request: "RetryWorkflowRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def reconfigure_workflow( - self, workflow_reconfigure_request: "WorkflowReconfigureRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def __rpc_retrieve_workflow_state( - self, - stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.retrieve_workflow_state(request) - await stream.send_message(response) - - async def __rpc_propagate_embedded_control_message( - self, - stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, PropagateEmbeddedControlMessageResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.propagate_embedded_control_message(request) - await stream.send_message(response) - - async def __rpc_take_global_checkpoint( - self, - stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, TakeGlobalCheckpointResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.take_global_checkpoint(request) - await stream.send_message(response) - - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.debug_command(request) - await stream.send_message(response) - - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatePythonExpressionResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.evaluate_python_expression(request) - await stream.send_message(response) - - async def __rpc_console_message_triggered( - self, - stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, EmptyReturn]", - ) -> None: - request = await stream.recv_message() - response = await self.console_message_triggered(request) - await stream.send_message(response) - - async def __rpc_port_completed( - self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.port_completed(request) - await stream.send_message(response) - - async def __rpc_start_workflow( - self, stream: "grpclib.server.Stream[EmptyRequest, StartWorkflowResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.start_workflow(request) - await stream.send_message(response) - - async def __rpc_resume_workflow( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.resume_workflow(request) - await stream.send_message(response) - - async def __rpc_pause_workflow( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.pause_workflow(request) - await stream.send_message(response) - - async def __rpc_worker_state_updated( - self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.worker_state_updated(request) - await stream.send_message(response) - - async def __rpc_worker_execution_completed( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.worker_execution_completed(request) - await stream.send_message(response) - - async def __rpc_jump_to_operator_region( - self, stream: "grpclib.server.Stream[JumpToOperatorRegionRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.jump_to_operator_region(request) - await stream.send_message(response) - - async def __rpc_link_workers( - self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.link_workers(request) - await stream.send_message(response) - - async def __rpc_controller_initiate_query_statistics( - self, stream: "grpclib.server.Stream[QueryStatisticsRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.controller_initiate_query_statistics(request) - await stream.send_message(response) - - async def __rpc_retry_workflow( - self, stream: "grpclib.server.Stream[RetryWorkflowRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.retry_workflow(request) - await stream.send_message(response) - - async def __rpc_reconfigure_workflow( - self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.reconfigure_workflow(request) - await stream.send_message(response) - - def __mapping__(self) -> Dict[str, grpclib.const.Handler]: - return { - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( - self.__rpc_retrieve_workflow_state, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - RetrieveWorkflowStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage": grpclib.const.Handler( - self.__rpc_propagate_embedded_control_message, - grpclib.const.Cardinality.UNARY_UNARY, - PropagateEmbeddedControlMessageRequest, - PropagateEmbeddedControlMessageResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint": grpclib.const.Handler( - self.__rpc_take_global_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - TakeGlobalCheckpointRequest, - TakeGlobalCheckpointResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, - grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, - grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatePythonExpressionResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered": grpclib.const.Handler( - self.__rpc_console_message_triggered, - grpclib.const.Cardinality.UNARY_UNARY, - ConsoleMessageTriggeredRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( - self.__rpc_port_completed, - grpclib.const.Cardinality.UNARY_UNARY, - PortCompletedRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow": grpclib.const.Handler( - self.__rpc_start_workflow, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - StartWorkflowResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow": grpclib.const.Handler( - self.__rpc_resume_workflow, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow": grpclib.const.Handler( - self.__rpc_pause_workflow, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated": grpclib.const.Handler( - self.__rpc_worker_state_updated, - grpclib.const.Cardinality.UNARY_UNARY, - WorkerStateUpdatedRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted": grpclib.const.Handler( - self.__rpc_worker_execution_completed, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion": grpclib.const.Handler( - self.__rpc_jump_to_operator_region, - grpclib.const.Cardinality.UNARY_UNARY, - JumpToOperatorRegionRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( - self.__rpc_link_workers, - grpclib.const.Cardinality.UNARY_UNARY, - LinkWorkersRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics": grpclib.const.Handler( - self.__rpc_controller_initiate_query_statistics, - grpclib.const.Cardinality.UNARY_UNARY, - QueryStatisticsRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow": grpclib.const.Handler( - self.__rpc_retry_workflow, - grpclib.const.Cardinality.UNARY_UNARY, - RetryWorkflowRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow": grpclib.const.Handler( - self.__rpc_reconfigure_workflow, - grpclib.const.Cardinality.UNARY_UNARY, - WorkflowReconfigureRequest, - EmptyReturn, - ), - } diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py deleted file mode 100644 index bc241806b5c..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py +++ /dev/null @@ -1,66 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from typing import ( - List, -) - -import betterproto - -from .... import core as ___core__ - - -@dataclass(eq=False, repr=False) -class Partitioning(betterproto.Message): - one_to_one_partitioning: "OneToOnePartitioning" = betterproto.message_field( - 1, group="sealed_value" - ) - round_robin_partitioning: "RoundRobinPartitioning" = betterproto.message_field( - 2, group="sealed_value" - ) - hash_based_shuffle_partitioning: "HashBasedShufflePartitioning" = ( - betterproto.message_field(3, group="sealed_value") - ) - range_based_shuffle_partitioning: "RangeBasedShufflePartitioning" = ( - betterproto.message_field(4, group="sealed_value") - ) - broadcast_partitioning: "BroadcastPartitioning" = betterproto.message_field( - 5, group="sealed_value" - ) - - -@dataclass(eq=False, repr=False) -class OneToOnePartitioning(betterproto.Message): - batch_size: int = betterproto.int32_field(1) - channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class RoundRobinPartitioning(betterproto.Message): - batch_size: int = betterproto.int32_field(1) - channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class HashBasedShufflePartitioning(betterproto.Message): - batch_size: int = betterproto.int32_field(1) - channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2) - hash_attribute_names: List[str] = betterproto.string_field(3) - - -@dataclass(eq=False, repr=False) -class RangeBasedShufflePartitioning(betterproto.Message): - batch_size: int = betterproto.int32_field(1) - channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2) - range_attribute_names: List[str] = betterproto.string_field(3) - range_min: int = betterproto.int64_field(4) - range_max: int = betterproto.int64_field(5) - - -@dataclass(eq=False, repr=False) -class BroadcastPartitioning(betterproto.Message): - batch_size: int = betterproto.int32_field(1) - channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2) diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py deleted file mode 100644 index 6a7b210e185..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/amber/engine/architecture/worker/statistics.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from typing import ( - List, -) - -import betterproto - -from .... import core as ___core__ - - -class WorkerState(betterproto.Enum): - UNINITIALIZED = 0 - READY = 1 - RUNNING = 2 - PAUSED = 3 - COMPLETED = 4 - TERMINATED = 5 - - -@dataclass(eq=False, repr=False) -class PortTupleMetricsMapping(betterproto.Message): - port_id: "___core__.PortIdentity" = betterproto.message_field(1) - tuple_metrics: "TupleMetrics" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class TupleMetrics(betterproto.Message): - count: int = betterproto.int64_field(1) - size: int = betterproto.int64_field(2) - - -@dataclass(eq=False, repr=False) -class WorkerStatistics(betterproto.Message): - input_tuple_metrics: List["PortTupleMetricsMapping"] = betterproto.message_field(1) - output_tuple_metrics: List["PortTupleMetricsMapping"] = betterproto.message_field(2) - data_processing_time: int = betterproto.int64_field(3) - control_processing_time: int = betterproto.int64_field(4) - idle_time: int = betterproto.int64_field(5) - - -@dataclass(eq=False, repr=False) -class WorkerMetrics(betterproto.Message): - worker_state: "WorkerState" = betterproto.enum_field(1) - worker_statistics: "WorkerStatistics" = betterproto.message_field(2) diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py deleted file mode 100644 index 55c789aa395..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py +++ /dev/null @@ -1,156 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/amber/engine/common/actormessage.proto, org/apache/texera/amber/engine/common/ambermessage.proto, org/apache/texera/amber/engine/common/executionruntimestate.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from typing import ( - Dict, - List, -) - -import betterproto - -from ... import core as __core__ -from ..architecture import ( - rpc as _architecture_rpc__, - worker as _architecture_worker__, -) - - -@dataclass(eq=False, repr=False) -class DirectControlMessagePayloadV2(betterproto.Message): - control_invocation: "_architecture_rpc__.ControlInvocation" = ( - betterproto.message_field(1, group="value") - ) - return_invocation: "_architecture_rpc__.ReturnInvocation" = ( - betterproto.message_field(2, group="value") - ) - - -@dataclass(eq=False, repr=False) -class PythonDataHeader(betterproto.Message): - tag: "__core__.ChannelIdentity" = betterproto.message_field(1) - payload_type: str = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class PythonControlMessage(betterproto.Message): - tag: "__core__.ChannelIdentity" = betterproto.message_field(1) - payload: "DirectControlMessagePayloadV2" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class BreakpointFault(betterproto.Message): - worker_name: str = betterproto.string_field(1) - faulted_tuple: "BreakpointFaultBreakpointTuple" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class BreakpointFaultBreakpointTuple(betterproto.Message): - id: int = betterproto.int64_field(1) - is_input: bool = betterproto.bool_field(2) - tuple: List[str] = betterproto.string_field(3) - - -@dataclass(eq=False, repr=False) -class OperatorBreakpoints(betterproto.Message): - unresolved_breakpoints: List["BreakpointFault"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class ExecutionBreakpointStore(betterproto.Message): - operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class EvaluatedValueList(betterproto.Message): - values: List["_architecture_rpc__.EvaluatedValue"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class OperatorConsole(betterproto.Message): - console_messages: List["_architecture_rpc__.ConsoleMessage"] = ( - betterproto.message_field(1) - ) - evaluate_expr_results: Dict[str, "EvaluatedValueList"] = betterproto.map_field( - 2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class ExecutionConsoleStore(betterproto.Message): - operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class OperatorWorkerMapping(betterproto.Message): - operator_id: str = betterproto.string_field(1) - worker_ids: List[str] = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class OperatorStatistics(betterproto.Message): - input_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = ( - betterproto.message_field(1) - ) - output_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = ( - betterproto.message_field(2) - ) - num_workers: int = betterproto.int32_field(3) - data_processing_time: int = betterproto.int64_field(4) - control_processing_time: int = betterproto.int64_field(5) - idle_time: int = betterproto.int64_field(6) - - -@dataclass(eq=False, repr=False) -class OperatorMetrics(betterproto.Message): - operator_state: "_architecture_rpc__.WorkflowAggregatedState" = ( - betterproto.enum_field(1) - ) - operator_statistics: "OperatorStatistics" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class ExecutionStatsStore(betterproto.Message): - start_time_stamp: int = betterproto.int64_field(1) - end_time_stamp: int = betterproto.int64_field(2) - operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field( - 3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - operator_worker_mapping: List["OperatorWorkerMapping"] = betterproto.message_field( - 4 - ) - - -@dataclass(eq=False, repr=False) -class ExecutionMetadataStore(betterproto.Message): - state: "_architecture_rpc__.WorkflowAggregatedState" = betterproto.enum_field(1) - fatal_errors: List["__core__.WorkflowFatalError"] = betterproto.message_field(2) - execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3) - is_recovering: bool = betterproto.bool_field(4) - - -@dataclass(eq=False, repr=False) -class Backpressure(betterproto.Message): - enable_backpressure: bool = betterproto.bool_field(1) - - -@dataclass(eq=False, repr=False) -class CreditUpdate(betterproto.Message): - pass - - -@dataclass(eq=False, repr=False) -class ActorCommand(betterproto.Message): - backpressure: "Backpressure" = betterproto.message_field(1, group="sealed_value") - credit_update: "CreditUpdate" = betterproto.message_field(2, group="sealed_value") - - -@dataclass(eq=False, repr=False) -class PythonActorMessage(betterproto.Message): - payload: "ActorCommand" = betterproto.message_field(1) diff --git a/amber/src/main/python/proto/org/apache/texera/web/__init__.py b/amber/src/main/python/proto/org/apache/texera/web/__init__.py deleted file mode 100644 index adb5848bb0c..00000000000 --- a/amber/src/main/python/proto/org/apache/texera/web/__init__.py +++ /dev/null @@ -1,158 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: org/apache/texera/workflowruntimestate.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from datetime import datetime -from typing import ( - Dict, - List, -) - -import betterproto - -from ...amber.engine import common as __amber_engine_common__ -from ...amber.engine.architecture import worker as __amber_engine_architecture_worker__ - - -class FatalErrorType(betterproto.Enum): - COMPILATION_ERROR = 0 - EXECUTION_FAILURE = 1 - - -class WorkflowAggregatedState(betterproto.Enum): - UNINITIALIZED = 0 - READY = 1 - RUNNING = 2 - PAUSING = 3 - PAUSED = 4 - RESUMING = 5 - COMPLETED = 6 - FAILED = 7 - UNKNOWN = 8 - KILLED = 9 - - -@dataclass(eq=False, repr=False) -class BreakpointFault(betterproto.Message): - worker_name: str = betterproto.string_field(1) - faulted_tuple: "BreakpointFaultBreakpointTuple" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class BreakpointFaultBreakpointTuple(betterproto.Message): - id: int = betterproto.int64_field(1) - is_input: bool = betterproto.bool_field(2) - tuple: List[str] = betterproto.string_field(3) - - -@dataclass(eq=False, repr=False) -class OperatorBreakpoints(betterproto.Message): - unresolved_breakpoints: List["BreakpointFault"] = betterproto.message_field(1) - - -@dataclass(eq=False, repr=False) -class ExecutionBreakpointStore(betterproto.Message): - operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class EvaluatedValueList(betterproto.Message): - values: List["__amber_engine_architecture_worker__.EvaluatedValue"] = ( - betterproto.message_field(1) - ) - - -@dataclass(eq=False, repr=False) -class OperatorConsole(betterproto.Message): - console_messages: List["__amber_engine_architecture_worker__.ConsoleMessage"] = ( - betterproto.message_field(1) - ) - evaluate_expr_results: Dict[str, "EvaluatedValueList"] = betterproto.map_field( - 2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class ExecutionConsoleStore(betterproto.Message): - operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - - -@dataclass(eq=False, repr=False) -class OperatorWorkerMapping(betterproto.Message): - operator_id: str = betterproto.string_field(1) - worker_ids: List[str] = betterproto.string_field(2) - - -@dataclass(eq=False, repr=False) -class OperatorStatistics(betterproto.Message): - input_count: List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = ( - betterproto.message_field(1) - ) - output_count: List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = ( - betterproto.message_field(2) - ) - num_workers: int = betterproto.int32_field(3) - data_processing_time: int = betterproto.int64_field(4) - control_processing_time: int = betterproto.int64_field(5) - idle_time: int = betterproto.int64_field(6) - - -@dataclass(eq=False, repr=False) -class OperatorMetrics(betterproto.Message): - operator_state: "WorkflowAggregatedState" = betterproto.enum_field(1) - operator_statistics: "OperatorStatistics" = betterproto.message_field(2) - - -@dataclass(eq=False, repr=False) -class ExecutionStatsStore(betterproto.Message): - start_time_stamp: int = betterproto.int64_field(1) - end_time_stamp: int = betterproto.int64_field(2) - operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field( - 3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) - operator_worker_mapping: List["OperatorWorkerMapping"] = betterproto.message_field( - 4 - ) - - -@dataclass(eq=False, repr=False) -class WorkflowFatalError(betterproto.Message): - type: "FatalErrorType" = betterproto.enum_field(1) - timestamp: datetime = betterproto.message_field(2) - message: str = betterproto.string_field(3) - details: str = betterproto.string_field(4) - operator_id: str = betterproto.string_field(5) - worker_id: str = betterproto.string_field(6) - - -@dataclass(eq=False, repr=False) -class ExecutionMetadataStore(betterproto.Message): - state: "WorkflowAggregatedState" = betterproto.enum_field(1) - fatal_errors: List["WorkflowFatalError"] = betterproto.message_field(2) - execution_id: "__amber_engine_common__.ExecutionIdentity" = ( - betterproto.message_field(3) - ) - is_recovering: bool = betterproto.bool_field(4) diff --git a/amber/src/main/python/proto/scalapb/__init__.py b/amber/src/main/python/proto/scalapb/__init__.py deleted file mode 100644 index 49c713815a5..00000000000 --- a/amber/src/main/python/proto/scalapb/__init__.py +++ /dev/null @@ -1,421 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: scalapb/scalapb.proto -# plugin: python-betterproto -# This file has been @generated - -from dataclasses import dataclass -from typing import ( - Dict, - List, -) - -import betterproto -import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf - - -class MatchType(betterproto.Enum): - CONTAINS = 0 - EXACT = 1 - PRESENCE = 2 - - -class ScalaPbOptionsOptionsScope(betterproto.Enum): - """ - Whether to apply the options only to this file, or for the entire package (and its subpackages) - """ - - FILE = 0 - """Apply the options for this file only (default)""" - - PACKAGE = 1 - """Apply the options for the entire package and its subpackages.""" - - -class ScalaPbOptionsEnumValueNaming(betterproto.Enum): - """Naming convention for generated enum values""" - - AS_IN_PROTO = 0 - CAMEL_CASE = 1 - - -@dataclass(eq=False, repr=False) -class ScalaPbOptions(betterproto.Message): - package_name: str = betterproto.string_field(1) - """If set then it overrides the java_package and package.""" - - flat_package: bool = betterproto.bool_field(2) - """ - If true, the compiler does not append the proto base file name - into the generated package name. If false (the default), the - generated scala package name is the package_name.basename where - basename is the proto file name without the .proto extension. - """ - - import_: List[str] = betterproto.string_field(3) - """ - Adds the following imports at the top of the file (this is meant - to provide implicit TypeMappers) - """ - - preamble: List[str] = betterproto.string_field(4) - """ - Text to add to the generated scala file. This can be used only - when single_file is true. - """ - - single_file: bool = betterproto.bool_field(5) - """ - If true, all messages and enums (but not services) will be written - to a single Scala file. - """ - - no_primitive_wrappers: bool = betterproto.bool_field(7) - """ - By default, wrappers defined at - https://github.com/google/protobuf/blob/master/src/google/protobuf/wrappers.proto, - are mapped to an Option[T] where T is a primitive type. When this field - is set to true, we do not perform this transformation. - """ - - primitive_wrappers: bool = betterproto.bool_field(6) - """ - DEPRECATED. In ScalaPB <= 0.5.47, it was necessary to explicitly enable - primitive_wrappers. This field remains here for backwards compatibility, - but it has no effect on generated code. It is an error to set both - `primitive_wrappers` and `no_primitive_wrappers`. - """ - - collection_type: str = betterproto.string_field(8) - """ - Scala type to be used for repeated fields. If unspecified, - `scala.collection.Seq` will be used. - """ - - preserve_unknown_fields: bool = betterproto.bool_field(9) - """ - If set to true, all generated messages in this file will preserve unknown - fields. - """ - - object_name: str = betterproto.string_field(10) - """ - If defined, sets the name of the file-level object that would be generated. This - object extends `GeneratedFileObject` and contains descriptors, and list of message - and enum companions. - """ - - scope: "ScalaPbOptionsOptionsScope" = betterproto.enum_field(11) - """Experimental: scope to apply the given options.""" - - lenses: bool = betterproto.bool_field(12) - """If true, lenses will be generated.""" - - retain_source_code_info: bool = betterproto.bool_field(13) - """ - If true, then source-code info information will be included in the - generated code - normally the source code info is cleared out to reduce - code size. The source code info is useful for extracting source code - location from the descriptors as well as comments. - """ - - map_type: str = betterproto.string_field(14) - """ - Scala type to be used for maps. If unspecified, - `scala.collection.immutable.Map` will be used. - """ - - no_default_values_in_constructor: bool = betterproto.bool_field(15) - """ - If true, no default values will be generated in message constructors. - """ - - enum_value_naming: "ScalaPbOptionsEnumValueNaming" = betterproto.enum_field(16) - enum_strip_prefix: bool = betterproto.bool_field(17) - """ - Indicate if prefix (enum name + optional underscore) should be removed in scala code - Strip is applied before enum value naming changes. - """ - - bytes_type: str = betterproto.string_field(21) - """Scala type to use for bytes fields.""" - - java_conversions: bool = betterproto.bool_field(23) - """Enable java conversions for this file.""" - - aux_message_options: List["ScalaPbOptionsAuxMessageOptions"] = ( - betterproto.message_field(18) - ) - """List of message options to apply to some messages.""" - - aux_field_options: List["ScalaPbOptionsAuxFieldOptions"] = ( - betterproto.message_field(19) - ) - """List of message options to apply to some fields.""" - - aux_enum_options: List["ScalaPbOptionsAuxEnumOptions"] = betterproto.message_field( - 20 - ) - """List of message options to apply to some enums.""" - - aux_enum_value_options: List["ScalaPbOptionsAuxEnumValueOptions"] = ( - betterproto.message_field(22) - ) - """List of enum value options to apply to some enum values.""" - - preprocessors: List[str] = betterproto.string_field(24) - """List of preprocessors to apply.""" - - field_transformations: List["FieldTransformation"] = betterproto.message_field(25) - ignore_all_transformations: bool = betterproto.bool_field(26) - """ - Ignores all transformations for this file. This is meant to allow specific files to - opt out from transformations inherited through package-scoped options. - """ - - getters: bool = betterproto.bool_field(27) - """If true, getters will be generated.""" - - test_only_no_java_conversions: bool = betterproto.bool_field(999) - """ - For use in tests only. Inhibit Java conversions even when when generator parameters - request for it. - """ - - -@dataclass(eq=False, repr=False) -class ScalaPbOptionsAuxMessageOptions(betterproto.Message): - """ - AuxMessageOptions enables you to set message-level options through package-scoped options. - This is useful when you can't add a dependency on scalapb.proto from the proto file that - defines the message. - """ - - target: str = betterproto.string_field(1) - """The fully-qualified name of the message in the proto name space.""" - - options: "MessageOptions" = betterproto.message_field(2) - """ - Options to apply to the message. If there are any options defined on the target message - they take precedence over the options. - """ - - -@dataclass(eq=False, repr=False) -class ScalaPbOptionsAuxFieldOptions(betterproto.Message): - """ - AuxFieldOptions enables you to set field-level options through package-scoped options. - This is useful when you can't add a dependency on scalapb.proto from the proto file that - defines the field. - """ - - target: str = betterproto.string_field(1) - """The fully-qualified name of the field in the proto name space.""" - - options: "FieldOptions" = betterproto.message_field(2) - """ - Options to apply to the field. If there are any options defined on the target message - they take precedence over the options. - """ - - -@dataclass(eq=False, repr=False) -class ScalaPbOptionsAuxEnumOptions(betterproto.Message): - """ - AuxEnumOptions enables you to set enum-level options through package-scoped options. - This is useful when you can't add a dependency on scalapb.proto from the proto file that - defines the enum. - """ - - target: str = betterproto.string_field(1) - """The fully-qualified name of the enum in the proto name space.""" - - options: "EnumOptions" = betterproto.message_field(2) - """ - Options to apply to the enum. If there are any options defined on the target enum - they take precedence over the options. - """ - - -@dataclass(eq=False, repr=False) -class ScalaPbOptionsAuxEnumValueOptions(betterproto.Message): - """ - AuxEnumValueOptions enables you to set enum value level options through package-scoped - options. This is useful when you can't add a dependency on scalapb.proto from the proto - file that defines the enum. - """ - - target: str = betterproto.string_field(1) - """The fully-qualified name of the enum value in the proto name space.""" - - options: "EnumValueOptions" = betterproto.message_field(2) - """ - Options to apply to the enum value. If there are any options defined on - the target enum value they take precedence over the options. - """ - - -@dataclass(eq=False, repr=False) -class MessageOptions(betterproto.Message): - extends: List[str] = betterproto.string_field(1) - """Additional classes and traits to mix in to the case class.""" - - companion_extends: List[str] = betterproto.string_field(2) - """Additional classes and traits to mix in to the companion object.""" - - annotations: List[str] = betterproto.string_field(3) - """Custom annotations to add to the generated case class.""" - - type: str = betterproto.string_field(4) - """ - All instances of this message will be converted to this type. An implicit TypeMapper - must be present. - """ - - companion_annotations: List[str] = betterproto.string_field(5) - """ - Custom annotations to add to the companion object of the generated class. - """ - - sealed_oneof_extends: List[str] = betterproto.string_field(6) - """ - Additional classes and traits to mix in to generated sealed_oneof base trait. - """ - - no_box: bool = betterproto.bool_field(7) - """ - If true, when this message is used as an optional field, do not wrap it in an `Option`. - This is equivalent of setting `(field).no_box` to true on each field with the message type. - """ - - unknown_fields_annotations: List[str] = betterproto.string_field(8) - """ - Custom annotations to add to the generated `unknownFields` case class field. - """ - - -@dataclass(eq=False, repr=False) -class Collection(betterproto.Message): - """ - Represents a custom Collection type in Scala. This allows ScalaPB to integrate with - collection types that are different enough from the ones in the standard library. - """ - - type: str = betterproto.string_field(1) - """Type of the collection""" - - non_empty: bool = betterproto.bool_field(2) - """ - Set to true if this collection type is not allowed to be empty, for example - cats.data.NonEmptyList. When true, ScalaPB will not generate `clearX` for the repeated - field and not provide a default argument in the constructor. - """ - - adapter: str = betterproto.string_field(3) - """ - An Adapter is a Scala object available at runtime that provides certain static methods - that can operate on this collection type. - """ - - -@dataclass(eq=False, repr=False) -class FieldOptions(betterproto.Message): - type: str = betterproto.string_field(1) - scala_name: str = betterproto.string_field(2) - collection_type: str = betterproto.string_field(3) - """ - Can be specified only if this field is repeated. If unspecified, - it falls back to the file option named `collection_type`, which defaults - to `scala.collection.Seq`. - """ - - collection: "Collection" = betterproto.message_field(8) - key_type: str = betterproto.string_field(4) - """ - If the field is a map, you can specify custom Scala types for the key - or value. - """ - - value_type: str = betterproto.string_field(5) - annotations: List[str] = betterproto.string_field(6) - """Custom annotations to add to the field.""" - - map_type: str = betterproto.string_field(7) - """ - Can be specified only if this field is a map. If unspecified, - it falls back to the file option named `map_type` which defaults to - `scala.collection.immutable.Map` - """ - - no_box: bool = betterproto.bool_field(30) - """ - Do not box this value in Option[T]. If set, this overrides MessageOptions.no_box - """ - - required: bool = betterproto.bool_field(31) - """ - Like no_box it does not box a value in Option[T], but also fails parsing when a value - is not provided. This enables to emulate required fields in proto3. - """ - - -@dataclass(eq=False, repr=False) -class EnumOptions(betterproto.Message): - extends: List[str] = betterproto.string_field(1) - """Additional classes and traits to mix in to the base trait""" - - companion_extends: List[str] = betterproto.string_field(2) - """Additional classes and traits to mix in to the companion object.""" - - type: str = betterproto.string_field(3) - """ - All instances of this enum will be converted to this type. An implicit TypeMapper - must be present. - """ - - base_annotations: List[str] = betterproto.string_field(4) - """Custom annotations to add to the generated enum's base class.""" - - recognized_annotations: List[str] = betterproto.string_field(5) - """Custom annotations to add to the generated trait.""" - - unrecognized_annotations: List[str] = betterproto.string_field(6) - """Custom annotations to add to the generated Unrecognized case class.""" - - -@dataclass(eq=False, repr=False) -class EnumValueOptions(betterproto.Message): - extends: List[str] = betterproto.string_field(1) - """Additional classes and traits to mix in to an individual enum value.""" - - scala_name: str = betterproto.string_field(2) - """Name in Scala to use for this enum value.""" - - annotations: List[str] = betterproto.string_field(3) - """ - Custom annotations to add to the generated case object for this enum value. - """ - - -@dataclass(eq=False, repr=False) -class OneofOptions(betterproto.Message): - extends: List[str] = betterproto.string_field(1) - """Additional traits to mix in to a oneof.""" - - scala_name: str = betterproto.string_field(2) - """Name in Scala to use for this oneof field.""" - - -@dataclass(eq=False, repr=False) -class FieldTransformation(betterproto.Message): - when: "betterproto_lib_google_protobuf.FieldDescriptorProto" = ( - betterproto.message_field(1) - ) - match_type: "MatchType" = betterproto.enum_field(2) - set: "betterproto_lib_google_protobuf.FieldOptions" = betterproto.message_field(3) - - -@dataclass(eq=False, repr=False) -class PreprocessorOutput(betterproto.Message): - options_by_file: Dict[str, "ScalaPbOptions"] = betterproto.map_field( - 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE - ) diff --git a/bin/computing-unit-master.dockerfile b/bin/computing-unit-master.dockerfile index 191d23f2ddf..c303604a4bf 100644 --- a/bin/computing-unit-master.dockerfile +++ b/bin/computing-unit-master.dockerfile @@ -36,20 +36,34 @@ COPY project/ project/ COPY build.sbt build.sbt COPY .jvmopts .jvmopts -# Update system and install dependencies. python3-minimal is needed by -# bin/licensing/concat_license_binary.py below. +# python3-minimal is needed by bin/licensing/concat_license_binary.py +# below; python3-pip + curl are for the protoc + betterproto[compiler] +# install below. RUN apt-get update && apt-get install -y \ netcat \ unzip \ + curl \ libpq-dev \ python3-minimal \ + python3-pip \ && apt-get clean +# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the +# betterproto plugin are required by the genPythonProto sbt task so the +# generated amber/src/main/python/proto/ tree is populated before the +# dist is packaged. +RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ + && unzip -o /tmp/protoc.zip -d /usr/local \ + && chmod +x /usr/local/bin/protoc \ + && rm /tmp/protoc.zip \ + && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh RUN sbt clean WorkflowExecutionService/dist diff --git a/bin/computing-unit-worker.dockerfile b/bin/computing-unit-worker.dockerfile index 28d3b4cf0c2..0fd375f926e 100644 --- a/bin/computing-unit-worker.dockerfile +++ b/bin/computing-unit-worker.dockerfile @@ -37,19 +37,33 @@ COPY build.sbt build.sbt COPY .jvmopts .jvmopts # Update system and install dependencies. python3-minimal is needed by -# bin/licensing/concat_license_binary.py below. +# bin/licensing/concat_license_binary.py below; python3-pip + curl are +# for the protoc + betterproto[compiler] install below. RUN apt-get update && apt-get install -y \ netcat \ unzip \ + curl \ libpq-dev \ python3-minimal \ + python3-pip \ && apt-get clean +# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the +# betterproto plugin are required by the genPythonProto sbt task so the +# generated amber/src/main/python/proto/ tree is populated before the +# dist is packaged. +RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ + && unzip -o /tmp/protoc.zip -d /usr/local \ + && chmod +x /usr/local/bin/protoc \ + && rm /tmp/protoc.zip \ + && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh RUN sbt clean WorkflowExecutionService/dist diff --git a/bin/python-proto-gen.sh b/bin/python-proto-gen.sh index 0faf33eb9b7..f512c54ee38 100755 --- a/bin/python-proto-gen.sh +++ b/bin/python-proto-gen.sh @@ -17,6 +17,8 @@ # assuming inside the pytexera executing Python ENV +set -euo pipefail + # dirs TEXERA_HOME="$(git rev-parse --show-toplevel)" AMBER_DIR="$TEXERA_HOME/amber" @@ -27,6 +29,7 @@ CORE_DIR="$TEXERA_HOME/common/workflow-core" PROTOBUF_CORE_DIR="$CORE_DIR/src/main/protobuf" # proto-gen +mkdir -p "$PYAMBER_DIR/proto" protoc --python_betterproto_out="$PYAMBER_DIR/proto" \ -I="$PROTOBUF_AMBER_DIR" \ -I="$PROTOBUF_CORE_DIR" \ diff --git a/bin/texera-web-application.dockerfile b/bin/texera-web-application.dockerfile index a829fb16aa1..f73e55dd138 100644 --- a/bin/texera-web-application.dockerfile +++ b/bin/texera-web-application.dockerfile @@ -50,20 +50,34 @@ COPY project/ project/ COPY build.sbt build.sbt COPY .jvmopts .jvmopts -# Update system and install dependencies. python3-minimal is needed by -# bin/licensing/concat_license_binary.py below. +# python3-minimal is needed by bin/licensing/concat_license_binary.py +# below; python3-pip + curl are for the protoc + betterproto[compiler] +# install below. RUN apt-get update && apt-get install -y \ netcat \ unzip \ + curl \ libpq-dev \ python3-minimal \ + python3-pip \ && apt-get clean +# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the +# betterproto plugin are required by the genPythonProto sbt task so the +# generated amber/src/main/python/proto/ tree is populated before the +# WorkflowExecutionService dist is packaged. +RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ + && unzip -o /tmp/protoc.zip -d /usr/local \ + && chmod +x /usr/local/bin/protoc \ + && rm /tmp/protoc.zip \ + && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh # Bring frontend/LICENSE-binary into this build stage so the per-image # LICENSE merge below can union it with amber/LICENSE-binary-java. From 8afe6bd3ebd519f3deb9cf1cef6e7650ff782951 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 15 May 2026 04:53:45 -0700 Subject: [PATCH 2/6] make python proto-gen robust to varying protoc install layouts --- amber/build.sbt | 9 +++++---- bin/python-proto-gen.sh | 3 +++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/amber/build.sbt b/amber/build.sbt index ce0bb1dfab2..77911eeeda8 100644 --- a/amber/build.sbt +++ b/amber/build.sbt @@ -200,15 +200,16 @@ libraryDependencies += "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0" // enable protobuf compilation in Test Test / PB.protoSources += PB.externalSourcePath.value -// Skipped with a warning if protoc is missing. +// Skipped with a warning if protoc or the betterproto plugin is missing. val genPythonProto = taskKey[Unit]("Generate Python betterproto bindings from .proto sources.") genPythonProto := { val log = streams.value.log val repoRoot = (ThisBuild / baseDirectory).value val script = repoRoot / "bin" / "python-proto-gen.sh" - val protocOnPath = scala.sys.process.Process(Seq("bash", "-c", "command -v protoc >/dev/null 2>&1")).! == 0 - if (!protocOnPath) { - log.warn("protoc not found on PATH; skipping Python proto generation. Install protoc and `pip install betterproto[compiler]` before running pytest.") + def onPath(bin: String): Boolean = + scala.sys.process.Process(Seq("bash", "-c", s"command -v $bin >/dev/null 2>&1")).! == 0 + if (!onPath("protoc") || !onPath("protoc-gen-python_betterproto")) { + log.warn("protoc or protoc-gen-python_betterproto not found on PATH; skipping Python proto generation. Install protoc and `pip install betterproto[compiler]` before running pytest.") } else { val exit = scala.sys.process.Process(Seq("bash", script.getAbsolutePath), repoRoot).!(log) if (exit != 0) sys.error(s"python-proto-gen.sh failed with exit code $exit") diff --git a/bin/python-proto-gen.sh b/bin/python-proto-gen.sh index f512c54ee38..68c096fa54b 100755 --- a/bin/python-proto-gen.sh +++ b/bin/python-proto-gen.sh @@ -28,9 +28,12 @@ PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf" CORE_DIR="$TEXERA_HOME/common/workflow-core" PROTOBUF_CORE_DIR="$CORE_DIR/src/main/protobuf" +PROTOC_INCLUDE_DIR="$(dirname "$(dirname "$(command -v protoc)")")/include" + # proto-gen mkdir -p "$PYAMBER_DIR/proto" protoc --python_betterproto_out="$PYAMBER_DIR/proto" \ + -I="$PROTOC_INCLUDE_DIR" \ -I="$PROTOBUF_AMBER_DIR" \ -I="$PROTOBUF_CORE_DIR" \ $(find "$PROTOBUF_AMBER_DIR" -iname "*.proto") \ From ce43bfbfa364e40ac9bd7b0616bf53215de41f97 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 15 May 2026 05:04:15 -0700 Subject: [PATCH 3/6] make protoc include dir readable to non-root runner --- .github/workflows/build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c3aa4841b8e..12ee508ba7f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -328,6 +328,7 @@ jobs: curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip sudo unzip -o /tmp/protoc.zip -d /usr/local sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include/google - name: Create Databases run: | psql -h localhost -U postgres -f sql/texera_ddl.sql @@ -635,6 +636,7 @@ jobs: curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip sudo unzip -o /tmp/protoc.zip -d /usr/local sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include/google - name: Generate Python proto bindings run: bash bin/python-proto-gen.sh - name: Test with pytest From 2207055cd290097058c073235f26383bdc9e6726 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 25 May 2026 19:35:20 -0700 Subject: [PATCH 4/6] use pip constraint to install betterproto[compiler] without dev-deps bloat --- .github/workflows/build.yml | 45 ++++++++++++++++++++++----- .gitignore | 2 +- AGENTS.md | 8 +---- amber/build.sbt | 21 +++---------- bin/computing-unit-master.dockerfile | 23 ++++++++------ bin/computing-unit-worker.dockerfile | 23 ++++++++------ bin/protoc-version.txt | 1 + bin/python-proto-gen.sh | 8 ++--- bin/texera-web-application.dockerfile | 24 ++++++++------ 9 files changed, 89 insertions(+), 66 deletions(-) create mode 100644 bin/protoc-version.txt diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 12ee508ba7f..944ee7bd40a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -180,6 +180,24 @@ jobs: with: distribution: "temurin" java-version: 17 + - name: Setup Python for proto-gen + # bin/python-proto-gen.sh (run below before sbt dist) needs + # python3 and the betterproto plugin from amber/dev-requirements.txt. + uses: actions/setup-python@v6 + with: + python-version: "3.11" + - name: Install Python dependencies for proto-gen + run: | + python -m pip install uv + uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt + - name: Install protoc + # Version pinned in bin/protoc-version.txt. + run: | + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include/google - name: Create Databases # Must run before any sbt compile step: the build's JOOQ source # generators connect to texera_db while compiling. @@ -194,6 +212,11 @@ jobs: - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # v8.1.0 with: extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", "project/build.properties" ]' + - name: Generate Python proto bindings + # Must run before `WorkflowExecutionService/dist` so the amber + # dist packages amber/src/main/python/proto/. Independent of sbt + # and the JDK. + run: bash bin/python-proto-gen.sh - name: Lint and build amber distributable bundle # Single sbt invocation: scalafmt -> scalafix -> amber dist. # scalafmtCheckAll and scalafixAll cover every Scala module, so the @@ -316,16 +339,17 @@ jobs: # flag a dependabot bump to a version not yet mirrored there # fails to resolve even though PyPI has it. # dev-requirements.txt provides the betterproto plugin used by - # genPythonProto (wired into amber/Compile in amber/build.sbt). + # bin/python-proto-gen.sh. run: | python -m pip install uv if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi - - name: Install protoc 3.19.4 - # Matches PB.protocVersion in amber/build.sbt. + - name: Install protoc + # Version pinned in bin/protoc-version.txt. run: | - curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" sudo unzip -o /tmp/protoc.zip -d /usr/local sudo chmod +x /usr/local/bin/protoc sudo chmod -R a+rX /usr/local/include/google @@ -436,6 +460,10 @@ jobs: } } EOF + - name: Generate Python proto bindings + # Integration specs spawn Python UDF workers that import the + # generated betterproto bindings. Independent of sbt and the JDK. + run: bash bin/python-proto-gen.sh - name: Lint and run amber integration tests # AMBER_TEST_FILTER=integration-only tells amber/build.sbt to # keep only @org.apache.texera.amber.tags.IntegrationTest @@ -630,10 +658,11 @@ jobs: run: | python -m pip install uv if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi - - name: Install protoc 3.19.4 - # Matches PB.protocVersion in amber/build.sbt. + - name: Install protoc + # Version pinned in bin/protoc-version.txt. run: | - curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" sudo unzip -o /tmp/protoc.zip -d /usr/local sudo chmod +x /usr/local/bin/protoc sudo chmod -R a+rX /usr/local/include/google @@ -683,6 +712,8 @@ jobs: if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi - name: Install protoc + # Homebrew protoc; this job doesn't exercise scalapb so the + # bin/protoc-version.txt pin doesn't apply here. run: brew install protobuf - name: Generate Python proto bindings run: bash bin/python-proto-gen.sh diff --git a/.gitignore b/.gitignore index ee570fceea8..6e772fee5ab 100644 --- a/.gitignore +++ b/.gitignore @@ -60,7 +60,7 @@ coverage.xml *.model *.pkl -# Regenerated by sbt amber/compile. +# Regenerated by bin/python-proto-gen.sh. amber/src/main/python/proto/ # Ingoring user generated resources diff --git a/AGENTS.md b/AGENTS.md index f517eb71979..eef0aea460d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -83,16 +83,10 @@ One Python venv shared across worktrees, sibling of the texera checkout: ```bash python3.12 -m venv ../venv312 && source ../venv312/bin/activate pip install -r amber/requirements.txt -r amber/operator-requirements.txt -# For pytest or sbt-driven Python codegen, also install dev deps: +# For pytest or running bin/python-proto-gen.sh, also install dev deps: pip install -r amber/dev-requirements.txt ``` -`amber/src/main/python/proto/` is gitignored and regenerated by -[`bin/python-proto-gen.sh`](bin/python-proto-gen.sh) on `sbt amber/compile` -(see `genPythonProto` in [`amber/build.sbt`](amber/build.sbt)). Requires -`protoc` on PATH (pin to `3.19.4` to match `PB.protocVersion`); skipped -with a warning when `protoc` is missing. - Tests that spawn Python workers need an interpreter path. Edit `python.path` in [`udf.conf`](common/config/src/main/resources/udf.conf) or `export UDF_PYTHON_PATH="$(pwd)/../venv312/bin/python"` (env var overrides). diff --git a/amber/build.sbt b/amber/build.sbt index 77911eeeda8..2cd20ed79b2 100644 --- a/amber/build.sbt +++ b/amber/build.sbt @@ -179,7 +179,7 @@ libraryDependencies ++= hadoopDependencies // protobuf related // run the following with sbt to have protobuf codegen -PB.protocVersion := "3.19.4" +PB.protocVersion := IO.read((ThisBuild / baseDirectory).value / "bin" / "protoc-version.txt").trim enablePlugins(Fs2Grpc) @@ -200,22 +200,9 @@ libraryDependencies += "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0" // enable protobuf compilation in Test Test / PB.protoSources += PB.externalSourcePath.value -// Skipped with a warning if protoc or the betterproto plugin is missing. -val genPythonProto = taskKey[Unit]("Generate Python betterproto bindings from .proto sources.") -genPythonProto := { - val log = streams.value.log - val repoRoot = (ThisBuild / baseDirectory).value - val script = repoRoot / "bin" / "python-proto-gen.sh" - def onPath(bin: String): Boolean = - scala.sys.process.Process(Seq("bash", "-c", s"command -v $bin >/dev/null 2>&1")).! == 0 - if (!onPath("protoc") || !onPath("protoc-gen-python_betterproto")) { - log.warn("protoc or protoc-gen-python_betterproto not found on PATH; skipping Python proto generation. Install protoc and `pip install betterproto[compiler]` before running pytest.") - } else { - val exit = scala.sys.process.Process(Seq("bash", script.getAbsolutePath), repoRoot).!(log) - if (exit != 0) sys.error(s"python-proto-gen.sh failed with exit code $exit") - } -} -Compile / compile := (Compile / compile).dependsOn(genPythonProto).value +// Python betterproto bindings (amber/src/main/python/proto/) are regenerated +// out-of-band by bin/python-proto-gen.sh so dockerfiles, CI, and local +// devs can refresh bindings without sbt or a JDK. ///////////////////////////////////////////////////////////////////////////// // Test related diff --git a/bin/computing-unit-master.dockerfile b/bin/computing-unit-master.dockerfile index c303604a4bf..0d9d60b79f5 100644 --- a/bin/computing-unit-master.dockerfile +++ b/bin/computing-unit-master.dockerfile @@ -36,9 +36,8 @@ COPY project/ project/ COPY build.sbt build.sbt COPY .jvmopts .jvmopts -# python3-minimal is needed by bin/licensing/concat_license_binary.py -# below; python3-pip + curl are for the protoc + betterproto[compiler] -# install below. +# python3-minimal is needed by bin/licensing/concat_license_binary.py; +# python3-pip installs the betterproto plugin; unzip + curl fetch protoc. RUN apt-get update && apt-get install -y \ netcat \ unzip \ @@ -48,22 +47,26 @@ RUN apt-get update && apt-get install -y \ python3-pip \ && apt-get clean -# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the -# betterproto plugin are required by the genPythonProto sbt task so the -# generated amber/src/main/python/proto/ tree is populated before the -# dist is packaged. -RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ +# Install protoc (version pinned in bin/protoc-version.txt) and the +# betterproto plugin (version pinned via amber/requirements.txt as a +# pip constraint, so the runtime base `betterproto` and the build-time +# `betterproto[compiler]` stay in lockstep), then regenerate +# amber/src/main/python/proto/ before `sbt dist`. +COPY bin/protoc-version.txt bin/protoc-version.txt +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh +RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \ + && curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" \ && unzip -o /tmp/protoc.zip -d /usr/local \ && chmod +x /usr/local/bin/protoc \ && rm /tmp/protoc.zip \ - && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + && pip3 install --no-cache-dir -c amber/requirements.txt 'betterproto[compiler]' \ + && bash bin/python-proto-gen.sh # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ -COPY bin/python-proto-gen.sh bin/python-proto-gen.sh RUN sbt clean WorkflowExecutionService/dist diff --git a/bin/computing-unit-worker.dockerfile b/bin/computing-unit-worker.dockerfile index 0fd375f926e..fc80998888f 100644 --- a/bin/computing-unit-worker.dockerfile +++ b/bin/computing-unit-worker.dockerfile @@ -36,9 +36,8 @@ COPY project/ project/ COPY build.sbt build.sbt COPY .jvmopts .jvmopts -# Update system and install dependencies. python3-minimal is needed by -# bin/licensing/concat_license_binary.py below; python3-pip + curl are -# for the protoc + betterproto[compiler] install below. +# python3-minimal is needed by bin/licensing/concat_license_binary.py; +# python3-pip installs the betterproto plugin; unzip + curl fetch protoc. RUN apt-get update && apt-get install -y \ netcat \ unzip \ @@ -48,22 +47,26 @@ RUN apt-get update && apt-get install -y \ python3-pip \ && apt-get clean -# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the -# betterproto plugin are required by the genPythonProto sbt task so the -# generated amber/src/main/python/proto/ tree is populated before the -# dist is packaged. -RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ +# Install protoc (version pinned in bin/protoc-version.txt) and the +# betterproto plugin (version pinned via amber/requirements.txt as a +# pip constraint, so the runtime base `betterproto` and the build-time +# `betterproto[compiler]` stay in lockstep), then regenerate +# amber/src/main/python/proto/ before `sbt dist`. +COPY bin/protoc-version.txt bin/protoc-version.txt +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh +RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \ + && curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" \ && unzip -o /tmp/protoc.zip -d /usr/local \ && chmod +x /usr/local/bin/protoc \ && rm /tmp/protoc.zip \ - && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + && pip3 install --no-cache-dir -c amber/requirements.txt 'betterproto[compiler]' \ + && bash bin/python-proto-gen.sh # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ -COPY bin/python-proto-gen.sh bin/python-proto-gen.sh RUN sbt clean WorkflowExecutionService/dist diff --git a/bin/protoc-version.txt b/bin/protoc-version.txt new file mode 100644 index 00000000000..de24deecf37 --- /dev/null +++ b/bin/protoc-version.txt @@ -0,0 +1 @@ +3.19.4 diff --git a/bin/python-proto-gen.sh b/bin/python-proto-gen.sh index 68c096fa54b..db51bb76265 100755 --- a/bin/python-proto-gen.sh +++ b/bin/python-proto-gen.sh @@ -15,12 +15,12 @@ # specific language governing permissions and limitations # under the License. -# assuming inside the pytexera executing Python ENV - set -euo pipefail -# dirs -TEXERA_HOME="$(git rev-parse --show-toplevel)" +# Resolve repo root from this script's location (avoids git/CWD assumptions +# so the script works inside Docker build stages before .git is copied). +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEXERA_HOME="$(cd "$SCRIPT_DIR/.." && pwd)" AMBER_DIR="$TEXERA_HOME/amber" PYAMBER_DIR="$AMBER_DIR/src/main/python" PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf" diff --git a/bin/texera-web-application.dockerfile b/bin/texera-web-application.dockerfile index f73e55dd138..efaee5699a8 100644 --- a/bin/texera-web-application.dockerfile +++ b/bin/texera-web-application.dockerfile @@ -50,9 +50,8 @@ COPY project/ project/ COPY build.sbt build.sbt COPY .jvmopts .jvmopts -# python3-minimal is needed by bin/licensing/concat_license_binary.py -# below; python3-pip + curl are for the protoc + betterproto[compiler] -# install below. +# python3-minimal is needed by bin/licensing/concat_license_binary.py; +# python3-pip installs the betterproto plugin; unzip + curl fetch protoc. RUN apt-get update && apt-get install -y \ netcat \ unzip \ @@ -62,22 +61,27 @@ RUN apt-get update && apt-get install -y \ python3-pip \ && apt-get clean -# protoc 3.19.4 (matches PB.protocVersion in amber/build.sbt) and the -# betterproto plugin are required by the genPythonProto sbt task so the -# generated amber/src/main/python/proto/ tree is populated before the -# WorkflowExecutionService dist is packaged. -RUN curl -fsSL -o /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip \ +# Install protoc (version pinned in bin/protoc-version.txt) and the +# betterproto plugin (version pinned via amber/requirements.txt as a +# pip constraint, so the runtime base `betterproto` and the build-time +# `betterproto[compiler]` stay in lockstep), then regenerate +# amber/src/main/python/proto/ before the WorkflowExecutionService dist +# is packaged. +COPY bin/protoc-version.txt bin/protoc-version.txt +COPY bin/python-proto-gen.sh bin/python-proto-gen.sh +RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \ + && curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" \ && unzip -o /tmp/protoc.zip -d /usr/local \ && chmod +x /usr/local/bin/protoc \ && rm /tmp/protoc.zip \ - && pip3 install --no-cache-dir 'betterproto[compiler]==2.0.0b7' + && pip3 install --no-cache-dir -c amber/requirements.txt 'betterproto[compiler]' \ + && bash bin/python-proto-gen.sh # Add .git for runtime calls to jgit from OPversion COPY .git .git COPY LICENSE NOTICE DISCLAIMER ./ COPY licenses/ licenses/ COPY bin/licensing/ bin/licensing/ -COPY bin/python-proto-gen.sh bin/python-proto-gen.sh # Bring frontend/LICENSE-binary into this build stage so the per-image # LICENSE merge below can union it with amber/LICENSE-binary-java. From 313cf5493698532d7b799a5a1ed320100bf0eabe Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 26 May 2026 00:19:16 -0700 Subject: [PATCH 5/6] Removed comment --- amber/build.sbt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/amber/build.sbt b/amber/build.sbt index 2cd20ed79b2..dc9b5d8f301 100644 --- a/amber/build.sbt +++ b/amber/build.sbt @@ -200,10 +200,6 @@ libraryDependencies += "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0" // enable protobuf compilation in Test Test / PB.protoSources += PB.externalSourcePath.value -// Python betterproto bindings (amber/src/main/python/proto/) are regenerated -// out-of-band by bin/python-proto-gen.sh so dockerfiles, CI, and local -// devs can refresh bindings without sbt or a JDK. - ///////////////////////////////////////////////////////////////////////////// // Test related // https://mvnrepository.com/artifact/org.scalamock/scalamock From 3077c09d05903e553a949ec568a1410924f47edd Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 26 May 2026 01:01:37 -0700 Subject: [PATCH 6/6] Removed all four prot-gen steps from amber --- .github/workflows/build.yml | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d856fa40629..2b93c555764 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -206,24 +206,6 @@ jobs: with: distribution: "temurin" java-version: 17 - - name: Setup Python for proto-gen - # bin/python-proto-gen.sh (run below before sbt dist) needs - # python3 and the betterproto plugin from amber/dev-requirements.txt. - uses: actions/setup-python@v6 - with: - python-version: "3.11" - - name: Install Python dependencies for proto-gen - run: | - python -m pip install uv - uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt - - name: Install protoc - # Version pinned in bin/protoc-version.txt. - run: | - PROTOC_VERSION=$(cat bin/protoc-version.txt) - curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" - sudo unzip -o /tmp/protoc.zip -d /usr/local - sudo chmod +x /usr/local/bin/protoc - sudo chmod -R a+rX /usr/local/include/google - name: Create Databases # Must run before any sbt compile step: the build's JOOQ source # generators connect to texera_db while compiling. @@ -238,11 +220,6 @@ jobs: - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # v8.1.0 with: extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", "project/build.properties" ]' - - name: Generate Python proto bindings - # Must run before `WorkflowExecutionService/dist` so the amber - # dist packages amber/src/main/python/proto/. Independent of sbt - # and the JDK. - run: bash bin/python-proto-gen.sh - name: Lint and build amber distributable bundle # Single sbt invocation: scalafmt -> scalafix -> amber dist. # scalafmtCheckAll and scalafixAll cover every Scala module, so the