-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdecorators.py
More file actions
280 lines (225 loc) · 9.91 KB
/
decorators.py
File metadata and controls
280 lines (225 loc) · 9.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# SPDX-FileCopyrightText: 2026 The Botanu Authors
# SPDX-License-Identifier: Apache-2.0
"""Decorators for automatic run span creation and context propagation.
The ``@botanu_use_case`` decorator is the primary integration point.
It creates a "run span" that:
- Generates a UUIDv7 run_id
- Emits ``run.started`` and ``run.completed`` events
- Propagates run context via W3C Baggage
- Records outcome at completion
"""
from __future__ import annotations
import functools
import hashlib
import inspect
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Optional, TypeVar, Union
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode
from botanu.models.run_context import RunContext, RunStatus
from botanu.sdk.context import get_baggage, set_baggage
T = TypeVar("T")
tracer = trace.get_tracer("botanu_sdk")
def _compute_workflow_version(func: Callable[..., Any]) -> str:
try:
source = inspect.getsource(func)
code_hash = hashlib.sha256(source.encode()).hexdigest()
return f"v:{code_hash[:12]}"
except (OSError, TypeError):
return "v:unknown"
def _get_parent_run_id() -> Optional[str]:
return get_baggage("botanu.run_id")
def botanu_use_case(
name: str,
workflow: Optional[str] = None,
*,
environment: Optional[str] = None,
tenant_id: Optional[str] = None,
auto_outcome_on_success: bool = True,
span_kind: SpanKind = SpanKind.SERVER,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""Decorator to create a run span with automatic context propagation.
This is the primary integration point. It:
1. Creates a UUIDv7 ``run_id`` (sortable, globally unique)
2. Creates a ``botanu.run`` span as the root of the run
3. Emits ``run.started`` event
4. Propagates run context via W3C Baggage
5. On completion: emits ``run.completed`` event with outcome
Args:
name: Use case name (low cardinality, e.g. ``"Customer Support"``).
workflow: Workflow name (defaults to function qualified name).
environment: Deployment environment.
tenant_id: Tenant identifier for multi-tenant apps.
auto_outcome_on_success: Emit ``"success"`` if no exception.
span_kind: OpenTelemetry span kind (default: ``SERVER``).
Example::
@botanu_use_case("Customer Support")
async def handle_ticket(ticket_id: str):
result = await process_ticket(ticket_id)
emit_outcome("success", value_type="tickets_resolved", value_amount=1)
return result
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
workflow_name = workflow or func.__qualname__
workflow_version = _compute_workflow_version(func)
is_async = inspect.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> T:
parent_run_id = _get_parent_run_id()
run_ctx = RunContext.create(
use_case=name,
workflow=workflow_name,
workflow_version=workflow_version,
environment=environment,
tenant_id=tenant_id,
parent_run_id=parent_run_id,
)
with tracer.start_as_current_span(
name=f"botanu.run/{name}",
kind=span_kind,
) as span:
for key, value in run_ctx.to_span_attributes().items():
span.set_attribute(key, value)
span.add_event(
"botanu.run.started",
attributes={
"run_id": run_ctx.run_id,
"use_case": run_ctx.use_case,
"workflow": workflow_name,
},
)
for key, value in run_ctx.to_baggage_dict().items():
set_baggage(key, value)
try:
result = await func(*args, **kwargs)
span_attrs = getattr(span, "attributes", None)
existing_outcome = span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, dict) else None
if existing_outcome is None and auto_outcome_on_success:
run_ctx.complete(RunStatus.SUCCESS)
span.set_status(Status(StatusCode.OK))
_emit_run_completed(span, run_ctx, RunStatus.SUCCESS)
return result
except Exception as exc:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
run_ctx.complete(RunStatus.FAILURE, error_class=exc.__class__.__name__)
_emit_run_completed(
span,
run_ctx,
RunStatus.FAILURE,
error_class=exc.__class__.__name__,
)
raise
@functools.wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> T:
parent_run_id = _get_parent_run_id()
run_ctx = RunContext.create(
use_case=name,
workflow=workflow_name,
workflow_version=workflow_version,
environment=environment,
tenant_id=tenant_id,
parent_run_id=parent_run_id,
)
with tracer.start_as_current_span(
name=f"botanu.run/{name}",
kind=span_kind,
) as span:
for key, value in run_ctx.to_span_attributes().items():
span.set_attribute(key, value)
span.add_event(
"botanu.run.started",
attributes={
"run_id": run_ctx.run_id,
"use_case": run_ctx.use_case,
"workflow": workflow_name,
},
)
for key, value in run_ctx.to_baggage_dict().items():
set_baggage(key, value)
try:
result = func(*args, **kwargs)
span_attrs = getattr(span, "attributes", None)
existing_outcome = span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, dict) else None
if existing_outcome is None and auto_outcome_on_success:
run_ctx.complete(RunStatus.SUCCESS)
span.set_status(Status(StatusCode.OK))
_emit_run_completed(span, run_ctx, RunStatus.SUCCESS)
return result
except Exception as exc:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
run_ctx.complete(RunStatus.FAILURE, error_class=exc.__class__.__name__)
_emit_run_completed(
span,
run_ctx,
RunStatus.FAILURE,
error_class=exc.__class__.__name__,
)
raise
if is_async:
return async_wrapper # type: ignore[return-value]
return sync_wrapper # type: ignore[return-value]
return decorator
def _emit_run_completed(
span: trace.Span,
run_ctx: RunContext,
status: RunStatus,
error_class: Optional[str] = None,
) -> None:
duration_ms = (datetime.now(timezone.utc) - run_ctx.start_time).total_seconds() * 1000
event_attrs: Dict[str, Union[str, float]] = {
"run_id": run_ctx.run_id,
"use_case": run_ctx.use_case,
"status": status.value,
"duration_ms": duration_ms,
}
if error_class:
event_attrs["error_class"] = error_class
if run_ctx.outcome and run_ctx.outcome.value_type:
event_attrs["value_type"] = run_ctx.outcome.value_type
if run_ctx.outcome and run_ctx.outcome.value_amount is not None:
event_attrs["value_amount"] = run_ctx.outcome.value_amount
span.add_event("botanu.run.completed", attributes=event_attrs)
span.set_attribute("botanu.outcome.status", status.value)
span.set_attribute("botanu.run.duration_ms", duration_ms)
# Alias
use_case = botanu_use_case
def botanu_outcome(
success: Optional[str] = None,
partial: Optional[str] = None,
failed: Optional[str] = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""Decorator to automatically emit outcomes based on function result.
This is a convenience decorator for sub-functions within a use case.
It does NOT create a new run — use ``@botanu_use_case`` for that.
"""
from botanu.sdk.span_helpers import emit_outcome
def decorator(func: Callable[..., T]) -> Callable[..., T]:
is_async = inspect.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> T:
try:
result = await func(*args, **kwargs)
span = trace.get_current_span()
if not span.attributes or "botanu.outcome.status" not in span.attributes:
emit_outcome("success")
return result
except Exception as exc:
emit_outcome("failed", reason=exc.__class__.__name__)
raise
@functools.wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> T:
try:
result = func(*args, **kwargs)
span = trace.get_current_span()
if not span.attributes or "botanu.outcome.status" not in span.attributes:
emit_outcome("success")
return result
except Exception as exc:
emit_outcome("failed", reason=exc.__class__.__name__)
raise
if is_async:
return async_wrapper # type: ignore[return-value]
return sync_wrapper # type: ignore[return-value]
return decorator