Skip to content

Commit 425d4bc

Browse files
fix: linear accumulation of span processors across job runs (#1487)
1 parent 77f40df commit 425d4bc

6 files changed

Lines changed: 271 additions & 13 deletions

File tree

packages/uipath-core/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-core"
3-
version = "0.5.7"
3+
version = "0.5.8"
44
description = "UiPath Core abstractions"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

packages/uipath-core/src/uipath/core/tracing/trace_manager.py

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from __future__ import annotations
44

55
import contextlib
6-
from typing import Any, Generator, Optional
6+
import logging
7+
import threading
8+
from typing import Any, ClassVar, Generator, Optional
79

10+
from opentelemetry import context as context_api
811
from opentelemetry import trace
9-
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, TracerProvider
12+
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor, TracerProvider
1013
from opentelemetry.sdk.trace.export import SpanExporter
1114
from opentelemetry.util._decorator import _AgnosticContextManager
1215

@@ -17,22 +20,85 @@
1720
)
1821
from uipath.core.tracing.types import UiPathTraceSettings
1922

23+
logger = logging.getLogger(__name__)
24+
25+
26+
class _DelegatingSpanProcessor(SpanProcessor):
27+
"""A span processor that delegates to a mutable list of children.
28+
29+
Registered once on the global TracerProvider. Children can be added
30+
and cleared between jobs without touching the provider's internal state.
31+
32+
WORKAROUND: This exists because OTel's global TracerProvider is set once
33+
and has no public API to remove span processors. When a pod runs multiple
34+
jobs, each job creates a new UiPathTraceManager that adds processors to
35+
the same provider, causing linear accumulation of HTTP calls.
36+
The proper fix is for cli_server to create the TracerProvider and
37+
trace manager once, and register processors only at startup rather than
38+
per-job.
39+
"""
40+
41+
_instance: ClassVar[_DelegatingSpanProcessor | None] = None
42+
_init_lock: ClassVar[threading.Lock] = threading.Lock()
43+
44+
def __init__(self) -> None:
45+
self._processors: list[SpanProcessor] = []
46+
47+
@classmethod
48+
def get_instance(cls, provider: TracerProvider) -> _DelegatingSpanProcessor:
49+
"""Get or create the singleton, registering it on the provider once."""
50+
if cls._instance is None:
51+
with cls._init_lock:
52+
if cls._instance is None:
53+
cls._instance = cls()
54+
provider.add_span_processor(cls._instance)
55+
return cls._instance
56+
57+
def add(self, processor: SpanProcessor) -> None:
58+
"""Add a child processor."""
59+
self._processors.append(processor)
60+
61+
def clear(self) -> list[SpanProcessor]:
62+
"""Remove and return all child processors."""
63+
removed = self._processors.copy()
64+
self._processors.clear()
65+
return removed
66+
67+
def on_start(
68+
self, span: Span, parent_context: context_api.Context | None = None
69+
) -> None:
70+
for p in self._processors:
71+
p.on_start(span, parent_context)
72+
73+
def on_end(self, span: ReadableSpan) -> None:
74+
for p in self._processors:
75+
p.on_end(span)
76+
77+
def force_flush(self, timeout_millis: int = 30000) -> bool:
78+
return all(p.force_flush(timeout_millis) for p in self._processors)
79+
80+
def shutdown(self) -> None:
81+
for p in self._processors:
82+
p.shutdown()
83+
2084

2185
class UiPathTraceManager:
2286
"""Trace manager.
2387
24-
NOTE: Instantiate trace manager only once.
88+
Uses a single delegating processor on the global TracerProvider so that
89+
child processors can be added and removed between jobs without accumulation.
2590
"""
2691

27-
def __init__(self):
92+
def __init__(self) -> None:
2893
"""Initialize a trace manager."""
2994
trace.set_tracer_provider(TracerProvider())
30-
# If a previous provider set, reuse it.
95+
# If a previous provider was already set, reuse it.
3196
current_provider = trace.get_tracer_provider()
3297
assert isinstance(current_provider, TracerProvider), (
3398
"An incompatible Otel TracerProvider was instantiated. Please check runtime configuration."
3499
)
35100
self.tracer_provider: TracerProvider = current_provider
101+
self._delegating = _DelegatingSpanProcessor.get_instance(current_provider)
36102
self.tracer_span_processors: list[SpanProcessor] = []
37103
self.execution_span_exporter = UiPathRuntimeExecutionSpanExporter()
38104
self.add_span_exporter(self.execution_span_exporter)
@@ -58,7 +124,7 @@ def add_span_exporter(
58124
span_exporter, settings
59125
)
60126
self.tracer_span_processors.append(span_processor)
61-
self.tracer_provider.add_span_processor(span_processor)
127+
self._delegating.add(span_processor)
62128
return self
63129

