From 07b28e7daa19bea3c8d9f7582ad34e4def575bc7 Mon Sep 17 00:00:00 2001 From: Nithish-KV Date: Fri, 13 Mar 2026 14:01:59 +0530 Subject: [PATCH] [NET-371] feat: Add Claude Agent SDK instrumentation --- netra/instrumentation/__init__.py | 24 ++++ .../claude_agent_sdk/__init__.py | 84 ++++++++++++ .../instrumentation/claude_agent_sdk/utils.py | 123 ++++++++++++++++++ .../claude_agent_sdk/version.py | 1 + .../claude_agent_sdk/wrappers.py | 107 +++++++++++++++ netra/instrumentation/instruments.py | 1 + 6 files changed, 340 insertions(+) create mode 100644 netra/instrumentation/claude_agent_sdk/__init__.py create mode 100644 netra/instrumentation/claude_agent_sdk/utils.py create mode 100644 netra/instrumentation/claude_agent_sdk/version.py create mode 100644 netra/instrumentation/claude_agent_sdk/wrappers.py diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index 49f27ae..f3f61c9 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -313,6 +313,10 @@ def init_instrumentations( if CustomInstruments.ELEVENLABS in netra_custom_instruments: init_elevenlabs_instrumentation() + # Initialize claude_agent_sdk instrumentation. + if CustomInstruments.CLAUDE_AGENT_SDK in netra_custom_instruments: + init_claude_agent_sdk_instrumentation() + def init_groq_instrumentation() -> bool: """Initialize Groq instrumentation.""" @@ -1343,3 +1347,23 @@ def init_elevenlabs_instrumentation() -> bool: logging.error(f"Error initializing Elevenlabs instrumentor: {e}") Telemetry().log_exception(e) return False + + +def init_claude_agent_sdk_instrumentation() -> bool: + """Initialize Claude Agent SDK instrumentation. + + Returns: + bool: True if initialization was successful, False otherwise. + """ + try: + if is_package_installed("claude-agent-sdk"): + from netra.instrumentation.claude_agent_sdk import NetraClaudeAgentSDKInstrumentor + + instrumentor = NetraClaudeAgentSDKInstrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument() + return True + except Exception as e: + logging.error(f"Error initializing Claude Agent SDK instrumentor: {e}") + Telemetry().log_exception(e) + return False \ No newline at end of file diff --git a/netra/instrumentation/claude_agent_sdk/__init__.py b/netra/instrumentation/claude_agent_sdk/__init__.py new file mode 100644 index 0000000..7065b85 --- /dev/null +++ b/netra/instrumentation/claude_agent_sdk/__init__.py @@ -0,0 +1,84 @@ +import wrapt +import logging +from opentelemetry.trace import Tracer, get_tracer +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + +from netra.instrumentation.claude_agent_sdk.version import __version__ +from netra.instrumentation.claude_agent_sdk.wrappers import ( + client_query_wrapper, + client_response_wrapper, + query_wrapper +) + +logger = logging.getLogger(__name__) + +_instruments = ("claude_agent_sdk >= 0.1.0", ) + +class NetraClaudeAgentSDKInstrumentor(BaseInstrumentor): + def instrumentation_dependencies(self): + return _instruments + + def _instrument(self, **kwargs): + try: + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + except Exception as e: + logger.error(f"Failed to initialize tracer: {e}") + return + self._instrument_query(tracer) + self._instrument_client_query(tracer) + self._instrument_client_response(tracer) + + def _uninstrument(self, **kwargs): + self._uninstrument_query() + self._uninstrument_client_query() + self._uninstrument_client_response() + + def _instrument_query(self, tracer: Tracer): + try: + wrapt.wrap_function_wrapper( + "claude_agent_sdk._internal.client", + "InternalClient.process_query", + query_wrapper(tracer) + ) + except Exception as e: + logger.error(f"Failed to instrument claude-agent-sdk query: {e}") + + def _instrument_client_query(self, tracer: Tracer): + try: + wrapt.wrap_function_wrapper( + "claude_agent_sdk.client", + "ClaudeSDKClient.query", + client_query_wrapper() + ) + except Exception as e: + logger.error(f"Failed to instrument claude-sdk-client query: {e}") + + def _instrument_client_response(self, tracer: Tracer): + try: + wrapt.wrap_function_wrapper( + "claude_agent_sdk.client", + "ClaudeSDKClient.receive_messages", + client_response_wrapper(tracer) + ) + except Exception as e: + logger.error(f"Failed to instrument claude-sdk-client response: {e}") + + def _uninstrument_query(self): + try: + unwrap("claude_agent_sdk._internal.client", "InternalClient.process_query") + except (AttributeError, ModuleNotFoundError): + logger.error(f"Failed to uninstrument claude-agent-sdk query") + + def _uninstrument_client_query(self): + try: + unwrap("claude_agent_sdk.client", "ClaudeSDKClient.query") + except (AttributeError, ModuleNotFoundError): + logger.error(f"Failed to uninstrument claude-sdk-client query") + + def _uninstrument_client_response(self): + try: + unwrap("claude_agent_sdk.client", "ClaudeSDKClient.receive_messages") + except (AttributeError, ModuleNotFoundError): + logger.error(f"Failed to uninstrument claude-sdk-client response") \ No newline at end of file diff --git a/netra/instrumentation/claude_agent_sdk/utils.py b/netra/instrumentation/claude_agent_sdk/utils.py new file mode 100644 index 0000000..2a1bba2 --- /dev/null +++ b/netra/instrumentation/claude_agent_sdk/utils.py @@ -0,0 +1,123 @@ +import json +import logging +from typing import Any, Dict +from opentelemetry.trace import Span +from opentelemetry.semconv_ai import SpanAttributes +from claude_agent_sdk import ( + ClaudeAgentOptions, + UserMessage, + AssistantMessage, + ResultMessage, + TextBlock, + ThinkingBlock, + ToolUseBlock, + ToolResultBlock +) + +from netra import Netra +from netra.session_manager import ConversationType + +logger = logging.getLogger(__name__) + +def set_request_attributes(span: Span, kwargs: Dict[str, Any], prompt_index: int) -> None: + try: + options = kwargs.get("options") + if options and isinstance(options, ClaudeAgentOptions): + if model := options.model: + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model) + except Exception: + logger.error(f"Cannot extract model from request") + + try: + prompt = kwargs.get("prompt", "") + if prompt: + prompt_index = _set_conversation(span, ConversationType.INPUT, "user", prompt, prompt_index) + except Exception: + logger.error(f"Cannot extract prompt from request") + + return prompt_index + +def set_response_message_attributes(span: Span, message: Any, prompt_index: int): + try: + if message.model: + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, message.model) + except Exception as e: + pass + + try: + if isinstance(message, AssistantMessage): + prompt_index = _set_assistant_message_attributes(span, message, prompt_index) + + elif isinstance(message, UserMessage): + prompt_index = _set_user_message_attributes(span, message, prompt_index) + + elif isinstance(message, ResultMessage): + _set_result_message_attributes(span, message) + + except Exception as e: + logger.error(f"Cannot extract data from message", e) + + return prompt_index + +def _set_conversation( + span: Span, + conversationType: ConversationType, + role: str, + content: str, + prompt_index: int = 0 +): + if content and role: + Netra.add_conversation(conversationType, role, content) + span.set_attribute(f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", role) + span.set_attribute(f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", content) + prompt_index += 1 + return prompt_index + +def _set_user_message_attributes(span: Span, message: UserMessage, prompt_index: int): + for block in message.content: + if isinstance(block, ToolResultBlock): + prompt_index = _set_conversation(span, ConversationType.INPUT, "tool_result", block.content, prompt_index) + return prompt_index + +def _set_assistant_message_attributes(span: Span, message: AssistantMessage, prompt_index: int): + role, content = None, None + for block in message.content: + if isinstance(block, TextBlock): + role = "assistant" + content = block.text + elif isinstance(block, ThinkingBlock): + role = "assistant" + content = block.thinking + elif isinstance(block, ToolUseBlock): + role = "tool" + content = f"Tool `{block.name}` invoked using attributes\n{json.dumps(block.input, indent=2)}" + prompt_index = _set_conversation(span, ConversationType.OUTPUT, role, content, prompt_index) + + return prompt_index + +def _set_result_message_attributes(span: Span, message: ResultMessage): + if not message.usage: + return + + usage = message.usage + + if (input_tokens := usage.get("input_tokens")) is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens) + + if (output_tokens := usage.get("output_tokens")) is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, output_tokens) + + if (total_tokens := usage.get("total_tokens")) is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) + + if (cache_creation_input_tokens := usage.get("cache_creation_input_tokens")) is not None: + span.set_attribute( + SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, + cache_creation_input_tokens, + ) + + if (cache_read_input_tokens := usage.get("cache_read_input_tokens")) is not None: + span.set_attribute( + SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, + cache_read_input_tokens, + ) diff --git a/netra/instrumentation/claude_agent_sdk/version.py b/netra/instrumentation/claude_agent_sdk/version.py new file mode 100644 index 0000000..d538f87 --- /dev/null +++ b/netra/instrumentation/claude_agent_sdk/version.py @@ -0,0 +1 @@ +__version__ = "1.0.0" \ No newline at end of file diff --git a/netra/instrumentation/claude_agent_sdk/wrappers.py b/netra/instrumentation/claude_agent_sdk/wrappers.py new file mode 100644 index 0000000..77ae84d --- /dev/null +++ b/netra/instrumentation/claude_agent_sdk/wrappers.py @@ -0,0 +1,107 @@ +import json +import logging +from typing import Any, Callable, Dict, Tuple +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.trace.status import Status, StatusCode + +from netra.instrumentation.claude_agent_sdk.utils import ( + set_request_attributes, + set_response_message_attributes +) + +logger = logging.getLogger(__name__) + +QUERY_SPAN_NAME = "claude-agent-sdk.query" +SDK_CLIENT_SPAN_NAME = "claude-agent-sdk.sdk-client" + +def query_wrapper(tracer: Tracer): + async def wrapper( + wrapped: Callable[..., Any], + instance: Any, + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> Any: + prompt_index = 0 + span = tracer.start_span( + QUERY_SPAN_NAME, + kind=SpanKind.CLIENT + ) + try: + with trace.use_span(span, end_on_exit=False): + prompt_index = set_request_attributes(span, kwargs, prompt_index) + aiterator = aiter(wrapped(*args, **kwargs)) + + while True: + with trace.use_span(span, end_on_exit=False): + try: + message = await anext(aiterator) + except StopAsyncIteration: + break + + prompt_index = set_response_message_attributes(span, message, prompt_index) + yield message + + except GeneratorExit: + raise + except Exception as e: + logger.error("netra.instrumentation.claude-agent-sdk: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + finally: + span.end() + + return wrapper + +def client_query_wrapper(): + async def wrapper( + wrapped: Callable[..., Any], + instance: Any, + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> Any: + # Add prompt to the client object to map to the corresponding response + # in recieve function + prompt = args[0] if len(args) > 0 else kwargs.get("prompt") + instance._prompt_data = { "prompt": prompt } + return await wrapped(*args, **kwargs) + return wrapper + + +def client_response_wrapper(tracer: Tracer): + async def wrapper( + wrapped: Callable[..., Any], + instance: Any, + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> Any: + span = tracer.start_span( + SDK_CLIENT_SPAN_NAME, + kind=SpanKind.CLIENT + ) + prompt_index = 0 + try: + with trace.use_span(span, end_on_exit=False): + if hasattr(instance, "_prompt_data"): + prompt_index = set_request_attributes(span, instance._prompt_data, prompt_index) + + with trace.use_span(span, end_on_exit=False): + aiterator = aiter(wrapped(*args, **kwargs)) + + while True: + with trace.use_span(span, end_on_exit=False): + try: + message = await anext(aiterator) + prompt_index = set_response_message_attributes(span, message, prompt_index) + except StopAsyncIteration: + break + yield message + + except Exception as e: + logger.error("netra.instrumentation.claude-agent-sdk: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + finally: + span.end() + + return wrapper \ No newline at end of file diff --git a/netra/instrumentation/instruments.py b/netra/instrumentation/instruments.py index 1db7ea6..9a694f7 100644 --- a/netra/instrumentation/instruments.py +++ b/netra/instrumentation/instruments.py @@ -71,6 +71,7 @@ class CustomInstruments(Enum): DEEPGRAM = "deepgram" CARTESIA = "cartesia" ELEVENLABS = "elevenlabs" + CLAUDE_AGENT_SDK = "claude_agent_sdk" class NetraInstruments(Enum):