Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aws_durable_execution_sdk_python/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def from_log_info(cls, logger: LoggerInterface, info: LogInfo) -> Logger:
# Use 'operation_name' instead of 'name' as key because the stdlib LogRecord internally reserved 'name' parameter
extra["operationName"] = info.name
if info.attempt is not None:
extra["attempt"] = info.attempt + 1
extra["attempt"] = info.attempt
if info.operation_id:
extra["operationId"] = info.operation_id
return cls(
Expand Down
5 changes: 3 additions & 2 deletions src/aws_durable_execution_sdk_python/operation/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ def execute(self, checkpointed_result: CheckpointedResult) -> T:
ExecutionError: For fatal errors that should not be retried
May raise other exceptions that will be handled by retry_handler
"""
attempt: int = 0
# Get current attempt - checkpointed attempts + 1
attempt: int = 1
if checkpointed_result.operation and checkpointed_result.operation.step_details:
attempt = checkpointed_result.operation.step_details.attempt
attempt = checkpointed_result.operation.step_details.attempt + 1

step_context: StepContext = StepContext(
logger=self.context_logger.with_log_info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ def execute(self, checkpointed_result: CheckpointedResult) -> T:
else:
current_state = self.config.initial_state

# Get attempt number
# Get attempt number - current attempt is checkpointed attempts + 1
# The checkpoint stores completed attempts, so the current attempt being executed is one more
attempt: int = 1
if checkpointed_result.operation and checkpointed_result.operation.step_details:
attempt = checkpointed_result.operation.step_details.attempt
attempt = checkpointed_result.operation.step_details.attempt + 1

try:
# Execute the check function with the injected logger
Expand Down
4 changes: 2 additions & 2 deletions tests/logger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def test_logger_from_log_info_full():
"parentId": "parent123",
"operationId": "op123",
"operationName": "test_name",
"attempt": 6,
"attempt": 5,
}
assert logger._default_extra == expected_extra # noqa: SLF001
assert logger._logger is mock_logger # noqa: SLF001
Expand All @@ -202,7 +202,7 @@ def test_logger_from_log_info_partial_fields():
# Test with attempt but no parent_id or name
log_info = LogInfo(EXECUTION_STATE, None, None, None, 5)
logger = Logger.from_log_info(mock_logger, log_info)
expected_extra = {"executionArn": "arn:aws:test", "attempt": 6}
expected_extra = {"executionArn": "arn:aws:test", "attempt": 5}
assert logger._default_extra == expected_extra # noqa: SLF001


Expand Down
122 changes: 121 additions & 1 deletion tests/operation/wait_for_condition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,127 @@ def wait_strategy(state, attempt):
context_logger=mock_logger,
)

assert captured_attempt == 3
assert captured_attempt == 4


def test_wait_for_condition_attempt_sequence_is_monotonic():
"""Test that attempt numbers form a monotonically increasing sequence: 1, 2, 3, 4...

This test validates the fix for the attempt counting bug where:
- First execution (no checkpoint): attempt = 1
- After first retry (checkpoint.attempt = 1): attempt = 2
- After second retry (checkpoint.attempt = 2): attempt = 3
- After third retry (checkpoint.attempt = 3): attempt = 4

The current attempt should always be: checkpointed_attempts + 1
"""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "arn:aws:test"

mock_logger = Mock(spec=Logger)
mock_logger.with_log_info.return_value = mock_logger

op_id = OperationIdentifier("op1", None, "test_wait")

def check_func(state, context):
return state + 1

captured_attempts = []

def wait_strategy(state, attempt):
captured_attempts.append(attempt)
return WaitForConditionDecision.stop_polling()

config = WaitForConditionConfig(initial_state=5, wait_strategy=wait_strategy)

# Test 1: First execution (no checkpoint exists)
mock_state.get_checkpoint_result.return_value = (
CheckpointedResult.create_not_found()
)

wait_for_condition_handler(
state=mock_state,
operation_identifier=op_id,
check=check_func,
config=config,
context_logger=mock_logger,
)

assert captured_attempts[-1] == 1, "First execution should have attempt=1"

# Test 2: After first retry (checkpoint has attempt=1)
operation = Operation(
operation_id="op1",
operation_type=OperationType.STEP,
status=OperationStatus.STARTED,
step_details=StepDetails(result=json.dumps(10), attempt=1),
)
mock_result = CheckpointedResult.create_from_operation(operation)
mock_state.get_checkpoint_result.return_value = mock_result

wait_for_condition_handler(
state=mock_state,
operation_identifier=op_id,
check=check_func,
config=config,
context_logger=mock_logger,
)

assert (
captured_attempts[-1] == 2
), "After first retry (checkpoint.attempt=1), current attempt should be 2"

# Test 3: After second retry (checkpoint has attempt=2)
operation = Operation(
operation_id="op1",
operation_type=OperationType.STEP,
status=OperationStatus.STARTED,
step_details=StepDetails(result=json.dumps(10), attempt=2),
)
mock_result = CheckpointedResult.create_from_operation(operation)
mock_state.get_checkpoint_result.return_value = mock_result

wait_for_condition_handler(
state=mock_state,
operation_identifier=op_id,
check=check_func,
config=config,
context_logger=mock_logger,
)

assert (
captured_attempts[-1] == 3
), "After second retry (checkpoint.attempt=2), current attempt should be 3"

# Test 4: After third retry (checkpoint has attempt=3)
operation = Operation(
operation_id="op1",
operation_type=OperationType.STEP,
status=OperationStatus.STARTED,
step_details=StepDetails(result=json.dumps(10), attempt=3),
)
mock_result = CheckpointedResult.create_from_operation(operation)
mock_state.get_checkpoint_result.return_value = mock_result

wait_for_condition_handler(
state=mock_state,
operation_identifier=op_id,
check=check_func,
config=config,
context_logger=mock_logger,
)

assert (
captured_attempts[-1] == 4
), "After third retry (checkpoint.attempt=3), current attempt should be 4"

# Verify the complete sequence is monotonically increasing
assert captured_attempts == [
1,
2,
3,
4,
], f"Expected [1, 2, 3, 4] but got {captured_attempts}"


def test_wait_for_condition_state_passed_to_strategy():
Expand Down
Loading