diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 881c8931ee44..3ead7de8e666 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -7,6 +7,9 @@ ([#46096](https://github.com/Azure/azure-sdk-for-python/pull/46096)) - Add support for populating SDK version from distro and Microsoft OpenTelemetry distro environment variables ([#46613](https://github.com/Azure/azure-sdk-for-python/pull/46613)) +- Add GenAI main-agent attribution processors to propagate `microsoft.gen_ai.main_agent.*` attributes + across spans and log records in multi-agent systems per [spec](https://github.com/aep-health-and-standards/Telemetry-Collection-Spec/blob/main/ApplicationInsights/genai_main_agent_attribution.md) + ([#46700](https://github.com/Azure/azure-sdk-for-python/pull/46700)) ### Breaking Changes - Dropped support for Python 3.9. This package now supports Python 3.10+. [Follows upstream otel dropping support](https://github.com/open-telemetry/opentelemetry-python/pull/5076) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 9f7d3479f41d..218340e3a13d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -371,4 +371,27 @@ class _RP_Names(Enum): "gen_ai.evaluation.explanation", ) +# Gen AI main-agent attribution constants +# Attribute mapping for main-agent propagation in OnStart +_MAIN_AGENT_ATTRIBUTES = ( + ("microsoft.gen_ai.main_agent.name", "microsoft.gen_ai.main_agent.name", "gen_ai.agent.name"), + ("microsoft.gen_ai.main_agent.id", "microsoft.gen_ai.main_agent.id", "gen_ai.agent.id"), + ("microsoft.gen_ai.main_agent.version", "microsoft.gen_ai.main_agent.version", "gen_ai.agent.version"), + ( + "microsoft.gen_ai.main_agent.conversation_id", + "microsoft.gen_ai.main_agent.conversation_id", + "gen_ai.conversation.id", + ), +) + +# OnEnd self-attribution mapping (for root invoke_agent spans) +_MAIN_AGENT_SELF_ATTRIBUTES = ( + ("microsoft.gen_ai.main_agent.name", "gen_ai.agent.name"), + ("microsoft.gen_ai.main_agent.id", "gen_ai.agent.id"), + ("microsoft.gen_ai.main_agent.version", "gen_ai.agent.version"), + ("microsoft.gen_ai.main_agent.conversation_id", "gen_ai.conversation.id"), +) + +_MAIN_AGENT_PREFIX = "microsoft.gen_ai.main_agent." + # cSpell:disable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/__init__.py new file mode 100644 index 000000000000..0bdee620366b --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/__init__.py @@ -0,0 +1,5 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License in the project root for +# license information. +# -------------------------------------------------------------------------- diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/_processor.py new file mode 100644 index 000000000000..29cc5af9e73c --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_gen_ai/_processor.py @@ -0,0 +1,117 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License in the project root for +# license information. +# -------------------------------------------------------------------------- + +from typing import Optional + +from opentelemetry.context import Context +from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import get_current_span, Span + +from azure.monitor.opentelemetry.exporter._constants import ( + _MAIN_AGENT_ATTRIBUTES, + _MAIN_AGENT_PREFIX, + _MAIN_AGENT_SELF_ATTRIBUTES, +) + + +# pylint: disable=protected-access +class _GenAIMainAgentSpanProcessor(SpanProcessor): + """Propagates main-agent context in GenAI multi-agent systems. + + In OnStart, copies microsoft.gen_ai.main_agent.* attributes from the parent span + to the child span (with fallback to gen_ai.agent.* on the parent). + + In OnEnd, self-attributes root invoke_agent spans that have no main_agent context. + """ + + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: # type: ignore + if parent_context is None: + return + parent_span = get_current_span(parent_context) + parent_span_context = parent_span.get_span_context() + if not parent_span_context.is_valid: + return + + parent_attributes = getattr(parent_span, "attributes", None) + if parent_attributes is None: + return + + for target, primary_source, fallback_source in _MAIN_AGENT_ATTRIBUTES: + value = parent_attributes.get(primary_source) + if value is None: + value = parent_attributes.get(fallback_source) + if value is not None: + span.set_attribute(target, value) + + def on_end(self, span: ReadableSpan) -> None: + attributes = span.attributes + if attributes is None: + return + + # Only apply to spans with gen_ai.operation.name = "invoke_agent" + if attributes.get("gen_ai.operation.name") != "invoke_agent": + return + + # If span already has any microsoft.gen_ai.main_agent.* attribute, return + for key in attributes: + if key.startswith(_MAIN_AGENT_PREFIX): + return + + # Self-attribute from the span's own gen_ai attributes + for target, source in _MAIN_AGENT_SELF_ATTRIBUTES: + value = attributes.get(source) + if value is not None: + span._attributes[target] = value # type: ignore + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000): + return True + + +class _GenAIMainAgentLogRecordProcessor(LogRecordProcessor): + """Copies microsoft.gen_ai.main_agent.* attributes from the current span onto log records.""" + + def on_emit(self, log_record: ReadWriteLogRecord) -> None: # type: ignore # pylint: disable=arguments-renamed + current_span = get_current_span() + span_context = current_span.get_span_context() + if not span_context.is_valid: + return + + span_attributes = getattr(current_span, "attributes", None) + if span_attributes is None: + return + + # Collect all microsoft.gen_ai.main_agent.* attributes from the current span + main_agent_attrs = {key: value for key, value in span_attributes.items() if key.startswith(_MAIN_AGENT_PREFIX)} + + if not main_agent_attrs: + return + + # Copy them onto the log record without overwriting any existing log-level values + if hasattr(log_record, "log_record") and log_record.log_record is not None: + if log_record.log_record.attributes is None: + log_record.log_record.attributes = {} + for key, value in main_agent_attrs.items(): + if key not in log_record.log_record.attributes: + log_record.log_record.attributes[key] = value # type: ignore[index] + elif hasattr(log_record, "attributes"): + if log_record.attributes is None: # type: ignore[union-attr] + log_record.attributes = {} # type: ignore[union-attr] + for key, value in main_agent_attrs.items(): + if key not in log_record.attributes: # type: ignore[operator] + log_record.attributes[key] = value # type: ignore[index] + + def emit(self, log_record: ReadWriteLogRecord) -> None: # pylint: disable=arguments-renamed + self.on_emit(log_record) + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000): + return True diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_gen_ai_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_gen_ai_processor.py new file mode 100644 index 000000000000..7dc78504838d --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_gen_ai_processor.py @@ -0,0 +1,357 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import unittest +from unittest.mock import MagicMock, patch + +from opentelemetry.context import Context +from opentelemetry.trace import INVALID_SPAN_CONTEXT, SpanContext, TraceFlags + +from azure.monitor.opentelemetry.exporter._constants import ( + _MAIN_AGENT_ATTRIBUTES, + _MAIN_AGENT_PREFIX, + _MAIN_AGENT_SELF_ATTRIBUTES, +) +from azure.monitor.opentelemetry.exporter._gen_ai._processor import ( + _GenAIMainAgentLogRecordProcessor, + _GenAIMainAgentSpanProcessor, +) + + +class TestGenAIMainAgentSpanProcessorOnStart(unittest.TestCase): + def setUp(self): + self.processor = _GenAIMainAgentSpanProcessor() + + def test_on_start_no_parent_context(self): + """on_start should no-op when parent_context is None.""" + span = MagicMock() + self.processor.on_start(span, parent_context=None) + span.set_attribute.assert_not_called() + + def test_on_start_invalid_parent_span(self): + """on_start should no-op when parent span context is invalid.""" + span = MagicMock() + parent_span = MagicMock() + parent_span.get_span_context.return_value = INVALID_SPAN_CONTEXT + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + span.set_attribute.assert_not_called() + + def test_on_start_propagates_primary_source(self): + """on_start should copy microsoft.gen_ai.main_agent.* from parent (primary source).""" + span = MagicMock() + parent_span = MagicMock() + parent_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + parent_span.attributes = { + "microsoft.gen_ai.main_agent.name": "MainAgent", + "microsoft.gen_ai.main_agent.id": "agent-123", + "microsoft.gen_ai.main_agent.version": "1.0", + "microsoft.gen_ai.main_agent.conversation_id": "conv-456", + } + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.name", "MainAgent") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.id", "agent-123") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.version", "1.0") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.conversation_id", "conv-456") + + def test_on_start_propagates_fallback_source(self): + """on_start should use gen_ai.agent.* as fallback when primary not present.""" + span = MagicMock() + parent_span = MagicMock() + parent_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + parent_span.attributes = { + "gen_ai.agent.name": "RootAgent", + "gen_ai.agent.id": "root-789", + "gen_ai.agent.version": "2.0", + "gen_ai.conversation.id": "conv-101", + } + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.name", "RootAgent") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.id", "root-789") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.version", "2.0") + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.conversation_id", "conv-101") + + def test_on_start_primary_takes_precedence_over_fallback(self): + """on_start should prefer primary source over fallback.""" + span = MagicMock() + parent_span = MagicMock() + parent_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + parent_span.attributes = { + "microsoft.gen_ai.main_agent.name": "MainAgent", + "gen_ai.agent.name": "SubAgent", + "gen_ai.agent.id": "sub-999", + } + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + # Primary source takes precedence for name + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.name", "MainAgent") + # Fallback used for id (no primary source present) + span.set_attribute.assert_any_call("microsoft.gen_ai.main_agent.id", "sub-999") + + def test_on_start_no_attributes_on_parent(self): + """on_start should no-op when parent has no relevant attributes.""" + span = MagicMock() + parent_span = MagicMock() + parent_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + parent_span.attributes = {"http.method": "GET"} + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + span.set_attribute.assert_not_called() + + def test_on_start_parent_has_no_attributes_property(self): + """on_start should no-op when parent span has no attributes.""" + span = MagicMock() + parent_span = MagicMock(spec=[]) # No attributes at all + parent_span.get_span_context = MagicMock( + return_value=SpanContext(trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1)) + ) + parent_context = MagicMock(spec=Context) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=parent_span, + ): + self.processor.on_start(span, parent_context=parent_context) + + span.set_attribute.assert_not_called() + + +class TestGenAIMainAgentSpanProcessorOnEnd(unittest.TestCase): + def setUp(self): + self.processor = _GenAIMainAgentSpanProcessor() + + def test_on_end_no_attributes(self): + """on_end should no-op when span has no attributes.""" + span = MagicMock() + span.attributes = None + self.processor.on_end(span) + + def test_on_end_not_invoke_agent(self): + """on_end should no-op when gen_ai.operation.name is not invoke_agent.""" + span = MagicMock() + span.attributes = {"gen_ai.operation.name": "chat"} + span._attributes = {} + self.processor.on_end(span) + self.assertEqual(span._attributes, {}) + + def test_on_end_no_operation_name(self): + """on_end should no-op when gen_ai.operation.name is missing.""" + span = MagicMock() + span.attributes = {"gen_ai.agent.name": "Agent1"} + span._attributes = {} + self.processor.on_end(span) + self.assertEqual(span._attributes, {}) + + def test_on_end_already_has_main_agent_attributes(self): + """on_end should no-op when span already has microsoft.gen_ai.main_agent.* attrs.""" + span = MagicMock() + span.attributes = { + "gen_ai.operation.name": "invoke_agent", + "microsoft.gen_ai.main_agent.name": "AlreadySet", + "gen_ai.agent.name": "Agent1", + } + span._attributes = dict(span.attributes) + self.processor.on_end(span) + # Should not have been changed + self.assertEqual(span._attributes.get("microsoft.gen_ai.main_agent.name"), "AlreadySet") + + def test_on_end_self_attributes_invoke_agent(self): + """on_end should self-attribute root invoke_agent spans.""" + span = MagicMock() + span.attributes = { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.name": "RootAgent", + "gen_ai.agent.id": "agent-001", + "gen_ai.agent.version": "3.0", + "gen_ai.conversation.id": "conv-xyz", + } + span._attributes = dict(span.attributes) + self.processor.on_end(span) + self.assertEqual(span._attributes["microsoft.gen_ai.main_agent.name"], "RootAgent") + self.assertEqual(span._attributes["microsoft.gen_ai.main_agent.id"], "agent-001") + self.assertEqual(span._attributes["microsoft.gen_ai.main_agent.version"], "3.0") + self.assertEqual(span._attributes["microsoft.gen_ai.main_agent.conversation_id"], "conv-xyz") + + def test_on_end_partial_self_attribution(self): + """on_end should only copy attributes that exist on the span.""" + span = MagicMock() + span.attributes = { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.name": "PartialAgent", + } + span._attributes = dict(span.attributes) + self.processor.on_end(span) + self.assertEqual(span._attributes["microsoft.gen_ai.main_agent.name"], "PartialAgent") + self.assertNotIn("microsoft.gen_ai.main_agent.id", span._attributes) + self.assertNotIn("microsoft.gen_ai.main_agent.version", span._attributes) + self.assertNotIn("microsoft.gen_ai.main_agent.conversation_id", span._attributes) + + +class TestGenAIMainAgentLogRecordProcessor(unittest.TestCase): + def setUp(self): + self.processor = _GenAIMainAgentLogRecordProcessor() + + def test_on_emit_no_current_span(self): + """on_emit should no-op when there is no valid current span.""" + log_record = MagicMock() + invalid_span = MagicMock() + invalid_span.get_span_context.return_value = INVALID_SPAN_CONTEXT + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=invalid_span, + ): + self.processor.on_emit(log_record) + + def test_on_emit_span_has_no_main_agent_attributes(self): + """on_emit should no-op when span has no microsoft.gen_ai.main_agent.* attributes.""" + log_record = MagicMock() + current_span = MagicMock() + current_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + current_span.attributes = {"http.method": "GET"} + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=current_span, + ): + self.processor.on_emit(log_record) + + def test_on_emit_copies_main_agent_attributes_to_log_record(self): + """on_emit should copy microsoft.gen_ai.main_agent.* from span to log record.""" + log_record = MagicMock() + log_record.log_record = MagicMock() + log_record.log_record.attributes = {} + + current_span = MagicMock() + current_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + current_span.attributes = { + "microsoft.gen_ai.main_agent.name": "MainAgent", + "microsoft.gen_ai.main_agent.id": "agent-123", + "microsoft.gen_ai.main_agent.version": "1.0", + "microsoft.gen_ai.main_agent.conversation_id": "conv-456", + "other.attribute": "should_not_be_copied", + } + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=current_span, + ): + self.processor.on_emit(log_record) + + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.name"], "MainAgent") + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.id"], "agent-123") + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.version"], "1.0") + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.conversation_id"], "conv-456") + self.assertNotIn("other.attribute", log_record.log_record.attributes) + + def test_on_emit_creates_attributes_dict_if_none(self): + """on_emit should initialize attributes dict if None.""" + log_record = MagicMock() + log_record.log_record = MagicMock() + log_record.log_record.attributes = None + + current_span = MagicMock() + current_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + current_span.attributes = { + "microsoft.gen_ai.main_agent.name": "TestAgent", + } + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=current_span, + ): + self.processor.on_emit(log_record) + + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.name"], "TestAgent") + + def test_on_emit_does_not_overwrite_existing_log_record_attributes(self): + """on_emit should not overwrite existing microsoft.gen_ai.main_agent.* on the log record.""" + log_record = MagicMock() + log_record.log_record = MagicMock() + log_record.log_record.attributes = { + "microsoft.gen_ai.main_agent.name": "ExistingAgent", + } + + current_span = MagicMock() + current_span.get_span_context.return_value = SpanContext( + trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1) + ) + current_span.attributes = { + "microsoft.gen_ai.main_agent.name": "SpanAgent", + "microsoft.gen_ai.main_agent.id": "span-id-123", + } + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=current_span, + ): + self.processor.on_emit(log_record) + + # Existing value should not be overwritten + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.name"], "ExistingAgent") + # Missing value should be populated + self.assertEqual(log_record.log_record.attributes["microsoft.gen_ai.main_agent.id"], "span-id-123") + + def test_on_emit_span_has_no_attributes(self): + """on_emit should no-op when span has no attributes property.""" + log_record = MagicMock() + current_span = MagicMock(spec=[]) + current_span.get_span_context = MagicMock( + return_value=SpanContext(trace_id=1, span_id=2, is_remote=False, trace_flags=TraceFlags(1)) + ) + + with patch( + "azure.monitor.opentelemetry.exporter._gen_ai._processor.get_current_span", + return_value=current_span, + ): + self.processor.on_emit(log_record) + + +if __name__ == "__main__": + unittest.main()