Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
ADDED

- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
- Improved distributed tracing support with full span coverage for orchestrations, activities, sub-orchestrations, timers, and events

FIXED:

Expand Down
50 changes: 29 additions & 21 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
import durabletask.internal.tracing as tracing
from durabletask import task
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl

Expand Down Expand Up @@ -169,20 +170,26 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
version: Optional[str] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
resolved_version = version if version else self.default_version

with tracing.start_create_orchestration_span(
name, resolved_instance_id, version=resolved_version,
):
req = pb.CreateInstanceRequest(
name=name,
instanceId=resolved_instance_id,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(resolved_version),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags,
parentTraceContext=tracing.get_current_trace_context(),
)

req = pb.CreateInstanceRequest(
name=name,
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(version if version else self.default_version),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
Expand Down Expand Up @@ -269,14 +276,15 @@ def wait_for_orchestration_completion(self, instance_id: str, *,

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
)
with tracing.start_raise_event_span(event_name, instance_id):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Optional[Any] = None,
Expand Down Expand Up @@ -355,7 +363,7 @@ def signal_entity(self,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
requestId=str(uuid.uuid4()),
scheduledTime=None,
parentTraceContext=None,
parentTraceContext=tracing.get_current_trace_context(),
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
Expand Down
12 changes: 8 additions & 4 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,13 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction


def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
tags: Optional[dict[str, str]],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input),
tags=tags
tags=tags,
parentTraceContext=parent_trace_context,
))


Expand Down Expand Up @@ -302,12 +304,14 @@ def new_create_sub_orchestration_action(
name: str,
instance_id: Optional[str],
encoded_input: Optional[str],
version: Optional[str]) -> pb.OrchestratorAction:
version: Optional[str],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
input=get_string_value(encoded_input),
version=get_string_value(version)
version=get_string_value(version),
parentTraceContext=parent_trace_context,
))


Expand Down
Loading