forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_dts_activity_sequence.py
More file actions
69 lines (54 loc) · 2.73 KB
/
test_dts_activity_sequence.py
File metadata and controls
69 lines (54 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
import pytest
from durabletask import client, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
pytestmark = pytest.mark.dts
def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'
def sequence(ctx: task.OrchestrationContext, _):
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
# call "hello" activity function in a sequence
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')
# return an array of results
return [result1, result2, result3]
# Read the environment variable
taskhub_name = os.getenv("TASKHUB")
# Check if the variable exists
if taskhub_name:
print(f"The value of TASKHUB is: {taskhub_name}")
else:
print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use")
print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"")
print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"")
exit()
# Read the environment variable
endpoint = os.getenv("ENDPOINT")
# Check if the variable exists
if endpoint:
print(f"The value of ENDPOINT is: {endpoint}")
else:
print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler")
print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<schedulerEndpoint>\"")
print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"")
exit()
# configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None) as w:
w.add_orchestrator(sequence)
w.add_activity(hello)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None)
instance_id = c.schedule_new_orchestration(sequence)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')