Skip to content

Commit 005cb2d

Browse files
committed
Enable SDK flag, pick up initiate_shutdown changes
1 parent 496a650 commit 005cb2d

File tree

7 files changed

+13
-9
lines changed

7 files changed

+13
-9
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -683,10 +683,12 @@ impl WorkerRef {
683683
.map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}")))
684684
}
685685

686-
fn initiate_shutdown(&self) -> PyResult<()> {
686+
fn initiate_shutdown<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
687687
let worker = self.worker.as_ref().unwrap().clone();
688-
worker.initiate_shutdown();
689-
Ok(())
688+
self.runtime.future_into_py(py, async move {
689+
worker.initiate_shutdown().await;
690+
Ok(())
691+
})
690692
}
691693

692694
fn finalize_shutdown<'p>(&mut self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {

temporalio/bridge/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ def replace_client(self, client: temporalio.bridge.client.Client) -> None:
265265
"""Replace the worker client."""
266266
self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess]
267267

268-
def initiate_shutdown(self) -> None:
268+
async def initiate_shutdown(self) -> None:
269269
"""Start shutdown of the worker."""
270-
self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]
270+
await self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]
271271

272272
async def finalize_shutdown(self) -> None:
273273
"""Finalize the worker.

temporalio/worker/_replayer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
390390
# We must shutdown here
391391
try:
392392
if bridge_worker_scope is not None:
393-
bridge_worker_scope.initiate_shutdown()
393+
await bridge_worker_scope.initiate_shutdown()
394394
await bridge_worker_scope.finalize_shutdown()
395395
except Exception:
396396
logger.warning("Failed to finalize shutdown", exc_info=True)

temporalio/worker/_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ async def raise_on_shutdown():
837837
)
838838

839839
# Initiate core worker shutdown
840-
self._bridge_worker.initiate_shutdown()
840+
await self._bridge_worker.initiate_shutdown()
841841

842842
# If any worker task had an exception, replace that task with a queue drain
843843
for worker, task in tasks.items():

temporalio/worker/_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ async def _handle_cache_eviction(
622622
except Exception as e:
623623
self._throw_after_activation = e
624624
logger.debug("Shutting down worker on eviction hook exception")
625-
self._bridge_worker().initiate_shutdown()
625+
await self._bridge_worker().initiate_shutdown()
626626

627627
def _create_workflow_instance(
628628
self,

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0"
1+
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6"

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
124124
"history.enableChasm=true",
125125
"--dynamic-config-value",
126126
"history.enableTransitionHistory=true",
127+
"--dynamic-config-value",
128+
"frontend.enableCancelWorkerPollsOnShutdown=true",
127129
],
128130
dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION,
129131
)

0 commit comments

Comments
 (0)