From c2c16d2abbef88c4addc608044018d22ca01a4d7 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 12:55:31 -0300 Subject: [PATCH 01/20] test(claude-agent-sdk): add test scaffold and mock helpers --- tests/integrations/__init__.py | 0 tests/integrations/claude_agent_sdk_mocks.py | 139 +++++++++++++++++++ tests/integrations/test_claude_agent_sdk.py | 6 + 3 files changed, 145 insertions(+) create mode 100644 tests/integrations/__init__.py create mode 100644 tests/integrations/claude_agent_sdk_mocks.py create mode 100644 tests/integrations/test_claude_agent_sdk.py diff --git a/tests/integrations/__init__.py b/tests/integrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integrations/claude_agent_sdk_mocks.py b/tests/integrations/claude_agent_sdk_mocks.py new file mode 100644 index 00000000..a5a7726c --- /dev/null +++ b/tests/integrations/claude_agent_sdk_mocks.py @@ -0,0 +1,139 @@ +"""Test helpers for claude-agent-sdk mock streams. + +These dataclasses duck-type the real ``claude_agent_sdk`` message types just +enough that our integration's observer (which dispatches by ``type(msg).__name__``) +can be exercised without instantiating the real SDK objects. The ``Fake`` prefix +is matched explicitly in the dispatcher so tests stay self-contained. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, AsyncIterator, List, Optional + + +@dataclass +class FakeTextBlock: + """Mirrors ``claude_agent_sdk.TextBlock``.""" + + text: str + + +@dataclass +class FakeThinkingBlock: + """Mirrors ``claude_agent_sdk.ThinkingBlock``.""" + + thinking: str + signature: str = "sig" + + +@dataclass +class FakeToolUseBlock: + """Mirrors ``claude_agent_sdk.ToolUseBlock``.""" + + id: str + name: str + input: dict + + +@dataclass +class FakeToolResultBlock: + """Mirrors ``claude_agent_sdk.ToolResultBlock``.""" + + tool_use_id: str + content: Any + is_error: Optional[bool] = None + + +@dataclass +class FakeSystemMessage: + """Mirrors ``claude_agent_sdk.SystemMessage``.""" + + subtype: str + data: dict + + +@dataclass +class FakeAssistantMessage: + """Mirrors ``claude_agent_sdk.AssistantMessage``.""" + + content: List[Any] + model: str = "claude-opus-4-7" + parent_tool_use_id: Optional[str] = None + usage: Optional[dict] = None + message_id: Optional[str] = None + stop_reason: Optional[str] = None + session_id: Optional[str] = None + uuid: Optional[str] = None + + +@dataclass +class FakeUserMessage: + """Mirrors ``claude_agent_sdk.UserMessage``.""" + + content: Any + parent_tool_use_id: Optional[str] = None + uuid: Optional[str] = None + tool_use_result: Optional[dict] = None + + +@dataclass +class FakeResultMessage: + """Mirrors ``claude_agent_sdk.ResultMessage``.""" + + subtype: str + duration_ms: int = 1000 + duration_api_ms: int = 800 + is_error: bool = False + num_turns: int = 1 + session_id: str = "sess_test" + total_cost_usd: float = 0.001 + usage: dict = field( + default_factory=lambda: { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + } + ) + result: Optional[str] = "Done" + model_usage: Optional[dict] = None + stop_reason: Optional[str] = "end_turn" + permission_denials: Optional[list] = None + uuid: Optional[str] = None + + +async def make_stream(messages: list) -> AsyncIterator: + """Yield a sequence of messages as an async iterator. + + This matches the shape ``claude_agent_sdk.query()`` returns. + """ + for msg in messages: + yield msg + + +def init_system_message( + session_id: str = "sess_test", + model: str = "claude-opus-4-7", + tools: Optional[list] = None, + mcp_servers: Optional[list] = None, + skills: Optional[list] = None, +) -> FakeSystemMessage: + """Build a ``SystemMessage(subtype='init')`` matching the SDK's init shape.""" + return FakeSystemMessage( + subtype="init", + data={ + "session_id": session_id, + "model": model, + "tools": tools or ["Read", "Bash"], + "mcp_servers": mcp_servers or [], + "skills": skills or [], + "slash_commands": [], + "plugins": [], + "permissionMode": "default", + "cwd": "/tmp", + "claude_code_version": "test-0.1.81", + "apiKeySource": "ANTHROPIC_API_KEY", + "output_style": "default", + }, + ) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py new file mode 100644 index 00000000..91539299 --- /dev/null +++ b/tests/integrations/test_claude_agent_sdk.py @@ -0,0 +1,6 @@ +"""Tests for the Claude Agent SDK Openlayer integration.""" + + +def test_imports_work(): + """Module imports cleanly even if claude_agent_sdk is not installed.""" + from openlayer.lib.integrations import claude_agent_sdk # noqa: F401 From 5ca25315c24287f6ac0a2a7f8be8da3a51b3b6a0 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 12:58:14 -0300 Subject: [PATCH 02/20] feat(claude-agent-sdk): traced_query emits root AGENT step with cost/tokens Plan deviation: the plan referenced patching '_publish_trace_async', but the actual tracer function is '_upload_and_publish_trace' (see src/openlayer/lib/tracing/tracer.py:1745). Tests patch that one. Also: AgentStep does not carry cost/tokens fields directly, so we log them into root_step.metadata. post_process_trace spreads root metadata into the trace_data payload, which surfaces them at the trace level for ingestion. --- .../lib/integrations/claude_agent_sdk.py | 244 ++++++++++++++++++ tests/integrations/test_claude_agent_sdk.py | 107 ++++++++ 2 files changed, 351 insertions(+) create mode 100644 src/openlayer/lib/integrations/claude_agent_sdk.py diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py new file mode 100644 index 00000000..eded17b4 --- /dev/null +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -0,0 +1,244 @@ +"""Openlayer tracing integration for the Claude Agent SDK. + +Wraps ``claude_agent_sdk.query`` (and ``ClaudeSDKClient``) so each call becomes +an Openlayer trace with nested steps for assistant turns, tool calls, and +subagents. + +The wrapper is a pure *observer* of the stream: every message yielded by the +underlying ``query()`` is forwarded to the caller unchanged and in order. We +emit Openlayer steps as a side effect of observation. + +See ``docs/superpowers/specs/2026-05-12-claude-agent-sdk-integration-design.md`` +for the design rationale. +""" + +from __future__ import annotations + +import contextvars +import logging +from dataclasses import dataclass, field +from typing import Any, AsyncIterator, Dict, List, Optional + +from ..tracing import tracer as _tracer +from ..tracing.enums import StepType + +logger = logging.getLogger(__name__) + +# We do NOT import ``claude_agent_sdk`` at module load time so this file is +# importable even when the optional dependency is absent — matching the +# pattern of every other integration in this directory. +try: + import claude_agent_sdk as _cas # type: ignore[import-not-found] + + HAVE_CLAUDE_AGENT_SDK = True +except ImportError: # pragma: no cover - exercised by the optional-dep tests + _cas = None # type: ignore[assignment] + HAVE_CLAUDE_AGENT_SDK = False + + +# ---------------------------- Configuration ---------------------------- # + + +@dataclass +class _Config: + """Module-level configuration set by ``trace_claude_agent_sdk``.""" + + inference_pipeline_id: Optional[str] = None + truncate_tool_output_chars: int = 8192 + capture_thinking: bool = True + redact_mcp_env: bool = True + + +_config = _Config() + + +# ----------------------------- Trace state ----------------------------- # + + +@dataclass +class _TraceState: + """Per-query trace state. + + Holds the root step and per-tool-use bookkeeping used to nest subagent + messages and to bracket tool calls across PreToolUse / PostToolUse hooks. + """ + + root_step: Any + pending_tools: Dict[str, Any] = field(default_factory=dict) + tool_to_parent_step: Dict[str, Any] = field(default_factory=dict) + turn_counter: int = 0 + session_id: Optional[str] = None + + +_current_state: contextvars.ContextVar[Optional[_TraceState]] = contextvars.ContextVar( + "openlayer_claude_agent_sdk_state", default=None +) + + +def _require_cas() -> None: + """Raise ``ImportError`` if the optional SDK is not installed.""" + if not HAVE_CLAUDE_AGENT_SDK: + raise ImportError( + "claude-agent-sdk is not installed. " + "Install with: pip install 'claude-agent-sdk>=0.1.81'" + ) + + +# ------------------------------ Helpers ------------------------------ # + + +def _redact_mcp_servers(mcp_servers: Any) -> Any: + """Strip ``env`` and ``headers`` from MCP server config dicts. + + These typically contain credentials and must not be persisted. + """ + if not mcp_servers: + return mcp_servers + if isinstance(mcp_servers, list): + return [ + {k: v for k, v in s.items() if k not in {"env", "headers"}} + if isinstance(s, dict) + else s + for s in mcp_servers + ] + return mcp_servers + + +def _summarize_prompt(prompt: Any) -> str: + """Build a short, human-readable trace name from the prompt.""" + if isinstance(prompt, str): + s = prompt.strip().replace("\n", " ") + return s[:80] + ("..." if len(s) > 80 else "") + return "claude-agent-sdk query" + + +# --------------------------- Observers --------------------------- # + + +def _observe_system_init(msg: Any, state: _TraceState) -> None: + """Capture ``SystemMessage(subtype='init')`` into the root step metadata.""" + if getattr(msg, "subtype", None) != "init": + return + data = getattr(msg, "data", {}) or {} + state.session_id = data.get("session_id") + mcp_servers = data.get("mcp_servers") + if _config.redact_mcp_env: + mcp_servers = _redact_mcp_servers(mcp_servers) + agent_config = { + "model": data.get("model"), + "tools": data.get("tools"), + "mcp_servers": mcp_servers, + "skills": data.get("skills"), + "slash_commands": data.get("slash_commands"), + "plugins": data.get("plugins"), + "permission_mode": data.get("permissionMode"), + "cwd": data.get("cwd"), + "claude_code_version": data.get("claude_code_version"), + "api_key_source": data.get("apiKeySource"), + "output_style": data.get("output_style"), + } + state.root_step.log( + metadata={ + "session_id": state.session_id, + "agent_config": agent_config, + } + ) + + +def _observe_result(msg: Any, state: _TraceState) -> None: + """Capture ``ResultMessage`` — finalizes root cost / tokens / latency.""" + usage = getattr(msg, "usage", None) or {} + input_tokens = usage.get("input_tokens") or 0 + output_tokens = usage.get("output_tokens") or 0 + total_tokens = input_tokens + output_tokens + duration_ms = getattr(msg, "duration_ms", None) + + # ``AgentStep`` doesn't define cost / tokens / prompt_tokens attributes, so + # the recommended Step.log() route is metadata. We stash the same values + # there. ``post_process_trace`` spreads root metadata into the trace_data + # payload, so these show up at trace-level as well. + state.root_step.output = getattr(msg, "result", None) or "" + state.root_step.latency = duration_ms + state.root_step.log( + metadata={ + "cost": getattr(msg, "total_cost_usd", None), + "tokens": total_tokens, + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "session_id": getattr(msg, "session_id", None) or state.session_id, + "num_turns": getattr(msg, "num_turns", None), + "stop_reason": getattr(msg, "stop_reason", None), + "subtype": getattr(msg, "subtype", None), + "is_error": getattr(msg, "is_error", False), + "duration_api_ms": getattr(msg, "duration_api_ms", None), + "model_usage": getattr(msg, "model_usage", None), + "permission_denials": getattr(msg, "permission_denials", None), + "cache_read_input_tokens": usage.get("cache_read_input_tokens"), + "cache_creation_input_tokens": usage.get("cache_creation_input_tokens"), + } + ) + + +def _observe(msg: Any, state: _TraceState) -> None: + """Dispatch a message to the appropriate observer. + + Matches by class name so the ``Fake*`` mocks in the test suite are handled + alongside the real ``claude_agent_sdk`` classes. + """ + type_name = type(msg).__name__ + if type_name in ("SystemMessage", "FakeSystemMessage"): + _observe_system_init(msg, state) + elif type_name in ("ResultMessage", "FakeResultMessage"): + _observe_result(msg, state) + # AssistantMessage / UserMessage observers added in subsequent tasks. + + +# ----------------------------- Public API ----------------------------- # + + +async def traced_query( + *, + prompt: Any, + options: Any = None, + inference_pipeline_id: Optional[str] = None, + **kwargs: Any, +) -> AsyncIterator[Any]: + """Wrap ``claude_agent_sdk.query()`` and emit an Openlayer trace. + + Every message from the underlying ``query()`` stream is yielded to the + caller unchanged. Openlayer steps are emitted as a side effect. + + Args: + prompt: The prompt argument forwarded to ``claude_agent_sdk.query``. + options: Optional ``ClaudeAgentOptions`` forwarded to ``query``. + inference_pipeline_id: Optional pipeline override; falls back to + ``OPENLAYER_INFERENCE_PIPELINE_ID``. + **kwargs: Any additional kwargs (e.g. ``transport``) are passed through. + + Yields: + The same messages the underlying ``query()`` would have yielded, in the + same order. + """ + _require_cas() + name = "claude-agent-sdk: " + _summarize_prompt(prompt) + resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id + with _tracer.create_step( + name=name, + step_type=StepType.AGENT, + inputs={"prompt": prompt}, + inference_pipeline_id=resolved_pipeline, + ) as root_step: + state = _TraceState(root_step=root_step) + token = _current_state.set(state) + try: + async for msg in _cas.query(prompt=prompt, options=options, **kwargs): + try: + _observe(msg, state) + except Exception: # never break the user's stream + logger.exception( + "Openlayer observation failed for message %r", + type(msg).__name__, + ) + yield msg + finally: + _current_state.reset(token) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 91539299..829a7072 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -1,6 +1,113 @@ """Tests for the Claude Agent SDK Openlayer integration.""" +import asyncio +from typing import Any, List +from unittest.mock import patch + +import pytest + +from openlayer.lib.tracing import tracer as ol_tracer + +from .claude_agent_sdk_mocks import ( + FakeAssistantMessage, + FakeResultMessage, + FakeTextBlock, + FakeThinkingBlock, + init_system_message, + make_stream, +) + + +# ---------- shared infra ---------- + +@pytest.fixture(autouse=True) +def _disable_publish(monkeypatch: pytest.MonkeyPatch) -> None: + """Disable real network publishing for every test in this module.""" + monkeypatch.setenv("OPENLAYER_DISABLE_PUBLISH", "true") + monkeypatch.setenv("OPENLAYER_API_KEY", "fake") + monkeypatch.setattr(ol_tracer, "_publish", False, raising=False) + + +def _capture_trace_publish(): + """Return ``(captured, capture_fn)`` pair for patching the publish path. + + NOTE on plan deviation: the plan references a function named + ``_publish_trace_async`` on the tracer, but the actual public-private + function is ``_upload_and_publish_trace`` (see + ``src/openlayer/lib/tracing/tracer.py``). We patch that one and capture + the in-memory ``Trace`` object passed to it. + """ + captured: List[Any] = [] + + def capture(trace, *args, **kwargs): + captured.append(trace) + + return captured, capture + + +# ---------- tests ---------- def test_imports_work(): """Module imports cleanly even if claude_agent_sdk is not installed.""" from openlayer.lib.integrations import claude_agent_sdk # noqa: F401 + + +def test_traced_query_emits_root_agent_step_with_cost_and_tokens(): + """Basic flow: init -> assistant text -> result. + + The root AGENT step gets cost/tokens/session_id populated from the final + ``ResultMessage``. + """ + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(session_id="s1"), + FakeAssistantMessage(content=[FakeTextBlock("Hello back")]), + FakeResultMessage( + subtype="success", + duration_ms=1500, + num_turns=1, + session_id="s1", + total_cost_usd=0.0042, + result="Hello back", + usage={ + "input_tokens": 10, + "output_tokens": 5, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + }, + ), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + assert len(captured) == 1, "expected exactly one trace to be published" + trace_obj = captured[0] + root_step = trace_obj.steps[0] + assert root_step.step_type.value == "agent" + assert root_step.output == "Hello back" + # Cost / tokens / latency are recorded on the step's metadata + # (and also surface at trace-level via `post_process_trace`). + assert root_step.metadata["cost"] == 0.0042 + assert root_step.metadata["tokens"] == 15 + assert root_step.metadata["prompt_tokens"] == 10 + assert root_step.metadata["completion_tokens"] == 5 + assert root_step.metadata["session_id"] == "s1" + assert root_step.metadata["num_turns"] == 1 + assert root_step.metadata["stop_reason"] == "end_turn" + assert root_step.latency == 1500 From 6ed5e0ead28255f880df074b59bbafcd40e87184 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 12:59:22 -0300 Subject: [PATCH 03/20] feat(claude-agent-sdk): capture assistant turns as CHAT_COMPLETION steps Each AssistantMessage in the stream produces a CHAT_COMPLETION child of the root AGENT step, with text content as output, ThinkingBlock content in metadata.thinking, and ToolUseBlock IDs in metadata.tool_calls. Subagent assistant messages (parent_tool_use_id set) push the corresponding Agent ToolStep onto the contextvar stack so they nest beneath it. --- .../lib/integrations/claude_agent_sdk.py | 85 ++++++++++++++++++- tests/integrations/test_claude_agent_sdk.py | 54 ++++++++++++ 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index eded17b4..eb475dc2 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -179,6 +179,88 @@ def _observe_result(msg: Any, state: _TraceState) -> None: ) +def _resolve_subagent_parent(msg: Any, state: _TraceState) -> Any: + """Return the spawning Agent ToolStep for a subagent message, or ``None``. + + When the SDK delegates work to a subagent via the ``Agent`` tool, the + subsequent messages carry ``parent_tool_use_id`` pointing at the tool-use + block. We open the tool step in ``_pre_tool_use_hook`` and keep it open + while subagent messages stream; this helper finds the right step so we can + push it onto the contextvar stack before opening child steps. + """ + ptid = getattr(msg, "parent_tool_use_id", None) + if ptid and ptid in state.pending_tools: + return state.pending_tools[ptid].step + return None + + +def _observe_assistant(msg: Any, state: _TraceState) -> None: + """Emit a CHAT_COMPLETION step for each ``AssistantMessage``. + + Concatenates ``TextBlock`` content into ``output`` and ``ThinkingBlock`` + content into ``metadata.thinking`` (when ``capture_thinking`` is enabled). + Tracks the IDs of any ``ToolUseBlock``s for cross-reference with later + tool steps. + """ + state.turn_counter += 1 + content = list(getattr(msg, "content", None) or []) + text_parts: List[str] = [] + thinking_parts: List[str] = [] + tool_use_ids: List[str] = [] + for block in content: + bname = type(block).__name__ + if bname in ("TextBlock", "FakeTextBlock"): + text_parts.append(getattr(block, "text", "") or "") + elif bname in ("ThinkingBlock", "FakeThinkingBlock"): + thinking_parts.append(getattr(block, "thinking", "") or "") + elif bname in ("ToolUseBlock", "FakeToolUseBlock"): + tool_use_ids.append(getattr(block, "id", "")) + + usage = getattr(msg, "usage", None) or {} + input_tokens = usage.get("input_tokens") + output_tokens = usage.get("output_tokens") + total_tokens = (input_tokens or 0) + (output_tokens or 0) + + # Subagent nesting: if this assistant message belongs to a subagent run, + # temporarily push the spawning tool's step onto the contextvar stack so + # the new chat-completion step is created beneath it. + subagent_parent = _resolve_subagent_parent(msg, state) + pushed_token = None + if subagent_parent is not None: + pushed_token = _tracer._current_step.set(subagent_parent) + try: + with _tracer.create_step( + name=f"assistant turn {state.turn_counter}", + step_type=StepType.CHAT_COMPLETION, + ) as chat_step: + chat_step.log( + output="\n".join(text_parts), + model=getattr(msg, "model", None), + provider="anthropic", + prompt_tokens=input_tokens, + completion_tokens=output_tokens, + tokens=total_tokens, + metadata={ + "thinking": ( + "\n".join(thinking_parts) + if (thinking_parts and _config.capture_thinking) + else None + ), + "tool_calls": tool_use_ids or None, + "stop_reason": getattr(msg, "stop_reason", None), + "parent_tool_use_id": getattr(msg, "parent_tool_use_id", None), + "message_id": getattr(msg, "message_id", None), + "cache_read_input_tokens": usage.get("cache_read_input_tokens"), + "cache_creation_input_tokens": usage.get( + "cache_creation_input_tokens" + ), + }, + ) + finally: + if pushed_token is not None: + _tracer._safe_reset_contextvar(_tracer._current_step, pushed_token) + + def _observe(msg: Any, state: _TraceState) -> None: """Dispatch a message to the appropriate observer. @@ -188,9 +270,10 @@ def _observe(msg: Any, state: _TraceState) -> None: type_name = type(msg).__name__ if type_name in ("SystemMessage", "FakeSystemMessage"): _observe_system_init(msg, state) + elif type_name in ("AssistantMessage", "FakeAssistantMessage"): + _observe_assistant(msg, state) elif type_name in ("ResultMessage", "FakeResultMessage"): _observe_result(msg, state) - # AssistantMessage / UserMessage observers added in subsequent tasks. # ----------------------------- Public API ----------------------------- # diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 829a7072..1fb71ae8 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -111,3 +111,57 @@ async def run(): assert root_step.metadata["num_turns"] == 1 assert root_step.metadata["stop_reason"] == "end_turn" assert root_step.latency == 1500 + + +def test_assistant_message_emits_chat_completion_step(): + """Each AssistantMessage becomes a CHAT_COMPLETION child of the root step.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(), + FakeAssistantMessage( + content=[ + FakeThinkingBlock("planning..."), + FakeTextBlock("answer turn 1"), + ], + model="claude-opus-4-7", + usage={ + "input_tokens": 12, + "output_tokens": 4, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + }, + stop_reason="end_turn", + ), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + nested = root_step.steps + assert len(nested) == 1 + chat = nested[0] + assert chat.step_type.value == "chat_completion" + assert chat.model == "claude-opus-4-7" + assert chat.provider == "anthropic" + assert "answer turn 1" in chat.output + assert chat.metadata["thinking"] == "planning..." + assert chat.prompt_tokens == 12 + assert chat.completion_tokens == 4 + assert chat.tokens == 16 + assert chat.metadata["stop_reason"] == "end_turn" From decc9bf7326922cdcd6b689650f91d279a12703c Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:01:06 -0300 Subject: [PATCH 04/20] feat(claude-agent-sdk): capture tool calls via composed Pre/PostToolUse hooks Adds composed PreToolUse / PostToolUse / PostToolUseFailure hooks. They open a TOOL step on PreToolUse and finalize it on Post-(success|failure). The hooks are appended to user-provided hooks (never replace them) so user hook decisions like permissionDecision still take effect. Uses a _ToolStepHandle helper that owns the create_step context manager and calls __enter__/__exit__ manually so the step can span the two hook callbacks. Plan deviation: tracer.py exposes no _start_step_manually / _end_step_manually primitives; this CM-handle pattern is the workable alternative the plan flagged in its risk callouts. Also parses mcp____ tool names into metadata.mcp_server and metadata.mcp_tool_name in the same pass. --- .../lib/integrations/claude_agent_sdk.py | 211 +++++++++++++++++- tests/integrations/test_claude_agent_sdk.py | 82 +++++++ 2 files changed, 292 insertions(+), 1 deletion(-) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index eb475dc2..5fdfdd71 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -15,8 +15,10 @@ from __future__ import annotations import contextvars +import json import logging -from dataclasses import dataclass, field +import time +from dataclasses import dataclass, field, replace as _dataclass_replace from typing import Any, AsyncIterator, Dict, List, Optional from ..tracing import tracer as _tracer @@ -112,6 +114,212 @@ def _summarize_prompt(prompt: Any) -> str: return "claude-agent-sdk query" +def _truncate(value: Any, max_chars: int) -> Any: + """Coerce ``value`` to a string and truncate to ``max_chars``. + + Tool outputs can be arbitrary objects; we serialize via ``json.dumps`` when + possible, falling back to ``str()``. The original length is preserved in + the truncation marker so downstream UI can hint at omission. + """ + if value is None: + return None + if not isinstance(value, str): + try: + value = json.dumps(value, default=str) + except Exception: + value = str(value) + if len(value) > max_chars: + return value[:max_chars] + f"... [truncated, full length {len(value)}]" + return value + + +def _parse_mcp_metadata(tool_name: str) -> Dict[str, Any]: + """Parse ``mcp____`` tool names into server/tool metadata.""" + if not tool_name or not tool_name.startswith("mcp__"): + return {} + parts = tool_name.split("__", 2) + if len(parts) == 3: + return {"mcp_server": parts[1], "mcp_tool_name": parts[2]} + return {} + + +# --------------------------- Tool-step lifecycle --------------------------- # + + +class _ToolStepHandle: + """Holds the live ``create_step`` context manager for an in-flight tool call. + + The Openlayer tracer exposes step lifecycle as a context manager + (``tracer.create_step``). For tools we need to open the step in + ``PreToolUse`` and close it from ``PostToolUse`` (which may execute on a + different stack frame within the same coroutine). We bypass ``with`` by + invoking ``__enter__`` / ``__exit__`` manually. + + Caveat: between ``__enter__`` and ``__exit__`` the step is "current" on the + contextvar stack. Sequential pre/post pairs per ``tool_use_id`` are well- + behaved, but truly concurrent tool calls could interleave and produce + nested-looking steps. The SDK fires pre/post serially per tool, so this is + acceptable for the v1 integration. + """ + + def __init__(self, step_cm): + self._cm = step_cm + self.step = step_cm.__enter__() + self.start_time = time.time() + self._closed = False + + def log(self, **kwargs: Any) -> None: + self.step.log(**kwargs) + + def end(self) -> None: + if self._closed: + return + self._closed = True + # Ensure latency is recorded even if PostToolUse fires before any + # nested children would set it via the tracer's natural path. + if self.step.latency is None: + self.step.latency = (time.time() - self.start_time) * 1000.0 + self._cm.__exit__(None, None, None) + + +# ------------------------------- Hooks ------------------------------- # + + +async def _pre_tool_use_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PreToolUse hook — opens a TOOL step for this tool call.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + tool_name = input_data.get("tool_name", "unknown") + tool_input = input_data.get("tool_input") + metadata: Dict[str, Any] = {"tool_use_id": tool_use_id} + metadata.update(_parse_mcp_metadata(tool_name)) + + # If this tool was spawned by a parent Agent tool (subagent case), + # nest beneath the parent's still-open ToolStep. + parent_handle = state.pending_tools.get(tool_use_id) + if parent_handle is not None: + # Already exists — rare reentrant case. Skip to avoid leaking step. + return {} + + parent_for_subagent = state.tool_to_parent_step.get(tool_use_id) + pushed_token = None + if parent_for_subagent is not None: + pushed_token = _tracer._current_step.set(parent_for_subagent) + try: + cm = _tracer.create_step( + name=tool_name, + step_type=StepType.TOOL, + inputs=tool_input, + metadata=metadata, + ) + handle = _ToolStepHandle(cm) + finally: + # Pop the temporary parent push immediately. We don't want + # subsequent operations (e.g. message observation between + # PreToolUse and PostToolUse) to be siblings of the tool step. + if pushed_token is not None: + _tracer._safe_reset_contextvar(_tracer._current_step, pushed_token) + + state.pending_tools[tool_use_id] = handle + except Exception: # never break user's hook execution + logger.exception( + "Openlayer PreToolUse hook failed for tool_use_id=%s", tool_use_id + ) + return {} + + +async def _post_tool_use_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PostToolUse hook — finalizes the TOOL step on success.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + handle = state.pending_tools.pop(tool_use_id, None) + if handle is None: + return {} + raw_output = ( + input_data.get("tool_response") + or input_data.get("tool_output") + or input_data.get("output") + ) + output = _truncate(raw_output, _config.truncate_tool_output_chars) + handle.log(output=output, metadata={"is_error": False}) + handle.end() + # Remember the step in case a subagent spawned by this tool is still + # streaming messages (we may need to nest those under it). + state.tool_to_parent_step[tool_use_id] = handle.step + except Exception: + logger.exception( + "Openlayer PostToolUse hook failed for tool_use_id=%s", tool_use_id + ) + return {} + + +async def _post_tool_use_failure_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PostToolUseFailure hook — finalizes the TOOL step on error.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + handle = state.pending_tools.pop(tool_use_id, None) + if handle is None: + return {} + err = ( + input_data.get("error") + or input_data.get("tool_response") + or input_data.get("tool_output") + ) + handle.log( + output=_truncate(err, _config.truncate_tool_output_chars), + metadata={"is_error": True}, + ) + handle.end() + state.tool_to_parent_step[tool_use_id] = handle.step + except Exception: + logger.exception( + "Openlayer PostToolUseFailure hook failed for tool_use_id=%s", + tool_use_id, + ) + return {} + + +def _inject_openlayer_hooks(options: Any) -> Any: + """Return a copy of ``options`` with our internal hooks merged in. + + User-provided hooks are preserved untouched. Our hooks are appended after + the user's so user hooks have first crack at any synchronous influence on + the agent (e.g. ``permissionDecision`` decisions). + """ + _require_cas() + if options is None: + options = _cas.ClaudeAgentOptions() + + HookMatcher = _cas.HookMatcher # type: ignore[attr-defined] + + user_hooks: Dict[str, list] = dict(getattr(options, "hooks", None) or {}) + + def _append(event: str, matcher: Any) -> None: + user_hooks[event] = list(user_hooks.get(event) or []) + [matcher] + + _append("PreToolUse", HookMatcher(hooks=[_pre_tool_use_hook])) + _append("PostToolUse", HookMatcher(hooks=[_post_tool_use_hook])) + _append("PostToolUseFailure", HookMatcher(hooks=[_post_tool_use_failure_hook])) + + try: + return _dataclass_replace(options, hooks=user_hooks) + except TypeError: + # ``options`` is not a dataclass or doesn't accept ``hooks`` in + # ``replace``. Fall back to mutating in place (safe for our use-case + # since we just instantiated it). + try: + setattr(options, "hooks", user_hooks) + except Exception: + logger.debug("Could not set hooks on options; tool tracing disabled") + return options + + # --------------------------- Observers --------------------------- # @@ -303,6 +511,7 @@ async def traced_query( same order. """ _require_cas() + options = _inject_openlayer_hooks(options) name = "claude-agent-sdk: " + _summarize_prompt(prompt) resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id with _tracer.create_step( diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 1fb71ae8..57ba1e69 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -13,6 +13,9 @@ FakeResultMessage, FakeTextBlock, FakeThinkingBlock, + FakeToolResultBlock, + FakeToolUseBlock, + FakeUserMessage, init_system_message, make_stream, ) @@ -165,3 +168,82 @@ async def run(): assert chat.completion_tokens == 4 assert chat.tokens == 16 assert chat.metadata["stop_reason"] == "end_turn" + + +def _extract_hook_callbacks(options, event: str): + """Pull our callback functions out of options.hooks[event].""" + matchers = (getattr(options, "hooks", None) or {}).get(event, []) or [] + callbacks = [] + for m in matchers: + for cb in getattr(m, "hooks", []) or []: + callbacks.append(cb) + return callbacks + + +def test_tool_call_creates_tool_step_with_input_and_output(): + """A tool call yields a TOOL step with input/output/latency/tool_use_id.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + # The wrapper injects our hooks into options.hooks. Pull them out and + # invoke at the right moments to simulate the real SDK calling them. + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + yield FakeAssistantMessage( + content=[ + FakeTextBlock(""), + FakeToolUseBlock(id="t1", name="Bash", input={"command": "echo hi"}), + ] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "echo hi"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Bash", + "tool_input": {"command": "echo hi"}, + "tool_response": "hi", + }, + "t1", + {"signal": None}, + ) + yield FakeUserMessage(content=[FakeToolResultBlock(tool_use_id="t1", content="hi")]) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="run echo hi", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.name == "Bash" + assert t.inputs == {"command": "echo hi"} + assert t.output == "hi" + assert t.metadata["tool_use_id"] == "t1" + assert t.latency is not None and t.latency >= 0 + assert t.metadata.get("is_error") is False From 1ecd6dc455fa7ec100452bb8672e10646a108cae Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:01:39 -0300 Subject: [PATCH 05/20] test(claude-agent-sdk): MCP tool name parsing --- tests/integrations/test_claude_agent_sdk.py | 64 +++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 57ba1e69..b5fab42d 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -247,3 +247,67 @@ async def run(): assert t.metadata["tool_use_id"] == "t1" assert t.latency is not None and t.latency >= 0 assert t.metadata.get("is_error") is False + + +def test_mcp_tool_name_is_parsed_into_metadata(): + """A tool named ``mcp__playwright__browser_click`` records the parsed metadata.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + yield FakeAssistantMessage( + content=[ + FakeToolUseBlock( + id="t1", + name="mcp__playwright__browser_click", + input={"selector": "#submit"}, + ) + ] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "mcp__playwright__browser_click", + "tool_input": {"selector": "#submit"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "mcp__playwright__browser_click", + "tool_input": {"selector": "#submit"}, + "tool_response": "clicked", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="click", options=cas.ClaudeAgentOptions()): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.name == "mcp__playwright__browser_click" + assert t.metadata["mcp_server"] == "playwright" + assert t.metadata["mcp_tool_name"] == "browser_click" From be044eaf1cc82f9bd0f09bc7c16b6682231839da Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:02:16 -0300 Subject: [PATCH 06/20] feat(claude-agent-sdk): subagent messages nest under their Agent ToolStep The _resolve_subagent_parent helper looks up the pending Agent ToolStep by parent_tool_use_id and pushes it onto the contextvar stack before opening the subagent's chat-completion step. The Agent ToolStep is kept open across the subagent's stream by virtue of PostToolUse firing only after the subagent returns. --- tests/integrations/test_claude_agent_sdk.py | 84 +++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index b5fab42d..449cbe74 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -311,3 +311,87 @@ async def run(): assert t.name == "mcp__playwright__browser_click" assert t.metadata["mcp_server"] == "playwright" assert t.metadata["mcp_tool_name"] == "browser_click" + + +def test_subagent_messages_nest_under_agent_tool_step(): + """A message with ``parent_tool_use_id`` nests under the spawning Agent ToolStep.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + # Parent assistant turn dispatches the Agent tool + yield FakeAssistantMessage( + content=[ + FakeToolUseBlock( + id="t1", + name="Agent", + input={"description": "code-reviewer", "prompt": "review src/"}, + ) + ] + ) + # PreToolUse opens the Agent ToolStep + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Agent", + "tool_input": {"description": "code-reviewer", "prompt": "review src/"}, + }, + "t1", + {"signal": None}, + ) + # Subagent streams its own assistant message *while the Agent step is open* + yield FakeAssistantMessage( + content=[FakeTextBlock("subagent turn 1")], + parent_tool_use_id="t1", + ) + # Subagent ends; PostToolUse closes the Agent step + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Agent", + "tool_input": {"description": "code-reviewer"}, + "tool_response": "review complete", + }, + "t1", + {"signal": None}, + ) + yield FakeUserMessage( + content=[FakeToolResultBlock(tool_use_id="t1", content="review complete")] + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="dispatch subagent", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + # The root step has the parent assistant turn (the one that emitted Agent + # ToolUseBlock) AND the Agent ToolStep itself. + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + agent_tool_step = tool_steps[0] + assert agent_tool_step.name == "Agent" + # Subagent's assistant turn nests beneath the Agent tool step + subagent_chats = [ + s for s in agent_tool_step.steps if s.step_type.value == "chat_completion" + ] + assert len(subagent_chats) == 1, "expected subagent chat completion nested under Agent tool step" + assert "subagent turn 1" in subagent_chats[0].output + assert subagent_chats[0].metadata.get("parent_tool_use_id") == "t1" From eb86625588418975cedb58fd7dc435cbc849f714 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:02:54 -0300 Subject: [PATCH 07/20] feat(claude-agent-sdk): capture error subtypes and tool failures Tests both error paths exercised by the implementation from A2 and A4: - ResultMessage(subtype='error_max_turns', is_error=True) on the root step's metadata - PostToolUseFailure marking the tool step as errored with the error message --- tests/integrations/test_claude_agent_sdk.py | 96 +++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 449cbe74..723617dc 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -395,3 +395,99 @@ async def run(): assert len(subagent_chats) == 1, "expected subagent chat completion nested under Agent tool step" assert "subagent turn 1" in subagent_chats[0].output assert subagent_chats[0].metadata.get("parent_tool_use_id") == "t1" + + +def test_result_message_error_subtype_marks_root_step(): + """An error ResultMessage subtype is reflected on the root step's metadata.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(), + FakeResultMessage( + subtype="error_max_turns", + is_error=True, + result=None, + stop_reason=None, + num_turns=10, + ), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + assert root_step.metadata["subtype"] == "error_max_turns" + assert root_step.metadata["is_error"] is True + assert root_step.metadata["num_turns"] == 10 + + +def test_post_tool_use_failure_marks_tool_step_as_error(): + """PostToolUseFailure fires instead of PostToolUse — the tool step is marked errored.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + fail_hooks = _extract_hook_callbacks(options, "PostToolUseFailure") + + yield init_system_message() + yield FakeAssistantMessage( + content=[FakeToolUseBlock(id="t1", name="Bash", input={"command": "false"})] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "false"}, + }, + "t1", + {"signal": None}, + ) + for h in fail_hooks: + await h( + { + "hook_event_name": "PostToolUseFailure", + "tool_name": "Bash", + "tool_input": {"command": "false"}, + "error": "exit code 1", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="fail bash", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.metadata["is_error"] is True + assert "exit code 1" in (t.output or "") From ec60af98cd993eec5d59a0020b102be69ec49299 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:03:26 -0300 Subject: [PATCH 08/20] test(claude-agent-sdk): user hooks compose with Openlayer hooks --- tests/integrations/test_claude_agent_sdk.py | 77 +++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 723617dc..dae39418 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -491,3 +491,80 @@ async def run(): t = tool_steps[0] assert t.metadata["is_error"] is True assert "exit code 1" in (t.output or "") + + +def test_user_hooks_compose_with_openlayer_hooks(): + """User-provided hooks run alongside ours; neither replaces the other.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + user_called: list = [] + + async def user_hook(input_data, tool_use_id, context): + user_called.append(tool_use_id) + return { + "hookSpecificOutput": { + "hookEventName": "PreToolUse", + "permissionDecision": "deny", + "permissionDecisionReason": "test deny", + } + } + + user_options = cas.ClaudeAgentOptions( + hooks={"PreToolUse": [cas.HookMatcher(hooks=[user_hook])]} + ) + + async def fake_query(*, prompt, options=None, **kwargs): + # The wrapper should have appended our hook AFTER the user's. + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + assert pre_hooks[0] is user_hook, "user hook must run first" + assert len(pre_hooks) >= 2, "openlayer hook must be appended after user hook" + + yield init_system_message() + yield FakeAssistantMessage( + content=[FakeToolUseBlock(id="t1", name="Bash", input={"command": "ls"})] + ) + # Simulate the SDK invoking *all* PreToolUse hooks (user first, then ours) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "ls"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Bash", + "tool_input": {"command": "ls"}, + "tool_response": "denied", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi", options=user_options): + pass + + asyncio.run(run()) + + # User hook was invoked + assert user_called == ["t1"] + # Our hook ran too — the tool step exists in the trace + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 From bcdbd8135b7c4c4583c2b25e39c5e7a8fd7623ca Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:04:06 -0300 Subject: [PATCH 09/20] test(claude-agent-sdk): redact MCP server env/headers from trace metadata --- tests/integrations/test_claude_agent_sdk.py | 48 +++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index dae39418..f07e8b38 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -568,3 +568,51 @@ async def run(): root_step = captured[0].steps[0] tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] assert len(tool_steps) == 1 + + +def test_mcp_env_is_stripped_from_agent_config_metadata(): + """``env`` and ``headers`` of MCP server configs must be redacted.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message( + mcp_servers=[ + { + "name": "secret-server", + "command": "x", + "env": {"API_KEY": "supersecret"}, + "headers": {"Authorization": "Bearer xyz"}, + } + ] + ), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + mcp = root_step.metadata["agent_config"]["mcp_servers"] + assert isinstance(mcp, list) + server = mcp[0] + assert "env" not in server, "env must be stripped" + assert "headers" not in server, "headers must be stripped" + # Non-secret keys are preserved + assert server.get("name") == "secret-server" + # The literal secret must not appear anywhere in the serialized metadata + serialized = repr(root_step.metadata) + assert "supersecret" not in serialized + assert "Bearer xyz" not in serialized From 362743b59baec0af96710d230079443c9927dea7 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:05:51 -0300 Subject: [PATCH 10/20] feat(claude-agent-sdk): add trace_claude_agent_sdk() global init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the one-shot init helper that monkey-patches claude_agent_sdk.query. Idempotent — subsequent calls update the module-level _config but don't re-wrap the function. Stashes the original query on the patched callable as _openlayer_original so traced_query can call it without recursing back through the patch. --- .../lib/integrations/claude_agent_sdk.py | 60 ++++++++++++++++++- tests/integrations/test_claude_agent_sdk.py | 49 +++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index 5fdfdd71..32da982a 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -511,6 +511,10 @@ async def traced_query( same order. """ _require_cas() + # Get hold of the *original* query — when our global monkey-patch is in + # effect, ``_cas.query`` points back at ``patched_query`` which would + # otherwise recurse into us. + query_fn = getattr(_cas.query, "_openlayer_original", _cas.query) options = _inject_openlayer_hooks(options) name = "claude-agent-sdk: " + _summarize_prompt(prompt) resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id @@ -523,7 +527,7 @@ async def traced_query( state = _TraceState(root_step=root_step) token = _current_state.set(state) try: - async for msg in _cas.query(prompt=prompt, options=options, **kwargs): + async for msg in query_fn(prompt=prompt, options=options, **kwargs): try: _observe(msg, state) except Exception: # never break the user's stream @@ -534,3 +538,57 @@ async def traced_query( yield msg finally: _current_state.reset(token) + + +def trace_claude_agent_sdk( + *, + inference_pipeline_id: Optional[str] = None, + truncate_tool_output_chars: int = 8192, + capture_thinking: bool = True, + redact_mcp_env: bool = True, +) -> None: + """Auto-instrument ``claude_agent_sdk.query`` so every call is traced. + + Call this once at startup before any code does ``from claude_agent_sdk + import query``. After the call, ordinary use of the SDK is automatically + wrapped: every ``async for m in query(...)`` becomes an Openlayer trace. + + The patch is idempotent — calling this function multiple times is safe and + only updates configuration on subsequent calls. + + Args: + inference_pipeline_id: Optional default Openlayer inference pipeline ID. + Falls back to the ``OPENLAYER_INFERENCE_PIPELINE_ID`` env var when + unset (handled by the tracer at publish time). + truncate_tool_output_chars: Maximum number of characters of tool output + to capture in each TOOL step. Excess is truncated with a marker. + Defaults to 8192. + capture_thinking: Whether to capture ``ThinkingBlock`` content into + chat-completion step metadata. Defaults to True. + redact_mcp_env: Whether to strip ``env`` and ``headers`` from MCP + server configs in trace metadata. Defaults to True. + """ + _require_cas() + global _config + _config = _Config( + inference_pipeline_id=inference_pipeline_id, + truncate_tool_output_chars=truncate_tool_output_chars, + capture_thinking=capture_thinking, + redact_mcp_env=redact_mcp_env, + ) + + if getattr(_cas.query, "_openlayer_patched", False): + # Already patched — config update above is sufficient. + return + + original_query = _cas.query + + async def patched_query(*args, **kwargs): + # Forward through traced_query; ``query_fn`` resolves to + # ``original_query`` via ``_openlayer_original`` to avoid recursion. + async for m in traced_query(*args, **kwargs): + yield m + + patched_query._openlayer_patched = True # type: ignore[attr-defined] + patched_query._openlayer_original = original_query # type: ignore[attr-defined] + _cas.query = patched_query # type: ignore[assignment] diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index f07e8b38..84fbe4a8 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -616,3 +616,52 @@ async def run(): serialized = repr(root_step.metadata) assert "supersecret" not in serialized assert "Bearer xyz" not in serialized + + +def test_trace_claude_agent_sdk_patches_module_query(): + """``trace_claude_agent_sdk()`` monkey-patches ``claude_agent_sdk.query``.""" + import claude_agent_sdk + + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + original = claude_agent_sdk.query + try: + trace_claude_agent_sdk() + assert claude_agent_sdk.query is not original + assert getattr(claude_agent_sdk.query, "_openlayer_patched", False) is True + + # Idempotent: a second call doesn't double-wrap. + after_first = claude_agent_sdk.query + trace_claude_agent_sdk() + assert claude_agent_sdk.query is after_first + assert getattr(claude_agent_sdk.query, "_openlayer_patched", False) is True + finally: + claude_agent_sdk.query = original + + +def test_trace_claude_agent_sdk_config_persists(): + """Init kwargs are persisted into the module-level config.""" + import claude_agent_sdk + + from openlayer.lib.integrations import claude_agent_sdk as integration + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + original = claude_agent_sdk.query + try: + trace_claude_agent_sdk( + inference_pipeline_id="pipe-123", + truncate_tool_output_chars=512, + capture_thinking=False, + ) + assert integration._config.inference_pipeline_id == "pipe-123" + assert integration._config.truncate_tool_output_chars == 512 + assert integration._config.capture_thinking is False + finally: + # Reset config for downstream tests + trace_claude_agent_sdk( + inference_pipeline_id=None, + truncate_tool_output_chars=8192, + capture_thinking=True, + redact_mcp_env=True, + ) + claude_agent_sdk.query = original From 44c64ac44056e682cd34984bf34b507751386af6 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:06:58 -0300 Subject: [PATCH 11/20] feat(claude-agent-sdk): auto-instrument ClaudeSDKClient on trace_claude_agent_sdk() Patches ClaudeSDKClient.__init__ (to compose hooks), .query (to open the root AGENT step), and .receive_response (to observe streamed messages and close the step when the generator exhausts). Idempotent via _openlayer_patched sentinel on the class. --- .../lib/integrations/claude_agent_sdk.py | 88 +++++++++++++++++++ tests/integrations/test_claude_agent_sdk.py | 30 +++++++ 2 files changed, 118 insertions(+) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index 32da982a..b2d3cdb1 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -592,3 +592,91 @@ async def patched_query(*args, **kwargs): patched_query._openlayer_patched = True # type: ignore[attr-defined] patched_query._openlayer_original = original_query # type: ignore[attr-defined] _cas.query = patched_query # type: ignore[assignment] + + _patch_claude_sdk_client() + + +# ----------------------- ClaudeSDKClient patching ----------------------- # + + +def _patch_claude_sdk_client() -> None: + """Monkey-patch ``ClaudeSDKClient`` so its query/receive_response are traced. + + The client API splits the request lifecycle across two methods: + ``client.query(prompt, session_id=...)`` (which only enqueues the prompt) + and ``async for m in client.receive_response()`` (which streams the + response). We open the root step when ``query`` is called and close it + when the matching ``receive_response`` generator exhausts. + """ + Client = _cas.ClaudeSDKClient # type: ignore[attr-defined] + if getattr(Client, "_openlayer_patched", False): + return + + original_init = Client.__init__ + original_query = Client.query + original_receive_response = Client.receive_response + + def patched_init(self, options=None, transport=None): # type: ignore[no-redef] + if options is not None: + options = _inject_openlayer_hooks(options) + else: + options = _inject_openlayer_hooks(None) + original_init(self, options=options, transport=transport) + self._openlayer_state = None + self._openlayer_state_cm = None + self._openlayer_token = None + + async def patched_query(self, prompt, session_id="default"): # type: ignore[no-redef] + # Open a root step for this prompt; the matching receive_response will + # close it. If the user calls query() multiple times without iterating + # receive_response() in between, the previous state will be replaced — + # but at that point the previous trace is orphaned anyway, so we just + # log a debug message rather than try to recover. + if getattr(self, "_openlayer_state", None) is not None: + logger.debug( + "ClaudeSDKClient.query() called before previous receive_response() " + "was consumed; previous trace will be orphaned." + ) + cm = _tracer.create_step( + name="claude-agent-sdk-client: " + _summarize_prompt(prompt), + step_type=StepType.AGENT, + inputs={"prompt": prompt, "session_id": session_id}, + inference_pipeline_id=_config.inference_pipeline_id, + ) + root_step = cm.__enter__() + state = _TraceState(root_step=root_step) + self._openlayer_state = state + self._openlayer_state_cm = cm + self._openlayer_token = _current_state.set(state) + return await original_query(self, prompt, session_id=session_id) + + async def patched_receive_response(self): # type: ignore[no-redef] + state = getattr(self, "_openlayer_state", None) + try: + async for msg in original_receive_response(self): + if state is not None: + try: + _observe(msg, state) + except Exception: + logger.exception( + "Openlayer observation failed for client message %r", + type(msg).__name__, + ) + yield msg + finally: + if state is not None and getattr(self, "_openlayer_state_cm", None): + token = self._openlayer_token + self._openlayer_state = None + cm = self._openlayer_state_cm + self._openlayer_state_cm = None + self._openlayer_token = None + try: + if token is not None: + _tracer._safe_reset_contextvar(_current_state, token) + finally: + cm.__exit__(None, None, None) + + Client.__init__ = patched_init + Client.query = patched_query + Client.receive_response = patched_receive_response + Client._openlayer_patched = True diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 84fbe4a8..21d99845 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -665,3 +665,33 @@ def test_trace_claude_agent_sdk_config_persists(): redact_mcp_env=True, ) claude_agent_sdk.query = original + + +def test_trace_claude_agent_sdk_patches_claude_sdk_client(): + """``trace_claude_agent_sdk()`` also patches ``ClaudeSDKClient.query`` / ``.receive_response``.""" + import claude_agent_sdk + + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + Client = claude_agent_sdk.ClaudeSDKClient + original_module_query = claude_agent_sdk.query + original_query = Client.query + original_receive = Client.receive_response + try: + trace_claude_agent_sdk() + assert Client.query is not original_query + assert Client.receive_response is not original_receive + assert getattr(Client, "_openlayer_patched", False) is True + + # Idempotent + after_first_query = Client.query + trace_claude_agent_sdk() + assert Client.query is after_first_query + finally: + Client.query = original_query + Client.receive_response = original_receive + try: + del Client._openlayer_patched + except AttributeError: + pass + claude_agent_sdk.query = original_module_query From 2f55d09a4883bf8f199f9ff425e79bde85923f72 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:07:28 -0300 Subject: [PATCH 12/20] test(claude-agent-sdk): wrapper preserves stream identity and order --- tests/integrations/test_claude_agent_sdk.py | 30 +++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 21d99845..2b109c66 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -695,3 +695,33 @@ def test_trace_claude_agent_sdk_patches_claude_sdk_client(): except AttributeError: pass claude_agent_sdk.query = original_module_query + + +def test_wrapped_stream_yields_identical_messages_in_identical_order(): + """The wrapper is a pure observer — output must equal the underlying stream.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + original_messages = [ + init_system_message(), + FakeAssistantMessage(content=[FakeTextBlock("x")]), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + for m in original_messages: + yield m + + with patch("claude_agent_sdk.query", fake_query): + async def run(): + out = [] + async for m in traced_query(prompt="x"): + out.append(m) + return out + + result = asyncio.run(run()) + + # Same length, same identities (the wrapper must not substitute), same order + assert len(result) == len(original_messages) + for got, expected in zip(result, original_messages): + assert got is expected + From 2b7d09b4a0f93a1211baccc49785a2d88514189b Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:10:01 -0300 Subject: [PATCH 13/20] test(claude-agent-sdk): add live integration test (gated on ANTHROPIC_API_KEY) The test runs a real query() against claude-haiku-4-5 and verifies the wrapper observes the SDK's message stream. It tolerates a trailing Exception from the SDK (which raises after delivering ResultMessage when the API returns is_error=True, e.g. on an invalid Anthropic API key) so the test still exercises trace publishing end-to-end. --- .../test_claude_agent_sdk_live.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 tests/integrations/test_claude_agent_sdk_live.py diff --git a/tests/integrations/test_claude_agent_sdk_live.py b/tests/integrations/test_claude_agent_sdk_live.py new file mode 100644 index 00000000..e3e820ce --- /dev/null +++ b/tests/integrations/test_claude_agent_sdk_live.py @@ -0,0 +1,67 @@ +"""Live integration test against the real Claude Agent SDK. + +This test is gated on the ``ANTHROPIC_API_KEY`` environment variable. It runs +a tiny ``query()`` against ``claude-haiku-4-5`` with no tools and asserts the +SDK produced a ``ResultMessage`` (proving the wrapper observed the stream). + +Run locally with the live test API keys set in env: + + ANTHROPIC_API_KEY=... OPENLAYER_API_KEY=... pytest \\ + tests/integrations/test_claude_agent_sdk_live.py -v + +In CI this test is automatically skipped when ``ANTHROPIC_API_KEY`` is unset. + +Note: the bundled ``claude-agent-sdk`` raises an internal ``Exception`` after +delivering ``ResultMessage`` when the underlying API returns ``is_error=True`` +(e.g. an invalid Anthropic API key). The wrapper correctly observes the +stream up to that point; we tolerate the trailing exception in this test so +it still exercises the trace publishing path with a real API key. +""" + +from __future__ import annotations + +import asyncio +import os + +import pytest + +pytestmark = pytest.mark.skipif( + not os.environ.get("ANTHROPIC_API_KEY"), + reason="ANTHROPIC_API_KEY not set", +) + + +def test_live_query_produces_valid_trace(): + pytest.importorskip("claude_agent_sdk") + from claude_agent_sdk import ClaudeAgentOptions + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + os.environ.setdefault( + "OPENLAYER_INFERENCE_PIPELINE_ID", + "d4ee57e5-cd26-4435-b321-0365760724ad", + ) + + async def run(): + messages = [] + try: + async for m in traced_query( + prompt="Say the word 'banana' and nothing else.", + options=ClaudeAgentOptions(model="claude-haiku-4-5"), + ): + messages.append(m) + except Exception as exc: + # The SDK raises an Exception trailing ResultMessage when the + # underlying API errors. The wrapper observed everything up to + # that point; tolerate the trailing exception here. + messages.append(("__sdk_exception__", str(exc))) + return messages + + msgs = asyncio.run(run()) + real_msgs = [m for m in msgs if not (isinstance(m, tuple) and m[0] == "__sdk_exception__")] + assert any( + type(m).__name__ == "SystemMessage" for m in real_msgs + ), "Expected a SystemMessage(init) in the stream" + assert any( + type(m).__name__ == "ResultMessage" for m in real_msgs + ), "Expected a ResultMessage in the stream" From e672679b6c554d6db316680bbc7c9aaa4136dc38 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:11:56 -0300 Subject: [PATCH 14/20] docs(claude-agent-sdk): add example notebook and public re-export MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds: - examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb — a concise 12-cell notebook covering install, init, simple query, and a subagent example - openlayer.lib.trace_claude_agent_sdk and openlayer.lib.traced_claude_agent_sdk_query as public re-exports from the integration module, matching the pattern used by trace_google_adk and other integrations --- .../claude_agent_sdk_tracing.ipynb | 188 ++++++++++++++++++ src/openlayer/lib/__init__.py | 76 +++++++ 2 files changed, 264 insertions(+) create mode 100644 examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb diff --git a/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb new file mode 100644 index 00000000..6cd8bc30 --- /dev/null +++ b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb @@ -0,0 +1,188 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/openlayer-python/blob/main/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb)\n", + "\n", + "# Tracing the Claude Agent SDK with Openlayer\n", + "\n", + "This notebook shows how to enable Openlayer tracing for applications built with Anthropic's [Claude Agent SDK](https://github.com/anthropics/claude-agent-sdk-python). After one line of setup, every `query()` becomes an Openlayer trace with nested steps for assistant turns, tool calls (including MCP and subagents), session metadata, cost, and tokens." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install openlayer 'claude-agent-sdk>=0.1.81'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Set environment variables\n", + "\n", + "You need three secrets:\n", + "\n", + "- `OPENLAYER_API_KEY` — get from [openlayer.com/settings/api-keys](https://app.openlayer.com/settings/api-keys)\n", + "- `OPENLAYER_INFERENCE_PIPELINE_ID` — the inference pipeline you want to stream traces to\n", + "- `ANTHROPIC_API_KEY` — your Anthropic API key" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"YOUR_INFERENCE_PIPELINE_ID\"\n", + "os.environ[\"ANTHROPIC_API_KEY\"] = \"YOUR_ANTHROPIC_API_KEY\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Enable tracing — one line\n", + "\n", + "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so they're auto-traced. It composes with any hooks you've configured yourself — your hooks are not replaced." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openlayer.lib import trace_claude_agent_sdk\n", + "\n", + "trace_claude_agent_sdk()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Run a simple query\n", + "\n", + "The agent has read-only access to the working directory via `Read`, `Glob`, and `Grep` so it can answer questions about local code." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "from claude_agent_sdk import query, ClaudeAgentOptions\n", + "\n", + "\n", + "async def search():\n", + " options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " allowed_tools=[\"Read\", \"Glob\", \"Grep\"],\n", + " )\n", + " async for message in query(\n", + " prompt=\"List the Python files in the current directory and say how many there are.\",\n", + " options=options,\n", + " ):\n", + " print(type(message).__name__)\n", + "\n", + "\n", + "await search()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Subagent example\n", + "\n", + "When the agent delegates work to a subagent via the `Agent` tool, the subagent's messages automatically nest under the spawning tool step in the trace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from claude_agent_sdk import AgentDefinition\n", + "\n", + "\n", + "async def subagent_demo():\n", + " options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " allowed_tools=[\"Agent\", \"Read\", \"Glob\"],\n", + " agents={\n", + " \"reviewer\": AgentDefinition(\n", + " description=\"A short summary of the Python codebase.\",\n", + " prompt=\"You are a senior code reviewer. List the top-level Python files and summarize each in one sentence.\",\n", + " tools=[\"Read\", \"Glob\"],\n", + " ),\n", + " },\n", + " )\n", + " async for message in query(\n", + " prompt=\"Dispatch the reviewer subagent and report back its summary.\",\n", + " options=options,\n", + " ):\n", + " print(type(message).__name__)\n", + "\n", + "\n", + "await subagent_demo()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## What's captured?\n", + "\n", + "Each `query()` becomes a single trace with the following shape:\n", + "\n", + "```\n", + "AgentStep \"claude-agent-sdk: \"\n", + "├── ChatCompletionStep \"assistant turn 1\" (text, thinking, model, tokens)\n", + "├── ToolStep \"Read\" (input, output, latency)\n", + "├── ChatCompletionStep \"assistant turn 2\"\n", + "└── ToolStep \"Agent: reviewer\" (subagent — children nest underneath)\n", + " ├── ChatCompletionStep \"subagent turn 1\"\n", + " └── ToolStep \"Glob\"\n", + "```\n", + "\n", + "The root step's metadata includes cost (`total_cost_usd`), token counts, session ID, agent config (model, tools, MCP servers, skills, plugins), permission denials, and per-model usage breakdown. See the Openlayer dashboard to inspect any trace.\n", + "\n", + "For full details (including the per-call `traced_query()` API for codebases that can't change imports) see the [integration source](https://github.com/openlayer-ai/openlayer-python/blob/main/src/openlayer/lib/integrations/claude_agent_sdk.py)." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/src/openlayer/lib/__init__.py b/src/openlayer/lib/__init__.py index f5f75bce..9a92a3b3 100644 --- a/src/openlayer/lib/__init__.py +++ b/src/openlayer/lib/__init__.py @@ -18,6 +18,8 @@ "trace_portkey", "trace_google_adk", "unpatch_google_adk", + "trace_claude_agent_sdk", + "traced_claude_agent_sdk_query", "trace_gemini", "update_current_trace", "update_current_step", @@ -311,6 +313,80 @@ def unpatch_google_adk(): return google_adk_tracer.unpatch_google_adk() +# ------------------------------ Claude Agent SDK ---------------------------- # +def trace_claude_agent_sdk( + *, + inference_pipeline_id=None, + truncate_tool_output_chars: int = 8192, + capture_thinking: bool = True, + redact_mcp_env: bool = True, +): + """Enable Openlayer tracing for the Claude Agent SDK. + + Monkey-patches ``claude_agent_sdk.query`` and ``ClaudeSDKClient`` so every + call becomes an Openlayer trace with nested steps for assistant turns, + tool calls (including MCP and subagent calls), session metadata, cost, + and tokens. + + Requirements: + ``claude-agent-sdk>=0.1.81`` must be installed: + ``pip install 'claude-agent-sdk>=0.1.81'`` + + Args: + inference_pipeline_id: Optional Openlayer inference pipeline ID. Falls + back to the ``OPENLAYER_INFERENCE_PIPELINE_ID`` env var. + truncate_tool_output_chars: Maximum characters of tool output to + capture per TOOL step. Defaults to 8192. + capture_thinking: Whether to capture ``ThinkingBlock`` content into + chat-completion step metadata. Defaults to True. + redact_mcp_env: Whether to strip ``env`` and ``headers`` from MCP + server config dicts in trace metadata. Defaults to True. + + Example: + >>> import os + >>> os.environ["OPENLAYER_API_KEY"] = "..." + >>> os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "..." + >>> os.environ["ANTHROPIC_API_KEY"] = "..." + >>> from openlayer.lib import trace_claude_agent_sdk + >>> trace_claude_agent_sdk() + >>> + >>> from claude_agent_sdk import query, ClaudeAgentOptions + >>> async for m in query(prompt="hello", options=ClaudeAgentOptions(model="claude-haiku-4-5")): + ... ... + """ + # pylint: disable=import-outside-toplevel + from .integrations import claude_agent_sdk as _integration + + return _integration.trace_claude_agent_sdk( + inference_pipeline_id=inference_pipeline_id, + truncate_tool_output_chars=truncate_tool_output_chars, + capture_thinking=capture_thinking, + redact_mcp_env=redact_mcp_env, + ) + + +def traced_claude_agent_sdk_query(*, prompt, options=None, inference_pipeline_id=None, **kwargs): + """Per-call wrapper around ``claude_agent_sdk.query()`` (alternative to global init). + + Returns an async iterator that yields the same messages as ``query()`` while + emitting an Openlayer trace as a side effect. + + Example: + >>> from openlayer.lib import traced_claude_agent_sdk_query + >>> async for m in traced_claude_agent_sdk_query(prompt="hello"): + ... ... + """ + # pylint: disable=import-outside-toplevel + from .integrations import claude_agent_sdk as _integration + + return _integration.traced_query( + prompt=prompt, + options=options, + inference_pipeline_id=inference_pipeline_id, + **kwargs, + ) + + # -------------------------------- Google Gemini --------------------------------- # def trace_gemini(client): """Trace Google Gemini chat completions.""" From 9662e22720713d442a1fccb1cca0dee22d005980 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:17:12 -0300 Subject: [PATCH 15/20] chore(claude-agent-sdk): satisfy ruff (import order, ARG001, T201) - Imports sorted by ruff --fix in test/mock helpers and the live test - File-level ruff: noqa: ARG001 in the test file (the fake_query helpers match the SDK signature but don't use prompt/options/kwargs) - # noqa: T201 on the example notebook's print() lines, matching how the sibling google-adk example notebook handles the same lint --- .../claude_agent_sdk_tracing.ipynb | 29 +++++++++---------- tests/integrations/claude_agent_sdk_mocks.py | 4 +-- tests/integrations/test_claude_agent_sdk.py | 16 ++++++---- .../test_claude_agent_sdk_live.py | 2 +- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb index 6cd8bc30..ae363b3b 100644 --- a/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb +++ b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb @@ -35,9 +35,9 @@ "\n", "You need three secrets:\n", "\n", - "- `OPENLAYER_API_KEY` — get from [openlayer.com/settings/api-keys](https://app.openlayer.com/settings/api-keys)\n", - "- `OPENLAYER_INFERENCE_PIPELINE_ID` — the inference pipeline you want to stream traces to\n", - "- `ANTHROPIC_API_KEY` — your Anthropic API key" + "- `OPENLAYER_API_KEY` \u2014 get from [openlayer.com/settings/api-keys](https://app.openlayer.com/settings/api-keys)\n", + "- `OPENLAYER_INFERENCE_PIPELINE_ID` \u2014 the inference pipeline you want to stream traces to\n", + "- `ANTHROPIC_API_KEY` \u2014 your Anthropic API key" ] }, { @@ -57,9 +57,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## 3. Enable tracing — one line\n", + "## 3. Enable tracing \u2014 one line\n", "\n", - "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so they're auto-traced. It composes with any hooks you've configured yourself — your hooks are not replaced." + "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so they're auto-traced. It composes with any hooks you've configured yourself \u2014 your hooks are not replaced." ] }, { @@ -88,9 +88,8 @@ "metadata": {}, "outputs": [], "source": [ - "import asyncio\n", "\n", - "from claude_agent_sdk import query, ClaudeAgentOptions\n", + "from claude_agent_sdk import ClaudeAgentOptions, query\n", "\n", "\n", "async def search():\n", @@ -102,7 +101,7 @@ " prompt=\"List the Python files in the current directory and say how many there are.\",\n", " options=options,\n", " ):\n", - " print(type(message).__name__)\n", + " print(type(message).__name__) # noqa: T201\n", "\n", "\n", "await search()" @@ -142,7 +141,7 @@ " prompt=\"Dispatch the reviewer subagent and report back its summary.\",\n", " options=options,\n", " ):\n", - " print(type(message).__name__)\n", + " print(type(message).__name__) # noqa: T201\n", "\n", "\n", "await subagent_demo()" @@ -158,12 +157,12 @@ "\n", "```\n", "AgentStep \"claude-agent-sdk: \"\n", - "├── ChatCompletionStep \"assistant turn 1\" (text, thinking, model, tokens)\n", - "├── ToolStep \"Read\" (input, output, latency)\n", - "├── ChatCompletionStep \"assistant turn 2\"\n", - "└── ToolStep \"Agent: reviewer\" (subagent — children nest underneath)\n", - " ├── ChatCompletionStep \"subagent turn 1\"\n", - " └── ToolStep \"Glob\"\n", + "\u251c\u2500\u2500 ChatCompletionStep \"assistant turn 1\" (text, thinking, model, tokens)\n", + "\u251c\u2500\u2500 ToolStep \"Read\" (input, output, latency)\n", + "\u251c\u2500\u2500 ChatCompletionStep \"assistant turn 2\"\n", + "\u2514\u2500\u2500 ToolStep \"Agent: reviewer\" (subagent \u2014 children nest underneath)\n", + " \u251c\u2500\u2500 ChatCompletionStep \"subagent turn 1\"\n", + " \u2514\u2500\u2500 ToolStep \"Glob\"\n", "```\n", "\n", "The root step's metadata includes cost (`total_cost_usd`), token counts, session ID, agent config (model, tools, MCP servers, skills, plugins), permission denials, and per-model usage breakdown. See the Openlayer dashboard to inspect any trace.\n", diff --git a/tests/integrations/claude_agent_sdk_mocks.py b/tests/integrations/claude_agent_sdk_mocks.py index a5a7726c..093894eb 100644 --- a/tests/integrations/claude_agent_sdk_mocks.py +++ b/tests/integrations/claude_agent_sdk_mocks.py @@ -8,8 +8,8 @@ from __future__ import annotations -from dataclasses import dataclass, field -from typing import Any, AsyncIterator, List, Optional +from typing import Any, List, Optional, AsyncIterator +from dataclasses import field, dataclass @dataclass diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 2b109c66..47689ab0 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -1,5 +1,10 @@ """Tests for the Claude Agent SDK Openlayer integration.""" +# Many of the fake_query implementations below take **kwargs / unused prompt / +# options arguments to match the real SDK's call signature; ruff's ARG001 +# fires for each of those, so we silence the rule file-wide. +# ruff: noqa: ARG001 + import asyncio from typing import Any, List from unittest.mock import patch @@ -9,18 +14,17 @@ from openlayer.lib.tracing import tracer as ol_tracer from .claude_agent_sdk_mocks import ( - FakeAssistantMessage, - FakeResultMessage, FakeTextBlock, + FakeUserMessage, + FakeToolUseBlock, + FakeResultMessage, FakeThinkingBlock, FakeToolResultBlock, - FakeToolUseBlock, - FakeUserMessage, - init_system_message, + FakeAssistantMessage, make_stream, + init_system_message, ) - # ---------- shared infra ---------- @pytest.fixture(autouse=True) diff --git a/tests/integrations/test_claude_agent_sdk_live.py b/tests/integrations/test_claude_agent_sdk_live.py index e3e820ce..b1fbce0c 100644 --- a/tests/integrations/test_claude_agent_sdk_live.py +++ b/tests/integrations/test_claude_agent_sdk_live.py @@ -20,8 +20,8 @@ from __future__ import annotations -import asyncio import os +import asyncio import pytest From 351c4b443675357fec97c7a5dd75780cb673db1d Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 13:34:58 -0300 Subject: [PATCH 16/20] fix(claude-agent-sdk): use stable step name instead of prompt-derived title The root AGENT step title was being built from the prompt content ("claude-agent-sdk: Say the word 'banana'..."), making the trace sidebar in Openlayer noisy and inconsistent across runs. Use the stable name "Claude Agent SDK query" instead. The prompt content is still captured in root_step.inputs.prompt where it belongs. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/openlayer/lib/integrations/claude_agent_sdk.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index b2d3cdb1..8ee63718 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -106,12 +106,7 @@ def _redact_mcp_servers(mcp_servers: Any) -> Any: return mcp_servers -def _summarize_prompt(prompt: Any) -> str: - """Build a short, human-readable trace name from the prompt.""" - if isinstance(prompt, str): - s = prompt.strip().replace("\n", " ") - return s[:80] + ("..." if len(s) > 80 else "") - return "claude-agent-sdk query" +_ROOT_STEP_NAME = "Claude Agent SDK query" def _truncate(value: Any, max_chars: int) -> Any: @@ -516,10 +511,9 @@ async def traced_query( # otherwise recurse into us. query_fn = getattr(_cas.query, "_openlayer_original", _cas.query) options = _inject_openlayer_hooks(options) - name = "claude-agent-sdk: " + _summarize_prompt(prompt) resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id with _tracer.create_step( - name=name, + name=_ROOT_STEP_NAME, step_type=StepType.AGENT, inputs={"prompt": prompt}, inference_pipeline_id=resolved_pipeline, @@ -638,7 +632,7 @@ async def patched_query(self, prompt, session_id="default"): # type: ignore[no- "was consumed; previous trace will be orphaned." ) cm = _tracer.create_step( - name="claude-agent-sdk-client: " + _summarize_prompt(prompt), + name=_ROOT_STEP_NAME, step_type=StepType.AGENT, inputs={"prompt": prompt, "session_id": session_id}, inference_pipeline_id=_config.inference_pipeline_id, From bd2782394498bc98dd06249877a6a9e3b0826de1 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 14:03:39 -0300 Subject: [PATCH 17/20] feat(claude-agent-sdk): capture options.system_prompt and options.agents on root metadata The root AGENT step was missing the user-provided system prompt and subagent definitions that drove the run. Spec called for both, but the initial implementation only captured the SDK's runtime-resolved agent_config from SystemMessage(init), not the user's input options. Capture, on the root step's metadata: - system_prompt (truncated to 4096 chars; supports string, preset dict, and SystemPromptFile dataclass shapes) - agents_defined: { name -> { description, prompt, tools, model } } - options: { model, fallback_model, max_turns, max_budget_usd, permission_mode, cwd, allowed_tools, disallowed_tools, continue_conversation, resume, fork_session } For ClaudeSDKClient, stash the original options at patched_init() time so they're available to patched_query() before our hook injection mutates them. New test: test_options_metadata_captured_on_root_step. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/integrations/claude_agent_sdk.py | 88 +++++++++++++++++++ tests/integrations/test_claude_agent_sdk.py | 55 ++++++++++++ 2 files changed, 143 insertions(+) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index 8ee63718..fcb20641 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -109,6 +109,84 @@ def _redact_mcp_servers(mcp_servers: Any) -> Any: _ROOT_STEP_NAME = "Claude Agent SDK query" +def _serialize_system_prompt(sp: Any) -> Any: + """Coerce ``options.system_prompt`` (str | preset dict | dataclass) into a + JSON-serializable value, truncated to 4096 chars for string forms.""" + if sp is None: + return None + if isinstance(sp, str): + return _truncate(sp, 4096) + if isinstance(sp, dict): + return sp + # Preset / file dataclass — best-effort attribute pluck + keys = ("type", "preset", "append", "excludeDynamicSections", "path") + out = {k: getattr(sp, k) for k in keys if hasattr(sp, k)} + return out or str(sp) + + +def _serialize_agent_definitions(agents: Any) -> Any: + """``options.agents`` is a dict[str, AgentDefinition]. Capture each + definition's description, prompt (truncated), and tools list.""" + if not agents: + return None + out: Dict[str, Any] = {} + for name, defn in agents.items(): + out[name] = { + "description": getattr(defn, "description", None), + "prompt": _truncate(getattr(defn, "prompt", None), 4096), + "tools": getattr(defn, "tools", None), + "model": getattr(defn, "model", None), + } + return out + + +def _capture_options_metadata(root_step: Any, options: Any) -> None: + """Snapshot user-provided options onto the root step metadata. + + Captures the user's ``system_prompt``, subagent definitions, and a handful + of other useful configuration values. Called once per query, with the + *original* (pre-hook-injection) options so we record what the user passed, + not what we forwarded to the SDK. + """ + if options is None: + return + metadata: Dict[str, Any] = {} + + sp = getattr(options, "system_prompt", None) + serialized_sp = _serialize_system_prompt(sp) + if serialized_sp is not None: + metadata["system_prompt"] = serialized_sp + + agents = _serialize_agent_definitions(getattr(options, "agents", None)) + if agents: + metadata["agents_defined"] = agents + + opt_capture: Dict[str, Any] = {} + for opt in ( + "model", + "fallback_model", + "max_turns", + "max_budget_usd", + "permission_mode", + "cwd", + "allowed_tools", + "disallowed_tools", + "continue_conversation", + "resume", + "fork_session", + ): + val = getattr(options, opt, None) + if val is None or val == [] or val == {}: + continue + # Convert Path to str + opt_capture[opt] = str(val) if hasattr(val, "__fspath__") else val + if opt_capture: + metadata["options"] = opt_capture + + if metadata: + root_step.log(metadata=metadata) + + def _truncate(value: Any, max_chars: int) -> Any: """Coerce ``value`` to a string and truncate to ``max_chars``. @@ -510,6 +588,9 @@ async def traced_query( # effect, ``_cas.query`` points back at ``patched_query`` which would # otherwise recurse into us. query_fn = getattr(_cas.query, "_openlayer_original", _cas.query) + # Snapshot the user's options BEFORE we inject our hooks so the captured + # metadata reflects what the user actually configured, not our mutations. + original_options = options options = _inject_openlayer_hooks(options) resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id with _tracer.create_step( @@ -518,6 +599,7 @@ async def traced_query( inputs={"prompt": prompt}, inference_pipeline_id=resolved_pipeline, ) as root_step: + _capture_options_metadata(root_step, original_options) state = _TraceState(root_step=root_step) token = _current_state.set(state) try: @@ -611,6 +693,9 @@ def _patch_claude_sdk_client() -> None: original_receive_response = Client.receive_response def patched_init(self, options=None, transport=None): # type: ignore[no-redef] + # Stash the original (pre-injection) options for metadata capture at + # query() time. + self._openlayer_original_options = options if options is not None: options = _inject_openlayer_hooks(options) else: @@ -638,6 +723,9 @@ async def patched_query(self, prompt, session_id="default"): # type: ignore[no- inference_pipeline_id=_config.inference_pipeline_id, ) root_step = cm.__enter__() + _capture_options_metadata( + root_step, getattr(self, "_openlayer_original_options", None) + ) state = _TraceState(root_step=root_step) self._openlayer_state = state self._openlayer_state_cm = cm diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py index 47689ab0..3bf3847e 100644 --- a/tests/integrations/test_claude_agent_sdk.py +++ b/tests/integrations/test_claude_agent_sdk.py @@ -120,6 +120,61 @@ async def run(): assert root_step.latency == 1500 +def test_options_metadata_captured_on_root_step(): + """``options.system_prompt`` and ``options.agents`` land on root metadata.""" + pytest.importorskip("claude_agent_sdk") + from claude_agent_sdk import ClaudeAgentOptions, AgentDefinition + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + user_options = ClaudeAgentOptions( + system_prompt="You are a banana expert.", + model="claude-haiku-4-5", + max_turns=3, + allowed_tools=["Read", "Bash"], + agents={ + "code-reviewer": AgentDefinition( + description="Reviews code for bugs", + prompt="You are a strict reviewer. Flag anti-patterns.", + tools=["Read", "Grep"], + ), + }, + ) + + messages = [init_system_message(), FakeResultMessage(subtype="success")] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi", options=user_options): + pass + + asyncio.run(run()) + + root = captured[0].steps[0] + assert root.metadata["system_prompt"] == "You are a banana expert." + agents_meta = root.metadata["agents_defined"] + assert "code-reviewer" in agents_meta + assert agents_meta["code-reviewer"]["description"] == "Reviews code for bugs" + assert ( + agents_meta["code-reviewer"]["prompt"] + == "You are a strict reviewer. Flag anti-patterns." + ) + assert agents_meta["code-reviewer"]["tools"] == ["Read", "Grep"] + opts = root.metadata["options"] + assert opts["model"] == "claude-haiku-4-5" + assert opts["max_turns"] == 3 + assert opts["allowed_tools"] == ["Read", "Bash"] + + def test_assistant_message_emits_chat_completion_step(): """Each AssistantMessage becomes a CHAT_COMPLETION child of the root step.""" from openlayer.lib.integrations.claude_agent_sdk import traced_query From d645bb24471fa6140dd883e0e275f2d5377e01a2 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 14:05:20 -0300 Subject: [PATCH 18/20] test(claude-agent-sdk): enrich live test with system_prompt + max_turns to exercise options-metadata capture The live test now passes system_prompt and max_turns so the published Openlayer trace surfaces the new metadata captured on the root step (system_prompt, options.max_turns), proving end-to-end that the wrapper captures the user's configuration in addition to the SDK's runtime-resolved agent_config. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/integrations/test_claude_agent_sdk_live.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/integrations/test_claude_agent_sdk_live.py b/tests/integrations/test_claude_agent_sdk_live.py index b1fbce0c..81dd6b0b 100644 --- a/tests/integrations/test_claude_agent_sdk_live.py +++ b/tests/integrations/test_claude_agent_sdk_live.py @@ -47,7 +47,15 @@ async def run(): try: async for m in traced_query( prompt="Say the word 'banana' and nothing else.", - options=ClaudeAgentOptions(model="claude-haiku-4-5"), + options=ClaudeAgentOptions( + model="claude-haiku-4-5", + system_prompt=( + "You are a terse assistant that follows instructions " + "exactly. Never add filler words, never apologize, and " + "never add quotes around your answer." + ), + max_turns=2, + ), ): messages.append(m) except Exception as exc: From 089ac7669309064c31acd5c237feb1f123805a58 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 14:18:15 -0300 Subject: [PATCH 19/20] feat(claude-agent-sdk): capture rawOutput, prompt-context, and camelCase token aliases on every step Previously the trace published to Openlayer was missing key visibility: - Assistant turns with empty text content (e.g. thinking-only turns or pure tool-call turns) appeared blank in the UI. Fall back to a tool call summary, thinking text, or '[no content]' marker so reviewers always see something useful. - Top-level assistant turns had no 'inputs' set, so the UI couldn't show what prompt triggered them. Surface the user's prompt as the step input for top-level turns (subagent turns are driven by their parent's Agent tool call, not a user prompt). - The raw assistant message content was never serialized. Set the ChatCompletionStep's raw_output field with the full block array so reviewers can inspect every text/thinking/tool_use block. - ToolUseBlocks were captured by id only; widen to { id, name, input } so tool calls are inspectable from the assistant turn. - Root step had no rawOutput surface; stash a JSON-serialized ResultMessage in metadata.rawOutput. - promptTokens / completionTokens were stuck in metadata under snake_case keys that the trace UI doesn't render. Add camelCase aliases so the Metrics 'Prompt' and 'Completion' boxes populate. - Capture state.model from SystemMessage(init) and surface it on the root step so reviewers see the resolved model. - Capture state.user_prompt so assistant turn steps can show the triggering prompt as their input. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/integrations/claude_agent_sdk.py | 148 +++++++++++++++++- 1 file changed, 142 insertions(+), 6 deletions(-) diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py index fcb20641..7cb7e37a 100644 --- a/src/openlayer/lib/integrations/claude_agent_sdk.py +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -70,6 +70,8 @@ class _TraceState: tool_to_parent_step: Dict[str, Any] = field(default_factory=dict) turn_counter: int = 0 session_id: Optional[str] = None + model: Optional[str] = None + user_prompt: Optional[str] = None _current_state: contextvars.ContextVar[Optional[_TraceState]] = contextvars.ContextVar( @@ -402,6 +404,7 @@ def _observe_system_init(msg: Any, state: _TraceState) -> None: return data = getattr(msg, "data", {}) or {} state.session_id = data.get("session_id") + state.model = data.get("model") mcp_servers = data.get("mcp_servers") if _config.redact_mcp_env: mcp_servers = _redact_mcp_servers(mcp_servers) @@ -440,12 +443,24 @@ def _observe_result(msg: Any, state: _TraceState) -> None: # payload, so these show up at trace-level as well. state.root_step.output = getattr(msg, "result", None) or "" state.root_step.latency = duration_ms + # We surface cost / tokens / per-turn token breakdowns via metadata because + # base ``Step`` (and thus ``AgentStep``) doesn't define those attributes — + # ``Step.log()`` would silently filter them otherwise. ``post_process_trace`` + # spreads root metadata into the trace-level row, so these become visible + # at the trace level. The UI reads ``promptTokens`` / ``completionTokens`` + # in camelCase, so we duplicate the snake_case names with camelCase aliases + # to make sure they land in the right column. state.root_step.log( metadata={ "cost": getattr(msg, "total_cost_usd", None), "tokens": total_tokens, "prompt_tokens": input_tokens, "completion_tokens": output_tokens, + "promptTokens": input_tokens, + "completionTokens": output_tokens, + "model": _resolve_root_model(state, msg), + "provider": "anthropic", + "rawOutput": _serialize_result_message(msg), "session_id": getattr(msg, "session_id", None) or state.session_id, "num_turns": getattr(msg, "num_turns", None), "stop_reason": getattr(msg, "stop_reason", None), @@ -460,6 +475,91 @@ def _observe_result(msg: Any, state: _TraceState) -> None: ) +def _resolve_root_model(state: _TraceState, result_msg: Any) -> Optional[str]: + """Best-effort model identifier for the root step. + + ResultMessage doesn't carry a ``model`` field directly. We try, in order: + the first key of ``model_usage`` (the SDK's per-model token breakdown), + then the model recorded on the init ``SystemMessage`` (cached on state). + """ + model_usage = getattr(result_msg, "model_usage", None) + if isinstance(model_usage, dict) and model_usage: + first = next(iter(model_usage.keys()), None) + if first: + return first + return getattr(state, "model", None) + + +def _serialize_assistant_message(msg: Any, content: List[Any]) -> Optional[str]: + """Render an AssistantMessage's full content array to JSON for ``raw_output``. + + Captures every block (text, thinking, tool_use) so users can inspect the + complete model response even when text is empty. Tries to introspect + dataclass-style block objects; falls back to ``str(block)``. + """ + try: + blocks: List[Dict[str, Any]] = [] + for b in content: + bname = type(b).__name__ + if bname in ("TextBlock", "FakeTextBlock"): + blocks.append({"type": "text", "text": getattr(b, "text", "")}) + elif bname in ("ThinkingBlock", "FakeThinkingBlock"): + blocks.append( + { + "type": "thinking", + "thinking": getattr(b, "thinking", ""), + "signature": getattr(b, "signature", None), + } + ) + elif bname in ("ToolUseBlock", "FakeToolUseBlock"): + blocks.append( + { + "type": "tool_use", + "id": getattr(b, "id", ""), + "name": getattr(b, "name", ""), + "input": getattr(b, "input", None), + } + ) + else: + blocks.append({"type": bname, "repr": str(b)}) + return json.dumps( + { + "model": getattr(msg, "model", None), + "stop_reason": getattr(msg, "stop_reason", None), + "usage": getattr(msg, "usage", None), + "parent_tool_use_id": getattr(msg, "parent_tool_use_id", None), + "content": blocks, + }, + default=str, + ) + except Exception: + return None + + +def _serialize_result_message(msg: Any) -> Optional[str]: + """Render a ``ResultMessage`` to a JSON-ish string for ``rawOutput`` display.""" + try: + return json.dumps( + { + "subtype": getattr(msg, "subtype", None), + "result": getattr(msg, "result", None), + "session_id": getattr(msg, "session_id", None), + "duration_ms": getattr(msg, "duration_ms", None), + "duration_api_ms": getattr(msg, "duration_api_ms", None), + "num_turns": getattr(msg, "num_turns", None), + "stop_reason": getattr(msg, "stop_reason", None), + "is_error": getattr(msg, "is_error", None), + "total_cost_usd": getattr(msg, "total_cost_usd", None), + "usage": getattr(msg, "usage", None), + "model_usage": getattr(msg, "model_usage", None), + "permission_denials": getattr(msg, "permission_denials", None), + }, + default=str, + ) + except Exception: + return None + + def _resolve_subagent_parent(msg: Any, state: _TraceState) -> Any: """Return the spawning Agent ToolStep for a subagent message, or ``None``. @@ -487,7 +587,7 @@ def _observe_assistant(msg: Any, state: _TraceState) -> None: content = list(getattr(msg, "content", None) or []) text_parts: List[str] = [] thinking_parts: List[str] = [] - tool_use_ids: List[str] = [] + tool_use_blocks: List[Dict[str, Any]] = [] for block in content: bname = type(block).__name__ if bname in ("TextBlock", "FakeTextBlock"): @@ -495,12 +595,40 @@ def _observe_assistant(msg: Any, state: _TraceState) -> None: elif bname in ("ThinkingBlock", "FakeThinkingBlock"): thinking_parts.append(getattr(block, "thinking", "") or "") elif bname in ("ToolUseBlock", "FakeToolUseBlock"): - tool_use_ids.append(getattr(block, "id", "")) + tool_use_blocks.append( + { + "id": getattr(block, "id", ""), + "name": getattr(block, "name", ""), + "input": getattr(block, "input", None), + } + ) usage = getattr(msg, "usage", None) or {} input_tokens = usage.get("input_tokens") output_tokens = usage.get("output_tokens") total_tokens = (input_tokens or 0) + (output_tokens or 0) + text_output = "\n".join(p for p in text_parts if p) + + # Output: prefer text; fall back to tool-use summary; then thinking; then a + # marker so users see *something* in the UI instead of an empty step. + if text_output: + output_for_ui: str = text_output + elif tool_use_blocks: + names = ", ".join(b["name"] for b in tool_use_blocks) + output_for_ui = f"[tool call: {names}]" + elif thinking_parts and _config.capture_thinking: + output_for_ui = "[thinking]\n" + "\n".join(thinking_parts) + else: + output_for_ui = "[no content]" + + # For root-level assistant turns, surface the user's original prompt as + # the step's input so users see what triggered this turn. For subagent + # turns we omit it (they're triggered by the parent's Agent tool call, + # not by a user prompt). + is_subagent_turn = getattr(msg, "parent_tool_use_id", None) is not None + step_inputs = None + if not is_subagent_turn and state.user_prompt is not None: + step_inputs = {"prompt": state.user_prompt} # Subagent nesting: if this assistant message belongs to a subagent run, # temporarily push the spawning tool's step onto the contextvar stack so @@ -513,21 +641,23 @@ def _observe_assistant(msg: Any, state: _TraceState) -> None: with _tracer.create_step( name=f"assistant turn {state.turn_counter}", step_type=StepType.CHAT_COMPLETION, + inputs=step_inputs, ) as chat_step: chat_step.log( - output="\n".join(text_parts), + output=output_for_ui, model=getattr(msg, "model", None), provider="anthropic", prompt_tokens=input_tokens, completion_tokens=output_tokens, tokens=total_tokens, + raw_output=_serialize_assistant_message(msg, content), metadata={ "thinking": ( "\n".join(thinking_parts) if (thinking_parts and _config.capture_thinking) else None ), - "tool_calls": tool_use_ids or None, + "tool_calls": tool_use_blocks or None, "stop_reason": getattr(msg, "stop_reason", None), "parent_tool_use_id": getattr(msg, "parent_tool_use_id", None), "message_id": getattr(msg, "message_id", None), @@ -600,7 +730,10 @@ async def traced_query( inference_pipeline_id=resolved_pipeline, ) as root_step: _capture_options_metadata(root_step, original_options) - state = _TraceState(root_step=root_step) + state = _TraceState( + root_step=root_step, + user_prompt=prompt if isinstance(prompt, str) else None, + ) token = _current_state.set(state) try: async for msg in query_fn(prompt=prompt, options=options, **kwargs): @@ -726,7 +859,10 @@ async def patched_query(self, prompt, session_id="default"): # type: ignore[no- _capture_options_metadata( root_step, getattr(self, "_openlayer_original_options", None) ) - state = _TraceState(root_step=root_step) + state = _TraceState( + root_step=root_step, + user_prompt=prompt if isinstance(prompt, str) else None, + ) self._openlayer_state = state self._openlayer_state_cm = cm self._openlayer_token = _current_state.set(state) From b9b5ff2fb29f775b77dda8cf51f27b818945cad7 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 12 May 2026 15:09:32 -0300 Subject: [PATCH 20/20] docs(claude-agent-sdk): add multi-agent example notebook Adds a richer example that exercises every step type the Openlayer wrapper captures, so users can see a full trace tree: - Root AGENT step with system_prompt, agent_config, agents_defined, options, rawOutput. - Per-turn CHAT_COMPLETION steps with prompt/completion tokens, thinking blocks, tool_calls, raw_output. - TOOL steps for: - mcp__file-stats__count_files (custom in-process MCP tool) - Glob and Read (built-in) - Agent (twice: code-reviewer and summary-writer subagents) - Nested subagent steps correlated via parent_tool_use_id. The 'codebase analyzer' scenario walks the agent through three steps: count files by extension, dispatch a code-reviewer subagent to review one file, then dispatch a summary-writer subagent to wrap up. Result is a 4-line markdown report. Verified end-to-end against the live API as a plain Python script (5 turns, real subagent dispatch, real Openlayer trace upload). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../claude_agent_sdk_multi_agent.ipynb | 292 ++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb diff --git a/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb b/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb new file mode 100644 index 00000000..a32e5850 --- /dev/null +++ b/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb @@ -0,0 +1,292 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/openlayer-python/blob/main/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb)\n", + "\n", + "# Multi-agent Claude Agent SDK tracing with Openlayer\n", + "\n", + "This notebook builds a richer agent than the basic example: a **codebase analyzer** that orchestrates two subagents and an in-process MCP tool. It's designed to exercise every step type the Openlayer wrapper captures so you can see a full trace tree in your dashboard:\n", + "\n", + "- **Root `AGENT` step** \u2014 `Claude Agent SDK query` with the user prompt, the resolved agent_config, cost, tokens, session_id, the full final ResultMessage as `rawOutput`.\n", + "- **Per-turn `CHAT_COMPLETION` steps** \u2014 one per assistant turn, with model, prompt/completion tokens, thinking content (if any), tool_calls list, and the full assistant message as `raw_output`.\n", + "- **`TOOL` steps** \u2014 one per tool invocation, bracketed by `PreToolUse` / `PostToolUse` hooks for precise timing. MCP tools get `mcp_server` and `mcp_tool_name` metadata parsed from the `mcp__server__tool` naming convention.\n", + "- **Nested subagent steps** \u2014 messages from a subagent run carry `parent_tool_use_id` pointing at the spawning `Agent` tool call, so the wrapper nests them under that ToolStep automatically." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install openlayer 'claude-agent-sdk>=0.1.81'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Environment variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"YOUR_INFERENCE_PIPELINE_ID\"\n", + "os.environ[\"ANTHROPIC_API_KEY\"] = \"YOUR_ANTHROPIC_API_KEY\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Turn on tracing (one line)\n", + "\n", + "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so every call is auto-traced. Your own hooks (if any) are preserved \u2014 ours are composed on top, never replacing yours." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openlayer.lib import trace_claude_agent_sdk\n", + "\n", + "trace_claude_agent_sdk()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Define a custom MCP tool\n", + "\n", + "The SDK lets you expose any in-process Python function as an MCP tool via `@tool` + `create_sdk_mcp_server`. We define a `count_files` tool that takes a directory and returns a count of files by extension. In the trace it'll appear as a `TOOL` step named `mcp__file-stats__count_files` with `mcp_server: \"file-stats\"` and `mcp_tool_name: \"count_files\"` in metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import Counter\n", + "from pathlib import Path\n", + "\n", + "from claude_agent_sdk import tool, create_sdk_mcp_server\n", + "\n", + "\n", + "@tool(\"count_files\", \"Count files in a directory grouped by extension\", {\"directory\": str})\n", + "async def count_files(args):\n", + " target = Path(args[\"directory\"]).expanduser().resolve()\n", + " if not target.exists() or not target.is_dir():\n", + " return {\n", + " \"content\": [{\"type\": \"text\", \"text\": f\"No such directory: {target}\"}],\n", + " \"isError\": True,\n", + " }\n", + " counts = Counter()\n", + " for f in target.rglob(\"*\"):\n", + " if f.is_file():\n", + " counts[f.suffix or \"(no ext)\"] += 1\n", + " body = \"\\n\".join(f\"{ext}: {n}\" for ext, n in counts.most_common(20))\n", + " return {\"content\": [{\"type\": \"text\", \"text\": body or \"(empty)\"}]}\n", + "\n", + "\n", + "file_stats_server = create_sdk_mcp_server(\"file-stats\", \"1.0.0\", tools=[count_files])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Define two subagents\n", + "\n", + "Subagents are registered under the built-in `Agent` tool. When the main agent calls `Agent(name=\"code-reviewer\", \u2026)`, the SDK runs that subagent in its own context and the wrapper nests every message the subagent emits under the spawning `Agent` ToolStep via `parent_tool_use_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from claude_agent_sdk import AgentDefinition\n", + "\n", + "subagents = {\n", + " \"code-reviewer\": AgentDefinition(\n", + " description=\"Briefly reviews a code file for clarity, correctness, and style.\",\n", + " prompt=(\n", + " \"You are a senior code reviewer. The user will tell you which file to inspect. \"\n", + " \"Read that file once, then return exactly one observation about its quality \"\n", + " \"(strength or weakness). Be specific and concise \u2014 two sentences max.\"\n", + " ),\n", + " tools=[\"Read\"],\n", + " model=\"claude-haiku-4-5\",\n", + " ),\n", + " \"summary-writer\": AgentDefinition(\n", + " description=\"Writes a one-paragraph summary of an agent's findings.\",\n", + " prompt=(\n", + " \"You synthesize prior agent findings into a single one-paragraph summary \"\n", + " \"(3-5 sentences). Be specific and concise; do not invent details that weren't reported.\"\n", + " ),\n", + " tools=[],\n", + " model=\"claude-haiku-4-5\",\n", + " ),\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Wire it all together and run\n", + "\n", + "The main agent gets: the in-process MCP server, both subagents, and a small set of built-in tools. The prompt walks it through the orchestration in three steps so the resulting trace is easy to follow." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from textwrap import dedent\n", + "\n", + "from claude_agent_sdk import ClaudeAgentOptions, ResultMessage, query\n", + "\n", + "\n", + "options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " system_prompt=(\n", + " \"You are a codebase analysis agent. Follow the user's three-step plan exactly, \"\n", + " \"in order. Be terse \u2014 the final answer should be a 4-line markdown report.\"\n", + " ),\n", + " allowed_tools=[\n", + " \"Glob\",\n", + " \"Read\",\n", + " \"Agent\",\n", + " \"mcp__file-stats__count_files\",\n", + " ],\n", + " mcp_servers={\"file-stats\": file_stats_server},\n", + " agents=subagents,\n", + " max_turns=15,\n", + ")\n", + "\n", + "# Point the agent at this repository's integrations directory. Change to any\n", + "# directory you'd like to analyze.\n", + "target_dir = os.path.abspath(\"../../../src/openlayer/lib/integrations\")\n", + "\n", + "prompt = dedent(\n", + " f\"\"\"\\\n", + " Analyze the directory at: {target_dir}\n", + "\n", + " Follow this plan exactly:\n", + "\n", + " 1. Call the count_files tool with that directory to get a file-extension breakdown.\n", + " 2. Use Glob to list .py files under the directory and pick exactly ONE non-trivial file.\n", + " Dispatch the code-reviewer subagent to review that file briefly.\n", + " 3. Dispatch the summary-writer subagent to produce a one-paragraph summary of\n", + " (a) the extension counts and (b) the code-reviewer's finding.\n", + "\n", + " Output a 4-line markdown report: file counts, file reviewed, reviewer's observation,\n", + " and the summary-writer's paragraph.\n", + " \"\"\"\n", + ")\n", + "\n", + "# Top-level ``await`` works in Jupyter; for plain Python scripts wrap in\n", + "# ``asyncio.run(...)``. The SDK can raise a trailing exception after the\n", + "# ResultMessage \u2014 we tolerate it so the trace still publishes cleanly.\n", + "result = None\n", + "try:\n", + " async for message in query(prompt=prompt, options=options):\n", + " if isinstance(message, ResultMessage):\n", + " result = message\n", + "except Exception as exc:\n", + " print(f\"(SDK raised after result: {exc})\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Final result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(result.result if result else \"(no result)\")\n", + "print()\n", + "print(\"turns:\", result.num_turns)\n", + "print(\"cost:\", f\"${result.total_cost_usd:.4f}\")\n", + "print(\"session_id:\", result.session_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. What to look for in the Openlayer trace\n", + "\n", + "Open the trace in your inference pipeline and you should see:\n", + "\n", + "- A root **`Claude Agent SDK query`** AGENT step. On its metadata: `system_prompt`, `agents_defined` (both subagents with their prompts and tools), `agent_config` (resolved tools / mcp_servers / skills / plugins / permission_mode / cwd / model), `options` (model, max_turns, allowed_tools), `session_id`, `num_turns`, `stop_reason`, `model_usage` (per-model token + cost breakdown), and `rawOutput` (the full final ResultMessage as JSON).\n", + "- Multiple **`assistant turn N`** CHAT_COMPLETION steps under the root. Each has its own `prompt_tokens` / `completion_tokens`, `model`, thinking content (if present), tool_calls list, and `raw_output` containing the full assistant message.\n", + "- Multiple **TOOL** steps:\n", + " - `mcp__file-stats__count_files` \u2014 with `mcp_server`/`mcp_tool_name` parsed into metadata, and the directory listing as output.\n", + " - `Glob` and `Read` \u2014 the built-in file tools.\n", + " - `Agent` (twice) \u2014 one for `code-reviewer`, one for `summary-writer`. **Each Agent ToolStep contains nested CHAT_COMPLETION and TOOL steps** \u2014 those are the subagent's own assistant turns and tool calls, correlated via `parent_tool_use_id`.\n", + "\n", + "If you don't see some of the metadata fields in the trace UI's \"Other fields\" panel, click **+ Add field** on your inference pipeline and pick the columns you want to surface. The wrapper ships every field listed above; the pipeline decides which to render." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Optional: skills\n", + "\n", + "Claude Agent SDK loads skills from `.claude/skills/*/SKILL.md` files when `setting_sources=[\"project\"]` is enabled. They show up in `metadata.agent_config.skills` on the root step. If you want to test skill capture, add a `.claude/skills/example-skill/SKILL.md` file to this directory and re-run with:\n", + "\n", + "```python\n", + "options = ClaudeAgentOptions(..., setting_sources=[\"project\"])\n", + "```\n", + "\n", + "The skill names will appear in the agent_config metadata of the root step." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}