Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions netra/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ def init_instrumentations(
if CustomInstruments.ELEVENLABS in netra_custom_instruments:
init_elevenlabs_instrumentation()

# Initialize claude_agent_sdk instrumentation.
if CustomInstruments.CLAUDE_AGENT_SDK in netra_custom_instruments:
init_claude_agent_sdk_instrumentation()


def init_groq_instrumentation() -> bool:
"""Initialize Groq instrumentation."""
Expand Down Expand Up @@ -1343,3 +1347,23 @@ def init_elevenlabs_instrumentation() -> bool:
logging.error(f"Error initializing Elevenlabs instrumentor: {e}")
Telemetry().log_exception(e)
return False


def init_claude_agent_sdk_instrumentation() -> bool:
"""Initialize Claude Agent SDK instrumentation.

Returns:
bool: True if initialization was successful, False otherwise.
"""
try:
if is_package_installed("claude-agent-sdk"):
from netra.instrumentation.claude_agent_sdk import NetraClaudeAgentSDKInstrumentor

instrumentor = NetraClaudeAgentSDKInstrumentor()
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument()
return True
except Exception as e:
logging.error(f"Error initializing Claude Agent SDK instrumentor: {e}")
Telemetry().log_exception(e)
return False
84 changes: 84 additions & 0 deletions netra/instrumentation/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import wrapt
import logging
from opentelemetry.trace import Tracer, get_tracer
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor

from netra.instrumentation.claude_agent_sdk.version import __version__
from netra.instrumentation.claude_agent_sdk.wrappers import (
client_query_wrapper,
client_response_wrapper,
query_wrapper
)

logger = logging.getLogger(__name__)

_instruments = ("claude_agent_sdk >= 0.1.0", )

class NetraClaudeAgentSDKInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self):
return _instruments

def _instrument(self, **kwargs):
try:
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
except Exception as e:
logger.error(f"Failed to initialize tracer: {e}")
return
self._instrument_query(tracer)
self._instrument_client_query(tracer)
self._instrument_client_response(tracer)

def _uninstrument(self, **kwargs):
self._uninstrument_query()
self._uninstrument_client_query()
self._uninstrument_client_response()

def _instrument_query(self, tracer: Tracer):
try:
wrapt.wrap_function_wrapper(
"claude_agent_sdk._internal.client",
"InternalClient.process_query",
query_wrapper(tracer)
)
except Exception as e:
logger.error(f"Failed to instrument claude-agent-sdk query: {e}")

def _instrument_client_query(self, tracer: Tracer):
try:
wrapt.wrap_function_wrapper(
"claude_agent_sdk.client",
"ClaudeSDKClient.query",
client_query_wrapper()
)
except Exception as e:
logger.error(f"Failed to instrument claude-sdk-client query: {e}")

def _instrument_client_response(self, tracer: Tracer):
try:
wrapt.wrap_function_wrapper(
"claude_agent_sdk.client",
"ClaudeSDKClient.receive_messages",
client_response_wrapper(tracer)
)
except Exception as e:
logger.error(f"Failed to instrument claude-sdk-client response: {e}")

def _uninstrument_query(self):
try:
unwrap("claude_agent_sdk._internal.client", "InternalClient.process_query")
except (AttributeError, ModuleNotFoundError):
logger.error(f"Failed to uninstrument claude-agent-sdk query")

def _uninstrument_client_query(self):
try:
unwrap("claude_agent_sdk.client", "ClaudeSDKClient.query")
except (AttributeError, ModuleNotFoundError):
logger.error(f"Failed to uninstrument claude-sdk-client query")

def _uninstrument_client_response(self):
try:
unwrap("claude_agent_sdk.client", "ClaudeSDKClient.receive_messages")
except (AttributeError, ModuleNotFoundError):
logger.error(f"Failed to uninstrument claude-sdk-client response")
123 changes: 123 additions & 0 deletions netra/instrumentation/claude_agent_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json
import logging
from typing import Any, Dict
from opentelemetry.trace import Span
from opentelemetry.semconv_ai import SpanAttributes
from claude_agent_sdk import (
ClaudeAgentOptions,
UserMessage,
AssistantMessage,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolUseBlock,
ToolResultBlock
)

from netra import Netra
from netra.session_manager import ConversationType

logger = logging.getLogger(__name__)

def set_request_attributes(span: Span, kwargs: Dict[str, Any], prompt_index: int) -> None:
try:
options = kwargs.get("options")
if options and isinstance(options, ClaudeAgentOptions):
if model := options.model:
span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model)
except Exception:
logger.error(f"Cannot extract model from request")

try:
prompt = kwargs.get("prompt", "")
if prompt:
prompt_index = _set_conversation(span, ConversationType.INPUT, "user", prompt, prompt_index)
except Exception:
logger.error(f"Cannot extract prompt from request")

return prompt_index

def set_response_message_attributes(span: Span, message: Any, prompt_index: int):
try:
if message.model:
span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, message.model)
except Exception as e:
pass

try:
if isinstance(message, AssistantMessage):
prompt_index = _set_assistant_message_attributes(span, message, prompt_index)