64130
def add_span_processor(
@@ -67,7 +133,7 @@ def add_span_processor(
67133
) -> UiPathTraceManager:
68134
"""Add a span processor to the tracer provider."""
69135
self.tracer_span_processors.append(span_processor)
70-
self.tracer_provider.add_span_processor(span_processor)
136+
self._delegating.add(span_processor)
71137
return self
72138

73139
def get_execution_spans(
@@ -104,5 +170,26 @@ def flush_spans(self) -> None:
104170
for span_processor in self.tracer_span_processors:
105171
span_processor.force_flush()
106172

173+
def shutdown(self) -> None:
174+
"""Flush, shutdown, and remove all span processors registered by this manager.
175+
176+
Removes child processors from the delegating processor so they no longer
177+
receive span events. Must be called between jobs when the same process
178+
handles multiple executions to prevent linear accumulation.
179+
"""
180+
for processor in self.tracer_span_processors:
181+
try:
182+
processor.force_flush()
183+
processor.shutdown()
184+
except Exception:
185+
logger.warning(
186+
"Failed to shutdown processor %s",
187+
type(processor).__name__,
188+
exc_info=True,
189+
)
190+
self._delegating.clear()
191+
self.tracer_span_processors.clear()
192+
self.execution_span_exporter.clear()
193+
107194

108195
__all__ = ["UiPathTraceManager"]

packages/uipath-core/tests/tracing/test_trace_manager.py

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
1-
"""Simple test for runtime factory and executor span capture."""
1+
"""Tests for UiPathTraceManager"""
2+
3+
from unittest.mock import MagicMock
24

35
import pytest
46
from opentelemetry import trace
7+
from opentelemetry.sdk.trace import SpanProcessor
8+
9+
from uipath.core.tracing.trace_manager import (
10+
UiPathTraceManager,
11+
_DelegatingSpanProcessor,
12+
)
13+
514

6-
from uipath.core.tracing.trace_manager import UiPathTraceManager
15+
@pytest.fixture(autouse=True)
16+
def _reset_delegating_singleton():
17+
"""Reset the singleton between tests so each test starts clean."""
18+
_DelegatingSpanProcessor._instance = None
19+
yield
20+
_DelegatingSpanProcessor._instance = None
721

822

923
@pytest.mark.asyncio
@@ -31,3 +45,160 @@ async def test_multiple_factories_same_executor():
3145

3246
assert spans[1].name == "root-span"
3347
assert spans[1].attributes == {"execution.id": "test"}
48+
49+
50+
class TestDelegatingSpanProcessor:
51+
"""Tests for _DelegatingSpanProcessor."""
52+
53+
def test_add_and_clear(self) -> None:
54+
dp = _DelegatingSpanProcessor()
55+
p1 = MagicMock(spec=SpanProcessor)
56+
p2 = MagicMock(spec=SpanProcessor)
57+
58+
dp.add(p1)
59+
dp.add(p2)
60+
assert len(dp._processors) == 2
61+
62+
removed = dp.clear()
63+
assert removed == [p1, p2]
64+
assert dp._processors == []
65+
66+
def test_delegates_on_start_and_on_end(self) -> None:
67+
dp = _DelegatingSpanProcessor()
68+
p1 = MagicMock(spec=SpanProcessor)
69+
p2 = MagicMock(spec=SpanProcessor)
70+
dp.add(p1)
71+
dp.add(p2)
72+
73+
mock_span = MagicMock()
74+
dp.on_start(mock_span, parent_context=None)
75+
p1.on_start.assert_called_once_with(mock_span, None)
76+
p2.on_start.assert_called_once_with(mock_span, None)
77+
78+
mock_readable_span = MagicMock()
79+
dp.on_end(mock_readable_span)
80+
p1.on_end.assert_called_once_with(mock_readable_span)
81+
p2.on_end.assert_called_once_with(mock_readable_span)
82+
83+
def test_clear_stops_delegation(self) -> None:
84+
dp = _DelegatingSpanProcessor()
85+
p1 = MagicMock(spec=SpanProcessor)
86+
dp.add(p1)
87+
dp.clear()
88+
89+
mock_span = MagicMock()
90+
dp.on_start(mock_span, parent_context=None)
91+
dp.on_end(mock_span)
92+
p1.on_start.assert_not_called()
93+
p1.on_end.assert_not_called()
94+
95+
def test_get_instance_returns_singleton(self) -> None:
96+
provider = MagicMock()
97+
inst1 = _DelegatingSpanProcessor.get_instance(provider)
98+
inst2 = _DelegatingSpanProcessor.get_instance(provider)
99+
assert inst1 is inst2
100+
provider.add_span_processor.assert_called_once_with(inst1)
101+
102+
def test_force_flush_delegates(self) -> None:
103+
dp = _DelegatingSpanProcessor()
104+
p1 = MagicMock(spec=SpanProcessor)
105+
dp.add(p1)
106+
107+
result = dp.force_flush(timeout_millis=5000)
108+
assert result is True
109+
p1.force_flush.assert_called_once_with(5000)
110+
111+
def test_force_flush_returns_false_on_child_failure(self) -> None:
112+
dp = _DelegatingSpanProcessor()
113+
p1 = MagicMock(spec=SpanProcessor)
114+
p2 = MagicMock(spec=SpanProcessor)
115+
p1.force_flush.return_value = True
116+
p2.force_flush.return_value = False
117+
dp.add(p1)
118+
dp.add(p2)
119+
120+
assert dp.force_flush(timeout_millis=5000) is False
121+
122+
def test_shutdown_delegates_to_children(self) -> None:
123+
dp = _DelegatingSpanProcessor()
124+
p1 = MagicMock(spec=SpanProcessor)
125+
p2 = MagicMock(spec=SpanProcessor)
126+
dp.add(p1)
127+
dp.add(p2)
128+
dp.shutdown()
129+
p1.shutdown.assert_called_once()
130+
p2.shutdown.assert_called_once()
131+
assert len(dp._processors) == 2
132+
133+
134+
class TestTraceManagerShutdown:
135+
"""Tests for the multi-job accumulation fix."""
136+
137+
def test_shutdown_clears_processors(self) -> None:
138+
tm = UiPathTraceManager()
139+
p1 = MagicMock(spec=SpanProcessor)
140+
tm.add_span_processor(p1)
141+
142+
tm.shutdown()
143+
144+
assert tm.tracer_span_processors == []
145+
assert tm._delegating._processors == []
146+
p1.force_flush.assert_called_once()
147+
p1.shutdown.assert_called_once()
148+
149+
def test_successive_managers_do_not_accumulate(self) -> None:
150+
"""Simulates multiple jobs on the same pod.
151+
152+
Each job creates a new UiPathTraceManager and adds a processor.
153+
After shutdown, the next job's processors should not stack with
154+
the previous ones.
155+
"""
156+
processor_counts: list[int] = []
157+
158+
for _ in range(5):
159+
tm = UiPathTraceManager()
160+
mock_processor = MagicMock(spec=SpanProcessor)
161+
tm.add_span_processor(mock_processor)
162+
163+
# Count processors visible to the delegator (minus the batch
164+
# processor added by __init__ for execution_span_exporter)
165+
processor_counts.append(len(tm._delegating._processors))
166+
tm.shutdown()
167+
168+
# Every job should see the same number of processors (2: the
169+
# execution batch processor from __init__ + the one we added),
170+
# NOT a linearly growing count.
171+
assert all(c == processor_counts[0] for c in processor_counts), (
172+
f"Processor counts should be constant across jobs, got: {processor_counts}"
173+
)
174+
175+
def test_shutdown_tolerates_processor_error(self) -> None:
176+
tm = UiPathTraceManager()
177+
bad_processor = MagicMock(spec=SpanProcessor)
178+
bad_processor.force_flush.side_effect = RuntimeError("flush failed")
179+
tm.add_span_processor(bad_processor)
180+
181+
# Should not raise
182+
tm.shutdown()
183+
assert tm.tracer_span_processors == []
184+
assert tm._delegating._processors == []
185+
186+
def test_spans_not_sent_to_old_processors_after_shutdown(self) -> None:
187+
"""After shutdown, old processors must not receive new span events."""
188+
tm1 = UiPathTraceManager()
189+
old_processor = MagicMock(spec=SpanProcessor)
190+
tm1.add_span_processor(old_processor)
191+
tm1.shutdown()
192+
193+
# Simulate next job
194+
tm2 = UiPathTraceManager()
195+
new_processor = MagicMock(spec=SpanProcessor)
196+
tm2.add_span_processor(new_processor)
197+
198+
mock_span = MagicMock()
199+
tm2._delegating.on_start(mock_span, parent_context=None)
200+
201+
new_processor.on_start.assert_called_once()
202+
old_processor.on_start.assert_not_called()
203+
204+
tm2.shutdown()

packages/uipath-core/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/uipath-platform/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/uipath/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)