diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS deleted file mode 100644 index ee4902b..0000000 --- a/.github/CODEOWNERS +++ /dev/null @@ -1 +0,0 @@ -* @yaythomas @wangyb-A diff --git a/src/aws_durable_execution_sdk_python/concurrency/executor.py b/src/aws_durable_execution_sdk_python/concurrency/executor.py index da1a5cd..1a6c608 100644 --- a/src/aws_durable_execution_sdk_python/concurrency/executor.py +++ b/src/aws_durable_execution_sdk_python/concurrency/executor.py @@ -58,8 +58,9 @@ def __init__( self, resubmit_callback: Callable[[ExecutableWithState], None] ) -> None: self.resubmit_callback = resubmit_callback - self._pending_resumes: list[tuple[float, ExecutableWithState]] = [] + self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = [] self._lock = threading.Lock() + self._schedule_counter = 0 self._shutdown = threading.Event() self._timer_thread = threading.Thread(target=self._timer_loop, daemon=True) self._timer_thread.start() @@ -73,9 +74,18 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: def schedule_resume( self, exe_state: ExecutableWithState, resume_time: float ) -> None: - """Schedule a task to resume at the specified time.""" + """Schedule a task to resume at the specified time. + + Uses a counter as a tie-breaker to ensure FIFO ordering when multiple + tasks have the same resume_time, preventing TypeError from comparing + ExecutableWithState objects. + """ with self._lock: - heapq.heappush(self._pending_resumes, (resume_time, exe_state)) + heapq.heappush( + self._pending_resumes, + (resume_time, self._schedule_counter, exe_state), + ) + self._schedule_counter += 1 def shutdown(self) -> None: """Shutdown the timer thread and cancel all pending resumes.""" @@ -108,7 +118,7 @@ def _timer_loop(self) -> None: self._pending_resumes and self._pending_resumes[0][0] <= current_time ): - _, exe_state = heapq.heappop(self._pending_resumes) + _, _, exe_state = heapq.heappop(self._pending_resumes) if exe_state.can_resume: exe_state.reset_to_pending() self.resubmit_callback(exe_state) diff --git a/tests/concurrency_test.py b/tests/concurrency_test.py index cb2f0ba..b94f39f 100644 --- a/tests/concurrency_test.py +++ b/tests/concurrency_test.py @@ -2817,7 +2817,10 @@ def task_func(ctx, item, idx, items): items = list(range(100)) config = MapConfig( - max_concurrency=10, completion_config=CompletionConfig(min_successful=99) + max_concurrency=10, + completion_config=CompletionConfig( + min_successful=99, tolerated_failure_count=1 + ), ) executor = MapExecutor.from_items(items=items, func=task_func, config=config) @@ -2830,8 +2833,8 @@ def task_func(ctx, item, idx, items): result = executor.execute(execution_state, executor_context) - # With concurrency=1, only 2 tasks should execute before terminating - # min_successful(99) + failure_count(2) = 101 > total_tasks(100) + # With tolerated_failure_count=1, executor stops when failure_count > 1 (at 2 failures) + # Executor terminates early rather than executing all 100 tasks assert executed_count["value"] < 100 assert ( result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED @@ -3031,3 +3034,122 @@ def slow_func(): assert result.failure_count == 0 assert result.started_count == 1 assert result.total_count == 2 + + +# region TimerScheduler edge cases with exact same reschedule time + + +def test_timer_scheduler_same_timestamp_with_counter_tiebreaker(): + """ + Test that scheduling two tasks with the exact same resume_time works. + + This verifies the fix where a counter is used as a tie-breaker to prevent + TypeError when heapq tries to compare ExecutableWithState objects. + """ + resubmit_callback = Mock() + + with TimerScheduler(resubmit_callback) as scheduler: + # Create two different ExecutableWithState objects + exe_state1 = ExecutableWithState(Executable(index=0, func=lambda: "test1")) + exe_state2 = ExecutableWithState(Executable(index=1, func=lambda: "test2")) + + # Use the exact same timestamp for both + same_timestamp = time.time() + 10.0 + + # Both schedules should work fine now + scheduler.schedule_resume(exe_state1, same_timestamp) + scheduler.schedule_resume(exe_state2, same_timestamp) + + # Verify both are in the heap + assert len(scheduler._pending_resumes) == 2 # noqa: SLF001 + + # Verify FIFO ordering (first scheduled should be first in heap) + first_item = scheduler._pending_resumes[0] # noqa: SLF001 + assert first_item[0] == same_timestamp # timestamp + assert first_item[1] == 0 # counter (first scheduled) + assert first_item[2] == exe_state1 # first exe_state + + +def test_timer_scheduler_multiple_same_timestamps(): + """ + Test that scheduling many tasks with the same timestamp works correctly. + + Verifies FIFO ordering is maintained when multiple tasks have identical timestamps. + """ + resubmit_callback = Mock() + + with TimerScheduler(resubmit_callback) as scheduler: + same_timestamp = time.time() + 10.0 + + # Create and schedule 10 tasks with the same timestamp + exe_states = [ + ExecutableWithState(Executable(index=i, func=lambda i=i: f"test{i}")) + for i in range(10) + ] + + for exe_state in exe_states: + scheduler.schedule_resume(exe_state, same_timestamp) + + # All should be scheduled successfully + assert len(scheduler._pending_resumes) == 10 # noqa: SLF001 + + # Verify the heap maintains proper ordering + # The first item should have counter 0 + assert scheduler._pending_resumes[0][1] == 0 # noqa: SLF001 + + +def test_timer_scheduler_counter_increments(): + """Test that the schedule counter increments correctly.""" + resubmit_callback = Mock() + + with TimerScheduler(resubmit_callback) as scheduler: + exe_state1 = ExecutableWithState(Executable(0, lambda: "test1")) + exe_state2 = ExecutableWithState(Executable(1, lambda: "test2")) + exe_state3 = ExecutableWithState(Executable(2, lambda: "test3")) + + # Schedule with different times + scheduler.schedule_resume(exe_state1, time.time() + 1.0) + scheduler.schedule_resume(exe_state2, time.time() + 2.0) + scheduler.schedule_resume(exe_state3, time.time() + 3.0) + + # Counter should have incremented to 3 + assert scheduler._schedule_counter == 3 # noqa: SLF001 + + +def test_timer_scheduler_fifo_ordering_with_same_timestamp(): + """ + Test that FIFO ordering is maintained when timestamps are equal. + + When multiple tasks have the same timestamp, they should be processed + in the order they were scheduled (FIFO). The timer thread processes + items synchronously, so callback order is deterministic. + """ + results = [] + resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index)) + + with TimerScheduler(resubmit_callback) as scheduler: + # Use a past timestamp so they trigger immediately + past_time = time.time() - 0.1 + + exe_state1 = ExecutableWithState(Executable(0, lambda: "first")) + exe_state2 = ExecutableWithState(Executable(1, lambda: "second")) + exe_state3 = ExecutableWithState(Executable(2, lambda: "third")) + + # Make them all resumable + exe_state1.suspend() + exe_state2.suspend() + exe_state3.suspend() + + # Schedule all with same timestamp + scheduler.schedule_resume(exe_state1, past_time) + scheduler.schedule_resume(exe_state2, past_time) + scheduler.schedule_resume(exe_state3, past_time) + + # Wait for timer thread to process them + time.sleep(0.3) + + # Verify FIFO order - they should be resubmitted in order 0, 1, 2 + assert results == [0, 1, 2] + + +# endregion TimerScheduler edge cases with exact same reschedule time