elif isinstance(message, UserMessage):
prompt_index = _set_user_message_attributes(span, message, prompt_index)

elif isinstance(message, ResultMessage):
_set_result_message_attributes(span, message)

except Exception as e:
logger.error(f"Cannot extract data from message", e)

return prompt_index

def _set_conversation(
span: Span,
conversationType: ConversationType,
role: str,
content: str,
prompt_index: int = 0
):
if content and role:
Netra.add_conversation(conversationType, role, content)
span.set_attribute(f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", role)
span.set_attribute(f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", content)
prompt_index += 1
return prompt_index

def _set_user_message_attributes(span: Span, message: UserMessage, prompt_index: int):
for block in message.content:
if isinstance(block, ToolResultBlock):
prompt_index = _set_conversation(span, ConversationType.INPUT, "tool_result", block.content, prompt_index)
return prompt_index

def _set_assistant_message_attributes(span: Span, message: AssistantMessage, prompt_index: int):
role, content = None, None
for block in message.content:
if isinstance(block, TextBlock):
role = "assistant"
content = block.text
elif isinstance(block, ThinkingBlock):
role = "assistant"
content = block.thinking
elif isinstance(block, ToolUseBlock):
role = "tool"
content = f"Tool `{block.name}` invoked using attributes\n{json.dumps(block.input, indent=2)}"
prompt_index = _set_conversation(span, ConversationType.OUTPUT, role, content, prompt_index)

return prompt_index

def _set_result_message_attributes(span: Span, message: ResultMessage):
if not message.usage:
return

usage = message.usage

if (input_tokens := usage.get("input_tokens")) is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens)

if (output_tokens := usage.get("output_tokens")) is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, output_tokens)

if (total_tokens := usage.get("total_tokens")) is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens)

if (cache_creation_input_tokens := usage.get("cache_creation_input_tokens")) is not None:
span.set_attribute(
SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS,
cache_creation_input_tokens,
)

if (cache_read_input_tokens := usage.get("cache_read_input_tokens")) is not None:
span.set_attribute(
SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS,
cache_read_input_tokens,
)
1 change: 1 addition & 0 deletions netra/instrumentation/claude_agent_sdk/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "1.0.0"
107 changes: 107 additions & 0 deletions netra/instrumentation/claude_agent_sdk/wrappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import json
import logging
from typing import Any, Callable, Dict, Tuple
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.status import Status, StatusCode

from netra.instrumentation.claude_agent_sdk.utils import (
set_request_attributes,
set_response_message_attributes
)

logger = logging.getLogger(__name__)

QUERY_SPAN_NAME = "claude-agent-sdk.query"
SDK_CLIENT_SPAN_NAME = "claude-agent-sdk.sdk-client"

def query_wrapper(tracer: Tracer):
async def wrapper(
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> Any:
prompt_index = 0
span = tracer.start_span(
QUERY_SPAN_NAME,
kind=SpanKind.CLIENT
)
try:
with trace.use_span(span, end_on_exit=False):
prompt_index = set_request_attributes(span, kwargs, prompt_index)
aiterator = aiter(wrapped(*args, **kwargs))

while True:
with trace.use_span(span, end_on_exit=False):
try:
message = await anext(aiterator)
except StopAsyncIteration:
break

prompt_index = set_response_message_attributes(span, message, prompt_index)
yield message

except GeneratorExit:
raise
except Exception as e:
logger.error("netra.instrumentation.claude-agent-sdk: %s", e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
finally:
span.end()

return wrapper

def client_query_wrapper():
async def wrapper(
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> Any:
# Add prompt to the client object to map to the corresponding response
# in recieve function
prompt = args[0] if len(args) > 0 else kwargs.get("prompt")
instance._prompt_data = { "prompt": prompt }
return await wrapped(*args, **kwargs)
return wrapper


def client_response_wrapper(tracer: Tracer):
async def wrapper(
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> Any:
span = tracer.start_span(
SDK_CLIENT_SPAN_NAME,
kind=SpanKind.CLIENT
)
prompt_index = 0
try:
with trace.use_span(span, end_on_exit=False):
if hasattr(instance, "_prompt_data"):
prompt_index = set_request_attributes(span, instance._prompt_data, prompt_index)

with trace.use_span(span, end_on_exit=False):
aiterator = aiter(wrapped(*args, **kwargs))

while True:
with trace.use_span(span, end_on_exit=False):
try:
message = await anext(aiterator)
prompt_index = set_response_message_attributes(span, message, prompt_index)
except StopAsyncIteration:
break
yield message

except Exception as e:
logger.error("netra.instrumentation.claude-agent-sdk: %s", e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
finally:
span.end()

return wrapper
1 change: 1 addition & 0 deletions netra/instrumentation/instruments.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class CustomInstruments(Enum):
DEEPGRAM = "deepgram"
CARTESIA = "cartesia"
ELEVENLABS = "elevenlabs"
CLAUDE_AGENT_SDK = "claude_agent_sdk"


class NetraInstruments(Enum):
Expand Down