Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions src/conductor/client/automator/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import AuthorizationException, ApiException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
Expand Down Expand Up @@ -111,6 +111,7 @@ def __init__(
# Semaphore will be created in run() within the event loop
self._semaphore = None
self._shutdown = False # Flag to indicate graceful shutdown
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint

async def run(self) -> None:
"""Main async loop - runs continuously in single event loop."""
Expand Down Expand Up @@ -583,6 +584,11 @@ async def __async_execute_and_update_task(self, task: Task) -> None:
return
# Update task and get next task from v2 response
task = await self.__async_update_task(task_result)
# v2 returns the next task; if v1 was used (returns None), immediately
# poll for the next task to preserve tight-loop behaviour on older servers
if task is None and not self._use_update_v2 and not self._shutdown:
tasks = await self.__async_batch_poll(1)
task = tasks[0] if tasks else None
except Exception as e:
logger.error(
"Error executing/updating task %s: %s",
Expand Down Expand Up @@ -815,15 +821,55 @@ async def __async_update_task(self, task_result: TaskResult):
# Exponential backoff: [10s, 20s, 30s] before retry
await asyncio.sleep(attempt * 10)
try:
next_task = await self.async_task_client.update_task_v2(body=task_result)
logger.debug(
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
if self._use_update_v2:
next_task = await self.async_task_client.update_task_v2(body=task_result)
logger.debug(
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
)
return next_task
else:
await self.async_task_client.update_task(body=task_result)
logger.debug(
"Updated async task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
)
return None
except ApiException as e:
if e.status == 404 and self._use_update_v2:
logger.warning(
"Server does not support update-task-v2 endpoint (HTTP 404). "
"Falling back to v1 update endpoint. "
"Upgrade your Conductor instance to v5+ to enable the v2 endpoint."
)
self._use_update_v2 = False
# Retry immediately with v1
try:
await self.async_task_client.update_task(body=task_result)
return None
except Exception as fallback_e:
last_exception = fallback_e
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
"Failed to update async task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
traceback.format_exc()
)
return next_task
continue
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
Expand Down
58 changes: 52 additions & 6 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import AuthorizationException, ApiException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
Expand Down Expand Up @@ -92,6 +92,7 @@ def __init__(
self._last_poll_time = 0 # Track last poll to avoid excessive polling when queue is empty
self._consecutive_empty_polls = 0 # Track empty polls to implement backoff
self._shutdown = False # Flag to indicate graceful shutdown
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint

def run(self) -> None:
if self.configuration is not None:
Expand Down Expand Up @@ -523,6 +524,11 @@ def __execute_and_update_task(self, task: Task) -> None:
return
# Update task and get next task from v2 response
task = self.__update_task(task_result)
# v2 returns the next task; if v1 was used (returns None), immediately
# poll for the next task to preserve tight-loop behaviour on older servers
if task is None and not self._use_update_v2 and not self._shutdown:
tasks = self.__batch_poll_tasks(1)
task = tasks[0] if tasks else None
except Exception as e:
logger.error(
"Error executing/updating task %s: %s",
Expand Down Expand Up @@ -845,15 +851,55 @@ def __update_task(self, task_result: TaskResult):
# Exponential backoff: [10s, 20s, 30s] before retry
time.sleep(attempt * 10)
try:
next_task = self.task_client.update_task_v2(body=task_result)
logger.debug(
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
if self._use_update_v2:
next_task = self.task_client.update_task_v2(body=task_result)
logger.debug(
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
)
return next_task
else:
self.task_client.update_task(body=task_result)
logger.debug(
"Updated task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
)
return None
except ApiException as e:
if e.status == 404 and self._use_update_v2:
logger.warning(
"Server does not support update-task-v2 endpoint (HTTP 404). "
"Falling back to v1 update endpoint. "
"Upgrade your Orkes instance to v5+ to enable the v2 endpoint."
)
self._use_update_v2 = False
# Retry immediately with v1
try:
self.task_client.update_task(body=task_result)
return None
except Exception as fallback_e:
last_exception = fallback_e
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
traceback.format_exc()
)
return next_task
continue
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
Expand Down
146 changes: 145 additions & 1 deletion tests/unit/automator/test_task_runner_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import AuthorizationException, ApiException
from conductor.client.worker.worker_interface import WorkerInterface


Expand Down Expand Up @@ -803,6 +803,150 @@ def test_update_task_with_metrics_on_error(self):
4
)

# ========================================
# v1 Fallback Tests (backward compat with Orkes Conductor < v5)
# ========================================

@patch('time.sleep', Mock(return_value=None))
def test_update_task_v2_404_falls_back_to_v1(self):
"""When server returns 404 for v2 endpoint, should fall back to v1 and return None."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)

task_result = TaskResult(
task_id='test_id',
workflow_instance_id='wf_id',
worker_id=worker.get_identity(),
status=TaskResultStatus.COMPLETED
)

with patch.object(TaskResourceApi, 'update_task_v2',
side_effect=ApiException(status=404)) as mock_v2, \
patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1:
result = task_runner._TaskRunner__update_task(task_result)

mock_v2.assert_called_once()
mock_v1.assert_called_once()
self.assertIsNone(result)

@patch('time.sleep', Mock(return_value=None))
def test_update_task_v2_404_sets_v1_flag(self):
"""After a 404 on v2, _use_update_v2 flag must be False."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)
self.assertTrue(task_runner._use_update_v2)

task_result = TaskResult(
task_id='test_id',
workflow_instance_id='wf_id',
worker_id=worker.get_identity(),
status=TaskResultStatus.COMPLETED
)

with patch.object(TaskResourceApi, 'update_task_v2',
side_effect=ApiException(status=404)), \
patch.object(TaskResourceApi, 'update_task', return_value='ok'):
task_runner._TaskRunner__update_task(task_result)

self.assertFalse(task_runner._use_update_v2)

@patch('time.sleep', Mock(return_value=None))
def test_update_task_uses_v1_only_after_flag_set(self):
"""Once _use_update_v2 is False, v2 is never called again."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)
task_runner._use_update_v2 = False # pre-set as if fallback already happened

task_result = TaskResult(
task_id='test_id',
workflow_instance_id='wf_id',
worker_id=worker.get_identity(),
status=TaskResultStatus.COMPLETED
)

with patch.object(TaskResourceApi, 'update_task_v2') as mock_v2, \
patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1:
result = task_runner._TaskRunner__update_task(task_result)

mock_v2.assert_not_called()
mock_v1.assert_called_once()
self.assertIsNone(result)

@patch('time.sleep', Mock(return_value=None))
def test_update_task_non_404_api_exception_does_not_fallback(self):
"""A non-404 ApiException (e.g. 500) should not trigger v1 fallback."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)

task_result = TaskResult(
task_id='test_id',
workflow_instance_id='wf_id',
worker_id=worker.get_identity(),
status=TaskResultStatus.COMPLETED
)

with patch.object(TaskResourceApi, 'update_task_v2',
side_effect=ApiException(status=500)) as mock_v2, \
patch.object(TaskResourceApi, 'update_task') as mock_v1:
result = task_runner._TaskRunner__update_task(task_result)

# v2 called 4 times (all retries), v1 never called, flag unchanged
self.assertEqual(mock_v2.call_count, 4)
mock_v1.assert_not_called()
self.assertTrue(task_runner._use_update_v2)
self.assertIsNone(result)

@patch('time.sleep', Mock(return_value=None))
def test_execute_and_update_task_tight_loop_with_v1_polls_for_next(self):
"""When v1 is used, the tight loop should poll immediately for the next task."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)
task_runner._use_update_v2 = False # simulate post-fallback state

first_task = Task(task_id='task_1', workflow_instance_id='wf_1')
second_task = Task(task_id='task_2', workflow_instance_id='wf_1')

# Execute returns a result, update v1 returns None, poll returns second task then empty
with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \
patch.object(TaskResourceApi, 'batch_poll',
side_effect=[[second_task], []]) as mock_poll:
task_runner._TaskRunner__execute_and_update_task(first_task)

# update_task called twice (once per task), poll called twice (second_task then empty)
self.assertEqual(mock_v1.call_count, 2)
self.assertEqual(mock_poll.call_count, 2)

@patch('time.sleep', Mock(return_value=None))
def test_execute_and_update_task_tight_loop_stops_when_queue_empty_on_v1(self):
"""With v1, if poll returns nothing the tight loop exits cleanly."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)
task_runner._use_update_v2 = False

task = Task(task_id='task_1', workflow_instance_id='wf_1')

with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \
patch.object(TaskResourceApi, 'batch_poll', return_value=[]) as mock_poll:
task_runner._TaskRunner__execute_and_update_task(task)

mock_v1.assert_called_once()
mock_poll.assert_called_once()

@patch('time.sleep', Mock(return_value=None))
def test_execute_and_update_task_tight_loop_not_pollled_when_v2(self):
"""With v2, poll is NOT called inside the tight loop (v2 returns next task directly)."""
worker = MockWorker('test_task')
task_runner = TaskRunner(worker=worker)

first_task = Task(task_id='task_1', workflow_instance_id='wf_1')

with patch.object(TaskResourceApi, 'update_task_v2', return_value=None) as mock_v2, \
patch.object(TaskResourceApi, 'batch_poll') as mock_poll:
task_runner._TaskRunner__execute_and_update_task(first_task)

mock_v2.assert_called_once()
mock_poll.assert_not_called()

# ========================================
# Property and Environment Tests
# ========================================
Expand Down
Loading