diff --git a/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb b/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb new file mode 100644 index 00000000..a32e5850 --- /dev/null +++ b/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb @@ -0,0 +1,292 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/openlayer-python/blob/main/examples/tracing/claude_agent_sdk/claude_agent_sdk_multi_agent.ipynb)\n", + "\n", + "# Multi-agent Claude Agent SDK tracing with Openlayer\n", + "\n", + "This notebook builds a richer agent than the basic example: a **codebase analyzer** that orchestrates two subagents and an in-process MCP tool. It's designed to exercise every step type the Openlayer wrapper captures so you can see a full trace tree in your dashboard:\n", + "\n", + "- **Root `AGENT` step** \u2014 `Claude Agent SDK query` with the user prompt, the resolved agent_config, cost, tokens, session_id, the full final ResultMessage as `rawOutput`.\n", + "- **Per-turn `CHAT_COMPLETION` steps** \u2014 one per assistant turn, with model, prompt/completion tokens, thinking content (if any), tool_calls list, and the full assistant message as `raw_output`.\n", + "- **`TOOL` steps** \u2014 one per tool invocation, bracketed by `PreToolUse` / `PostToolUse` hooks for precise timing. MCP tools get `mcp_server` and `mcp_tool_name` metadata parsed from the `mcp__server__tool` naming convention.\n", + "- **Nested subagent steps** \u2014 messages from a subagent run carry `parent_tool_use_id` pointing at the spawning `Agent` tool call, so the wrapper nests them under that ToolStep automatically." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install openlayer 'claude-agent-sdk>=0.1.81'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Environment variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"YOUR_INFERENCE_PIPELINE_ID\"\n", + "os.environ[\"ANTHROPIC_API_KEY\"] = \"YOUR_ANTHROPIC_API_KEY\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Turn on tracing (one line)\n", + "\n", + "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so every call is auto-traced. Your own hooks (if any) are preserved \u2014 ours are composed on top, never replacing yours." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openlayer.lib import trace_claude_agent_sdk\n", + "\n", + "trace_claude_agent_sdk()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Define a custom MCP tool\n", + "\n", + "The SDK lets you expose any in-process Python function as an MCP tool via `@tool` + `create_sdk_mcp_server`. We define a `count_files` tool that takes a directory and returns a count of files by extension. In the trace it'll appear as a `TOOL` step named `mcp__file-stats__count_files` with `mcp_server: \"file-stats\"` and `mcp_tool_name: \"count_files\"` in metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import Counter\n", + "from pathlib import Path\n", + "\n", + "from claude_agent_sdk import tool, create_sdk_mcp_server\n", + "\n", + "\n", + "@tool(\"count_files\", \"Count files in a directory grouped by extension\", {\"directory\": str})\n", + "async def count_files(args):\n", + " target = Path(args[\"directory\"]).expanduser().resolve()\n", + " if not target.exists() or not target.is_dir():\n", + " return {\n", + " \"content\": [{\"type\": \"text\", \"text\": f\"No such directory: {target}\"}],\n", + " \"isError\": True,\n", + " }\n", + " counts = Counter()\n", + " for f in target.rglob(\"*\"):\n", + " if f.is_file():\n", + " counts[f.suffix or \"(no ext)\"] += 1\n", + " body = \"\\n\".join(f\"{ext}: {n}\" for ext, n in counts.most_common(20))\n", + " return {\"content\": [{\"type\": \"text\", \"text\": body or \"(empty)\"}]}\n", + "\n", + "\n", + "file_stats_server = create_sdk_mcp_server(\"file-stats\", \"1.0.0\", tools=[count_files])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Define two subagents\n", + "\n", + "Subagents are registered under the built-in `Agent` tool. When the main agent calls `Agent(name=\"code-reviewer\", \u2026)`, the SDK runs that subagent in its own context and the wrapper nests every message the subagent emits under the spawning `Agent` ToolStep via `parent_tool_use_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from claude_agent_sdk import AgentDefinition\n", + "\n", + "subagents = {\n", + " \"code-reviewer\": AgentDefinition(\n", + " description=\"Briefly reviews a code file for clarity, correctness, and style.\",\n", + " prompt=(\n", + " \"You are a senior code reviewer. The user will tell you which file to inspect. \"\n", + " \"Read that file once, then return exactly one observation about its quality \"\n", + " \"(strength or weakness). Be specific and concise \u2014 two sentences max.\"\n", + " ),\n", + " tools=[\"Read\"],\n", + " model=\"claude-haiku-4-5\",\n", + " ),\n", + " \"summary-writer\": AgentDefinition(\n", + " description=\"Writes a one-paragraph summary of an agent's findings.\",\n", + " prompt=(\n", + " \"You synthesize prior agent findings into a single one-paragraph summary \"\n", + " \"(3-5 sentences). Be specific and concise; do not invent details that weren't reported.\"\n", + " ),\n", + " tools=[],\n", + " model=\"claude-haiku-4-5\",\n", + " ),\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Wire it all together and run\n", + "\n", + "The main agent gets: the in-process MCP server, both subagents, and a small set of built-in tools. The prompt walks it through the orchestration in three steps so the resulting trace is easy to follow." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from textwrap import dedent\n", + "\n", + "from claude_agent_sdk import ClaudeAgentOptions, ResultMessage, query\n", + "\n", + "\n", + "options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " system_prompt=(\n", + " \"You are a codebase analysis agent. Follow the user's three-step plan exactly, \"\n", + " \"in order. Be terse \u2014 the final answer should be a 4-line markdown report.\"\n", + " ),\n", + " allowed_tools=[\n", + " \"Glob\",\n", + " \"Read\",\n", + " \"Agent\",\n", + " \"mcp__file-stats__count_files\",\n", + " ],\n", + " mcp_servers={\"file-stats\": file_stats_server},\n", + " agents=subagents,\n", + " max_turns=15,\n", + ")\n", + "\n", + "# Point the agent at this repository's integrations directory. Change to any\n", + "# directory you'd like to analyze.\n", + "target_dir = os.path.abspath(\"../../../src/openlayer/lib/integrations\")\n", + "\n", + "prompt = dedent(\n", + " f\"\"\"\\\n", + " Analyze the directory at: {target_dir}\n", + "\n", + " Follow this plan exactly:\n", + "\n", + " 1. Call the count_files tool with that directory to get a file-extension breakdown.\n", + " 2. Use Glob to list .py files under the directory and pick exactly ONE non-trivial file.\n", + " Dispatch the code-reviewer subagent to review that file briefly.\n", + " 3. Dispatch the summary-writer subagent to produce a one-paragraph summary of\n", + " (a) the extension counts and (b) the code-reviewer's finding.\n", + "\n", + " Output a 4-line markdown report: file counts, file reviewed, reviewer's observation,\n", + " and the summary-writer's paragraph.\n", + " \"\"\"\n", + ")\n", + "\n", + "# Top-level ``await`` works in Jupyter; for plain Python scripts wrap in\n", + "# ``asyncio.run(...)``. The SDK can raise a trailing exception after the\n", + "# ResultMessage \u2014 we tolerate it so the trace still publishes cleanly.\n", + "result = None\n", + "try:\n", + " async for message in query(prompt=prompt, options=options):\n", + " if isinstance(message, ResultMessage):\n", + " result = message\n", + "except Exception as exc:\n", + " print(f\"(SDK raised after result: {exc})\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Final result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(result.result if result else \"(no result)\")\n", + "print()\n", + "print(\"turns:\", result.num_turns)\n", + "print(\"cost:\", f\"${result.total_cost_usd:.4f}\")\n", + "print(\"session_id:\", result.session_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. What to look for in the Openlayer trace\n", + "\n", + "Open the trace in your inference pipeline and you should see:\n", + "\n", + "- A root **`Claude Agent SDK query`** AGENT step. On its metadata: `system_prompt`, `agents_defined` (both subagents with their prompts and tools), `agent_config` (resolved tools / mcp_servers / skills / plugins / permission_mode / cwd / model), `options` (model, max_turns, allowed_tools), `session_id`, `num_turns`, `stop_reason`, `model_usage` (per-model token + cost breakdown), and `rawOutput` (the full final ResultMessage as JSON).\n", + "- Multiple **`assistant turn N`** CHAT_COMPLETION steps under the root. Each has its own `prompt_tokens` / `completion_tokens`, `model`, thinking content (if present), tool_calls list, and `raw_output` containing the full assistant message.\n", + "- Multiple **TOOL** steps:\n", + " - `mcp__file-stats__count_files` \u2014 with `mcp_server`/`mcp_tool_name` parsed into metadata, and the directory listing as output.\n", + " - `Glob` and `Read` \u2014 the built-in file tools.\n", + " - `Agent` (twice) \u2014 one for `code-reviewer`, one for `summary-writer`. **Each Agent ToolStep contains nested CHAT_COMPLETION and TOOL steps** \u2014 those are the subagent's own assistant turns and tool calls, correlated via `parent_tool_use_id`.\n", + "\n", + "If you don't see some of the metadata fields in the trace UI's \"Other fields\" panel, click **+ Add field** on your inference pipeline and pick the columns you want to surface. The wrapper ships every field listed above; the pipeline decides which to render." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Optional: skills\n", + "\n", + "Claude Agent SDK loads skills from `.claude/skills/*/SKILL.md` files when `setting_sources=[\"project\"]` is enabled. They show up in `metadata.agent_config.skills` on the root step. If you want to test skill capture, add a `.claude/skills/example-skill/SKILL.md` file to this directory and re-run with:\n", + "\n", + "```python\n", + "options = ClaudeAgentOptions(..., setting_sources=[\"project\"])\n", + "```\n", + "\n", + "The skill names will appear in the agent_config metadata of the root step." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb new file mode 100644 index 00000000..ae363b3b --- /dev/null +++ b/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb @@ -0,0 +1,187 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/openlayer-python/blob/main/examples/tracing/claude_agent_sdk/claude_agent_sdk_tracing.ipynb)\n", + "\n", + "# Tracing the Claude Agent SDK with Openlayer\n", + "\n", + "This notebook shows how to enable Openlayer tracing for applications built with Anthropic's [Claude Agent SDK](https://github.com/anthropics/claude-agent-sdk-python). After one line of setup, every `query()` becomes an Openlayer trace with nested steps for assistant turns, tool calls (including MCP and subagents), session metadata, cost, and tokens." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install openlayer 'claude-agent-sdk>=0.1.81'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Set environment variables\n", + "\n", + "You need three secrets:\n", + "\n", + "- `OPENLAYER_API_KEY` \u2014 get from [openlayer.com/settings/api-keys](https://app.openlayer.com/settings/api-keys)\n", + "- `OPENLAYER_INFERENCE_PIPELINE_ID` \u2014 the inference pipeline you want to stream traces to\n", + "- `ANTHROPIC_API_KEY` \u2014 your Anthropic API key" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"YOUR_INFERENCE_PIPELINE_ID\"\n", + "os.environ[\"ANTHROPIC_API_KEY\"] = \"YOUR_ANTHROPIC_API_KEY\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Enable tracing \u2014 one line\n", + "\n", + "`trace_claude_agent_sdk()` monkey-patches `claude_agent_sdk.query` and `ClaudeSDKClient` so they're auto-traced. It composes with any hooks you've configured yourself \u2014 your hooks are not replaced." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openlayer.lib import trace_claude_agent_sdk\n", + "\n", + "trace_claude_agent_sdk()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Run a simple query\n", + "\n", + "The agent has read-only access to the working directory via `Read`, `Glob`, and `Grep` so it can answer questions about local code." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "from claude_agent_sdk import ClaudeAgentOptions, query\n", + "\n", + "\n", + "async def search():\n", + " options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " allowed_tools=[\"Read\", \"Glob\", \"Grep\"],\n", + " )\n", + " async for message in query(\n", + " prompt=\"List the Python files in the current directory and say how many there are.\",\n", + " options=options,\n", + " ):\n", + " print(type(message).__name__) # noqa: T201\n", + "\n", + "\n", + "await search()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Subagent example\n", + "\n", + "When the agent delegates work to a subagent via the `Agent` tool, the subagent's messages automatically nest under the spawning tool step in the trace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from claude_agent_sdk import AgentDefinition\n", + "\n", + "\n", + "async def subagent_demo():\n", + " options = ClaudeAgentOptions(\n", + " model=\"claude-haiku-4-5\",\n", + " allowed_tools=[\"Agent\", \"Read\", \"Glob\"],\n", + " agents={\n", + " \"reviewer\": AgentDefinition(\n", + " description=\"A short summary of the Python codebase.\",\n", + " prompt=\"You are a senior code reviewer. List the top-level Python files and summarize each in one sentence.\",\n", + " tools=[\"Read\", \"Glob\"],\n", + " ),\n", + " },\n", + " )\n", + " async for message in query(\n", + " prompt=\"Dispatch the reviewer subagent and report back its summary.\",\n", + " options=options,\n", + " ):\n", + " print(type(message).__name__) # noqa: T201\n", + "\n", + "\n", + "await subagent_demo()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## What's captured?\n", + "\n", + "Each `query()` becomes a single trace with the following shape:\n", + "\n", + "```\n", + "AgentStep \"claude-agent-sdk: \"\n", + "\u251c\u2500\u2500 ChatCompletionStep \"assistant turn 1\" (text, thinking, model, tokens)\n", + "\u251c\u2500\u2500 ToolStep \"Read\" (input, output, latency)\n", + "\u251c\u2500\u2500 ChatCompletionStep \"assistant turn 2\"\n", + "\u2514\u2500\u2500 ToolStep \"Agent: reviewer\" (subagent \u2014 children nest underneath)\n", + " \u251c\u2500\u2500 ChatCompletionStep \"subagent turn 1\"\n", + " \u2514\u2500\u2500 ToolStep \"Glob\"\n", + "```\n", + "\n", + "The root step's metadata includes cost (`total_cost_usd`), token counts, session ID, agent config (model, tools, MCP servers, skills, plugins), permission denials, and per-model usage breakdown. See the Openlayer dashboard to inspect any trace.\n", + "\n", + "For full details (including the per-call `traced_query()` API for codebases that can't change imports) see the [integration source](https://github.com/openlayer-ai/openlayer-python/blob/main/src/openlayer/lib/integrations/claude_agent_sdk.py)." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/src/openlayer/lib/__init__.py b/src/openlayer/lib/__init__.py index f5f75bce..9a92a3b3 100644 --- a/src/openlayer/lib/__init__.py +++ b/src/openlayer/lib/__init__.py @@ -18,6 +18,8 @@ "trace_portkey", "trace_google_adk", "unpatch_google_adk", + "trace_claude_agent_sdk", + "traced_claude_agent_sdk_query", "trace_gemini", "update_current_trace", "update_current_step", @@ -311,6 +313,80 @@ def unpatch_google_adk(): return google_adk_tracer.unpatch_google_adk() +# ------------------------------ Claude Agent SDK ---------------------------- # +def trace_claude_agent_sdk( + *, + inference_pipeline_id=None, + truncate_tool_output_chars: int = 8192, + capture_thinking: bool = True, + redact_mcp_env: bool = True, +): + """Enable Openlayer tracing for the Claude Agent SDK. + + Monkey-patches ``claude_agent_sdk.query`` and ``ClaudeSDKClient`` so every + call becomes an Openlayer trace with nested steps for assistant turns, + tool calls (including MCP and subagent calls), session metadata, cost, + and tokens. + + Requirements: + ``claude-agent-sdk>=0.1.81`` must be installed: + ``pip install 'claude-agent-sdk>=0.1.81'`` + + Args: + inference_pipeline_id: Optional Openlayer inference pipeline ID. Falls + back to the ``OPENLAYER_INFERENCE_PIPELINE_ID`` env var. + truncate_tool_output_chars: Maximum characters of tool output to + capture per TOOL step. Defaults to 8192. + capture_thinking: Whether to capture ``ThinkingBlock`` content into + chat-completion step metadata. Defaults to True. + redact_mcp_env: Whether to strip ``env`` and ``headers`` from MCP + server config dicts in trace metadata. Defaults to True. + + Example: + >>> import os + >>> os.environ["OPENLAYER_API_KEY"] = "..." + >>> os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "..." + >>> os.environ["ANTHROPIC_API_KEY"] = "..." + >>> from openlayer.lib import trace_claude_agent_sdk + >>> trace_claude_agent_sdk() + >>> + >>> from claude_agent_sdk import query, ClaudeAgentOptions + >>> async for m in query(prompt="hello", options=ClaudeAgentOptions(model="claude-haiku-4-5")): + ... ... + """ + # pylint: disable=import-outside-toplevel + from .integrations import claude_agent_sdk as _integration + + return _integration.trace_claude_agent_sdk( + inference_pipeline_id=inference_pipeline_id, + truncate_tool_output_chars=truncate_tool_output_chars, + capture_thinking=capture_thinking, + redact_mcp_env=redact_mcp_env, + ) + + +def traced_claude_agent_sdk_query(*, prompt, options=None, inference_pipeline_id=None, **kwargs): + """Per-call wrapper around ``claude_agent_sdk.query()`` (alternative to global init). + + Returns an async iterator that yields the same messages as ``query()`` while + emitting an Openlayer trace as a side effect. + + Example: + >>> from openlayer.lib import traced_claude_agent_sdk_query + >>> async for m in traced_claude_agent_sdk_query(prompt="hello"): + ... ... + """ + # pylint: disable=import-outside-toplevel + from .integrations import claude_agent_sdk as _integration + + return _integration.traced_query( + prompt=prompt, + options=options, + inference_pipeline_id=inference_pipeline_id, + **kwargs, + ) + + # -------------------------------- Google Gemini --------------------------------- # def trace_gemini(client): """Trace Google Gemini chat completions.""" diff --git a/src/openlayer/lib/integrations/claude_agent_sdk.py b/src/openlayer/lib/integrations/claude_agent_sdk.py new file mode 100644 index 00000000..7cb7e37a --- /dev/null +++ b/src/openlayer/lib/integrations/claude_agent_sdk.py @@ -0,0 +1,900 @@ +"""Openlayer tracing integration for the Claude Agent SDK. + +Wraps ``claude_agent_sdk.query`` (and ``ClaudeSDKClient``) so each call becomes +an Openlayer trace with nested steps for assistant turns, tool calls, and +subagents. + +The wrapper is a pure *observer* of the stream: every message yielded by the +underlying ``query()`` is forwarded to the caller unchanged and in order. We +emit Openlayer steps as a side effect of observation. + +See ``docs/superpowers/specs/2026-05-12-claude-agent-sdk-integration-design.md`` +for the design rationale. +""" + +from __future__ import annotations + +import contextvars +import json +import logging +import time +from dataclasses import dataclass, field, replace as _dataclass_replace +from typing import Any, AsyncIterator, Dict, List, Optional + +from ..tracing import tracer as _tracer +from ..tracing.enums import StepType + +logger = logging.getLogger(__name__) + +# We do NOT import ``claude_agent_sdk`` at module load time so this file is +# importable even when the optional dependency is absent — matching the +# pattern of every other integration in this directory. +try: + import claude_agent_sdk as _cas # type: ignore[import-not-found] + + HAVE_CLAUDE_AGENT_SDK = True +except ImportError: # pragma: no cover - exercised by the optional-dep tests + _cas = None # type: ignore[assignment] + HAVE_CLAUDE_AGENT_SDK = False + + +# ---------------------------- Configuration ---------------------------- # + + +@dataclass +class _Config: + """Module-level configuration set by ``trace_claude_agent_sdk``.""" + + inference_pipeline_id: Optional[str] = None + truncate_tool_output_chars: int = 8192 + capture_thinking: bool = True + redact_mcp_env: bool = True + + +_config = _Config() + + +# ----------------------------- Trace state ----------------------------- # + + +@dataclass +class _TraceState: + """Per-query trace state. + + Holds the root step and per-tool-use bookkeeping used to nest subagent + messages and to bracket tool calls across PreToolUse / PostToolUse hooks. + """ + + root_step: Any + pending_tools: Dict[str, Any] = field(default_factory=dict) + tool_to_parent_step: Dict[str, Any] = field(default_factory=dict) + turn_counter: int = 0 + session_id: Optional[str] = None + model: Optional[str] = None + user_prompt: Optional[str] = None + + +_current_state: contextvars.ContextVar[Optional[_TraceState]] = contextvars.ContextVar( + "openlayer_claude_agent_sdk_state", default=None +) + + +def _require_cas() -> None: + """Raise ``ImportError`` if the optional SDK is not installed.""" + if not HAVE_CLAUDE_AGENT_SDK: + raise ImportError( + "claude-agent-sdk is not installed. " + "Install with: pip install 'claude-agent-sdk>=0.1.81'" + ) + + +# ------------------------------ Helpers ------------------------------ # + + +def _redact_mcp_servers(mcp_servers: Any) -> Any: + """Strip ``env`` and ``headers`` from MCP server config dicts. + + These typically contain credentials and must not be persisted. + """ + if not mcp_servers: + return mcp_servers + if isinstance(mcp_servers, list): + return [ + {k: v for k, v in s.items() if k not in {"env", "headers"}} + if isinstance(s, dict) + else s + for s in mcp_servers + ] + return mcp_servers + + +_ROOT_STEP_NAME = "Claude Agent SDK query" + + +def _serialize_system_prompt(sp: Any) -> Any: + """Coerce ``options.system_prompt`` (str | preset dict | dataclass) into a + JSON-serializable value, truncated to 4096 chars for string forms.""" + if sp is None: + return None + if isinstance(sp, str): + return _truncate(sp, 4096) + if isinstance(sp, dict): + return sp + # Preset / file dataclass — best-effort attribute pluck + keys = ("type", "preset", "append", "excludeDynamicSections", "path") + out = {k: getattr(sp, k) for k in keys if hasattr(sp, k)} + return out or str(sp) + + +def _serialize_agent_definitions(agents: Any) -> Any: + """``options.agents`` is a dict[str, AgentDefinition]. Capture each + definition's description, prompt (truncated), and tools list.""" + if not agents: + return None + out: Dict[str, Any] = {} + for name, defn in agents.items(): + out[name] = { + "description": getattr(defn, "description", None), + "prompt": _truncate(getattr(defn, "prompt", None), 4096), + "tools": getattr(defn, "tools", None), + "model": getattr(defn, "model", None), + } + return out + + +def _capture_options_metadata(root_step: Any, options: Any) -> None: + """Snapshot user-provided options onto the root step metadata. + + Captures the user's ``system_prompt``, subagent definitions, and a handful + of other useful configuration values. Called once per query, with the + *original* (pre-hook-injection) options so we record what the user passed, + not what we forwarded to the SDK. + """ + if options is None: + return + metadata: Dict[str, Any] = {} + + sp = getattr(options, "system_prompt", None) + serialized_sp = _serialize_system_prompt(sp) + if serialized_sp is not None: + metadata["system_prompt"] = serialized_sp + + agents = _serialize_agent_definitions(getattr(options, "agents", None)) + if agents: + metadata["agents_defined"] = agents + + opt_capture: Dict[str, Any] = {} + for opt in ( + "model", + "fallback_model", + "max_turns", + "max_budget_usd", + "permission_mode", + "cwd", + "allowed_tools", + "disallowed_tools", + "continue_conversation", + "resume", + "fork_session", + ): + val = getattr(options, opt, None) + if val is None or val == [] or val == {}: + continue + # Convert Path to str + opt_capture[opt] = str(val) if hasattr(val, "__fspath__") else val + if opt_capture: + metadata["options"] = opt_capture + + if metadata: + root_step.log(metadata=metadata) + + +def _truncate(value: Any, max_chars: int) -> Any: + """Coerce ``value`` to a string and truncate to ``max_chars``. + + Tool outputs can be arbitrary objects; we serialize via ``json.dumps`` when + possible, falling back to ``str()``. The original length is preserved in + the truncation marker so downstream UI can hint at omission. + """ + if value is None: + return None + if not isinstance(value, str): + try: + value = json.dumps(value, default=str) + except Exception: + value = str(value) + if len(value) > max_chars: + return value[:max_chars] + f"... [truncated, full length {len(value)}]" + return value + + +def _parse_mcp_metadata(tool_name: str) -> Dict[str, Any]: + """Parse ``mcp____`` tool names into server/tool metadata.""" + if not tool_name or not tool_name.startswith("mcp__"): + return {} + parts = tool_name.split("__", 2) + if len(parts) == 3: + return {"mcp_server": parts[1], "mcp_tool_name": parts[2]} + return {} + + +# --------------------------- Tool-step lifecycle --------------------------- # + + +class _ToolStepHandle: + """Holds the live ``create_step`` context manager for an in-flight tool call. + + The Openlayer tracer exposes step lifecycle as a context manager + (``tracer.create_step``). For tools we need to open the step in + ``PreToolUse`` and close it from ``PostToolUse`` (which may execute on a + different stack frame within the same coroutine). We bypass ``with`` by + invoking ``__enter__`` / ``__exit__`` manually. + + Caveat: between ``__enter__`` and ``__exit__`` the step is "current" on the + contextvar stack. Sequential pre/post pairs per ``tool_use_id`` are well- + behaved, but truly concurrent tool calls could interleave and produce + nested-looking steps. The SDK fires pre/post serially per tool, so this is + acceptable for the v1 integration. + """ + + def __init__(self, step_cm): + self._cm = step_cm + self.step = step_cm.__enter__() + self.start_time = time.time() + self._closed = False + + def log(self, **kwargs: Any) -> None: + self.step.log(**kwargs) + + def end(self) -> None: + if self._closed: + return + self._closed = True + # Ensure latency is recorded even if PostToolUse fires before any + # nested children would set it via the tracer's natural path. + if self.step.latency is None: + self.step.latency = (time.time() - self.start_time) * 1000.0 + self._cm.__exit__(None, None, None) + + +# ------------------------------- Hooks ------------------------------- # + + +async def _pre_tool_use_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PreToolUse hook — opens a TOOL step for this tool call.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + tool_name = input_data.get("tool_name", "unknown") + tool_input = input_data.get("tool_input") + metadata: Dict[str, Any] = {"tool_use_id": tool_use_id} + metadata.update(_parse_mcp_metadata(tool_name)) + + # If this tool was spawned by a parent Agent tool (subagent case), + # nest beneath the parent's still-open ToolStep. + parent_handle = state.pending_tools.get(tool_use_id) + if parent_handle is not None: + # Already exists — rare reentrant case. Skip to avoid leaking step. + return {} + + parent_for_subagent = state.tool_to_parent_step.get(tool_use_id) + pushed_token = None + if parent_for_subagent is not None: + pushed_token = _tracer._current_step.set(parent_for_subagent) + try: + cm = _tracer.create_step( + name=tool_name, + step_type=StepType.TOOL, + inputs=tool_input, + metadata=metadata, + ) + handle = _ToolStepHandle(cm) + finally: + # Pop the temporary parent push immediately. We don't want + # subsequent operations (e.g. message observation between + # PreToolUse and PostToolUse) to be siblings of the tool step. + if pushed_token is not None: + _tracer._safe_reset_contextvar(_tracer._current_step, pushed_token) + + state.pending_tools[tool_use_id] = handle + except Exception: # never break user's hook execution + logger.exception( + "Openlayer PreToolUse hook failed for tool_use_id=%s", tool_use_id + ) + return {} + + +async def _post_tool_use_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PostToolUse hook — finalizes the TOOL step on success.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + handle = state.pending_tools.pop(tool_use_id, None) + if handle is None: + return {} + raw_output = ( + input_data.get("tool_response") + or input_data.get("tool_output") + or input_data.get("output") + ) + output = _truncate(raw_output, _config.truncate_tool_output_chars) + handle.log(output=output, metadata={"is_error": False}) + handle.end() + # Remember the step in case a subagent spawned by this tool is still + # streaming messages (we may need to nest those under it). + state.tool_to_parent_step[tool_use_id] = handle.step + except Exception: + logger.exception( + "Openlayer PostToolUse hook failed for tool_use_id=%s", tool_use_id + ) + return {} + + +async def _post_tool_use_failure_hook(input_data, tool_use_id, context): # noqa: D401 + """Composed PostToolUseFailure hook — finalizes the TOOL step on error.""" + state = _current_state.get() + if state is None or tool_use_id is None: + return {} + try: + handle = state.pending_tools.pop(tool_use_id, None) + if handle is None: + return {} + err = ( + input_data.get("error") + or input_data.get("tool_response") + or input_data.get("tool_output") + ) + handle.log( + output=_truncate(err, _config.truncate_tool_output_chars), + metadata={"is_error": True}, + ) + handle.end() + state.tool_to_parent_step[tool_use_id] = handle.step + except Exception: + logger.exception( + "Openlayer PostToolUseFailure hook failed for tool_use_id=%s", + tool_use_id, + ) + return {} + + +def _inject_openlayer_hooks(options: Any) -> Any: + """Return a copy of ``options`` with our internal hooks merged in. + + User-provided hooks are preserved untouched. Our hooks are appended after + the user's so user hooks have first crack at any synchronous influence on + the agent (e.g. ``permissionDecision`` decisions). + """ + _require_cas() + if options is None: + options = _cas.ClaudeAgentOptions() + + HookMatcher = _cas.HookMatcher # type: ignore[attr-defined] + + user_hooks: Dict[str, list] = dict(getattr(options, "hooks", None) or {}) + + def _append(event: str, matcher: Any) -> None: + user_hooks[event] = list(user_hooks.get(event) or []) + [matcher] + + _append("PreToolUse", HookMatcher(hooks=[_pre_tool_use_hook])) + _append("PostToolUse", HookMatcher(hooks=[_post_tool_use_hook])) + _append("PostToolUseFailure", HookMatcher(hooks=[_post_tool_use_failure_hook])) + + try: + return _dataclass_replace(options, hooks=user_hooks) + except TypeError: + # ``options`` is not a dataclass or doesn't accept ``hooks`` in + # ``replace``. Fall back to mutating in place (safe for our use-case + # since we just instantiated it). + try: + setattr(options, "hooks", user_hooks) + except Exception: + logger.debug("Could not set hooks on options; tool tracing disabled") + return options + + +# --------------------------- Observers --------------------------- # + + +def _observe_system_init(msg: Any, state: _TraceState) -> None: + """Capture ``SystemMessage(subtype='init')`` into the root step metadata.""" + if getattr(msg, "subtype", None) != "init": + return + data = getattr(msg, "data", {}) or {} + state.session_id = data.get("session_id") + state.model = data.get("model") + mcp_servers = data.get("mcp_servers") + if _config.redact_mcp_env: + mcp_servers = _redact_mcp_servers(mcp_servers) + agent_config = { + "model": data.get("model"), + "tools": data.get("tools"), + "mcp_servers": mcp_servers, + "skills": data.get("skills"), + "slash_commands": data.get("slash_commands"), + "plugins": data.get("plugins"), + "permission_mode": data.get("permissionMode"), + "cwd": data.get("cwd"), + "claude_code_version": data.get("claude_code_version"), + "api_key_source": data.get("apiKeySource"), + "output_style": data.get("output_style"), + } + state.root_step.log( + metadata={ + "session_id": state.session_id, + "agent_config": agent_config, + } + ) + + +def _observe_result(msg: Any, state: _TraceState) -> None: + """Capture ``ResultMessage`` — finalizes root cost / tokens / latency.""" + usage = getattr(msg, "usage", None) or {} + input_tokens = usage.get("input_tokens") or 0 + output_tokens = usage.get("output_tokens") or 0 + total_tokens = input_tokens + output_tokens + duration_ms = getattr(msg, "duration_ms", None) + + # ``AgentStep`` doesn't define cost / tokens / prompt_tokens attributes, so + # the recommended Step.log() route is metadata. We stash the same values + # there. ``post_process_trace`` spreads root metadata into the trace_data + # payload, so these show up at trace-level as well. + state.root_step.output = getattr(msg, "result", None) or "" + state.root_step.latency = duration_ms + # We surface cost / tokens / per-turn token breakdowns via metadata because + # base ``Step`` (and thus ``AgentStep``) doesn't define those attributes — + # ``Step.log()`` would silently filter them otherwise. ``post_process_trace`` + # spreads root metadata into the trace-level row, so these become visible + # at the trace level. The UI reads ``promptTokens`` / ``completionTokens`` + # in camelCase, so we duplicate the snake_case names with camelCase aliases + # to make sure they land in the right column. + state.root_step.log( + metadata={ + "cost": getattr(msg, "total_cost_usd", None), + "tokens": total_tokens, + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "promptTokens": input_tokens, + "completionTokens": output_tokens, + "model": _resolve_root_model(state, msg), + "provider": "anthropic", + "rawOutput": _serialize_result_message(msg), + "session_id": getattr(msg, "session_id", None) or state.session_id, + "num_turns": getattr(msg, "num_turns", None), + "stop_reason": getattr(msg, "stop_reason", None), + "subtype": getattr(msg, "subtype", None), + "is_error": getattr(msg, "is_error", False), + "duration_api_ms": getattr(msg, "duration_api_ms", None), + "model_usage": getattr(msg, "model_usage", None), + "permission_denials": getattr(msg, "permission_denials", None), + "cache_read_input_tokens": usage.get("cache_read_input_tokens"), + "cache_creation_input_tokens": usage.get("cache_creation_input_tokens"), + } + ) + + +def _resolve_root_model(state: _TraceState, result_msg: Any) -> Optional[str]: + """Best-effort model identifier for the root step. + + ResultMessage doesn't carry a ``model`` field directly. We try, in order: + the first key of ``model_usage`` (the SDK's per-model token breakdown), + then the model recorded on the init ``SystemMessage`` (cached on state). + """ + model_usage = getattr(result_msg, "model_usage", None) + if isinstance(model_usage, dict) and model_usage: + first = next(iter(model_usage.keys()), None) + if first: + return first + return getattr(state, "model", None) + + +def _serialize_assistant_message(msg: Any, content: List[Any]) -> Optional[str]: + """Render an AssistantMessage's full content array to JSON for ``raw_output``. + + Captures every block (text, thinking, tool_use) so users can inspect the + complete model response even when text is empty. Tries to introspect + dataclass-style block objects; falls back to ``str(block)``. + """ + try: + blocks: List[Dict[str, Any]] = [] + for b in content: + bname = type(b).__name__ + if bname in ("TextBlock", "FakeTextBlock"): + blocks.append({"type": "text", "text": getattr(b, "text", "")}) + elif bname in ("ThinkingBlock", "FakeThinkingBlock"): + blocks.append( + { + "type": "thinking", + "thinking": getattr(b, "thinking", ""), + "signature": getattr(b, "signature", None), + } + ) + elif bname in ("ToolUseBlock", "FakeToolUseBlock"): + blocks.append( + { + "type": "tool_use", + "id": getattr(b, "id", ""), + "name": getattr(b, "name", ""), + "input": getattr(b, "input", None), + } + ) + else: + blocks.append({"type": bname, "repr": str(b)}) + return json.dumps( + { + "model": getattr(msg, "model", None), + "stop_reason": getattr(msg, "stop_reason", None), + "usage": getattr(msg, "usage", None), + "parent_tool_use_id": getattr(msg, "parent_tool_use_id", None), + "content": blocks, + }, + default=str, + ) + except Exception: + return None + + +def _serialize_result_message(msg: Any) -> Optional[str]: + """Render a ``ResultMessage`` to a JSON-ish string for ``rawOutput`` display.""" + try: + return json.dumps( + { + "subtype": getattr(msg, "subtype", None), + "result": getattr(msg, "result", None), + "session_id": getattr(msg, "session_id", None), + "duration_ms": getattr(msg, "duration_ms", None), + "duration_api_ms": getattr(msg, "duration_api_ms", None), + "num_turns": getattr(msg, "num_turns", None), + "stop_reason": getattr(msg, "stop_reason", None), + "is_error": getattr(msg, "is_error", None), + "total_cost_usd": getattr(msg, "total_cost_usd", None), + "usage": getattr(msg, "usage", None), + "model_usage": getattr(msg, "model_usage", None), + "permission_denials": getattr(msg, "permission_denials", None), + }, + default=str, + ) + except Exception: + return None + + +def _resolve_subagent_parent(msg: Any, state: _TraceState) -> Any: + """Return the spawning Agent ToolStep for a subagent message, or ``None``. + + When the SDK delegates work to a subagent via the ``Agent`` tool, the + subsequent messages carry ``parent_tool_use_id`` pointing at the tool-use + block. We open the tool step in ``_pre_tool_use_hook`` and keep it open + while subagent messages stream; this helper finds the right step so we can + push it onto the contextvar stack before opening child steps. + """ + ptid = getattr(msg, "parent_tool_use_id", None) + if ptid and ptid in state.pending_tools: + return state.pending_tools[ptid].step + return None + + +def _observe_assistant(msg: Any, state: _TraceState) -> None: + """Emit a CHAT_COMPLETION step for each ``AssistantMessage``. + + Concatenates ``TextBlock`` content into ``output`` and ``ThinkingBlock`` + content into ``metadata.thinking`` (when ``capture_thinking`` is enabled). + Tracks the IDs of any ``ToolUseBlock``s for cross-reference with later + tool steps. + """ + state.turn_counter += 1 + content = list(getattr(msg, "content", None) or []) + text_parts: List[str] = [] + thinking_parts: List[str] = [] + tool_use_blocks: List[Dict[str, Any]] = [] + for block in content: + bname = type(block).__name__ + if bname in ("TextBlock", "FakeTextBlock"): + text_parts.append(getattr(block, "text", "") or "") + elif bname in ("ThinkingBlock", "FakeThinkingBlock"): + thinking_parts.append(getattr(block, "thinking", "") or "") + elif bname in ("ToolUseBlock", "FakeToolUseBlock"): + tool_use_blocks.append( + { + "id": getattr(block, "id", ""), + "name": getattr(block, "name", ""), + "input": getattr(block, "input", None), + } + ) + + usage = getattr(msg, "usage", None) or {} + input_tokens = usage.get("input_tokens") + output_tokens = usage.get("output_tokens") + total_tokens = (input_tokens or 0) + (output_tokens or 0) + text_output = "\n".join(p for p in text_parts if p) + + # Output: prefer text; fall back to tool-use summary; then thinking; then a + # marker so users see *something* in the UI instead of an empty step. + if text_output: + output_for_ui: str = text_output + elif tool_use_blocks: + names = ", ".join(b["name"] for b in tool_use_blocks) + output_for_ui = f"[tool call: {names}]" + elif thinking_parts and _config.capture_thinking: + output_for_ui = "[thinking]\n" + "\n".join(thinking_parts) + else: + output_for_ui = "[no content]" + + # For root-level assistant turns, surface the user's original prompt as + # the step's input so users see what triggered this turn. For subagent + # turns we omit it (they're triggered by the parent's Agent tool call, + # not by a user prompt). + is_subagent_turn = getattr(msg, "parent_tool_use_id", None) is not None + step_inputs = None + if not is_subagent_turn and state.user_prompt is not None: + step_inputs = {"prompt": state.user_prompt} + + # Subagent nesting: if this assistant message belongs to a subagent run, + # temporarily push the spawning tool's step onto the contextvar stack so + # the new chat-completion step is created beneath it. + subagent_parent = _resolve_subagent_parent(msg, state) + pushed_token = None + if subagent_parent is not None: + pushed_token = _tracer._current_step.set(subagent_parent) + try: + with _tracer.create_step( + name=f"assistant turn {state.turn_counter}", + step_type=StepType.CHAT_COMPLETION, + inputs=step_inputs, + ) as chat_step: + chat_step.log( + output=output_for_ui, + model=getattr(msg, "model", None), + provider="anthropic", + prompt_tokens=input_tokens, + completion_tokens=output_tokens, + tokens=total_tokens, + raw_output=_serialize_assistant_message(msg, content), + metadata={ + "thinking": ( + "\n".join(thinking_parts) + if (thinking_parts and _config.capture_thinking) + else None + ), + "tool_calls": tool_use_blocks or None, + "stop_reason": getattr(msg, "stop_reason", None), + "parent_tool_use_id": getattr(msg, "parent_tool_use_id", None), + "message_id": getattr(msg, "message_id", None), + "cache_read_input_tokens": usage.get("cache_read_input_tokens"), + "cache_creation_input_tokens": usage.get( + "cache_creation_input_tokens" + ), + }, + ) + finally: + if pushed_token is not None: + _tracer._safe_reset_contextvar(_tracer._current_step, pushed_token) + + +def _observe(msg: Any, state: _TraceState) -> None: + """Dispatch a message to the appropriate observer. + + Matches by class name so the ``Fake*`` mocks in the test suite are handled + alongside the real ``claude_agent_sdk`` classes. + """ + type_name = type(msg).__name__ + if type_name in ("SystemMessage", "FakeSystemMessage"): + _observe_system_init(msg, state) + elif type_name in ("AssistantMessage", "FakeAssistantMessage"): + _observe_assistant(msg, state) + elif type_name in ("ResultMessage", "FakeResultMessage"): + _observe_result(msg, state) + + +# ----------------------------- Public API ----------------------------- # + + +async def traced_query( + *, + prompt: Any, + options: Any = None, + inference_pipeline_id: Optional[str] = None, + **kwargs: Any, +) -> AsyncIterator[Any]: + """Wrap ``claude_agent_sdk.query()`` and emit an Openlayer trace. + + Every message from the underlying ``query()`` stream is yielded to the + caller unchanged. Openlayer steps are emitted as a side effect. + + Args: + prompt: The prompt argument forwarded to ``claude_agent_sdk.query``. + options: Optional ``ClaudeAgentOptions`` forwarded to ``query``. + inference_pipeline_id: Optional pipeline override; falls back to + ``OPENLAYER_INFERENCE_PIPELINE_ID``. + **kwargs: Any additional kwargs (e.g. ``transport``) are passed through. + + Yields: + The same messages the underlying ``query()`` would have yielded, in the + same order. + """ + _require_cas() + # Get hold of the *original* query — when our global monkey-patch is in + # effect, ``_cas.query`` points back at ``patched_query`` which would + # otherwise recurse into us. + query_fn = getattr(_cas.query, "_openlayer_original", _cas.query) + # Snapshot the user's options BEFORE we inject our hooks so the captured + # metadata reflects what the user actually configured, not our mutations. + original_options = options + options = _inject_openlayer_hooks(options) + resolved_pipeline = inference_pipeline_id or _config.inference_pipeline_id + with _tracer.create_step( + name=_ROOT_STEP_NAME, + step_type=StepType.AGENT, + inputs={"prompt": prompt}, + inference_pipeline_id=resolved_pipeline, + ) as root_step: + _capture_options_metadata(root_step, original_options) + state = _TraceState( + root_step=root_step, + user_prompt=prompt if isinstance(prompt, str) else None, + ) + token = _current_state.set(state) + try: + async for msg in query_fn(prompt=prompt, options=options, **kwargs): + try: + _observe(msg, state) + except Exception: # never break the user's stream + logger.exception( + "Openlayer observation failed for message %r", + type(msg).__name__, + ) + yield msg + finally: + _current_state.reset(token) + + +def trace_claude_agent_sdk( + *, + inference_pipeline_id: Optional[str] = None, + truncate_tool_output_chars: int = 8192, + capture_thinking: bool = True, + redact_mcp_env: bool = True, +) -> None: + """Auto-instrument ``claude_agent_sdk.query`` so every call is traced. + + Call this once at startup before any code does ``from claude_agent_sdk + import query``. After the call, ordinary use of the SDK is automatically + wrapped: every ``async for m in query(...)`` becomes an Openlayer trace. + + The patch is idempotent — calling this function multiple times is safe and + only updates configuration on subsequent calls. + + Args: + inference_pipeline_id: Optional default Openlayer inference pipeline ID. + Falls back to the ``OPENLAYER_INFERENCE_PIPELINE_ID`` env var when + unset (handled by the tracer at publish time). + truncate_tool_output_chars: Maximum number of characters of tool output + to capture in each TOOL step. Excess is truncated with a marker. + Defaults to 8192. + capture_thinking: Whether to capture ``ThinkingBlock`` content into + chat-completion step metadata. Defaults to True. + redact_mcp_env: Whether to strip ``env`` and ``headers`` from MCP + server configs in trace metadata. Defaults to True. + """ + _require_cas() + global _config + _config = _Config( + inference_pipeline_id=inference_pipeline_id, + truncate_tool_output_chars=truncate_tool_output_chars, + capture_thinking=capture_thinking, + redact_mcp_env=redact_mcp_env, + ) + + if getattr(_cas.query, "_openlayer_patched", False): + # Already patched — config update above is sufficient. + return + + original_query = _cas.query + + async def patched_query(*args, **kwargs): + # Forward through traced_query; ``query_fn`` resolves to + # ``original_query`` via ``_openlayer_original`` to avoid recursion. + async for m in traced_query(*args, **kwargs): + yield m + + patched_query._openlayer_patched = True # type: ignore[attr-defined] + patched_query._openlayer_original = original_query # type: ignore[attr-defined] + _cas.query = patched_query # type: ignore[assignment] + + _patch_claude_sdk_client() + + +# ----------------------- ClaudeSDKClient patching ----------------------- # + + +def _patch_claude_sdk_client() -> None: + """Monkey-patch ``ClaudeSDKClient`` so its query/receive_response are traced. + + The client API splits the request lifecycle across two methods: + ``client.query(prompt, session_id=...)`` (which only enqueues the prompt) + and ``async for m in client.receive_response()`` (which streams the + response). We open the root step when ``query`` is called and close it + when the matching ``receive_response`` generator exhausts. + """ + Client = _cas.ClaudeSDKClient # type: ignore[attr-defined] + if getattr(Client, "_openlayer_patched", False): + return + + original_init = Client.__init__ + original_query = Client.query + original_receive_response = Client.receive_response + + def patched_init(self, options=None, transport=None): # type: ignore[no-redef] + # Stash the original (pre-injection) options for metadata capture at + # query() time. + self._openlayer_original_options = options + if options is not None: + options = _inject_openlayer_hooks(options) + else: + options = _inject_openlayer_hooks(None) + original_init(self, options=options, transport=transport) + self._openlayer_state = None + self._openlayer_state_cm = None + self._openlayer_token = None + + async def patched_query(self, prompt, session_id="default"): # type: ignore[no-redef] + # Open a root step for this prompt; the matching receive_response will + # close it. If the user calls query() multiple times without iterating + # receive_response() in between, the previous state will be replaced — + # but at that point the previous trace is orphaned anyway, so we just + # log a debug message rather than try to recover. + if getattr(self, "_openlayer_state", None) is not None: + logger.debug( + "ClaudeSDKClient.query() called before previous receive_response() " + "was consumed; previous trace will be orphaned." + ) + cm = _tracer.create_step( + name=_ROOT_STEP_NAME, + step_type=StepType.AGENT, + inputs={"prompt": prompt, "session_id": session_id}, + inference_pipeline_id=_config.inference_pipeline_id, + ) + root_step = cm.__enter__() + _capture_options_metadata( + root_step, getattr(self, "_openlayer_original_options", None) + ) + state = _TraceState( + root_step=root_step, + user_prompt=prompt if isinstance(prompt, str) else None, + ) + self._openlayer_state = state + self._openlayer_state_cm = cm + self._openlayer_token = _current_state.set(state) + return await original_query(self, prompt, session_id=session_id) + + async def patched_receive_response(self): # type: ignore[no-redef] + state = getattr(self, "_openlayer_state", None) + try: + async for msg in original_receive_response(self): + if state is not None: + try: + _observe(msg, state) + except Exception: + logger.exception( + "Openlayer observation failed for client message %r", + type(msg).__name__, + ) + yield msg + finally: + if state is not None and getattr(self, "_openlayer_state_cm", None): + token = self._openlayer_token + self._openlayer_state = None + cm = self._openlayer_state_cm + self._openlayer_state_cm = None + self._openlayer_token = None + try: + if token is not None: + _tracer._safe_reset_contextvar(_current_state, token) + finally: + cm.__exit__(None, None, None) + + Client.__init__ = patched_init + Client.query = patched_query + Client.receive_response = patched_receive_response + Client._openlayer_patched = True diff --git a/tests/integrations/__init__.py b/tests/integrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integrations/claude_agent_sdk_mocks.py b/tests/integrations/claude_agent_sdk_mocks.py new file mode 100644 index 00000000..093894eb --- /dev/null +++ b/tests/integrations/claude_agent_sdk_mocks.py @@ -0,0 +1,139 @@ +"""Test helpers for claude-agent-sdk mock streams. + +These dataclasses duck-type the real ``claude_agent_sdk`` message types just +enough that our integration's observer (which dispatches by ``type(msg).__name__``) +can be exercised without instantiating the real SDK objects. The ``Fake`` prefix +is matched explicitly in the dispatcher so tests stay self-contained. +""" + +from __future__ import annotations + +from typing import Any, List, Optional, AsyncIterator +from dataclasses import field, dataclass + + +@dataclass +class FakeTextBlock: + """Mirrors ``claude_agent_sdk.TextBlock``.""" + + text: str + + +@dataclass +class FakeThinkingBlock: + """Mirrors ``claude_agent_sdk.ThinkingBlock``.""" + + thinking: str + signature: str = "sig" + + +@dataclass +class FakeToolUseBlock: + """Mirrors ``claude_agent_sdk.ToolUseBlock``.""" + + id: str + name: str + input: dict + + +@dataclass +class FakeToolResultBlock: + """Mirrors ``claude_agent_sdk.ToolResultBlock``.""" + + tool_use_id: str + content: Any + is_error: Optional[bool] = None + + +@dataclass +class FakeSystemMessage: + """Mirrors ``claude_agent_sdk.SystemMessage``.""" + + subtype: str + data: dict + + +@dataclass +class FakeAssistantMessage: + """Mirrors ``claude_agent_sdk.AssistantMessage``.""" + + content: List[Any] + model: str = "claude-opus-4-7" + parent_tool_use_id: Optional[str] = None + usage: Optional[dict] = None + message_id: Optional[str] = None + stop_reason: Optional[str] = None + session_id: Optional[str] = None + uuid: Optional[str] = None + + +@dataclass +class FakeUserMessage: + """Mirrors ``claude_agent_sdk.UserMessage``.""" + + content: Any + parent_tool_use_id: Optional[str] = None + uuid: Optional[str] = None + tool_use_result: Optional[dict] = None + + +@dataclass +class FakeResultMessage: + """Mirrors ``claude_agent_sdk.ResultMessage``.""" + + subtype: str + duration_ms: int = 1000 + duration_api_ms: int = 800 + is_error: bool = False + num_turns: int = 1 + session_id: str = "sess_test" + total_cost_usd: float = 0.001 + usage: dict = field( + default_factory=lambda: { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + } + ) + result: Optional[str] = "Done" + model_usage: Optional[dict] = None + stop_reason: Optional[str] = "end_turn" + permission_denials: Optional[list] = None + uuid: Optional[str] = None + + +async def make_stream(messages: list) -> AsyncIterator: + """Yield a sequence of messages as an async iterator. + + This matches the shape ``claude_agent_sdk.query()`` returns. + """ + for msg in messages: + yield msg + + +def init_system_message( + session_id: str = "sess_test", + model: str = "claude-opus-4-7", + tools: Optional[list] = None, + mcp_servers: Optional[list] = None, + skills: Optional[list] = None, +) -> FakeSystemMessage: + """Build a ``SystemMessage(subtype='init')`` matching the SDK's init shape.""" + return FakeSystemMessage( + subtype="init", + data={ + "session_id": session_id, + "model": model, + "tools": tools or ["Read", "Bash"], + "mcp_servers": mcp_servers or [], + "skills": skills or [], + "slash_commands": [], + "plugins": [], + "permissionMode": "default", + "cwd": "/tmp", + "claude_code_version": "test-0.1.81", + "apiKeySource": "ANTHROPIC_API_KEY", + "output_style": "default", + }, + ) diff --git a/tests/integrations/test_claude_agent_sdk.py b/tests/integrations/test_claude_agent_sdk.py new file mode 100644 index 00000000..3bf3847e --- /dev/null +++ b/tests/integrations/test_claude_agent_sdk.py @@ -0,0 +1,786 @@ +"""Tests for the Claude Agent SDK Openlayer integration.""" + +# Many of the fake_query implementations below take **kwargs / unused prompt / +# options arguments to match the real SDK's call signature; ruff's ARG001 +# fires for each of those, so we silence the rule file-wide. +# ruff: noqa: ARG001 + +import asyncio +from typing import Any, List +from unittest.mock import patch + +import pytest + +from openlayer.lib.tracing import tracer as ol_tracer + +from .claude_agent_sdk_mocks import ( + FakeTextBlock, + FakeUserMessage, + FakeToolUseBlock, + FakeResultMessage, + FakeThinkingBlock, + FakeToolResultBlock, + FakeAssistantMessage, + make_stream, + init_system_message, +) + +# ---------- shared infra ---------- + +@pytest.fixture(autouse=True) +def _disable_publish(monkeypatch: pytest.MonkeyPatch) -> None: + """Disable real network publishing for every test in this module.""" + monkeypatch.setenv("OPENLAYER_DISABLE_PUBLISH", "true") + monkeypatch.setenv("OPENLAYER_API_KEY", "fake") + monkeypatch.setattr(ol_tracer, "_publish", False, raising=False) + + +def _capture_trace_publish(): + """Return ``(captured, capture_fn)`` pair for patching the publish path. + + NOTE on plan deviation: the plan references a function named + ``_publish_trace_async`` on the tracer, but the actual public-private + function is ``_upload_and_publish_trace`` (see + ``src/openlayer/lib/tracing/tracer.py``). We patch that one and capture + the in-memory ``Trace`` object passed to it. + """ + captured: List[Any] = [] + + def capture(trace, *args, **kwargs): + captured.append(trace) + + return captured, capture + + +# ---------- tests ---------- + +def test_imports_work(): + """Module imports cleanly even if claude_agent_sdk is not installed.""" + from openlayer.lib.integrations import claude_agent_sdk # noqa: F401 + + +def test_traced_query_emits_root_agent_step_with_cost_and_tokens(): + """Basic flow: init -> assistant text -> result. + + The root AGENT step gets cost/tokens/session_id populated from the final + ``ResultMessage``. + """ + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(session_id="s1"), + FakeAssistantMessage(content=[FakeTextBlock("Hello back")]), + FakeResultMessage( + subtype="success", + duration_ms=1500, + num_turns=1, + session_id="s1", + total_cost_usd=0.0042, + result="Hello back", + usage={ + "input_tokens": 10, + "output_tokens": 5, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + }, + ), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + assert len(captured) == 1, "expected exactly one trace to be published" + trace_obj = captured[0] + root_step = trace_obj.steps[0] + assert root_step.step_type.value == "agent" + assert root_step.output == "Hello back" + # Cost / tokens / latency are recorded on the step's metadata + # (and also surface at trace-level via `post_process_trace`). + assert root_step.metadata["cost"] == 0.0042 + assert root_step.metadata["tokens"] == 15 + assert root_step.metadata["prompt_tokens"] == 10 + assert root_step.metadata["completion_tokens"] == 5 + assert root_step.metadata["session_id"] == "s1" + assert root_step.metadata["num_turns"] == 1 + assert root_step.metadata["stop_reason"] == "end_turn" + assert root_step.latency == 1500 + + +def test_options_metadata_captured_on_root_step(): + """``options.system_prompt`` and ``options.agents`` land on root metadata.""" + pytest.importorskip("claude_agent_sdk") + from claude_agent_sdk import ClaudeAgentOptions, AgentDefinition + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + user_options = ClaudeAgentOptions( + system_prompt="You are a banana expert.", + model="claude-haiku-4-5", + max_turns=3, + allowed_tools=["Read", "Bash"], + agents={ + "code-reviewer": AgentDefinition( + description="Reviews code for bugs", + prompt="You are a strict reviewer. Flag anti-patterns.", + tools=["Read", "Grep"], + ), + }, + ) + + messages = [init_system_message(), FakeResultMessage(subtype="success")] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi", options=user_options): + pass + + asyncio.run(run()) + + root = captured[0].steps[0] + assert root.metadata["system_prompt"] == "You are a banana expert." + agents_meta = root.metadata["agents_defined"] + assert "code-reviewer" in agents_meta + assert agents_meta["code-reviewer"]["description"] == "Reviews code for bugs" + assert ( + agents_meta["code-reviewer"]["prompt"] + == "You are a strict reviewer. Flag anti-patterns." + ) + assert agents_meta["code-reviewer"]["tools"] == ["Read", "Grep"] + opts = root.metadata["options"] + assert opts["model"] == "claude-haiku-4-5" + assert opts["max_turns"] == 3 + assert opts["allowed_tools"] == ["Read", "Bash"] + + +def test_assistant_message_emits_chat_completion_step(): + """Each AssistantMessage becomes a CHAT_COMPLETION child of the root step.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(), + FakeAssistantMessage( + content=[ + FakeThinkingBlock("planning..."), + FakeTextBlock("answer turn 1"), + ], + model="claude-opus-4-7", + usage={ + "input_tokens": 12, + "output_tokens": 4, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + }, + stop_reason="end_turn", + ), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + nested = root_step.steps + assert len(nested) == 1 + chat = nested[0] + assert chat.step_type.value == "chat_completion" + assert chat.model == "claude-opus-4-7" + assert chat.provider == "anthropic" + assert "answer turn 1" in chat.output + assert chat.metadata["thinking"] == "planning..." + assert chat.prompt_tokens == 12 + assert chat.completion_tokens == 4 + assert chat.tokens == 16 + assert chat.metadata["stop_reason"] == "end_turn" + + +def _extract_hook_callbacks(options, event: str): + """Pull our callback functions out of options.hooks[event].""" + matchers = (getattr(options, "hooks", None) or {}).get(event, []) or [] + callbacks = [] + for m in matchers: + for cb in getattr(m, "hooks", []) or []: + callbacks.append(cb) + return callbacks + + +def test_tool_call_creates_tool_step_with_input_and_output(): + """A tool call yields a TOOL step with input/output/latency/tool_use_id.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + # The wrapper injects our hooks into options.hooks. Pull them out and + # invoke at the right moments to simulate the real SDK calling them. + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + yield FakeAssistantMessage( + content=[ + FakeTextBlock(""), + FakeToolUseBlock(id="t1", name="Bash", input={"command": "echo hi"}), + ] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "echo hi"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Bash", + "tool_input": {"command": "echo hi"}, + "tool_response": "hi", + }, + "t1", + {"signal": None}, + ) + yield FakeUserMessage(content=[FakeToolResultBlock(tool_use_id="t1", content="hi")]) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="run echo hi", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.name == "Bash" + assert t.inputs == {"command": "echo hi"} + assert t.output == "hi" + assert t.metadata["tool_use_id"] == "t1" + assert t.latency is not None and t.latency >= 0 + assert t.metadata.get("is_error") is False + + +def test_mcp_tool_name_is_parsed_into_metadata(): + """A tool named ``mcp__playwright__browser_click`` records the parsed metadata.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + yield FakeAssistantMessage( + content=[ + FakeToolUseBlock( + id="t1", + name="mcp__playwright__browser_click", + input={"selector": "#submit"}, + ) + ] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "mcp__playwright__browser_click", + "tool_input": {"selector": "#submit"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "mcp__playwright__browser_click", + "tool_input": {"selector": "#submit"}, + "tool_response": "clicked", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="click", options=cas.ClaudeAgentOptions()): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.name == "mcp__playwright__browser_click" + assert t.metadata["mcp_server"] == "playwright" + assert t.metadata["mcp_tool_name"] == "browser_click" + + +def test_subagent_messages_nest_under_agent_tool_step(): + """A message with ``parent_tool_use_id`` nests under the spawning Agent ToolStep.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + + yield init_system_message() + # Parent assistant turn dispatches the Agent tool + yield FakeAssistantMessage( + content=[ + FakeToolUseBlock( + id="t1", + name="Agent", + input={"description": "code-reviewer", "prompt": "review src/"}, + ) + ] + ) + # PreToolUse opens the Agent ToolStep + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Agent", + "tool_input": {"description": "code-reviewer", "prompt": "review src/"}, + }, + "t1", + {"signal": None}, + ) + # Subagent streams its own assistant message *while the Agent step is open* + yield FakeAssistantMessage( + content=[FakeTextBlock("subagent turn 1")], + parent_tool_use_id="t1", + ) + # Subagent ends; PostToolUse closes the Agent step + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Agent", + "tool_input": {"description": "code-reviewer"}, + "tool_response": "review complete", + }, + "t1", + {"signal": None}, + ) + yield FakeUserMessage( + content=[FakeToolResultBlock(tool_use_id="t1", content="review complete")] + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="dispatch subagent", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + # The root step has the parent assistant turn (the one that emitted Agent + # ToolUseBlock) AND the Agent ToolStep itself. + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + agent_tool_step = tool_steps[0] + assert agent_tool_step.name == "Agent" + # Subagent's assistant turn nests beneath the Agent tool step + subagent_chats = [ + s for s in agent_tool_step.steps if s.step_type.value == "chat_completion" + ] + assert len(subagent_chats) == 1, "expected subagent chat completion nested under Agent tool step" + assert "subagent turn 1" in subagent_chats[0].output + assert subagent_chats[0].metadata.get("parent_tool_use_id") == "t1" + + +def test_result_message_error_subtype_marks_root_step(): + """An error ResultMessage subtype is reflected on the root step's metadata.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message(), + FakeResultMessage( + subtype="error_max_turns", + is_error=True, + result=None, + stop_reason=None, + num_turns=10, + ), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + assert root_step.metadata["subtype"] == "error_max_turns" + assert root_step.metadata["is_error"] is True + assert root_step.metadata["num_turns"] == 10 + + +def test_post_tool_use_failure_marks_tool_step_as_error(): + """PostToolUseFailure fires instead of PostToolUse — the tool step is marked errored.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + async def fake_query(*, prompt, options=None, **kwargs): + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + fail_hooks = _extract_hook_callbacks(options, "PostToolUseFailure") + + yield init_system_message() + yield FakeAssistantMessage( + content=[FakeToolUseBlock(id="t1", name="Bash", input={"command": "false"})] + ) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "false"}, + }, + "t1", + {"signal": None}, + ) + for h in fail_hooks: + await h( + { + "hook_event_name": "PostToolUseFailure", + "tool_name": "Bash", + "tool_input": {"command": "false"}, + "error": "exit code 1", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query( + prompt="fail bash", options=cas.ClaudeAgentOptions() + ): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + t = tool_steps[0] + assert t.metadata["is_error"] is True + assert "exit code 1" in (t.output or "") + + +def test_user_hooks_compose_with_openlayer_hooks(): + """User-provided hooks run alongside ours; neither replaces the other.""" + import claude_agent_sdk as cas + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + user_called: list = [] + + async def user_hook(input_data, tool_use_id, context): + user_called.append(tool_use_id) + return { + "hookSpecificOutput": { + "hookEventName": "PreToolUse", + "permissionDecision": "deny", + "permissionDecisionReason": "test deny", + } + } + + user_options = cas.ClaudeAgentOptions( + hooks={"PreToolUse": [cas.HookMatcher(hooks=[user_hook])]} + ) + + async def fake_query(*, prompt, options=None, **kwargs): + # The wrapper should have appended our hook AFTER the user's. + pre_hooks = _extract_hook_callbacks(options, "PreToolUse") + post_hooks = _extract_hook_callbacks(options, "PostToolUse") + assert pre_hooks[0] is user_hook, "user hook must run first" + assert len(pre_hooks) >= 2, "openlayer hook must be appended after user hook" + + yield init_system_message() + yield FakeAssistantMessage( + content=[FakeToolUseBlock(id="t1", name="Bash", input={"command": "ls"})] + ) + # Simulate the SDK invoking *all* PreToolUse hooks (user first, then ours) + for h in pre_hooks: + await h( + { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": "ls"}, + }, + "t1", + {"signal": None}, + ) + for h in post_hooks: + await h( + { + "hook_event_name": "PostToolUse", + "tool_name": "Bash", + "tool_input": {"command": "ls"}, + "tool_response": "denied", + }, + "t1", + {"signal": None}, + ) + yield FakeResultMessage(subtype="success") + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi", options=user_options): + pass + + asyncio.run(run()) + + # User hook was invoked + assert user_called == ["t1"] + # Our hook ran too — the tool step exists in the trace + root_step = captured[0].steps[0] + tool_steps = [s for s in root_step.steps if s.step_type.value == "tool"] + assert len(tool_steps) == 1 + + +def test_mcp_env_is_stripped_from_agent_config_metadata(): + """``env`` and ``headers`` of MCP server configs must be redacted.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + messages = [ + init_system_message( + mcp_servers=[ + { + "name": "secret-server", + "command": "x", + "env": {"API_KEY": "supersecret"}, + "headers": {"Authorization": "Bearer xyz"}, + } + ] + ), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + async for m in make_stream(messages): + yield m + + captured, capture_fn = _capture_trace_publish() + with patch("claude_agent_sdk.query", fake_query), patch.object( + ol_tracer, "_publish", True + ), patch.object( + ol_tracer, "_upload_and_publish_trace", side_effect=capture_fn + ): + async def run(): + async for _ in traced_query(prompt="hi"): + pass + + asyncio.run(run()) + + root_step = captured[0].steps[0] + mcp = root_step.metadata["agent_config"]["mcp_servers"] + assert isinstance(mcp, list) + server = mcp[0] + assert "env" not in server, "env must be stripped" + assert "headers" not in server, "headers must be stripped" + # Non-secret keys are preserved + assert server.get("name") == "secret-server" + # The literal secret must not appear anywhere in the serialized metadata + serialized = repr(root_step.metadata) + assert "supersecret" not in serialized + assert "Bearer xyz" not in serialized + + +def test_trace_claude_agent_sdk_patches_module_query(): + """``trace_claude_agent_sdk()`` monkey-patches ``claude_agent_sdk.query``.""" + import claude_agent_sdk + + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + original = claude_agent_sdk.query + try: + trace_claude_agent_sdk() + assert claude_agent_sdk.query is not original + assert getattr(claude_agent_sdk.query, "_openlayer_patched", False) is True + + # Idempotent: a second call doesn't double-wrap. + after_first = claude_agent_sdk.query + trace_claude_agent_sdk() + assert claude_agent_sdk.query is after_first + assert getattr(claude_agent_sdk.query, "_openlayer_patched", False) is True + finally: + claude_agent_sdk.query = original + + +def test_trace_claude_agent_sdk_config_persists(): + """Init kwargs are persisted into the module-level config.""" + import claude_agent_sdk + + from openlayer.lib.integrations import claude_agent_sdk as integration + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + original = claude_agent_sdk.query + try: + trace_claude_agent_sdk( + inference_pipeline_id="pipe-123", + truncate_tool_output_chars=512, + capture_thinking=False, + ) + assert integration._config.inference_pipeline_id == "pipe-123" + assert integration._config.truncate_tool_output_chars == 512 + assert integration._config.capture_thinking is False + finally: + # Reset config for downstream tests + trace_claude_agent_sdk( + inference_pipeline_id=None, + truncate_tool_output_chars=8192, + capture_thinking=True, + redact_mcp_env=True, + ) + claude_agent_sdk.query = original + + +def test_trace_claude_agent_sdk_patches_claude_sdk_client(): + """``trace_claude_agent_sdk()`` also patches ``ClaudeSDKClient.query`` / ``.receive_response``.""" + import claude_agent_sdk + + from openlayer.lib.integrations.claude_agent_sdk import trace_claude_agent_sdk + + Client = claude_agent_sdk.ClaudeSDKClient + original_module_query = claude_agent_sdk.query + original_query = Client.query + original_receive = Client.receive_response + try: + trace_claude_agent_sdk() + assert Client.query is not original_query + assert Client.receive_response is not original_receive + assert getattr(Client, "_openlayer_patched", False) is True + + # Idempotent + after_first_query = Client.query + trace_claude_agent_sdk() + assert Client.query is after_first_query + finally: + Client.query = original_query + Client.receive_response = original_receive + try: + del Client._openlayer_patched + except AttributeError: + pass + claude_agent_sdk.query = original_module_query + + +def test_wrapped_stream_yields_identical_messages_in_identical_order(): + """The wrapper is a pure observer — output must equal the underlying stream.""" + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + original_messages = [ + init_system_message(), + FakeAssistantMessage(content=[FakeTextBlock("x")]), + FakeResultMessage(subtype="success"), + ] + + async def fake_query(*, prompt, options=None, **kwargs): + for m in original_messages: + yield m + + with patch("claude_agent_sdk.query", fake_query): + async def run(): + out = [] + async for m in traced_query(prompt="x"): + out.append(m) + return out + + result = asyncio.run(run()) + + # Same length, same identities (the wrapper must not substitute), same order + assert len(result) == len(original_messages) + for got, expected in zip(result, original_messages): + assert got is expected + diff --git a/tests/integrations/test_claude_agent_sdk_live.py b/tests/integrations/test_claude_agent_sdk_live.py new file mode 100644 index 00000000..81dd6b0b --- /dev/null +++ b/tests/integrations/test_claude_agent_sdk_live.py @@ -0,0 +1,75 @@ +"""Live integration test against the real Claude Agent SDK. + +This test is gated on the ``ANTHROPIC_API_KEY`` environment variable. It runs +a tiny ``query()`` against ``claude-haiku-4-5`` with no tools and asserts the +SDK produced a ``ResultMessage`` (proving the wrapper observed the stream). + +Run locally with the live test API keys set in env: + + ANTHROPIC_API_KEY=... OPENLAYER_API_KEY=... pytest \\ + tests/integrations/test_claude_agent_sdk_live.py -v + +In CI this test is automatically skipped when ``ANTHROPIC_API_KEY`` is unset. + +Note: the bundled ``claude-agent-sdk`` raises an internal ``Exception`` after +delivering ``ResultMessage`` when the underlying API returns ``is_error=True`` +(e.g. an invalid Anthropic API key). The wrapper correctly observes the +stream up to that point; we tolerate the trailing exception in this test so +it still exercises the trace publishing path with a real API key. +""" + +from __future__ import annotations + +import os +import asyncio + +import pytest + +pytestmark = pytest.mark.skipif( + not os.environ.get("ANTHROPIC_API_KEY"), + reason="ANTHROPIC_API_KEY not set", +) + + +def test_live_query_produces_valid_trace(): + pytest.importorskip("claude_agent_sdk") + from claude_agent_sdk import ClaudeAgentOptions + + from openlayer.lib.integrations.claude_agent_sdk import traced_query + + os.environ.setdefault( + "OPENLAYER_INFERENCE_PIPELINE_ID", + "d4ee57e5-cd26-4435-b321-0365760724ad", + ) + + async def run(): + messages = [] + try: + async for m in traced_query( + prompt="Say the word 'banana' and nothing else.", + options=ClaudeAgentOptions( + model="claude-haiku-4-5", + system_prompt=( + "You are a terse assistant that follows instructions " + "exactly. Never add filler words, never apologize, and " + "never add quotes around your answer." + ), + max_turns=2, + ), + ): + messages.append(m) + except Exception as exc: + # The SDK raises an Exception trailing ResultMessage when the + # underlying API errors. The wrapper observed everything up to + # that point; tolerate the trailing exception here. + messages.append(("__sdk_exception__", str(exc))) + return messages + + msgs = asyncio.run(run()) + real_msgs = [m for m in msgs if not (isinstance(m, tuple) and m[0] == "__sdk_exception__")] + assert any( + type(m).__name__ == "SystemMessage" for m in real_msgs + ), "Expected a SystemMessage(init) in the stream" + assert any( + type(m).__name__ == "ResultMessage" for m in real_msgs + ), "Expected a ResultMessage in the stream"