diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_base.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_base.py index 0785f01e36ba..7ecd56a2c508 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_base.py +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_base.py @@ -188,11 +188,13 @@ def __init__( # Observability (logging + tracing) -------------------------------- _conn_str = applicationinsights_connection_string or self.config.appinsights_connection_string + _sensitive_data = os.environ.get("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true").lower() not in ("false", "0") if configure_observability is not None: try: configure_observability( connection_string=_conn_str, log_level=log_level, + enable_sensitive_data=_sensitive_data, ) except ValueError: raise # invalid log_level etc. — user should fix their config @@ -326,55 +328,24 @@ def _build_server_version(self) -> str: # Tracing (for protocol subclasses) # ------------------------------------------------------------------ - #: Default instrumentation scope for tracing spans. - #: Protocol subclasses should override this per the spec. - _INSTRUMENTATION_SCOPE = "Azure.AI.AgentServer" - @contextlib.contextmanager - def request_span( + def request_context( self, headers: Any, - request_id: str, - operation: str, - *, - operation_name: Optional[str] = None, - session_id: str = "", - end_on_exit: bool = True, ) -> Any: - """Create a request-scoped span with this host's identity attributes. + """Extract W3C trace context and attach as the current OTel context. - Delegates to :func:`_tracing.request_span` with pre-populated - agent identity from environment variables. + Delegates to :func:`_tracing.request_context`. No span is created — + this only ensures downstream framework spans are correctly parented + under the caller's trace context. :param headers: HTTP request headers. :type headers: any - :param request_id: The request/invocation ID. - :type request_id: str - :param operation: Span operation (e.g. ``"invoke_agent"``). - :type operation: str - :keyword operation_name: Optional ``gen_ai.operation.name`` value. - :paramtype operation_name: str or None - :keyword session_id: Session ID. - :paramtype session_id: str - :keyword end_on_exit: Whether to end the span when the context exits. - :paramtype end_on_exit: bool - :return: Context manager yielding the OTel span. + :return: Context manager (yields nothing). :rtype: any """ - with _tracing.request_span( - headers, - request_id, - operation, - agent_id=self.config.agent_id, - agent_name=self.config.agent_name, - agent_version=self.config.agent_version, - project_id=self.config.project_id, - operation_name=operation_name, - session_id=session_id, - end_on_exit=end_on_exit, - instrumentation_scope=self._INSTRUMENTATION_SCOPE, - ) as span: - yield span + with _tracing.request_context(headers): + yield # ------------------------------------------------------------------ # Shutdown handler (server-level lifecycle) diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_config.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_config.py index e22bc1ff1cf6..95111f467b91 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_config.py +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_config.py @@ -24,6 +24,9 @@ _ENV_FOUNDRY_AGENT_NAME = "FOUNDRY_AGENT_NAME" _ENV_FOUNDRY_AGENT_VERSION = "FOUNDRY_AGENT_VERSION" +_ENV_FOUNDRY_AGENT_INSTANCE_CLIENT_ID = "FOUNDRY_AGENT_INSTANCE_CLIENT_ID" +_ENV_FOUNDRY_AGENT_BLUEPRINT_CLIENT_ID = "FOUNDRY_AGENT_BLUEPRINT_CLIENT_ID" +_ENV_FOUNDRY_AGENT_TENANT_ID = "FOUNDRY_AGENT_TENANT_ID" _ENV_FOUNDRY_HOSTING_ENVIRONMENT = "FOUNDRY_HOSTING_ENVIRONMENT" _ENV_FOUNDRY_PROJECT_ENDPOINT = "FOUNDRY_PROJECT_ENDPOINT" _ENV_FOUNDRY_PROJECT_ARM_ID = "FOUNDRY_PROJECT_ARM_ID" @@ -283,6 +286,46 @@ def resolve_agent_version() -> str: return os.environ.get(_ENV_FOUNDRY_AGENT_VERSION, "") +def resolve_agent_id() -> str: + """Resolve the agent ID. + + Resolution order: + 1. ``FOUNDRY_AGENT_INSTANCE_CLIENT_ID`` environment variable. + 2. ``:`` if both are set. + 3. ```` if only name is set. + 4. Empty string if nothing is available. + + :return: The resolved agent ID, or an empty string if not determinable. + :rtype: str + """ + agent_id = os.environ.get(_ENV_FOUNDRY_AGENT_INSTANCE_CLIENT_ID, "") + if agent_id: + return agent_id + agent_name = os.environ.get(_ENV_FOUNDRY_AGENT_NAME, "") + agent_version = os.environ.get(_ENV_FOUNDRY_AGENT_VERSION, "") + if agent_name and agent_version: + return f"{agent_name}:{agent_version}" + return agent_name + + +def resolve_agent_blueprint_id() -> str: + """Resolve the agent blueprint client ID from the ``FOUNDRY_AGENT_BLUEPRINT_CLIENT_ID`` environment variable. + + :return: The agent blueprint client ID, or an empty string if not set. + :rtype: str + """ + return os.environ.get(_ENV_FOUNDRY_AGENT_BLUEPRINT_CLIENT_ID, "") + + +def resolve_agent_tenant_id() -> str: + """Resolve the agent tenant ID from the ``FOUNDRY_AGENT_TENANT_ID`` environment variable. + + :return: The agent tenant ID, or an empty string if not set. + :rtype: str + """ + return os.environ.get(_ENV_FOUNDRY_AGENT_TENANT_ID, "") + + def resolve_project_id() -> str: """Resolve the Foundry project ARM resource ID from the ``FOUNDRY_PROJECT_ARM_ID`` environment variable. diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_constants.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_constants.py index 74b7c0708931..93e017f0ca8b 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_constants.py +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_constants.py @@ -19,6 +19,8 @@ class Constants: # Tracing APPLICATIONINSIGHTS_CONNECTION_STRING = "APPLICATIONINSIGHTS_CONNECTION_STRING" OTEL_EXPORTER_OTLP_ENDPOINT = "OTEL_EXPORTER_OTLP_ENDPOINT" + FOUNDRY_AGENT365_TRACING_ENABLED = "FOUNDRY_AGENT365_TRACING_ENABLED" + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT" # SSE keep-alive SSE_KEEPALIVE_INTERVAL = "SSE_KEEPALIVE_INTERVAL" diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_tracing.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_tracing.py index faf5d23d7aaf..b5fba3d41169 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_tracing.py +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_tracing.py @@ -24,7 +24,8 @@ **Span operations:** -- :func:`request_span` — create a request-scoped span with GenAI attributes +- :func:`request_context` — extract W3C trace context from headers and attach + as the current OTel context (no span is created) - :func:`end_span` / :func:`record_error` — span lifecycle helpers - :func:`trace_stream` — wrap streaming responses with span lifecycle - :func:`set_current_span` / :func:`detach_context` — explicit context management @@ -53,6 +54,8 @@ _ATTR_GEN_AI_SYSTEM = "gen_ai.system" _ATTR_GEN_AI_PROVIDER_NAME = "gen_ai.provider.name" _ATTR_GEN_AI_AGENT_ID = "gen_ai.agent.id" +_ATTR_GEN_AI_AGENT_BLUEPRINT_ID = "gen_ai.agent.blueprint.id" +_ATTR_GEN_AI_AGENT_TENANT_ID = "microsoft.tenant.id" _ATTR_GEN_AI_AGENT_NAME = "gen_ai.agent.name" _ATTR_GEN_AI_AGENT_VERSION = "gen_ai.agent.version" _ATTR_GEN_AI_RESPONSE_ID = "gen_ai.response.id" @@ -68,6 +71,9 @@ # the calling service may carry either key as W3C baggage. _BAGGAGE_SESSION_ID = "azure.ai.agentserver.session_id" _BAGGAGE_CONVERSATION_ID = "azure.ai.agentserver.conversation_id" +_BAGGAGE_INVOCATION_ID = "azure.ai.agentserver.invocation_id" + +_ATTR_INVOCATION_ID = "azure.ai.agentserver.invocations.invocation_id" _SERVICE_NAME_VALUE = "azure.ai.agentserver" _GEN_AI_SYSTEM_VALUE = "azure.ai.agentserver" @@ -95,6 +101,7 @@ def configure_observability( *, connection_string: Optional[str] = None, log_level: Optional[str] = None, + enable_sensitive_data: bool = False, ) -> None: """Default observability setup: console logging + tracing/OTel export. @@ -111,6 +118,10 @@ def configure_observability( :paramtype connection_string: str or None :keyword log_level: Log level name (e.g. ``"INFO"``, ``"DEBUG"``). :paramtype log_level: str or None + :keyword enable_sensitive_data: Enable sensitive data recording + (prompts, tool arguments, results) for Agent Framework SDK + instrumentation. Defaults to False. + :paramtype enable_sensitive_data: bool """ # Console logging on the root logger so user logs are also visible. resolved_level = _config.resolve_log_level(log_level) @@ -135,10 +146,10 @@ def configure_observability( logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING) # Tracing and OTel export - _configure_tracing(connection_string=connection_string) + _configure_tracing(connection_string=connection_string, enable_sensitive_data=enable_sensitive_data) -def _configure_tracing(connection_string: Optional[str] = None) -> None: +def _configure_tracing(connection_string: Optional[str] = None, enable_sensitive_data: bool = False) -> None: """Configure OpenTelemetry exporters via the microsoft-opentelemetry distro. Internal helper called by :func:`configure_observability`. @@ -146,6 +157,9 @@ def _configure_tracing(connection_string: Optional[str] = None) -> None: :param connection_string: Application Insights connection string. When provided, traces and logs are exported to Azure Monitor. :type connection_string: str or None + :param enable_sensitive_data: Enable sensitive data recording for + Agent Framework SDK instrumentation. + :type enable_sensitive_data: bool """ resource = _create_resource() if resource is None: @@ -156,18 +170,16 @@ def _configure_tracing(connection_string: Optional[str] = None) -> None: agent_name = _config.resolve_agent_name() or None agent_version = _config.resolve_agent_version() or None project_id = _config.resolve_project_id() or None - - if agent_name and agent_version: - agent_id = f"{agent_name}:{agent_version}" - elif agent_name: - agent_id = agent_name - else: - agent_id = None + agent_id = _config.resolve_agent_id() or None + agent_blueprint_id = _config.resolve_agent_blueprint_id() or None + agent_tenant_id = _config.resolve_agent_tenant_id() or None span_processors = [ _FoundryEnrichmentSpanProcessor( agent_name=agent_name, agent_version=agent_version, agent_id=agent_id, project_id=project_id, + agent_blueprint_id=agent_blueprint_id, + agent_tenant_id=agent_tenant_id, ), ] log_record_processors = [_BaggageLogRecordProcessor()] # type: ignore[list-item] @@ -178,6 +190,7 @@ def _configure_tracing(connection_string: Optional[str] = None) -> None: span_processors=span_processors, log_record_processors=log_record_processors, connection_string=connection_string, + enable_sensitive_data=enable_sensitive_data, ) logger.info("Tracing configured successfully via microsoft-opentelemetry distro.") except ImportError: @@ -192,6 +205,7 @@ def _setup_distro_export( span_processors: list[Any], log_record_processors: list[Any], connection_string: Optional[str] = None, + enable_sensitive_data: bool = False, ) -> None: """Delegate to microsoft-opentelemetry distro for exporter configuration. @@ -202,6 +216,8 @@ def _setup_distro_export( :keyword span_processors: Span processors to register. :keyword log_record_processors: Log record processors to register. :keyword connection_string: Application Insights connection string. + :keyword enable_sensitive_data: Enable sensitive data recording for + Agent Framework SDK instrumentation. """ from microsoft.opentelemetry import use_microsoft_opentelemetry @@ -209,6 +225,7 @@ def _setup_distro_export( "resource": resource, "span_processors": span_processors, "log_record_processors": log_record_processors, + "enable_sensitive_data": enable_sensitive_data, } # Azure Monitor export is off by default in the distro — enable it @@ -217,6 +234,16 @@ def _setup_distro_export( kwargs["enable_azure_monitor"] = True kwargs["azure_monitor_connection_string"] = connection_string + # A365 tracing export — enabled only in hosted environments. + if ( + os.environ.get("FOUNDRY_HOSTING_ENVIRONMENT", "") + and os.environ.get("FOUNDRY_AGENT365_TRACING_ENABLED", "").lower() in ("true", "1") + ): + kwargs["enable_a365"] = True + kwargs["a365_use_s2s_endpoint"] = True + kwargs["a365_enable_observability_exporter"] = True + kwargs["a365_observability_scope_override"] = "api://9b975845-388f-4429-889e-eab1ef63949c/.default" + use_microsoft_opentelemetry(**kwargs) @@ -226,98 +253,41 @@ def _setup_distro_export( @contextmanager -def request_span( +def request_context( headers: Mapping[str, str], - request_id: str, - operation: str, - *, - agent_id: str = "", - agent_name: str = "", - agent_version: str = "", - project_id: str = "", - operation_name: Optional[str] = None, - session_id: str = "", - end_on_exit: bool = True, - instrumentation_scope: str = "Azure.AI.AgentServer", -) -> Iterator[Any]: - """Create a request-scoped span with GenAI semantic convention attributes. - - Extracts W3C trace context from *headers* and creates a span set as - current in context (child spans are correctly parented). - - For **non-streaming** requests use ``end_on_exit=True`` (default). - For **streaming** use ``end_on_exit=False`` and end via :func:`trace_stream`. +) -> Iterator[None]: + """Extract W3C trace context from *headers* and attach as the current context. + + No span is created — this only propagates the incoming ``traceparent``, + ``tracestate``, and ``baggage`` so that spans created by downstream + frameworks (e.g. LangChain, Semantic Kernel) are correctly parented + under the caller's span. + + Also propagates ``x-request-id`` as baggage for downstream services. :param headers: HTTP request headers. :type headers: Mapping[str, str] - :param request_id: The request/invocation ID. - :type request_id: str - :param operation: Span operation (e.g. ``"invoke_agent"``). - :type operation: str - :keyword agent_id: Agent identifier (``"name:version"`` or ``"name"``). - :paramtype agent_id: str - :keyword agent_name: Agent name from FOUNDRY_AGENT_NAME. - :paramtype agent_name: str - :keyword agent_version: Agent version from FOUNDRY_AGENT_VERSION. - :paramtype agent_version: str - :keyword project_id: Foundry project ARM resource ID. - :paramtype project_id: str - :keyword operation_name: Optional ``gen_ai.operation.name`` value. - :paramtype operation_name: str or None - :keyword session_id: Session ID (empty string if absent). - :paramtype session_id: str - :keyword end_on_exit: Whether to end the span when the context exits. - :paramtype end_on_exit: bool - :keyword instrumentation_scope: OpenTelemetry instrumentation scope name. - :paramtype instrumentation_scope: str - :return: Context manager yielding the OTel span. - :rtype: Iterator[any] + :return: Context manager (yields nothing). + :rtype: Iterator[None] """ - tracer = trace.get_tracer(instrumentation_scope) - - # Build span name - name = f"{operation} {agent_id}" if agent_id else operation - - # Build attributes - attrs: dict[str, str] = { - _ATTR_SERVICE_NAME: agent_name or _SERVICE_NAME_VALUE, - _ATTR_GEN_AI_SYSTEM: _GEN_AI_SYSTEM_VALUE, - _ATTR_GEN_AI_PROVIDER_NAME: _GEN_AI_PROVIDER_NAME_VALUE, - _ATTR_GEN_AI_RESPONSE_ID: request_id, - _ATTR_GEN_AI_AGENT_ID: agent_id, - } - if agent_name: - attrs[_ATTR_GEN_AI_AGENT_NAME] = agent_name - if agent_version: - attrs[_ATTR_GEN_AI_AGENT_VERSION] = agent_version - if operation_name: - attrs[_ATTR_GEN_AI_OPERATION_NAME] = operation_name - if session_id: - attrs[_ATTR_SESSION_ID] = session_id - if project_id: - attrs[_ATTR_FOUNDRY_PROJECT_ID] = project_id - - # Propagate platform request correlation ID as span attribute AND baggage - x_request_id = headers.get("x-request-id") - if x_request_id: - attrs["x_request_id"] = x_request_id - # Extract W3C trace context (traceparent + tracestate + baggage) carrier = _extract_w3c_carrier(headers) ctx = _propagator.extract(carrier=carrier) if carrier else None # Add x-request-id to baggage for downstream propagation + x_request_id = headers.get("x-request-id") if x_request_id: ctx = _otel_baggage.set_baggage("x_request_id", x_request_id, context=ctx) - with tracer.start_as_current_span( # type: ignore[reportGeneralTypeIssues] - name=name, - attributes=attrs, - kind=trace.SpanKind.SERVER, - context=ctx, - end_on_exit=end_on_exit, - ) as otel_span: - yield otel_span + token = _otel_context.attach(ctx) if ctx else None + try: + yield + finally: + if token is not None: + try: + _otel_context.detach(token) + except ValueError: + pass def end_span(span: Any, exc: Optional[BaseException] = None) -> None: @@ -460,15 +430,20 @@ class _FoundryEnrichmentSpanProcessor: def __init__( self, + *, agent_name: Optional[str] = None, agent_version: Optional[str] = None, agent_id: Optional[str] = None, project_id: Optional[str] = None, + agent_blueprint_id: Optional[str] = None, + agent_tenant_id: Optional[str] = None, ) -> None: self.agent_name = agent_name self.agent_version = agent_version self.agent_id = agent_id self.project_id = project_id + self.agent_blueprint_id = agent_blueprint_id + self.agent_tenant_id = agent_tenant_id def on_start(self, span: Any, parent_context: Any = None) -> None: if self.project_id: @@ -483,6 +458,9 @@ def on_start(self, span: Any, parent_context: Any = None) -> None: conversation_id = _otel_baggage.get_baggage(_BAGGAGE_CONVERSATION_ID, context=ctx) if conversation_id: span.set_attribute(_ATTR_GEN_AI_CONVERSATION_ID, conversation_id) + invocation_id = _otel_baggage.get_baggage(_BAGGAGE_INVOCATION_ID, context=ctx) + if invocation_id: + span.set_attribute(_ATTR_INVOCATION_ID, invocation_id) def _on_ending(self, span: Any) -> None: # Set agent identity attributes at span end so they cannot be @@ -504,6 +482,10 @@ def _on_ending(self, span: Any) -> None: attrs[_ATTR_GEN_AI_AGENT_VERSION] = self.agent_version if self.agent_id: attrs[_ATTR_GEN_AI_AGENT_ID] = self.agent_id + if self.agent_blueprint_id: + attrs[_ATTR_GEN_AI_AGENT_BLUEPRINT_ID] = self.agent_blueprint_id + if self.agent_tenant_id: + attrs[_ATTR_GEN_AI_AGENT_TENANT_ID] = self.agent_tenant_id except Exception: # pylint: disable=broad-exception-caught logger.debug("Failed to enrich span attributes in _on_ending", exc_info=True) diff --git a/sdk/agentserver/azure-ai-agentserver-core/samples/selfhosted_invocation/selfhosted_invocation.py b/sdk/agentserver/azure-ai-agentserver-core/samples/selfhosted_invocation/selfhosted_invocation.py index 9fc296ef775b..cb0e8d55d40b 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/samples/selfhosted_invocation/selfhosted_invocation.py +++ b/sdk/agentserver/azure-ai-agentserver-core/samples/selfhosted_invocation/selfhosted_invocation.py @@ -37,7 +37,7 @@ from starlette.responses import JSONResponse, Response from starlette.routing import Route -from azure.ai.agentserver.core import AgentServerHost, record_error +from azure.ai.agentserver.core import AgentServerHost logger = logging.getLogger("azure.ai.agentserver") @@ -61,10 +61,7 @@ async def _invoke(self, request: Request) -> Response: or str(uuid.uuid4()) ) - with self.request_span( - request.headers, invocation_id, "invoke_agent", - operation_name="invoke_agent", session_id=session_id, - ) as otel_span: + with self.request_context(dict(request.headers)): logger.info("Processing invocation %s in session %s", invocation_id, session_id) try: @@ -72,7 +69,6 @@ async def _invoke(self, request: Request) -> Response: name = data.get("name", "World") result = {"greeting": f"Hello, {name}!"} except Exception as exc: - record_error(otel_span, exc) logger.error("Invocation %s failed: %s", invocation_id, exc) raise diff --git a/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing.py b/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing.py index 2b3531b552d1..5eefa9ac2a27 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing.py +++ b/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing.py @@ -74,6 +74,7 @@ def test_observability_receives_constructor_connection_string(self) -> None: mock_configure.assert_called_once_with( connection_string="InstrumentationKey=ctor", log_level=None, + enable_sensitive_data=True, ) def test_observability_disabled_when_none(self) -> None: @@ -160,6 +161,7 @@ def test_constructor_passes_connection_string(self) -> None: mock_configure.assert_called_once_with( connection_string="InstrumentationKey=ctor", log_level=None, + enable_sensitive_data=True, ) @@ -336,6 +338,54 @@ def test_baggage_ids_propagate_to_child_spans(self) -> None: assert spans_by_name["parent"]["microsoft.session.id"] == "session-456" assert spans_by_name["parent"]["gen_ai.conversation.id"] == "conv-789" + def test_invocation_id_from_baggage(self) -> None: + """invocation_id baggage is stamped as azure.ai.agentserver.invocations.invocation_id.""" + proc = _FoundryEnrichmentSpanProcessor() + provider, collector = self._create_provider(proc) + tracer = provider.get_tracer("test") + + ctx = _otel_baggage.set_baggage( + "azure.ai.agentserver.invocation_id", "inv-abc-123", + ) + with tracer.start_as_current_span("span", context=ctx): + pass + + attrs = dict(collector.spans[0].attributes) + assert attrs["azure.ai.agentserver.invocations.invocation_id"] == "inv-abc-123" + + def test_invocation_id_not_set_when_no_baggage(self) -> None: + """invocation_id attr is not set when no invocation_id baggage is present.""" + proc = _FoundryEnrichmentSpanProcessor() + provider, collector = self._create_provider(proc) + tracer = provider.get_tracer("test") + + with tracer.start_as_current_span("span"): + pass + + attrs = dict(collector.spans[0].attributes) + assert "azure.ai.agentserver.invocations.invocation_id" not in attrs + + def test_invocation_id_propagates_to_child_spans(self) -> None: + """Child spans inherit invocation_id from baggage.""" + proc = _FoundryEnrichmentSpanProcessor() + provider, collector = self._create_provider(proc) + tracer = provider.get_tracer("test") + + ctx = _otel_baggage.set_baggage( + "azure.ai.agentserver.invocation_id", "inv-xyz-789", + ) + token = _otel_context.attach(ctx) + try: + with tracer.start_as_current_span("parent"): + with tracer.start_as_current_span("child"): + pass + finally: + _otel_context.detach(token) + + spans_by_name = {s.name: dict(s.attributes) for s in collector.spans} + assert spans_by_name["child"]["azure.ai.agentserver.invocations.invocation_id"] == "inv-xyz-789" + assert spans_by_name["parent"]["azure.ai.agentserver.invocations.invocation_id"] == "inv-xyz-789" + # ------------------------------------------------------------------ # # Agent name / version resolution with new env vars diff --git a/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing_e2e.py b/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing_e2e.py index d1c428e2bfa3..f698ae050422 100644 --- a/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing_e2e.py +++ b/sdk/agentserver/azure-ai-agentserver-core/tests/test_tracing_e2e.py @@ -10,11 +10,8 @@ The connection string is picked up automatically from the environment variable ``APPLICATIONINSIGHTS_CONNECTION_STRING`` by ``AgentServerHost.__init__``. -Each test correlates its specific span in App Insights using a unique request ID -stamped as ``gen_ai.response.id`` in customDimensions. - -Since the span is created with ``SpanKind.SERVER``, it lands in the ``requests`` -table in Application Insights. +With context-only propagation (no invoke_agent span), these tests verify that +framework-created child spans are properly exported to App Insights. """ import time import uuid @@ -36,9 +33,6 @@ _APPINSIGHTS_POLL_TIMEOUT = 300 _APPINSIGHTS_POLL_INTERVAL = 15 -# KQL attribute key for the response/request ID stamped on each span. -_RESPONSE_ID_ATTR = "gen_ai.response.id" - def _flush_provider(): """Force-flush all span processors so live exporters send data to App Insights. @@ -71,11 +65,11 @@ def _poll_appinsights(logs_client, resource_id, query, *, timeout=_APPINSIGHTS_P # --------------------------------------------------------------------------- -# Minimal echo app factories using core's AgentServerHost + request_span() +# Minimal echo app factories using core's AgentServerHost + request_context() # --------------------------------------------------------------------------- def _make_echo_app(): - """Create an AgentServerHost with a POST /echo route that creates a traced span. + """Create an AgentServerHost with a POST /echo route that uses request_context. Returns (app, request_ids) where request_ids is a list that collects the unique ID assigned to each request (for later App Insights correlation). @@ -85,7 +79,7 @@ def _make_echo_app(): async def echo_handler(request: Request) -> Response: req_id = str(uuid.uuid4()) request_ids.append(req_id) - with app.request_span(dict(request.headers), req_id, "invoke_agent"): + with app.request_context(dict(request.headers)): body = await request.body() resp = Response(content=body, media_type="application/octet-stream") resp.headers["x-request-id"] = req_id @@ -103,7 +97,7 @@ def _make_streaming_echo_app(): async def stream_handler(request: Request) -> StreamingResponse: req_id = str(uuid.uuid4()) request_ids.append(req_id) - with app.request_span(dict(request.headers), req_id, "invoke_agent"): + with app.request_context(dict(request.headers)): async def generate(): for chunk in [b"chunk1\n", b"chunk2\n", b"chunk3\n"]: yield chunk @@ -116,10 +110,10 @@ async def generate(): def _make_echo_app_with_child_span(): - """Create an AgentServerHost whose handler creates a child span inside request_span. + """Create an AgentServerHost whose handler creates a child span inside request_context. Returns (app, request_ids, child_span_ids). The child span simulates a - framework creating its own span inside the invoke_agent span context. + framework creating its own span inside the propagated context. ``child_span_ids`` captures the hex span-id of each child so the test can query App Insights by that value. """ @@ -130,7 +124,7 @@ def _make_echo_app_with_child_span(): async def echo_handler(request: Request) -> Response: req_id = str(uuid.uuid4()) request_ids.append(req_id) - with app.request_span(dict(request.headers), req_id, "invoke_agent"): + with app.request_context(dict(request.headers)): with child_tracer.start_as_current_span("framework_child") as child: child_span_ids.append(format(child.context.span_id, "016x")) body = await request.body() @@ -144,21 +138,19 @@ async def echo_handler(request: Request) -> Response: def _make_failing_echo_app(): - """Create an app whose handler raises inside request_span. Returns (app, request_ids).""" + """Create an app whose handler raises inside request_context. Returns (app, request_ids).""" request_ids: list[str] = [] async def fail_handler(request: Request) -> Response: req_id = str(uuid.uuid4()) request_ids.append(req_id) - try: - with app.request_span(dict(request.headers), req_id, "invoke_agent") as span: + with app.request_context(dict(request.headers)): + try: raise ValueError("e2e error test") - except ValueError: - span.set_status(trace.StatusCode.ERROR, "e2e error test") - span.record_exception(ValueError("e2e error test")) - resp = JSONResponse({"error": "e2e error test"}, status_code=500) - resp.headers["x-request-id"] = req_id - return resp + except ValueError: + resp = JSONResponse({"error": "e2e error test"}, status_code=500) + resp.headers["x-request-id"] = req_id + return resp routes = [Route("/echo", fail_handler, methods=["POST"])] app = AgentServerHost(routes=routes) @@ -170,110 +162,73 @@ async def fail_handler(request: Request) -> Response: # --------------------------------------------------------------------------- class TestAppInsightsIngestionE2E: - """Query Application Insights ``requests`` table to confirm spans were - actually ingested, correlating via gen_ai.response.id.""" + """Query Application Insights to confirm spans created inside + ``request_context`` are actually ingested and enriched.""" - def test_invoke_span_in_appinsights( + def test_child_span_in_appinsights( self, appinsights_connection_string, appinsights_resource_id, logs_query_client, ): - """Send an echo request and verify its span appears in App Insights ``requests`` table.""" - app, request_ids = _make_echo_app() + """Create a framework child span inside request_context and verify it + appears in the App Insights ``dependencies`` table.""" + app, request_ids, child_span_ids = _make_echo_app_with_child_span() client = TestClient(app) - resp = client.post("/echo", content=b"hello e2e") + resp = client.post("/echo", content=b"child e2e") assert resp.status_code == 200 - req_id = request_ids[-1] + child_span_id = child_span_ids[-1] _flush_provider() query = ( - "requests " - f"| where tostring(customDimensions['{_RESPONSE_ID_ATTR}']) == '{req_id}' " - "| project name, timestamp, duration, success, customDimensions " + "dependencies " + f"| where id == '{child_span_id}' " + "| where name == 'framework_child' " + "| project id, name, operation_Id " "| take 1" ) rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) assert len(rows) > 0, ( - f"invoke_agent span with response_id={req_id} not found in " - f"App Insights requests table after {_APPINSIGHTS_POLL_TIMEOUT}s" + f"Child framework_child span (id={child_span_id}) not found in " + f"dependencies table after {_APPINSIGHTS_POLL_TIMEOUT}s" ) - def test_streaming_span_in_appinsights( + def test_echo_request_succeeds( self, appinsights_connection_string, appinsights_resource_id, logs_query_client, ): - """Send a streaming request and verify its span appears in App Insights.""" - app, request_ids = _make_streaming_echo_app() + """Verify basic echo request succeeds with context-only propagation.""" + app, request_ids = _make_echo_app() client = TestClient(app) - resp = client.post("/echo", content=b"stream e2e") + resp = client.post("/echo", content=b"hello e2e") assert resp.status_code == 200 - req_id = request_ids[-1] - _flush_provider() - - query = ( - "requests " - f"| where tostring(customDimensions['{_RESPONSE_ID_ATTR}']) == '{req_id}' " - "| take 1" - ) - rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) - assert len(rows) > 0, ( - f"Streaming span with response_id={req_id} not found in App Insights" - ) + assert resp.content == b"hello e2e" - def test_error_span_in_appinsights( + def test_streaming_request_succeeds( self, appinsights_connection_string, appinsights_resource_id, logs_query_client, ): - """Send a failing request and verify the error span appears with success=false.""" - app, request_ids = _make_failing_echo_app() + """Verify streaming echo request succeeds with context-only propagation.""" + app, _request_ids = _make_streaming_echo_app() client = TestClient(app) - resp = client.post("/echo", content=b"fail e2e") - req_id = request_ids[-1] - _flush_provider() - - query = ( - "requests " - f"| where tostring(customDimensions['{_RESPONSE_ID_ATTR}']) == '{req_id}' " - "| where success == false " - "| take 1" - ) - rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) - assert len(rows) > 0, ( - f"Error span with response_id={req_id} not found in App Insights" - ) + resp = client.post("/echo", content=b"stream e2e") + assert resp.status_code == 200 - def test_genai_attributes_in_appinsights( + def test_error_request_returns_500( self, appinsights_connection_string, appinsights_resource_id, logs_query_client, ): - """Verify GenAI semantic convention attributes are present on the ingested span.""" - app, request_ids = _make_echo_app() + """Verify failing request returns 500 with context-only propagation.""" + app, _request_ids = _make_failing_echo_app() client = TestClient(app) - resp = client.post("/echo", content=b"genai attr e2e") - req_id = request_ids[-1] - _flush_provider() - - query = ( - "requests " - f"| where tostring(customDimensions['{_RESPONSE_ID_ATTR}']) == '{req_id}' " - "| where isnotempty(customDimensions['gen_ai.system']) " - "| project name, " - " genai_system=tostring(customDimensions['gen_ai.system']), " - " genai_provider=tostring(customDimensions['gen_ai.provider.name']) " - "| take 1" - ) - rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) - assert len(rows) > 0, ( - f"Span with response_id={req_id} and gen_ai.system attribute " - "not found in App Insights" - ) + resp = client.post("/echo", content=b"fail e2e") + assert resp.status_code == 500 def test_span_parenting_in_appinsights( self, @@ -281,23 +236,19 @@ def test_span_parenting_in_appinsights( appinsights_resource_id, logs_query_client, ): - """Verify a child span created inside request_span is parented correctly in App Insights. + """Verify a child span created inside request_context is exported to App Insights. - The parent (invoke_agent, SpanKind.SERVER) lands in ``requests``. - The child (framework_child, SpanKind.INTERNAL) lands in ``dependencies``. - We capture the child's span-id locally, use it to find the child row in - ``dependencies``, then follow its ``operation_ParentId`` back to the - parent row in ``requests``. + With context-only propagation, the child (framework_child, SpanKind.INTERNAL) + lands in ``dependencies``. We verify it appears using its locally-captured span-id. """ app, request_ids, child_span_ids = _make_echo_app_with_child_span() client = TestClient(app) resp = client.post("/echo", content=b"parenting e2e") assert resp.status_code == 200 - req_id = request_ids[-1] child_span_id = child_span_ids[-1] _flush_provider() - # Step 1: Find the child span in the dependencies table using its span-id. + # Find the child span in the dependencies table using its span-id. child_query = ( "dependencies " f"| where id == '{child_span_id}' " @@ -310,24 +261,3 @@ def test_span_parenting_in_appinsights( f"Child framework_child span (id={child_span_id}) not found in " f"dependencies table after {_APPINSIGHTS_POLL_TIMEOUT}s" ) - - operation_id = child_rows[0][2] # operation_Id column - child_parent_id = child_rows[0][3] # operation_ParentId column - - # Step 2: Find the parent span in the requests table using the child's operation_ParentId. - parent_query = ( - "requests " - f"| where id == '{child_parent_id}' " - f"| where operation_Id == '{operation_id}' " - "| project id, name, operation_Id " - "| take 1" - ) - parent_rows = _poll_appinsights(logs_query_client, appinsights_resource_id, parent_query) - assert len(parent_rows) > 0, ( - f"Parent span (id={child_parent_id}) referenced by child's " - f"operation_ParentId not found in requests table" - ) - - assert parent_rows[0][1] == "invoke_agent", ( - f"Expected parent span name 'invoke_agent', got '{parent_rows[0][1]}'" - ) diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py b/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py index bf3120974fa0..ee392a02f9d8 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py @@ -6,6 +6,7 @@ Provides the invocation protocol endpoints and handler decorators as a :class:`~azure.ai.agentserver.core.AgentServerHost` subclass. """ +import contextlib import contextvars import inspect import logging @@ -16,6 +17,7 @@ from typing import Any, Optional from opentelemetry import baggage as _otel_baggage, context as _otel_context +from opentelemetry.baggage.propagation import W3CBaggagePropagator from starlette.requests import Request from starlette.responses import JSONResponse, Response, StreamingResponse from starlette.routing import Route @@ -23,11 +25,6 @@ from azure.ai.agentserver.core import ( # pylint: disable=no-name-in-module AgentServerHost, create_error_response, - detach_context, - end_span, - record_error, - set_current_span, - trace_stream, ) from ._constants import InvocationConstants @@ -269,63 +266,6 @@ def get_openapi_spec(self) -> Optional[dict[str, Any]]: # Span attribute helper # ------------------------------------------------------------------ - @staticmethod - def _safe_set_attrs(span: Any, attrs: dict[str, str]) -> None: - if span is None: - return - try: - for key, value in attrs.items(): - span.set_attribute(key, value) - except Exception: # pylint: disable=broad-exception-caught - logger.debug("Failed to set span attributes: %s", list(attrs.keys()), exc_info=True) - - # ------------------------------------------------------------------ - # Streaming response helpers - # ------------------------------------------------------------------ - - def _wrap_streaming_response( - self, - response: StreamingResponse, - otel_span: Any, - ) -> StreamingResponse: - """Wrap a streaming response's body iterator with span lifecycle and context. - - Two layers of wrapping are applied: - - 1. **Inner (tracing):** ``trace_stream`` wraps the body iterator so - the OTel span covers the full streaming duration and is ended - when iteration completes. - 2. **Outer (context):** A second async generator re-attaches the span - as the current context for the duration of streaming, so that - child spans created by user handler code (e.g. Agent Framework) - are correctly parented under this span. - - :param response: The ``StreamingResponse`` returned by the user handler. - :type response: ~starlette.responses.StreamingResponse - :param otel_span: The OTel span (or *None* when tracing is disabled). - :type otel_span: any - :return: The same response object, with its body_iterator replaced. - :rtype: ~starlette.responses.StreamingResponse - """ - if otel_span is None: - return response - - # Inner wrap: trace_stream ends the span when iteration completes. - traced = trace_stream(response.body_iterator, otel_span) - - # Outer wrap: re-attach span as current context during streaming - # so child spans are correctly parented. - async def _iter_with_context(): # type: ignore[return-value] - token = set_current_span(otel_span) - try: - async for chunk in traced: - yield chunk - finally: - detach_context(token) - - response.body_iterator = _iter_with_context() - return response - # ------------------------------------------------------------------ # Endpoint handlers # ------------------------------------------------------------------ @@ -355,19 +295,17 @@ async def _create_invocation_endpoint(self, request: Request) -> Response: request.state.user_isolation_key = request.headers.get("x-agent-user-isolation-key", "") request.state.chat_isolation_key = request.headers.get("x-agent-chat-isolation-key", "") - with self.request_span( - request.headers, invocation_id, "invoke_agent", - operation_name="invoke_agent", session_id=session_id, - end_on_exit=False, - ) as otel_span: - self._safe_set_attrs(otel_span, { - InvocationConstants.ATTR_SPAN_INVOCATION_ID: invocation_id, - InvocationConstants.ATTR_SPAN_SESSION_ID: session_id, - }) - + with self.request_context(request.headers) if hasattr(self, "request_context") else contextlib.nullcontext(): # Propagate invocation/session IDs as W3C baggage so downstream # services receive them automatically via the baggage header. + # Extract incoming baggage from request headers (only baggage, not traceparent) + # to preserve parent-child span relationships while inheriting caller's baggage entries. + _incoming_baggage_ctx = W3CBaggagePropagator().extract( + carrier={"baggage": request.headers.get("baggage", "")} + ) ctx = _otel_context.get_current() + for _bkey, _bval in _otel_baggage.get_all(context=_incoming_baggage_ctx).items(): + ctx = _otel_baggage.set_baggage(_bkey, _bval, context=ctx) ctx = _otel_baggage.set_baggage( "azure.ai.agentserver.invocation_id", invocation_id, context=ctx, ) @@ -385,11 +323,6 @@ async def _create_invocation_endpoint(self, request: Request) -> Response: response.headers[InvocationConstants.INVOCATION_ID_HEADER] = invocation_id response.headers[InvocationConstants.SESSION_ID_HEADER] = session_id except NotImplementedError as exc: - self._safe_set_attrs(otel_span, { - InvocationConstants.ATTR_SPAN_ERROR_CODE: "not_implemented", - InvocationConstants.ATTR_SPAN_ERROR_MESSAGE: str(exc), - }) - end_span(otel_span, exc=exc) logger.error("Invocation %s failed: %s", invocation_id, exc) return create_error_response( "not_implemented", @@ -401,11 +334,6 @@ async def _create_invocation_endpoint(self, request: Request) -> Response: }, ) except Exception as exc: # pylint: disable=broad-exception-caught - self._safe_set_attrs(otel_span, { - InvocationConstants.ATTR_SPAN_ERROR_CODE: "internal_error", - InvocationConstants.ATTR_SPAN_ERROR_MESSAGE: str(exc), - }) - end_span(otel_span, exc=exc) logger.error("Error processing invocation %s: %s", invocation_id, exc, exc_info=True) return create_error_response( "internal_error", @@ -424,10 +352,6 @@ async def _create_invocation_endpoint(self, request: Request) -> Response: except ValueError: pass - if isinstance(response, StreamingResponse): - return self._wrap_streaming_response(response, otel_span) - - end_span(otel_span) return response async def _traced_invocation_endpoint( @@ -443,14 +367,7 @@ async def _traced_invocation_endpoint( raw_session_id = request.query_params.get("agent_session_id", "") session_id = _sanitize_id(raw_session_id, "") if raw_session_id else "" - with self.request_span( - request.headers, invocation_id, span_operation, - operation_name=span_operation, session_id=session_id, - ) as _otel_span: - self._safe_set_attrs(_otel_span, { - InvocationConstants.ATTR_SPAN_INVOCATION_ID: invocation_id, - InvocationConstants.ATTR_SPAN_SESSION_ID: session_id, - }) + with self.request_context(request.headers) if hasattr(self, "request_context") else contextlib.nullcontext(): _ensure_log_filter() inv_token = _invocation_id_var.set(invocation_id) session_token = _session_id_var.set(session_id) @@ -459,11 +376,6 @@ async def _traced_invocation_endpoint( response.headers[InvocationConstants.INVOCATION_ID_HEADER] = invocation_id return response except Exception as exc: # pylint: disable=broad-exception-caught - self._safe_set_attrs(_otel_span, { - InvocationConstants.ATTR_SPAN_ERROR_CODE: "internal_error", - InvocationConstants.ATTR_SPAN_ERROR_MESSAGE: str(exc), - }) - record_error(_otel_span, exc) logger.error("Error in %s %s: %s", span_operation, invocation_id, exc, exc_info=True) return create_error_response( "internal_error", diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/cspell.json b/sdk/agentserver/azure-ai-agentserver-invocations/cspell.json index 5858cd8e195b..e2180fd922d2 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/cspell.json +++ b/sdk/agentserver/azure-ai-agentserver-invocations/cspell.json @@ -4,6 +4,8 @@ "appinsights", "ASGI", "autouse", + "bkey", + "bval", "caplog", "genai", "hypercorn", diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/conftest.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/conftest.py index 8a3deb55c72f..e944ca031e0c 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/tests/conftest.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/conftest.py @@ -5,6 +5,7 @@ import json import os from typing import Any +from unittest.mock import patch import pytest from httpx import ASGITransport, AsyncClient @@ -18,6 +19,18 @@ def pytest_configure(config): config.addinivalue_line("markers", "tracing_e2e: end-to-end tracing tests against live Application Insights") +@pytest.fixture(autouse=True, scope="session") +def _prevent_distro_setup(): + """Prevent microsoft-opentelemetry distro from contaminating global OTel + state during tests. Without this, CI environments that have the distro + installed and APPLICATIONINSIGHTS_CONNECTION_STRING set would trigger + ``use_microsoft_opentelemetry()`` on the first server construction, + installing a global TracerProvider that breaks later traceparent- + propagation tests.""" + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + yield + + # --------------------------------------------------------------------------- # E2E tracing fixtures # --------------------------------------------------------------------------- @@ -115,6 +128,7 @@ def logs_query_client(): def _make_echo_agent(**kwargs: Any) -> InvocationAgentServerHost: """Create an InvocationAgentServerHost whose invoke handler echoes the request body.""" + kwargs.setdefault("configure_observability", None) app = InvocationAgentServerHost(**kwargs) @app.invoke_handler @@ -127,6 +141,7 @@ async def handle(request: Request) -> Response: def _make_streaming_agent(**kwargs: Any) -> InvocationAgentServerHost: """Create an InvocationAgentServerHost whose invoke handler returns 3 JSON chunks.""" + kwargs.setdefault("configure_observability", None) app = InvocationAgentServerHost(**kwargs) @app.invoke_handler @@ -142,6 +157,7 @@ async def generate(): def _make_async_storage_agent(**kwargs: Any) -> InvocationAgentServerHost: """Create an InvocationAgentServerHost with get/cancel handlers and in-memory store.""" + kwargs.setdefault("configure_observability", None) app = InvocationAgentServerHost(**kwargs) store: dict[str, Any] = {} @@ -178,7 +194,7 @@ async def cancel_handler(request: Request) -> Response: def _make_validated_agent() -> InvocationAgentServerHost: """Create an InvocationAgentServerHost with OpenAPI spec.""" - app = InvocationAgentServerHost(openapi_spec=SAMPLE_OPENAPI_SPEC) + app = InvocationAgentServerHost(openapi_spec=SAMPLE_OPENAPI_SPEC, configure_observability=None) @app.invoke_handler async def handle(request: Request) -> Response: @@ -190,6 +206,7 @@ async def handle(request: Request) -> Response: def _make_failing_agent(**kwargs: Any) -> InvocationAgentServerHost: """Create an InvocationAgentServerHost whose handler raises ValueError.""" + kwargs.setdefault("configure_observability", None) app = InvocationAgentServerHost(**kwargs) @app.invoke_handler diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_span_parenting.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_span_parenting.py index 5c31f78b6a8a..42a0b64d708f 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_span_parenting.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_span_parenting.py @@ -1,8 +1,9 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -"""Tests that the invoke_agent span is set as the current span in context, -so that child spans created by framework handlers are correctly parented. +"""Tests that incoming W3C trace context is propagated correctly so that +child spans created by framework handlers are properly parented under the +caller's traceparent (no intermediate invoke_agent span). These tests call the endpoint handler directly (bypassing ASGI transport) because HTTPX's ASGITransport runs the app in a different async context, @@ -57,10 +58,6 @@ def _clear(): _EXPORTER.clear() -def _get_spans(): - return list(_EXPORTER.get_finished_spans()) if _EXPORTER else [] - - def _make_server_with_child_span(): """Server whose handler creates a child span (simulating a framework).""" with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): @@ -93,44 +90,147 @@ async def generate(): return app -def _assert_child_parented(spans, streaming: bool = False): - """Assert the framework span is a child of the invoke_agent span.""" - parent_spans = [s for s in spans if "invoke_agent" in s.name and s.name != "framework_invoke_agent"] - child_spans = [s for s in spans if s.name == "framework_invoke_agent"] +def test_framework_span_parented_under_incoming_traceparent(): + """A span created inside the handler should be parented under the incoming + traceparent — there is no intermediate invoke_agent span. - assert len(parent_spans) >= 1, f"Expected invoke_agent span, got: {[s.name for s in spans]}" - assert len(child_spans) == 1, f"Expected framework span, got: {[s.name for s in spans]}" + Uses a real OTel span + ``inject(headers)`` instead of a synthetic + traceparent string so that the trace context is always propagated + correctly regardless of which TracerProvider or auto-instrumentation + is active in the process (e.g. CI environments). + """ + from opentelemetry.propagate import inject - parent = parent_spans[0] - child = child_spans[0] + server = _make_server_with_child_span() + client = TestClient(server) - label = "streaming" if streaming else "non-streaming" - assert child.parent is not None, f"Framework span has no parent in {label} case" - assert child.parent.span_id == parent.context.span_id, ( - f"Framework span parent ({format(child.parent.span_id, '016x')}) " - f"!= invoke_agent span ({format(parent.context.span_id, '016x')}). " - f"Spans are siblings, not parent-child ({label})." - ) + caller_tracer = trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerOperation") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + headers: dict[str, str] = {} + inject(headers) -def test_framework_span_is_child_of_invoke_span(): - """A span created inside the handler should be a child of the - agentserver invoke_agent span, not a sibling.""" - server = _make_server_with_child_span() - # TestClient runs synchronously in the same thread context, - # so OTel ContextVar propagation works correctly. - client = TestClient(server) - resp = client.post("/invocations", content=b"test") + resp = client.post("/invocations", content=b"test", headers=headers) assert resp.status_code == 200 - _assert_child_parented(_get_spans(), streaming=False) + spans = _EXPORTER.get_finished_spans() + fw_spans = [s for s in spans if s.name == "framework_invoke_agent"] + assert len(fw_spans) == 1, f"Expected framework span, got: {[s.name for s in spans]}" + + fw = fw_spans[0] + # Framework span should share the same trace ID + assert format(fw.context.trace_id, "032x") == caller_trace_id + # Framework span should be parented directly under the caller span + assert fw.parent is not None, "Framework span has no parent" + assert format(fw.parent.span_id, "016x") == caller_span_id + + +def test_framework_span_parented_under_incoming_traceparent_streaming(): + """Same parent-child relationship holds for streaming responses. + Uses a real OTel span + ``inject(headers)`` instead of a synthetic + traceparent string for CI reliability. + """ + from opentelemetry.propagate import inject -def test_framework_span_is_child_streaming(): - """Same parent-child relationship holds for streaming responses.""" server = _make_streaming_server_with_child_span() client = TestClient(server) - resp = client.post("/invocations", content=b"test") + + caller_tracer = trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerStreamOp") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + + headers: dict[str, str] = {} + inject(headers) + + resp = client.post("/invocations", content=b"test", headers=headers) assert resp.status_code == 200 - _assert_child_parented(_get_spans(), streaming=True) + spans = _EXPORTER.get_finished_spans() + fw_spans = [s for s in spans if s.name == "framework_invoke_agent"] + assert len(fw_spans) == 1, f"Expected framework span, got: {[s.name for s in spans]}" + + fw = fw_spans[0] + assert format(fw.context.trace_id, "032x") == caller_trace_id + assert fw.parent is not None, "Framework span has no parent (streaming)" + assert format(fw.parent.span_id, "016x") == caller_span_id + + +def test_no_invoke_agent_span_created(): + """Verify no invoke_agent span is created by the server — only framework spans.""" + server = _make_server_with_child_span() + client = TestClient(server) + client.post("/invocations", content=b"test") + + spans = _EXPORTER.get_finished_spans() + # Only the framework span should exist, not an invoke_agent server span + invoke_spans = [s for s in spans if "invoke_agent" in s.name and s.name != "framework_invoke_agent"] + assert len(invoke_spans) == 0, f"Unexpected invoke_agent spans: {[s.name for s in invoke_spans]}" + + +def test_handler_span_is_child_of_real_caller_span(): + """End-to-end: create a real caller span, propagate its trace context via + traceparent header to /invocations, create a child span inside the handler, + and validate the handler span is a child of the caller span. + + This differs from the synthetic-traceparent tests above by using a real + OTel span as the caller, so both the caller and handler spans appear in + the in-memory exporter and can be validated together. + """ + from opentelemetry.propagate import inject + + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + app = InvocationAgentServerHost() + + handler_tracer = trace.get_tracer("test.handler") + + @app.invoke_handler + async def handle(request: Request) -> Response: + with handler_tracer.start_as_current_span("HandleInvocation"): + body = await request.body() + return Response(content=body, media_type="application/octet-stream") + + # 1. Create a real caller span to act as the external parent + caller_tracer = trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerOperation") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + + # 2. Inject the caller span's context into HTTP headers (traceparent) + headers: dict[str, str] = {} + inject(headers) + + # 3. Send the request with the caller's trace context + client = TestClient(app) + resp = client.post("/invocations", content=b"e2e-test", headers=headers) + assert resp.status_code == 200 + + # 4. Validate the span hierarchy + spans = _EXPORTER.get_finished_spans() + span_by_name = {s.name: s for s in spans} + + assert "CallerOperation" in span_by_name, ( + f"Caller span not found. Spans: {[s.name for s in spans]}" + ) + assert "HandleInvocation" in span_by_name, ( + f"Handler span not found. Spans: {[s.name for s in spans]}" + ) + + caller = span_by_name["CallerOperation"] + handler = span_by_name["HandleInvocation"] + + # Handler span must share the same trace ID as the caller + assert format(handler.context.trace_id, "032x") == caller_trace_id, ( + "Handler span has a different trace ID — trace context was not propagated" + ) + + # Handler span must be a child of the caller span + assert handler.parent is not None, "Handler span has no parent" + assert format(handler.parent.span_id, "016x") == caller_span_id, ( + f"Handler span parent {format(handler.parent.span_id, '016x')} " + f"!= caller span {caller_span_id} — span parenting is broken" + ) diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing.py index 082ad23549ed..67e91d040192 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing.py @@ -151,11 +151,11 @@ async def generate(): # --------------------------------------------------------------------------- def test_tracing_disabled_by_default(): - """Invoke spans are still created by the global tracer when tracing is not explicitly configured.""" + """No invoke_agent span is created — only framework/user spans appear.""" if _MODULE_EXPORTER: _MODULE_EXPORTER.clear() - app = InvocationAgentServerHost() + app = InvocationAgentServerHost(configure_observability=None) @app.invoke_handler async def handle(request: Request) -> Response: @@ -164,77 +164,61 @@ async def handle(request: Request) -> Response: client = TestClient(app) client.post("/invocations", content=b"test") - # With the function-based tracing design, spans are always created - # when OTel is installed (via the global tracer). The difference is - # whether exporters are configured. Verify a span IS created. + # No invoke_agent SERVER span is created (request_context only propagates context) spans = _get_spans() invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 + assert len(invoke_spans) == 0 # --------------------------------------------------------------------------- -# Tracing enabled creates invoke span with correct name +# Tracing enabled — no invoke_agent span created # --------------------------------------------------------------------------- -def test_tracing_enabled_creates_invoke_span(): - """Tracing enabled creates a span named 'invoke_agent'.""" +def test_tracing_enabled_no_invoke_span(): + """Tracing enabled does NOT create an invoke_agent span (context-only propagation).""" server = _make_tracing_server() client = TestClient(server) client.post("/invocations", content=b"test") spans = _get_spans() invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - assert invoke_spans[0].name.startswith("invoke_agent") + assert len(invoke_spans) == 0 # --------------------------------------------------------------------------- -# Invoke error records exception +# Invoke error returns 500 # --------------------------------------------------------------------------- -def test_invoke_error_records_exception(): - """When handler raises, the span records the exception.""" +def test_invoke_error_returns_500(): + """When handler raises, a 500 response is returned.""" server = _make_failing_tracing_server() client = TestClient(server) resp = client.post("/invocations", content=b"test") assert resp.status_code == 500 - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - span = invoke_spans[0] - # Should have error status - assert span.status.status_code.name == "ERROR" - # --------------------------------------------------------------------------- -# GET/cancel create spans +# GET/cancel endpoints still work # --------------------------------------------------------------------------- -def test_get_invocation_creates_span(): - """GET /invocations/{id} creates a span.""" +def test_get_invocation_returns_response(): + """GET /invocations/{id} returns the stored response.""" server = _make_tracing_server_with_get_cancel() client = TestClient(server) resp = client.post("/invocations", content=b"data") inv_id = resp.headers["x-agent-invocation-id"] - client.get(f"/invocations/{inv_id}") + get_resp = client.get(f"/invocations/{inv_id}") + assert get_resp.status_code == 200 - spans = _get_spans() - get_spans = [s for s in spans if "get_invocation" in s.name] - assert len(get_spans) >= 1 - -def test_cancel_invocation_creates_span(): - """POST /invocations/{id}/cancel creates a span.""" +def test_cancel_invocation_returns_response(): + """POST /invocations/{id}/cancel returns cancelled status.""" server = _make_tracing_server_with_get_cancel() client = TestClient(server) resp = client.post("/invocations", content=b"data") inv_id = resp.headers["x-agent-invocation-id"] - client.post(f"/invocations/{inv_id}/cancel") - - spans = _get_spans() - cancel_spans = [s for s in spans if "cancel_invocation" in s.name] - assert len(cancel_spans) >= 1 + cancel_resp = client.post(f"/invocations/{inv_id}/cancel") + assert cancel_resp.status_code == 200 # --------------------------------------------------------------------------- @@ -254,9 +238,10 @@ async def handle(request: Request) -> Response: client = TestClient(app) client.post("/invocations", content=b"test") + # No invoke_agent span (context-only propagation) spans = _get_spans() invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 + assert len(invoke_spans) == 0 # --------------------------------------------------------------------------- @@ -270,7 +255,7 @@ def test_no_tracing_when_no_endpoints(): env.pop("APPLICATIONINSIGHTS_CONNECTION_STRING", None) env.pop("OTEL_EXPORTER_OTLP_ENDPOINT", None) with patch.dict(os.environ, env, clear=True): - app = InvocationAgentServerHost() + app = InvocationAgentServerHost(configure_observability=None) @app.invoke_handler async def handle(request: Request) -> Response: @@ -282,174 +267,244 @@ async def handle(request: Request) -> Response: client = TestClient(app) client.post("/invocations", content=b"test") - # Spans are still created via the global tracer — the difference - # is no exporters are configured to send them anywhere. + # No invoke_agent span spans = _get_spans() invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 + assert len(invoke_spans) == 0 # --------------------------------------------------------------------------- -# Traceparent propagation +# Traceparent propagation — context is set even without a span # --------------------------------------------------------------------------- def test_traceparent_propagation(): - """Server propagates traceparent header into span context.""" - server = _make_tracing_server() + """Server propagates traceparent header into OTel context for framework spans. + + Uses a real OTel span + ``inject(headers)`` instead of a synthetic + traceparent string for CI reliability. + """ + from opentelemetry import trace as _trace + from opentelemetry.propagate import inject + + captured_trace_id = None + captured_parent_id = None - # Create a traceparent - trace_id_hex = uuid.uuid4().hex - span_id_hex = uuid.uuid4().hex[:16] - traceparent = f"00-{trace_id_hex}-{span_id_hex}-01" + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + server = InvocationAgentServerHost() + + @server.invoke_handler + async def handle(request: Request) -> Response: + nonlocal captured_trace_id, captured_parent_id + # Create a framework span — it should inherit the incoming traceparent + tracer = _trace.get_tracer("test-framework") + with tracer.start_as_current_span("framework_op") as span: + captured_trace_id = format(span.context.trace_id, "032x") + captured_parent_id = format(span.parent.span_id, "016x") if span.parent else None + return Response(content=b"ok") client = TestClient(server) - client.post( - "/invocations", - content=b"test", - headers={"traceparent": traceparent}, - ) - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - span = invoke_spans[0] - # The span should have the same trace ID as the traceparent - actual_trace_id = format(span.context.trace_id, "032x") - assert actual_trace_id == trace_id_hex + caller_tracer = _trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerOp") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + + headers: dict[str, str] = {} + inject(headers) + + client.post( + "/invocations", + content=b"test", + headers=headers, + ) + + assert captured_trace_id == caller_trace_id + assert captured_parent_id == caller_span_id # --------------------------------------------------------------------------- -# Streaming spans +# Streaming responses still work # --------------------------------------------------------------------------- -def test_streaming_creates_span(): - """Streaming response creates and completes a span.""" +def test_streaming_returns_response(): + """Streaming response is returned successfully.""" server = _make_streaming_tracing_server() client = TestClient(server) resp = client.post("/invocations", content=b"test") assert resp.status_code == 200 - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - # --------------------------------------------------------------------------- -# GenAI attributes on invoke span +# Incoming W3C baggage propagation # --------------------------------------------------------------------------- -def test_genai_attributes_on_invoke_span(): - """Invoke span has GenAI semantic convention attributes.""" - server = _make_tracing_server() +def test_incoming_baggage_merged_into_context(): + """Incoming W3C baggage header entries are merged into OTel context.""" + from opentelemetry import baggage as _otel_baggage + + captured_baggage = {} + + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + server = InvocationAgentServerHost() + + @server.invoke_handler + async def handle(request: Request) -> Response: + captured_baggage.update(_otel_baggage.get_all()) + return Response(content=b"ok") + client = TestClient(server) - client.post("/invocations", content=b"test") + client.post( + "/invocations", + content=b"test", + headers={"baggage": "user.id=test-user-123,custom.key=custom-value"}, + ) - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - attrs = dict(invoke_spans[0].attributes) + # Incoming baggage entries should be present + assert captured_baggage.get("user.id") == "test-user-123" + assert captured_baggage.get("custom.key") == "custom-value" - assert attrs.get("gen_ai.provider.name") == "AzureAI Hosted Agents" - assert attrs.get("gen_ai.system") == "azure.ai.agentserver" - assert attrs.get("service.name") == "azure.ai.agentserver" +def test_sdk_set_baggage_available_in_handler(): + """SDK-set baggage entries (invocation_id, session_id) are available in handler context.""" + from opentelemetry import baggage as _otel_baggage -# --------------------------------------------------------------------------- -# Session ID in microsoft.session.id -# --------------------------------------------------------------------------- + captured_baggage = {} + + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + server = InvocationAgentServerHost() + + @server.invoke_handler + async def handle(request: Request) -> Response: + captured_baggage.update(_otel_baggage.get_all()) + return Response(content=b"ok") -def test_session_id_in_conversation_id(): - """Session ID is set as microsoft.session.id on invoke span.""" - server = _make_tracing_server() client = TestClient(server) client.post( - "/invocations?agent_session_id=test-session", + "/invocations", content=b"test", + headers={ + "x-agent-invocation-id": "inv-test-42", + "baggage": "caller.key=caller-value", + }, ) - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - attrs = dict(invoke_spans[0].attributes) - assert attrs.get("microsoft.session.id") == "test-session" + # SDK-set baggage entries + assert captured_baggage.get("azure.ai.agentserver.invocation_id") == "inv-test-42" + assert "azure.ai.agentserver.session_id" in captured_baggage + # Incoming caller baggage is also preserved + assert captured_baggage.get("caller.key") == "caller-value" -# --------------------------------------------------------------------------- -# GenAI attributes on get_invocation span -# --------------------------------------------------------------------------- +def test_incoming_baggage_does_not_break_span_parenting(): + """Incoming baggage header does not break parent-child span relationships. + Framework spans created inside the handler should be parented under the + incoming traceparent (no intermediate invoke_agent span). -def test_genai_attributes_on_get_span(): - """GET invocation span has GenAI attributes.""" - server = _make_tracing_server_with_get_cancel() - client = TestClient(server) - resp = client.post("/invocations", content=b"data") - inv_id = resp.headers["x-agent-invocation-id"] - client.get(f"/invocations/{inv_id}") + Uses a real OTel span + ``inject(headers)`` for CI reliability. + """ + from opentelemetry import trace as _trace + from opentelemetry.propagate import inject - spans = _get_spans() - get_spans = [s for s in spans if "get_invocation" in s.name] - assert len(get_spans) >= 1 - attrs = dict(get_spans[0].attributes) - assert attrs.get("gen_ai.system") == "azure.ai.agentserver" - assert attrs.get("gen_ai.provider.name") == "AzureAI Hosted Agents" + captured_trace_id = None + captured_parent_id = None + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + server = InvocationAgentServerHost() -# --------------------------------------------------------------------------- -# Namespaced invocation_id attribute -# --------------------------------------------------------------------------- + @server.invoke_handler + async def handle(request: Request) -> Response: + nonlocal captured_trace_id, captured_parent_id + tracer = _trace.get_tracer("test-framework") + with tracer.start_as_current_span("framework_op") as span: + captured_trace_id = format(span.context.trace_id, "032x") + captured_parent_id = format(span.parent.span_id, "016x") if span.parent else None + return Response(content=b"ok") -def test_namespaced_invocation_id_attribute(): - """Invoke span has azure.ai.agentserver.invocations.invocation_id.""" - server = _make_tracing_server() client = TestClient(server) - resp = client.post("/invocations", content=b"test") - inv_id = resp.headers["x-agent-invocation-id"] - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - attrs = dict(invoke_spans[0].attributes) - assert attrs.get("azure.ai.agentserver.invocations.invocation_id") == inv_id + caller_tracer = _trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerBaggageOp") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + headers: dict[str, str] = {"baggage": "user.id=test-user-456"} + inject(headers) -# --------------------------------------------------------------------------- -# Agent name/version in span names -# --------------------------------------------------------------------------- + client.post( + "/invocations", + content=b"test", + headers=headers, + ) -def test_agent_name_in_span_name(): - """Agent name from env var appears in span name.""" - with patch.dict(os.environ, { - "FOUNDRY_AGENT_NAME": "my-agent", - "FOUNDRY_AGENT_VERSION": "2.0", - }): - server = _make_tracing_server() + # Framework span inherits trace ID and parents directly under incoming span + assert captured_trace_id == caller_trace_id + assert captured_parent_id == caller_span_id + +def test_incoming_baggage_empty_header(): + """Empty baggage header does not cause errors.""" + server = _make_tracing_server() client = TestClient(server) - client.post("/invocations", content=b"test") + resp = client.post( + "/invocations", + content=b"test", + headers={"baggage": ""}, + ) + assert resp.status_code == 200 - spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - assert "my-agent" in invoke_spans[0].name - assert "2.0" in invoke_spans[0].name +def test_incoming_baggage_stamped_on_handler_spans(): + """Incoming W3C baggage entries (including invocation_id) are stamped + as span attributes on spans created inside the handler via the + FoundryEnrichmentSpanProcessor. -def test_agent_name_only_in_span_name(): - """Agent name without version in span name.""" - env_override = {"FOUNDRY_AGENT_NAME": "solo-agent"} - env_copy = os.environ.copy() - env_copy.pop("FOUNDRY_AGENT_VERSION", None) - env_copy.update(env_override) - with patch.dict(os.environ, env_copy, clear=True): - server = _make_tracing_server() + Uses a real OTel span + ``inject(headers)`` for CI reliability. + """ + from opentelemetry import trace as _trace + from opentelemetry.propagate import inject + from azure.ai.agentserver.core._tracing import _FoundryEnrichmentSpanProcessor + + # Add the enrichment processor to the test provider so baggage → span attrs works + proc = _FoundryEnrichmentSpanProcessor() + _MODULE_PROVIDER.add_span_processor(proc) + + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=00000000-0000-0000-0000-000000000000"}): + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + server = InvocationAgentServerHost() + + @server.invoke_handler + async def handle(request: Request) -> Response: + tracer = _trace.get_tracer("test-handler") + with tracer.start_as_current_span("handler_work"): + body = await request.body() + return Response(content=body, media_type="application/octet-stream") client = TestClient(server) - client.post("/invocations", content=b"test") + + caller_tracer = _trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerStampOp") as caller_span: + headers: dict[str, str] = {"baggage": "user.id=test-user-789,custom.key=custom-value"} + inject(headers) + + client.post( + "/invocations", + content=b"test", + headers=headers, + ) spans = _get_spans() - invoke_spans = [s for s in spans if "invoke_agent" in s.name] - assert len(invoke_spans) >= 1 - assert "solo-agent" in invoke_spans[0].name + handler_spans = [s for s in spans if s.name == "handler_work"] + assert handler_spans, f"Expected handler_work span, found: {[s.name for s in spans]}" + + attrs = dict(handler_spans[0].attributes) + # invocation_id is set by the invocations package and stamped by the enricher + assert "azure.ai.agentserver.invocations.invocation_id" in attrs + # session_id is also set as baggage and stamped by the enricher + assert "microsoft.session.id" in attrs # --------------------------------------------------------------------------- diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing_e2e.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing_e2e.py index 359799ce90f3..487cda4a0e88 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing_e2e.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_tracing_e2e.py @@ -11,6 +11,7 @@ ``APPLICATIONINSIGHTS_CONNECTION_STRING`` is not set. """ import time +import uuid from datetime import timedelta import pytest @@ -27,9 +28,6 @@ _APPINSIGHTS_POLL_TIMEOUT = 300 _APPINSIGHTS_POLL_INTERVAL = 15 -# Attribute key that InvocationAgentServerHost stamps on each span. -_INVOCATION_ID_ATTR = "azure.ai.agentserver.invocations.invocation_id" - def _flush_provider(): """Force-flush the global TracerProvider so exporters send data.""" @@ -40,13 +38,20 @@ def _flush_provider(): def _poll_appinsights(logs_client, resource_id, query, *, timeout=_APPINSIGHTS_POLL_TIMEOUT): """Poll Application Insights until the KQL query returns >= 1 row or timeout.""" + from azure.core.exceptions import ServiceRequestError + deadline = time.monotonic() + timeout while time.monotonic() < deadline: - response = logs_client.query_resource( - resource_id, - query, - timespan=timedelta(minutes=30), - ) + try: + response = logs_client.query_resource( + resource_id, + query, + timespan=timedelta(minutes=30), + ) + except ServiceRequestError: + # Transient network issues (DNS, connection reset) — retry after interval + time.sleep(_APPINSIGHTS_POLL_INTERVAL) + continue if response.tables and response.tables[0].rows: return response.tables[0].rows time.sleep(_APPINSIGHTS_POLL_INTERVAL) @@ -58,21 +63,30 @@ def _poll_appinsights(logs_client, resource_id, query, *, timeout=_APPINSIGHTS_P # --------------------------------------------------------------------------- class TestInvocationTracingE2E: - """Verify InvocationAgentServerHost auto-creates traced spans that land in App Insights.""" + """Verify that user-created spans inside InvocationAgentServerHost handlers land in App Insights.""" @pytest.mark.asyncio - async def test_invocation_span_in_appinsights( + async def test_handler_span_in_appinsights( self, appinsights_connection_string, appinsights_resource_id, logs_query_client, ): - """POST to /invocations and verify the span appears in App Insights requests table.""" + """POST to /invocations with a handler that creates a span, verify it appears in App Insights. + + The InvocationAgentServerHost propagates W3C trace context but does not + create its own invoke_agent span. This test verifies that a user-created + span inside the handler is correctly exported to App Insights. + """ + handler_tracer = trace.get_tracer("test.invocation.handler") + unique_span_name = f"HandlerWork-{uuid.uuid4().hex[:8]}" + app = InvocationAgentServerHost() @app.invoke_handler async def handle(request: Request) -> Response: - body = await request.body() + with handler_tracer.start_as_current_span(unique_span_name): + body = await request.body() return Response(content=body, media_type="application/octet-stream") transport = ASGITransport(app=app) @@ -80,18 +94,107 @@ async def handle(request: Request) -> Response: resp = await client.post("/invocations", content=b"hello e2e") assert resp.status_code == 200 - invocation_id = resp.headers.get("x-agent-invocation-id") - assert invocation_id, "Expected x-agent-invocation-id in response headers" _flush_provider() query = ( - "requests " - f"| where tostring(customDimensions['{_INVOCATION_ID_ATTR}']) == '{invocation_id}' " - "| project name, timestamp, duration, success, customDimensions " + "dependencies " + f"| where name == '{unique_span_name}' " + "| project name, timestamp, duration, success, operation_Id " "| take 1" ) rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) assert len(rows) > 0, ( - f"invoke_agent span with invocation_id={invocation_id} not found in " - f"App Insights requests table after {_APPINSIGHTS_POLL_TIMEOUT}s" + f"Handler span '{unique_span_name}' not found in " + f"App Insights dependencies table after {_APPINSIGHTS_POLL_TIMEOUT}s" + ) + + +class TestSpanParentingE2E: + """Verify that a child span created inside the invocation handler is + correctly parented under an external caller span, with the full + parent-child relationship visible in Application Insights.""" + + @pytest.mark.asyncio + async def test_handler_child_span_parented_under_caller_in_appinsights( + self, + appinsights_connection_string, + appinsights_resource_id, + logs_query_client, + ): + """End-to-end: create a real caller span, propagate its trace context + via traceparent header to /invocations, create a child span inside the + handler, flush to App Insights, and validate the parent-child + relationship via KQL. + + Expected hierarchy in App Insights: + CallerOperation (dependencies) → HandleInvocation (dependencies) + Both share the same operation_Id (trace ID), and HandleInvocation's + operation_ParentId equals the caller span's id. + """ + from opentelemetry.propagate import inject + + app = InvocationAgentServerHost() + handler_tracer = trace.get_tracer("test.handler") + + @app.invoke_handler + async def handle(request: Request) -> Response: + with handler_tracer.start_as_current_span("HandleInvocation"): + body = await request.body() + return Response(content=body, media_type="application/octet-stream") + + # 1. Create a real caller span + caller_tracer = trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerOperation") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + + # 2. Inject the caller's trace context into HTTP headers + headers: dict[str, str] = {} + inject(headers) + + # 3. Send the request with the propagated trace context + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + resp = await client.post("/invocations", content=b"parenting e2e", headers=headers) + + assert resp.status_code == 200 + + _flush_provider() + + # 4. Query App Insights for both spans in this trace + query = ( + "dependencies " + f"| where operation_Id == '{caller_trace_id}' " + "| where name in ('CallerOperation', 'HandleInvocation') " + "| project name, id, operation_ParentId, operation_Id " + ) + rows = _poll_appinsights(logs_query_client, appinsights_resource_id, query) + assert len(rows) >= 2, ( + f"Expected at least 2 spans (CallerOperation + HandleInvocation) " + f"in trace {caller_trace_id}, but found {len(rows)} after " + f"{_APPINSIGHTS_POLL_TIMEOUT}s" + ) + + # Build a lookup by span name + columns = {name: idx for idx, name in enumerate(["name", "id", "operation_ParentId", "operation_Id"])} + span_by_name = {} + for row in rows: + span_name = row[columns["name"]] + span_by_name[span_name] = row + + assert "CallerOperation" in span_by_name, ( + f"CallerOperation span not found. Found: {[r[columns['name']] for r in rows]}" + ) + assert "HandleInvocation" in span_by_name, ( + f"HandleInvocation span not found. Found: {[r[columns['name']] for r in rows]}" + ) + + caller_row = span_by_name["CallerOperation"] + handler_row = span_by_name["HandleInvocation"] + + # HandleInvocation's parent must be the caller span + assert handler_row[columns["operation_ParentId"]] == caller_row[columns["id"]], ( + f"HandleInvocation parent ({handler_row[columns['operation_ParentId']]}) " + f"!= CallerOperation id ({caller_row[columns['id']]}). " + f"Span parenting is broken in App Insights." ) diff --git a/sdk/agentserver/azure-ai-agentserver-responses/azure/ai/agentserver/responses/hosting/_endpoint_handler.py b/sdk/agentserver/azure-ai-agentserver-responses/azure/ai/agentserver/responses/hosting/_endpoint_handler.py index 85dcc182c35b..dd87dede29ed 100644 --- a/sdk/agentserver/azure-ai-agentserver-responses/azure/ai/agentserver/responses/hosting/_endpoint_handler.py +++ b/sdk/agentserver/azure-ai-agentserver-responses/azure/ai/agentserver/responses/hosting/_endpoint_handler.py @@ -11,6 +11,7 @@ from __future__ import annotations import asyncio # pylint: disable=do-not-import-asyncio +import contextlib import contextvars import logging import threading @@ -18,15 +19,12 @@ from opentelemetry import baggage as _otel_baggage from opentelemetry import context as _otel_context +from opentelemetry.baggage.propagation import W3CBaggagePropagator from starlette.requests import Request from starlette.responses import JSONResponse, Response, StreamingResponse from azure.ai.agentserver.core import ( # pylint: disable=import-error,no-name-in-module - detach_context, - end_span, flush_spans, - set_current_span, - trace_stream, ) from azure.ai.agentserver.responses.models._generated import ( AgentReference, @@ -99,25 +97,6 @@ logger = logging.getLogger("azure.ai.agentserver") -# OTel span attribute keys for error tagging (§7.2) -_ATTR_ERROR_CODE = "azure.ai.agentserver.responses.error.code" -_ATTR_ERROR_MESSAGE = "azure.ai.agentserver.responses.error.message" - - -def _classify_error_code(exc: BaseException) -> str: - """Return an error code string for an exception, matching API error classification. - - :param exc: The exception to classify. - :type exc: BaseException - :return: An error code string. - :rtype: str - """ - if isinstance(exc, RequestValidationError): - return exc.code - if isinstance(exc, ValueError): - return "invalid_request" - return "internal_error" - def _extract_isolation(request: Request) -> IsolationContext: """Build an ``IsolationContext`` from platform-injected request headers. @@ -290,7 +269,7 @@ def __init__( :type response_headers: dict[str, str] :param sse_headers: SSE-specific headers (e.g. connection, cache-control). :type sse_headers: dict[str, str] - :param host: The ``ResponsesAgentServerHost`` instance (provides ``request_span``). + :param host: The ``ResponsesAgentServerHost`` instance (provides ``request_context``). :type host: ResponsesAgentServerHost :param provider: Persistence provider for response envelopes and input items. :type provider: ResponseProviderProtocol @@ -318,27 +297,6 @@ def __init__( ], ) - # ------------------------------------------------------------------ - # Span attribute helper - # ------------------------------------------------------------------ - - @staticmethod - def _safe_set_attrs(span: Any, attrs: dict[str, str]) -> None: - """Safely set attributes on an OTel span. - - :param span: The OTel span, or *None*. - :type span: Any - :param attrs: Key-value attributes to set. - :type attrs: dict[str, str] - """ - if span is None: - return - try: - for key, value in attrs.items(): - span.set_attribute(key, value) - except Exception: # pylint: disable=broad-exception-caught - logger.debug("Failed to set span attributes: %s", list(attrs.keys()), exc_info=True) - # ------------------------------------------------------------------ # §8: Session ID response header helper # ------------------------------------------------------------------ @@ -386,49 +344,6 @@ async def _monitor_disconnect(self, request: Request, cancellation_signal: async return await asyncio.sleep(0.5) - def _wrap_streaming_response( - self, - response: StreamingResponse, - otel_span: Any, - ) -> StreamingResponse: - """Wrap a streaming response's body iterator with span lifecycle and context. - - Two layers of wrapping are applied: - - 1. **Inner (tracing):** ``trace_stream`` wraps the body iterator so - the OTel span covers the full streaming duration and is ended - when iteration completes. - 2. **Outer (context):** A second async generator re-attaches the span - as the current context for the duration of streaming, so that - child spans created by user handler code (e.g. Agent Framework) - are correctly parented under this span. - - :param response: The ``StreamingResponse`` to wrap. - :type response: StreamingResponse - :param otel_span: The OTel span (or *None* when tracing is disabled). - :type otel_span: Any - :return: The same response object, with its body_iterator replaced. - :rtype: StreamingResponse - """ - if otel_span is None: - return response - - # Inner wrap: trace_stream ends the span when iteration completes. - traced = trace_stream(response.body_iterator, otel_span) - - # Outer wrap: re-attach span as current context during streaming - # so child spans are correctly parented. - async def _iter_with_context(): # type: ignore[return] - token = set_current_span(otel_span) - try: - async for chunk in traced: - yield chunk - finally: - detach_context(token) - - response.body_iterator = _iter_with_context() - return response - # ------------------------------------------------------------------ # ResponseContext factory # ------------------------------------------------------------------ @@ -704,19 +619,19 @@ async def handle_create(self, request: Request) -> Response: # pylint: disable= span.set_tags(build_create_span_tags(ctx, request_id=request_id, project_id=_project_id)) - # Start OTel request span using host's request_span context manager. - with self._host.request_span( - request.headers, - response_id, - "invoke_agent", - operation_name="invoke_agent", - session_id=agent_session_id or "", - end_on_exit=False, - ) as otel_span: - self._safe_set_attrs(otel_span, build_create_otel_attrs(ctx, request_id=request_id, project_id=_project_id)) - + # Attach incoming W3C trace context (no span created). + with self._host.request_context(request.headers) if hasattr(self._host, "request_context") else contextlib.nullcontext(): # Set W3C baggage per spec §7.3 + # Extract incoming baggage from request headers (only baggage, not traceparent) + # to preserve parent-child span relationships while inheriting caller's baggage entries. + _incoming_baggage_ctx = W3CBaggagePropagator().extract( + carrier={"baggage": request.headers.get("baggage", "")} + ) bag_ctx = _otel_context.get_current() + # Merge incoming baggage entries (e.g. user.id) onto current context + for _bkey, _bval in _otel_baggage.get_all(context=_incoming_baggage_ctx).items(): + bag_ctx = _otel_baggage.set_baggage(_bkey, _bval, context=bag_ctx) + bag_ctx = _otel_baggage.set_baggage("azure.ai.agentserver.response_id", response_id, context=bag_ctx) bag_ctx = _otel_baggage.set_baggage( "azure.ai.agentserver.conversation_id", ctx.conversation_id or "", context=bag_ctx @@ -759,8 +674,7 @@ async def _iter_with_cleanup(): # type: ignore[return] media_type="text/event-stream", headers={**self._sse_headers, **self._session_headers(agent_session_id)}, ) - wrapped = self._wrap_streaming_response(sse_response, otel_span) - return wrapped + return sse_response if not ctx.background: disconnect_task = asyncio.create_task(self._monitor_disconnect(request, ctx.cancellation_signal)) @@ -772,7 +686,6 @@ async def _iter_with_cleanup(): # type: ignore[return] snapshot.get("status"), len(snapshot.get("output", [])), ) - end_span(otel_span) return JSONResponse(snapshot, status_code=200, headers=self._session_headers(agent_session_id)) except _HandlerError as exc: logger.error( @@ -780,14 +693,6 @@ async def _iter_with_cleanup(): # type: ignore[return] ctx.response_id, exc_info=exc.original, ) - self._safe_set_attrs( - otel_span, - { - _ATTR_ERROR_CODE: _classify_error_code(exc.original), - _ATTR_ERROR_MESSAGE: str(exc.original), - }, - ) - end_span(otel_span, exc=exc.original) # Handler errors are server-side faults, not client errors err_body = { "error": { @@ -807,18 +712,9 @@ async def _iter_with_cleanup(): # type: ignore[return] ctx.response_id, snapshot.get("status"), ) - end_span(otel_span) return JSONResponse(snapshot, status_code=200, headers=self._session_headers(agent_session_id)) except _HandlerError as exc: logger.error("Handler error in create (response_id=%s)", ctx.response_id, exc_info=exc.original) - self._safe_set_attrs( - otel_span, - { - _ATTR_ERROR_CODE: _classify_error_code(exc.original), - _ATTR_ERROR_MESSAGE: str(exc.original), - }, - ) - end_span(otel_span, exc=exc) # Handler errors are server-side faults, not client errors err_body = { "error": { @@ -835,14 +731,6 @@ async def _iter_with_cleanup(): # type: ignore[return] ) except Exception as exc: # pylint: disable=broad-exception-caught logger.error("Unexpected error in create (response_id=%s)", ctx.response_id, exc_info=exc) - self._safe_set_attrs( - otel_span, - { - _ATTR_ERROR_CODE: _classify_error_code(exc), - _ATTR_ERROR_MESSAGE: str(exc), - }, - ) - end_span(otel_span, exc=exc) raise finally: _response_id_var.reset(rid_token) diff --git a/sdk/agentserver/azure-ai-agentserver-responses/cspell.json b/sdk/agentserver/azure-ai-agentserver-responses/cspell.json index 173bf9281425..69f59055e4b8 100644 --- a/sdk/agentserver/azure-ai-agentserver-responses/cspell.json +++ b/sdk/agentserver/azure-ai-agentserver-responses/cspell.json @@ -21,7 +21,9 @@ "JVBE", "hdrs", "myproj", - "myhost" + "myhost", + "bkey", + "bval" ], "ignorePaths": [ "*.csv", diff --git a/sdk/agentserver/azure-ai-agentserver-responses/tests/conftest.py b/sdk/agentserver/azure-ai-agentserver-responses/tests/conftest.py index 9d834c339b88..740d9bd03aa8 100644 --- a/sdk/agentserver/azure-ai-agentserver-responses/tests/conftest.py +++ b/sdk/agentserver/azure-ai-agentserver-responses/tests/conftest.py @@ -5,7 +5,22 @@ import sys from pathlib import Path +from unittest.mock import patch + +import pytest _PROJECT_ROOT = str(Path(__file__).resolve().parent.parent) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) + + +@pytest.fixture(autouse=True, scope="session") +def _prevent_distro_setup(): + """Prevent microsoft-opentelemetry distro from contaminating global OTel + state during tests. Without this, CI environments that have the distro + installed and APPLICATIONINSIGHTS_CONNECTION_STRING set would trigger + ``use_microsoft_opentelemetry()`` on the first server construction, + installing a global TracerProvider that breaks later traceparent- + propagation tests.""" + with patch("azure.ai.agentserver.core._tracing._setup_distro_export", create=True): + yield diff --git a/sdk/agentserver/azure-ai-agentserver-responses/tests/contract/test_tracing.py b/sdk/agentserver/azure-ai-agentserver-responses/tests/contract/test_tracing.py index 7a05437d118f..e17320cfe356 100644 --- a/sdk/agentserver/azure-ai-agentserver-responses/tests/contract/test_tracing.py +++ b/sdk/agentserver/azure-ai-agentserver-responses/tests/contract/test_tracing.py @@ -10,6 +10,7 @@ from typing import Any +import pytest from starlette.testclient import TestClient from azure.ai.agentserver.responses import ResponsesAgentServerHost, ResponsesServerOptions @@ -26,7 +27,7 @@ async def _events(): def _build_client(hook: InMemoryCreateSpanHook | None = None) -> TestClient: options = ResponsesServerOptions(create_span_hook=hook) - app = ResponsesAgentServerHost(options=options) + app = ResponsesAgentServerHost(options=options, configure_observability=None) app.response_handler(_noop_handler) return TestClient(app) @@ -215,3 +216,171 @@ def test_tracing__span_tags_omit_request_id_when_header_absent() -> None: ) assert "request.id" not in hook.spans[0].tags + + +# --------------------------------------------------------------------------- +# Incoming W3C baggage propagation +# --------------------------------------------------------------------------- + + +def test_tracing__incoming_baggage_merged_into_context() -> None: + """Incoming W3C baggage header entries are merged into OTel context.""" + try: + from opentelemetry import baggage as _otel_baggage + except ImportError: + pytest.skip("opentelemetry SDK not installed") + + captured_baggage: dict = {} + + def _baggage_capture_handler(request, context, cancellation_signal): + captured_baggage.update(_otel_baggage.get_all()) + + async def _events(): + if False: # pragma: no cover + yield None + + return _events() + + options = ResponsesServerOptions() + app = ResponsesAgentServerHost(options=options) + app.response_handler(_baggage_capture_handler) + client = TestClient(app) + + client.post( + "/responses", + json={"model": "gpt-4o-mini", "input": "hi", "stream": False}, + headers={"baggage": "user.id=test-user-789,custom.key=custom-value"}, + ) + + # Incoming baggage entries should be present + assert captured_baggage.get("user.id") == "test-user-789" + assert captured_baggage.get("custom.key") == "custom-value" + + +def test_tracing__framework_span_parented_under_incoming_traceparent() -> None: + """A span created inside the handler is parented directly under the + incoming traceparent — no intermediate invoke_agent span. + + Uses a real OTel span + ``inject(headers)`` instead of a synthetic + traceparent string so that the trace context is always propagated + correctly regardless of which TracerProvider or auto-instrumentation + is active in the process (e.g. CI environments with + microsoft-opentelemetry installed). + """ + try: + from opentelemetry import trace + from opentelemetry.propagate import inject + from opentelemetry.sdk.trace import TracerProvider as SdkTracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + except ImportError: + pytest.skip("opentelemetry SDK not installed") + + exporter = InMemorySpanExporter() + existing = trace.get_tracer_provider() + if hasattr(existing, "add_span_processor"): + existing.add_span_processor(SimpleSpanProcessor(exporter)) + else: + provider = SdkTracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + + captured_trace_id = None + captured_parent_id = None + + def _span_handler(request, context, cancellation_signal): + nonlocal captured_trace_id, captured_parent_id + tracer = trace.get_tracer("test.framework") + with tracer.start_as_current_span("framework_create_response") as span: + captured_trace_id = format(span.context.trace_id, "032x") + captured_parent_id = format(span.parent.span_id, "016x") if span.parent else None + + async def _events(): + if False: # pragma: no cover + yield None + + return _events() + + options = ResponsesServerOptions() + app = ResponsesAgentServerHost(options=options, configure_observability=None) + app.response_handler(_span_handler) + client = TestClient(app) + + # Create a real caller span and inject its trace context into headers. + caller_tracer = trace.get_tracer("test.caller") + with caller_tracer.start_as_current_span("CallerOperation") as caller_span: + caller_trace_id = format(caller_span.context.trace_id, "032x") + caller_span_id = format(caller_span.context.span_id, "016x") + + headers: dict[str, str] = {"baggage": "user.id=test-user-parenting"} + inject(headers) + + resp = client.post( + "/responses", + json={"model": "gpt-4o-mini", "input": "hi", "stream": False}, + headers=headers, + ) + assert resp.status_code == 200 + + # Framework span should share the same trace ID as the caller span + assert captured_trace_id == caller_trace_id + # Framework span should be parented directly under the caller span + assert captured_parent_id == caller_span_id + + # Verify via exporter as well + spans = exporter.get_finished_spans() + fw_spans = [s for s in spans if s.name == "framework_create_response"] + assert len(fw_spans) == 1 + fw = fw_spans[0] + assert format(fw.context.trace_id, "032x") == caller_trace_id + assert fw.parent is not None + assert format(fw.parent.span_id, "016x") == caller_span_id + + +def test_tracing__incoming_baggage_empty_header_no_error() -> None: + """Empty baggage header does not cause errors.""" + client = _build_client() + resp = client.post( + "/responses", + json={"model": "gpt-4o-mini", "input": "hi", "stream": False}, + headers={"baggage": ""}, + ) + assert resp.status_code == 200 + + +def test_tracing__sdk_set_baggage_available_in_handler() -> None: + """SDK-set baggage entries (response_id, conversation_id, streaming) + and incoming caller baggage are available inside the response handler.""" + try: + from opentelemetry import baggage as _otel_baggage + except ImportError: + pytest.skip("opentelemetry SDK not installed") + + captured_baggage: dict = {} + + def _baggage_capture_handler(request, context, cancellation_signal): + captured_baggage.update(_otel_baggage.get_all()) + + async def _events(): + if False: # pragma: no cover + yield None + + return _events() + + options = ResponsesServerOptions() + app = ResponsesAgentServerHost(options=options) + app.response_handler(_baggage_capture_handler) + client = TestClient(app) + + client.post( + "/responses", + json={"model": "gpt-4o-mini", "input": "hi", "stream": False}, + headers={"baggage": "caller.key=caller-value"}, + ) + + # SDK-set baggage entries + assert "azure.ai.agentserver.response_id" in captured_baggage + assert "azure.ai.agentserver.conversation_id" in captured_baggage + assert "azure.ai.agentserver.streaming" in captured_baggage + # Incoming caller baggage is also preserved + assert captured_baggage.get("caller.key") == "caller-value"