Skip to content

Commit c58e381

Browse files
Alex Wangwangyb-A
authored andcommitted
feat: Implement json serdes for Operation
- Implement json compatiable serdes for Operation and InvocationInput class
1 parent afd4083 commit c58e381

4 files changed

Lines changed: 1264 additions & 3 deletions

File tree

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ def from_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
5959
next_marker=input_dict.get("NextMarker", ""),
6060
)
6161

62+
@staticmethod
63+
def from_json_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
64+
operations = []
65+
if input_operations := input_dict.get("Operations"):
66+
operations = [Operation.from_json_dict(op) for op in input_operations]
67+
return InitialExecutionState(
68+
operations=operations,
69+
next_marker=input_dict.get("NextMarker", ""),
70+
)
71+
6272
def get_execution_operation(self) -> Operation | None:
6373
if not self.operations:
6474
# Due to payload size limitations we may have an empty operations list.
@@ -91,6 +101,12 @@ def to_dict(self) -> MutableMapping[str, Any]:
91101
"NextMarker": self.next_marker,
92102
}
93103

104+
def to_json_dict(self) -> MutableMapping[str, Any]:
105+
return {
106+
"Operations": [op.to_json_dict() for op in self.operations],
107+
"NextMarker": self.next_marker,
108+
}
109+
94110

95111
@dataclass(frozen=True)
96112
class DurableExecutionInvocationInput:
@@ -110,13 +126,32 @@ def from_dict(
110126
),
111127
)
112128

129+
@staticmethod
130+
def from_json_dict(
131+
input_dict: MutableMapping[str, Any],
132+
) -> DurableExecutionInvocationInput:
133+
return DurableExecutionInvocationInput(
134+
durable_execution_arn=input_dict["DurableExecutionArn"],
135+
checkpoint_token=input_dict["CheckpointToken"],
136+
initial_execution_state=InitialExecutionState.from_json_dict(
137+
input_dict.get("InitialExecutionState", {})
138+
),
139+
)
140+
113141
def to_dict(self) -> MutableMapping[str, Any]:
114142
return {
115143
"DurableExecutionArn": self.durable_execution_arn,
116144
"CheckpointToken": self.checkpoint_token,
117145
"InitialExecutionState": self.initial_execution_state.to_dict(),
118146
}
119147

148+
def to_json_dict(self) -> MutableMapping[str, Any]:
149+
return {
150+
"DurableExecutionArn": self.durable_execution_arn,
151+
"CheckpointToken": self.checkpoint_token,
152+
"InitialExecutionState": self.initial_execution_state.to_json_dict(),
153+
}
154+
120155

121156
@dataclass(frozen=True)
122157
class DurableExecutionInvocationInputWithClient(DurableExecutionInvocationInput):

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import copy
34
import datetime
45
import logging
56
from dataclasses import dataclass, field
@@ -692,6 +693,24 @@ def create_wait_start(
692693
# endregion wait
693694

694695

696+
class TimestampConverter:
697+
"""Converter for datetime/Unix timestamp conversions."""
698+
699+
@staticmethod
700+
def to_unix_millis(dt: datetime.datetime | None) -> int | None:
701+
"""Convert datetime to Unix timestamp in milliseconds."""
702+
return int(dt.timestamp() * 1000) if dt else None
703+
704+
@staticmethod
705+
def from_unix_millis(ms: int | None) -> datetime.datetime | None:
706+
"""Convert Unix timestamp in milliseconds to datetime."""
707+
return (
708+
datetime.datetime.fromtimestamp(ms / 1000, tz=datetime.UTC)
709+
if ms is not None
710+
else None
711+
)
712+
713+
695714
@dataclass(frozen=True)
696715
class Operation:
697716
"""Represent the Operation type for GetDurableExecutionState and CheckpointDurableExecution."""
@@ -805,9 +824,11 @@ def to_dict(self) -> MutableMapping[str, Any]:
805824
step_dict["Error"] = self.step_details.error.to_dict()
806825
result["StepDetails"] = step_dict
807826
if self.wait_details:
808-
result["WaitDetails"] = {
809-
"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp
810-
}
827+
result["WaitDetails"] = (
828+
{"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp}
829+
if self.wait_details.scheduled_end_timestamp
830+
else {}
831+
)
811832
if self.callback_details:
812833
callback_dict: MutableMapping[str, Any] = {
813834
"CallbackId": self.callback_details.callback_id
@@ -826,6 +847,79 @@ def to_dict(self) -> MutableMapping[str, Any]:
826847
result["ChainedInvokeDetails"] = invoke_dict
827848
return result
828849

850+
def to_json_dict(self) -> MutableMapping[str, Any]:
851+
"""Convert the Operation to a JSON-serializable dictionary.
852+
853+
Converts datetime objects to millisecond timestamps for JSON compatibility.
854+
855+
Returns:
856+
A dictionary with JSON-serializable values
857+
"""
858+
# Start with the regular to_dict output
859+
result = self.to_dict()
860+
861+
# Convert datetime objects to millisecond timestamps
862+
if ts := result.get("StartTimestamp"):
863+
result["StartTimestamp"] = TimestampConverter.to_unix_millis(ts)
864+
865+
if ts := result.get("EndTimestamp"):
866+
result["EndTimestamp"] = TimestampConverter.to_unix_millis(ts)
867+
868+
if (step_details := result.get("StepDetails")) and (
869+
ts := step_details.get("NextAttemptTimestamp")
870+
):
871+
result["StepDetails"]["NextAttemptTimestamp"] = (
872+
TimestampConverter.to_unix_millis(ts)
873+
)
874+
875+
if (wait_details := result.get("WaitDetails")) and (
876+
ts := wait_details.get("ScheduledEndTimestamp")
877+
):
878+
result["WaitDetails"]["ScheduledEndTimestamp"] = (
879+
TimestampConverter.to_unix_millis(ts)
880+
)
881+
882+
return result
883+
884+
@classmethod
885+
def from_json_dict(cls, data: MutableMapping[str, Any]) -> Operation:
886+
"""Create an Operation from a JSON-serializable dictionary.
887+
888+
Converts millisecond timestamps back to datetime objects.
889+
890+
Args:
891+
data: Dictionary with JSON-serializable values (millisecond timestamps)
892+
893+
Returns:
894+
An Operation instance with datetime objects
895+
"""
896+
# Make a copy to avoid modifying the original data
897+
data_copy = copy.deepcopy(data)
898+
899+
# Convert millisecond timestamps back to datetime objects
900+
if ms := data_copy.get("StartTimestamp"):
901+
data_copy["StartTimestamp"] = TimestampConverter.from_unix_millis(ms)
902+
903+
if ms := data_copy.get("EndTimestamp"):
904+
data_copy["EndTimestamp"] = TimestampConverter.from_unix_millis(ms)
905+
906+
if (step_details := data_copy.get("StepDetails")) and (
907+
ms := step_details.get("NextAttemptTimestamp")
908+
):
909+
step_details["NextAttemptTimestamp"] = TimestampConverter.from_unix_millis(
910+
ms
911+
)
912+
913+
if (wait_details := data_copy.get("WaitDetails")) and (
914+
ms := wait_details.get("ScheduledEndTimestamp")
915+
):
916+
wait_details["ScheduledEndTimestamp"] = TimestampConverter.from_unix_millis(
917+
ms
918+
)
919+
920+
# Use the existing from_dict method with the converted data
921+
return cls.from_dict(data_copy)
922+
829923

830924
@dataclass(frozen=True)
831925
class CheckpointUpdatedExecutionState:

0 commit comments

Comments
 (0)