feat: add per-context replay status tracking#393
Conversation
| ) | ||
|
|
||
| def peek_next_step_id(self): | ||
| next_step = self._operation_counter.get_current() + 1 |
There was a problem hiding this comment.
Do we need to look at _virtual_operation_counter?
There was a problem hiding this comment.
This method is used to determine the replay status. _virtual_operation_counter is not used for that.
There was a problem hiding this comment.
If next operation is a virtual operation then what would self._operation_counter.get_current() + 1 return?
There was a problem hiding this comment.
it returns the id of the next "real" operation
| This allows us to recover operation ids or even look | ||
| forward without changing the internal state of this context. | ||
| """ | ||
| parts = [self._step_id_prefix, "v" if is_virtual else None, step] |
There was a problem hiding this comment.
Isn't this a breaking change?
There was a problem hiding this comment.
How SDK generates the operation id is internal to the SDK. Users will only see a hash value.
There was a problem hiding this comment.
I think we still want deterministic Ids AND unique Id for all virtual operations. With the current proposal, if there are multiple virtual child context between "real" steps, wouldn't they all have the same ID?
[UPDATE] sorry I misunderstood the counter. Currently, a virtual child context in the user context (pre hashing) would look like the following:
v-1
v-2
1
2
v-3
3-v-1
The only thing that bugs me here is that the virtual id doesn't preserve order within the context as a whole, but I guess it doesn't really matter in python since it's all hashed anyway.
| ).increment() | ||
| return self.create_step_id_for_logical_step(new_counter, is_virtual=is_virtual) | ||
|
|
||
| def create_step_id_for_logical_step(self, step: int, is_virtual: bool) -> str: |
There was a problem hiding this comment.
nit: personally I think it would have been better to keep the virtual child context id tracking in a separate PR
| executor_context._create_step_id_for_logical_step = lambda *args: "1" # noqa SLF001 | ||
| executor_context._operation_id_generator.create_step_id_for_logical_step = ( | ||
| lambda *args: "1" | ||
| ) # noqa SLF001 |
There was a problem hiding this comment.
nit: interesting pattern, since it looks like we have to override the linter, I do wonder if there are other (preferred) testing patterns.
| mock_state.get_checkpoint_result.assert_called_with( | ||
| "1ced8f5be2db23a6513eba4d819c73806424748a7bc6fa0d792cc1c7d1775a97" | ||
| ) | ||
|
|
There was a problem hiding this comment.
nit: are we missing new test asserting the behaviour of _track_replay() with a virtual child context as the next operation?
|
Abandoned in favor of a backend enhancement to help SDK track the updated operation during the execution is suspended. That would be nicer than whatever SDK can use to guess that. |
| ), | ||
| context_logger=self.logger, | ||
| ) | ||
| self._track_replay() |
There was a problem hiding this comment.
_track_replay() runs after create_step_id() increments the counter, so peek_next_step_id resolves to N+1 instead of N. The check asks "is the op after the current one checkpointed?" rather than "is the current one?" JS does the inverse order on purpose (checkAndUpdateReplayMode runs before createStepId)
Same pattern for the other operations.
This will result in is_replaying being incorrect during the last replayed op's processing window.
No customer-observable effect today because the only internal reader of the flag is Logger._should_log.
|
|
||
| def _should_log(self) -> bool: | ||
| return not self._execution_state.is_replaying() | ||
| return not self._context.is_replaying |
There was a problem hiding this comment.
Logger holds a DurableContext reference, and with_log_info (line 83) preserves it across Logger.from_log_info(..., context=self._context).
When create_child_context builds a child logger via parent_logger.with_log_info(...), the child's _context is permanently the parent. _should_log reads not parent.is_replaying, never the child's own flag.
This means this PR doesn't actually fix what it intends to, which is to say it keeps the broken behaviour where FLAT parallel/map branches log identically to each other, gated on the parent context's flag. Previously this happened via the global state.is_replaying().
| self._replay_status = ReplayStatus.NEW | ||
| def transition_replay_status(self) -> None: | ||
| """Transition to NEW status""" | ||
| if self._replay_status is ReplayStatus.REPLAY: |
There was a problem hiding this comment.
Shouldn't this be under a lock? Two threads can both pass the if, both serially acquire the lock, both log "Transitioning…", both reassign.
since _track_replay is now called per-context-per-op this could potentially happen more frequently too.
Issue #, if available:
Fixes #389
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.