-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathtask_context_example.py
More file actions
287 lines (232 loc) · 8.01 KB
/
task_context_example.py
File metadata and controls
287 lines (232 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
Task Context Example
Demonstrates how to use TaskContext to access task information and modify
task results during execution.
The TaskContext provides:
- Access to task metadata (task_id, workflow_id, retry_count, etc.)
- Ability to add logs visible in Conductor UI
- Ability to set callback delays for polling/retry patterns
- Access to input parameters
Run:
python examples/task_context_example.py
"""
import asyncio
import signal
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.context.task_context import get_task_context
from conductor.client.worker.worker_task import worker_task
# Example 1: Basic TaskContext usage - accessing task info
@worker_task(
task_definition_name='task_info_example',
thread_count=5
)
def task_info_example(data: dict) -> dict:
"""
Demonstrates accessing task information via TaskContext.
"""
# Get the current task context
ctx = get_task_context()
# Access task information
task_id = ctx.get_task_id()
workflow_id = ctx.get_workflow_instance_id()
retry_count = ctx.get_retry_count()
poll_count = ctx.get_poll_count()
print(f"Task ID: {task_id}")
print(f"Workflow ID: {workflow_id}")
print(f"Retry Count: {retry_count}")
print(f"Poll Count: {poll_count}")
return {
"task_id": task_id,
"workflow_id": workflow_id,
"retry_count": retry_count,
"result": "processed"
}
# Example 2: Adding logs via TaskContext
@worker_task(
task_definition_name='logging_example',
thread_count=5
)
async def logging_example(order_id: str, items: list) -> dict:
"""
Demonstrates adding logs that will be visible in Conductor UI.
"""
ctx = get_task_context()
# Add logs as processing progresses
ctx.add_log(f"Starting to process order {order_id}")
ctx.add_log(f"Order has {len(items)} items")
for i, item in enumerate(items):
await asyncio.sleep(0.1) # Simulate processing
ctx.add_log(f"Processed item {i+1}/{len(items)}: {item}")
ctx.add_log("Order processing completed")
return {
"order_id": order_id,
"items_processed": len(items),
"status": "completed"
}
# Example 3: Callback pattern - polling external service
@worker_task(
task_definition_name='polling_example',
thread_count=10
)
async def polling_example(job_id: str) -> dict:
"""
Demonstrates using callback_after for polling pattern.
The task will check if a job is complete, and if not, set a callback
to check again in 30 seconds.
"""
ctx = get_task_context()
ctx.add_log(f"Checking status of job {job_id}")
# Simulate checking external service
import random
is_complete = random.random() > 0.7 # 30% chance of completion
if is_complete:
ctx.add_log(f"Job {job_id} is complete!")
return {
"job_id": job_id,
"status": "completed",
"result": "Job finished successfully"
}
else:
# Job still running - poll again in 30 seconds
ctx.add_log(f"Job {job_id} still running, will check again in 30s")
ctx.set_callback_after(30)
return {
"job_id": job_id,
"status": "in_progress",
"message": "Job still running"
}
# Example 4: Retry logic with context awareness
@worker_task(
task_definition_name='retry_aware_example',
thread_count=5
)
def retry_aware_example(operation: str) -> dict:
"""
Demonstrates handling retries differently based on retry count.
"""
ctx = get_task_context()
retry_count = ctx.get_retry_count()
if retry_count > 0:
ctx.add_log(f"This is retry attempt #{retry_count}")
# Could implement exponential backoff, different logic, etc.
ctx.add_log(f"Executing operation: {operation}")
# Simulate operation
import random
success = random.random() > 0.3
if success:
ctx.add_log("Operation succeeded")
return {"status": "success", "operation": operation}
else:
ctx.add_log("Operation failed, will retry")
raise Exception("Operation failed")
# Example 5: Combining context with async operations
@worker_task(
task_definition_name='async_context_example',
thread_count=10
)
async def async_context_example(urls: list) -> dict:
"""
Demonstrates using TaskContext in async worker with concurrent operations.
"""
ctx = get_task_context()
ctx.add_log(f"Starting to fetch {len(urls)} URLs")
ctx.add_log(f"Task ID: {ctx.get_task_id()}")
results = []
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
for i, url in enumerate(urls):
ctx.add_log(f"Fetching URL {i+1}/{len(urls)}: {url}")
try:
response = await client.get(url)
results.append({
"url": url,
"status": response.status_code,
"success": True
})
ctx.add_log(f"✓ {url} - {response.status_code}")
except Exception as e:
results.append({
"url": url,
"error": str(e),
"success": False
})
ctx.add_log(f"✗ {url} - Error: {e}")
except Exception as e:
ctx.add_log(f"Fatal error: {e}")
raise
ctx.add_log(f"Completed fetching {len(results)} URLs")
return {
"total": len(urls),
"successful": sum(1 for r in results if r.get("success")),
"results": results
}
# Example 6: Accessing input parameters via context
@worker_task(
task_definition_name='input_access_example',
thread_count=5
)
def input_access_example() -> dict:
"""
Demonstrates accessing task input via context.
This is useful when you want to access raw input data or when
using dynamic parameter inspection.
"""
ctx = get_task_context()
# Get all input parameters
input_data = ctx.get_input()
ctx.add_log(f"Received input parameters: {list(input_data.keys())}")
# Process based on input
for key, value in input_data.items():
ctx.add_log(f" {key} = {value}")
return {
"processed_keys": list(input_data.keys()),
"input_count": len(input_data)
}
def main():
"""
Main entry point demonstrating TaskContext examples.
"""
api_config = Configuration()
print("=" * 60)
print("Conductor TaskContext Examples")
print("=" * 60)
print(f"Server: {api_config.host}")
print()
print("Workers demonstrating TaskContext usage:")
print(" • task_info_example - Access task metadata")
print(" • logging_example - Add logs to task")
print(" • polling_example - Use callback_after for polling")
print(" • retry_aware_example - Handle retries intelligently")
print(" • async_context_example - TaskContext in async workers")
print(" • input_access_example - Access task input via context")
print()
print("Key TaskContext Features:")
print(" ✓ Access task metadata (ID, workflow ID, retry count)")
print(" ✓ Add logs visible in Conductor UI")
print(" ✓ Set callback delays for polling patterns")
print(" ✓ Thread-safe and async-safe (uses contextvars)")
print("=" * 60)
print("\nStarting workers... Press Ctrl+C to stop\n")
try:
with TaskHandler(
configuration=api_config,
scan_for_annotated_workers=True
) as task_handler:
task_handler.start_processes()
task_handler.join_processes()
except KeyboardInterrupt:
print("\n\nShutting down gracefully...")
except Exception as e:
print(f"\n\nError: {e}")
raise
print("\nWorkers stopped. Goodbye!")
if __name__ == '__main__':
"""
Run the TaskContext examples.
"""
try:
main()
except KeyboardInterrupt:
pass