Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/CODEOWNERS

This file was deleted.

18 changes: 14 additions & 4 deletions src/aws_durable_execution_sdk_python/concurrency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def __init__(
self, resubmit_callback: Callable[[ExecutableWithState], None]
) -> None:
self.resubmit_callback = resubmit_callback
self._pending_resumes: list[tuple[float, ExecutableWithState]] = []
self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = []
self._lock = threading.Lock()
self._schedule_counter = 0
self._shutdown = threading.Event()
self._timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
self._timer_thread.start()
Expand All @@ -73,9 +74,18 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
def schedule_resume(
self, exe_state: ExecutableWithState, resume_time: float
) -> None:
"""Schedule a task to resume at the specified time."""
"""Schedule a task to resume at the specified time.

Uses a counter as a tie-breaker to ensure FIFO ordering when multiple
tasks have the same resume_time, preventing TypeError from comparing
ExecutableWithState objects.
"""
with self._lock:
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
heapq.heappush(
self._pending_resumes,
(resume_time, self._schedule_counter, exe_state),
)
self._schedule_counter += 1
Comment on lines +84 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
heapq.heappush(
self._pending_resumes,
(resume_time, self._schedule_counter, exe_state),
)
self._schedule_counter += 1
heapq.heappush(self._pending_resumes, (resume_time, next(self._counter), exe_state))


def shutdown(self) -> None:
"""Shutdown the timer thread and cancel all pending resumes."""
Expand Down Expand Up @@ -108,7 +118,7 @@ def _timer_loop(self) -> None:
self._pending_resumes
and self._pending_resumes[0][0] <= current_time
):
_, exe_state = heapq.heappop(self._pending_resumes)
_, _, exe_state = heapq.heappop(self._pending_resumes)
if exe_state.can_resume:
exe_state.reset_to_pending()
self.resubmit_callback(exe_state)
Expand Down
128 changes: 125 additions & 3 deletions tests/concurrency_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2817,7 +2817,10 @@ def task_func(ctx, item, idx, items):

items = list(range(100))
config = MapConfig(
max_concurrency=10, completion_config=CompletionConfig(min_successful=99)
max_concurrency=10,
completion_config=CompletionConfig(
min_successful=99, tolerated_failure_count=1
),
)

executor = MapExecutor.from_items(items=items, func=task_func, config=config)
Expand All @@ -2830,8 +2833,8 @@ def task_func(ctx, item, idx, items):

result = executor.execute(execution_state, executor_context)

# With concurrency=1, only 2 tasks should execute before terminating
# min_successful(99) + failure_count(2) = 101 > total_tasks(100)
# With tolerated_failure_count=1, executor stops when failure_count > 1 (at 2 failures)
# Executor terminates early rather than executing all 100 tasks
assert executed_count["value"] < 100
assert (
result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED
Expand Down Expand Up @@ -3031,3 +3034,122 @@ def slow_func():
assert result.failure_count == 0
assert result.started_count == 1
assert result.total_count == 2


# region TimerScheduler edge cases with exact same reschedule time


def test_timer_scheduler_same_timestamp_with_counter_tiebreaker():
"""
Test that scheduling two tasks with the exact same resume_time works.

This verifies the fix where a counter is used as a tie-breaker to prevent
TypeError when heapq tries to compare ExecutableWithState objects.
"""
resubmit_callback = Mock()

with TimerScheduler(resubmit_callback) as scheduler:
# Create two different ExecutableWithState objects
exe_state1 = ExecutableWithState(Executable(index=0, func=lambda: "test1"))
exe_state2 = ExecutableWithState(Executable(index=1, func=lambda: "test2"))

# Use the exact same timestamp for both
same_timestamp = time.time() + 10.0

# Both schedules should work fine now
scheduler.schedule_resume(exe_state1, same_timestamp)
scheduler.schedule_resume(exe_state2, same_timestamp)

# Verify both are in the heap
assert len(scheduler._pending_resumes) == 2 # noqa: SLF001

# Verify FIFO ordering (first scheduled should be first in heap)
first_item = scheduler._pending_resumes[0] # noqa: SLF001
assert first_item[0] == same_timestamp # timestamp
assert first_item[1] == 0 # counter (first scheduled)
assert first_item[2] == exe_state1 # first exe_state


def test_timer_scheduler_multiple_same_timestamps():
"""
Test that scheduling many tasks with the same timestamp works correctly.

Verifies FIFO ordering is maintained when multiple tasks have identical timestamps.
"""
resubmit_callback = Mock()

with TimerScheduler(resubmit_callback) as scheduler:
same_timestamp = time.time() + 10.0

# Create and schedule 10 tasks with the same timestamp
exe_states = [
ExecutableWithState(Executable(index=i, func=lambda i=i: f"test{i}"))
for i in range(10)
]

for exe_state in exe_states:
scheduler.schedule_resume(exe_state, same_timestamp)

# All should be scheduled successfully
assert len(scheduler._pending_resumes) == 10 # noqa: SLF001

# Verify the heap maintains proper ordering
# The first item should have counter 0
assert scheduler._pending_resumes[0][1] == 0 # noqa: SLF001


def test_timer_scheduler_counter_increments():
"""Test that the schedule counter increments correctly."""
resubmit_callback = Mock()

with TimerScheduler(resubmit_callback) as scheduler:
exe_state1 = ExecutableWithState(Executable(0, lambda: "test1"))
exe_state2 = ExecutableWithState(Executable(1, lambda: "test2"))
exe_state3 = ExecutableWithState(Executable(2, lambda: "test3"))

# Schedule with different times
scheduler.schedule_resume(exe_state1, time.time() + 1.0)
scheduler.schedule_resume(exe_state2, time.time() + 2.0)
scheduler.schedule_resume(exe_state3, time.time() + 3.0)

# Counter should have incremented to 3
assert scheduler._schedule_counter == 3 # noqa: SLF001


def test_timer_scheduler_fifo_ordering_with_same_timestamp():
"""
Test that FIFO ordering is maintained when timestamps are equal.

When multiple tasks have the same timestamp, they should be processed
in the order they were scheduled (FIFO). The timer thread processes
items synchronously, so callback order is deterministic.
"""
results = []
resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index))

with TimerScheduler(resubmit_callback) as scheduler:
# Use a past timestamp so they trigger immediately
past_time = time.time() - 0.1

exe_state1 = ExecutableWithState(Executable(0, lambda: "first"))
exe_state2 = ExecutableWithState(Executable(1, lambda: "second"))
exe_state3 = ExecutableWithState(Executable(2, lambda: "third"))

# Make them all resumable
exe_state1.suspend()
exe_state2.suspend()
exe_state3.suspend()

# Schedule all with same timestamp
scheduler.schedule_resume(exe_state1, past_time)
scheduler.schedule_resume(exe_state2, past_time)
scheduler.schedule_resume(exe_state3, past_time)

# Wait for timer thread to process them
time.sleep(0.3)

# Verify FIFO order - they should be resubmitted in order 0, 1, 2
assert results == [0, 1, 2]


# endregion TimerScheduler edge cases with exact same reschedule time
Loading