From 8d985f09a1d94e12631f735b8844edcc707bcd4d Mon Sep 17 00:00:00 2001 From: "itarun.p" Date: Mon, 4 May 2026 10:07:47 +0700 Subject: [PATCH 1/2] feat(compiler): add cache_control breakpoints for Anthropic prompt caching Compiler reuses base context A (system + doc) across N+M+2 LLM calls per document. Without cache_control markers, every call rebills the full document content as input tokens. Adds two breakpoints: - end of doc_msg: caches (system + doc) for summary, plan, every concept - end of assistant summary: caches (system + doc + summary) for plan and every concept generation call For non-Anthropic providers, the list-of-blocks payload is a valid OpenAI-compatible shape; LiteLLM normalizes cache_control away. Side fix: _llm_call_async now forwards **kwargs for parity with _llm_call (memory observation #82886). Refs #37 --- openkb/agent/compiler.py | 67 ++++++++++++++++----- tests/test_compiler.py | 125 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 16 deletions(-) diff --git a/openkb/agent/compiler.py b/openkb/agent/compiler.py index d202fc4e..bf15a1c3 100644 --- a/openkb/agent/compiler.py +++ b/openkb/agent/compiler.py @@ -6,6 +6,13 @@ Step 3: A + summary → concepts plan (create/update/related). Step 4: Concurrent LLM calls (A cached) → generate new + rewrite updated concepts. Step 5: Code adds cross-ref links to related concepts, updates index. + +Anthropic prompt caching is enabled via ``cache_control`` markers at two +breakpoints: end of the document message (caches system + doc across all +N+M+2 calls) and end of the assistant summary message (caches the additional +summary prefix across N+M concept-generation calls). Providers that do not +support cache_control receive a normalized list-of-blocks content payload, +which LiteLLM passes through cleanly. """ from __future__ import annotations @@ -131,6 +138,17 @@ # LLM helpers # --------------------------------------------------------------------------- +def _cached_text(text: str) -> list[dict]: + """Wrap a text payload into a content-block list with an Anthropic + ephemeral cache_control marker. + + LiteLLM passes the marker through to Anthropic (and OpenRouter → + Anthropic). For providers that ignore cache_control, the list-of-blocks + payload remains a valid OpenAI-compatible content shape. + """ + return [{"type": "text", "text": text, "cache_control": {"type": "ephemeral"}}] + + class _Spinner: """Animated dots spinner that runs in a background thread.""" @@ -168,15 +186,23 @@ def _format_usage(elapsed: float, usage) -> str: def _fmt_messages(messages: list[dict], max_content: int = 200) -> str: - """Format messages for debug output, truncating long content.""" + """Format messages for debug output, truncating long content. + + Accepts both plain-string content and the list-of-blocks shape used by + cache_control-tagged messages (joins all text blocks for preview). + """ parts = [] for msg in messages: role = msg["role"] - content = msg["content"] - if len(content) > max_content: - preview = content[:max_content] + f"... ({len(content)} chars)" + raw = msg["content"] + if isinstance(raw, list): + text = "".join(b.get("text", "") for b in raw if isinstance(b, dict)) else: - preview = content + text = raw + if len(text) > max_content: + preview = text[:max_content] + f"... ({len(text)} chars)" + else: + preview = text parts.append(f" [{role}] {preview}") return "\n".join(parts) @@ -199,13 +225,15 @@ def _llm_call(model: str, messages: list[dict], step_name: str, **kwargs) -> str return content.strip() -async def _llm_call_async(model: str, messages: list[dict], step_name: str) -> str: +async def _llm_call_async(model: str, messages: list[dict], step_name: str, **kwargs) -> str: """Async LLM call with timing output and debug logging.""" logger.debug("LLM request [%s]:\n%s", step_name, _fmt_messages(messages)) + if kwargs: + logger.debug("LLM kwargs [%s]: %s", step_name, kwargs) t0 = time.time() - response = await litellm.acompletion(model=model, messages=messages) + response = await litellm.acompletion(model=model, messages=messages, **kwargs) content = response.choices[0].message.content or "" elapsed = time.time() - t0 @@ -587,10 +615,14 @@ async def _compile_concepts( # --- Step 2: Get concepts plan (A cached) --- concept_briefs = _read_concept_briefs(wiki_dir) + # Second cache breakpoint: end of the assistant summary message. Covers + # (system + doc + summary) for the plan call and every concept call. + summary_msg = {"role": "assistant", "content": _cached_text(summary)} + plan_raw = _llm_call(model, [ system_msg, doc_msg, - {"role": "assistant", "content": summary}, + summary_msg, {"role": "user", "content": _CONCEPTS_PLAN_USER.format( concept_briefs=concept_briefs, )}, @@ -632,7 +664,7 @@ async def _gen_create(concept: dict) -> tuple[str, str, bool, str]: raw = await _llm_call_async(model, [ system_msg, doc_msg, - {"role": "assistant", "content": summary}, + summary_msg, {"role": "user", "content": _CONCEPT_PAGE_USER.format( title=title, doc_name=doc_name, update_instruction="", @@ -663,7 +695,7 @@ async def _gen_update(concept: dict) -> tuple[str, str, bool, str]: raw = await _llm_call_async(model, [ system_msg, doc_msg, - {"role": "assistant", "content": summary}, + summary_msg, {"role": "user", "content": _CONCEPT_UPDATE_USER.format( title=title, doc_name=doc_name, existing_content=existing_content, @@ -741,13 +773,15 @@ async def compile_short_doc( schema_md = get_agents_md(wiki_dir) content = source_path.read_text(encoding="utf-8") - # Base context A: system + document + # Base context A: system + document. cache_control marker on the doc + # message creates a cache breakpoint that covers (system + doc) for + # every downstream call (summary, concepts-plan, every concept page). system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format( schema_md=schema_md, language=language, )} - doc_msg = {"role": "user", "content": _SUMMARY_USER.format( + doc_msg = {"role": "user", "content": _cached_text(_SUMMARY_USER.format( doc_name=doc_name, content=content, - )} + ))} # --- Step 1: Generate summary --- summary_raw = _llm_call(model, [system_msg, doc_msg], "summary") @@ -792,13 +826,14 @@ async def compile_long_doc( schema_md = get_agents_md(wiki_dir) summary_content = summary_path.read_text(encoding="utf-8") - # Base context A + # Base context A. cache_control marker on the doc message creates a + # cache breakpoint covering (system + doc) for every concept call. system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format( schema_md=schema_md, language=language, )} - doc_msg = {"role": "user", "content": _LONG_DOC_SUMMARY_USER.format( + doc_msg = {"role": "user", "content": _cached_text(_LONG_DOC_SUMMARY_USER.format( doc_name=doc_name, doc_id=doc_id, content=summary_content, - )} + ))} # --- Step 1: Generate overview --- overview = _llm_call(model, [system_msg, doc_msg], "overview") diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 2a2e82dc..cb02efc0 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -651,6 +651,131 @@ async def test_handles_bad_json(self, tmp_path): assert (wiki / "summaries" / "doc.md").exists() +class TestCacheControl: + """Verify cache_control breakpoints are emitted on the right messages + so Anthropic prompt caching can hit on every reuse of the base context. + """ + + @staticmethod + def _has_cache_breakpoint(message: dict) -> bool: + content = message.get("content") + if not isinstance(content, list): + return False + return any( + isinstance(b, dict) and b.get("cache_control", {}).get("type") == "ephemeral" + for b in content + ) + + @pytest.mark.asyncio + async def test_short_doc_marks_doc_and_summary(self, tmp_path): + wiki = tmp_path / "wiki" + (wiki / "sources").mkdir(parents=True) + (wiki / "summaries").mkdir(parents=True) + (wiki / "concepts").mkdir(parents=True) + (wiki / "index.md").write_text( + "# Index\n\n## Documents\n\n## Concepts\n", encoding="utf-8", + ) + src = wiki / "sources" / "doc.md" + src.write_text("Body text about caching.", encoding="utf-8") + (tmp_path / ".openkb").mkdir() + + summary_response = json.dumps({"brief": "B", "content": "summary body"}) + plan_response = json.dumps({ + "create": [{"name": "topic", "title": "Topic"}], + "update": [], "related": [], + }) + concept_response = json.dumps({"brief": "C", "content": "page body"}) + + captured_sync_calls: list[list[dict]] = [] + captured_async_calls: list[list[dict]] = [] + + sync_responses = [summary_response, plan_response] + + def sync_side_effect(*args, **kwargs): + captured_sync_calls.append(kwargs["messages"]) + idx = min(len(captured_sync_calls) - 1, len(sync_responses) - 1) + mock_resp = MagicMock() + mock_resp.choices = [MagicMock()] + mock_resp.choices[0].message.content = sync_responses[idx] + mock_resp.usage = MagicMock(prompt_tokens=1, completion_tokens=1) + mock_resp.usage.prompt_tokens_details = None + return mock_resp + + async def async_side_effect(*args, **kwargs): + captured_async_calls.append(kwargs["messages"]) + mock_resp = MagicMock() + mock_resp.choices = [MagicMock()] + mock_resp.choices[0].message.content = concept_response + mock_resp.usage = MagicMock(prompt_tokens=1, completion_tokens=1) + mock_resp.usage.prompt_tokens_details = None + return mock_resp + + with patch("openkb.agent.compiler.litellm") as mock_litellm: + mock_litellm.completion = MagicMock(side_effect=sync_side_effect) + mock_litellm.acompletion = AsyncMock(side_effect=async_side_effect) + await compile_short_doc("doc", src, tmp_path, "anthropic/claude-sonnet-4-5") + + # Step 1 (summary): doc_msg carries the breakpoint. + summary_call = captured_sync_calls[0] + assert summary_call[0]["role"] == "system" + assert summary_call[1]["role"] == "user" + assert self._has_cache_breakpoint(summary_call[1]), ( + "doc_msg in summary call must carry an ephemeral cache_control marker" + ) + + # Step 2 (plan): doc_msg AND assistant summary both carry breakpoints. + plan_call = captured_sync_calls[1] + assert self._has_cache_breakpoint(plan_call[1]) + assert plan_call[2]["role"] == "assistant" + assert self._has_cache_breakpoint(plan_call[2]), ( + "assistant summary in plan call must carry a cache_control marker" + ) + + # Step 3 (concept generation): same two breakpoints reused. + assert captured_async_calls, "expected at least one async concept call" + concept_call = captured_async_calls[0] + assert self._has_cache_breakpoint(concept_call[1]) + assert self._has_cache_breakpoint(concept_call[2]) + + @pytest.mark.asyncio + async def test_long_doc_marks_doc_message(self, tmp_path): + wiki = tmp_path / "wiki" + (wiki / "summaries").mkdir(parents=True) + (wiki / "concepts").mkdir(parents=True) + (wiki / "index.md").write_text( + "# Index\n\n## Documents\n\n## Concepts\n", encoding="utf-8", + ) + sp = wiki / "summaries" / "big.md" + sp.write_text("PageIndex tree summary.", encoding="utf-8") + (tmp_path / ".openkb").mkdir() + + captured: list[list[dict]] = [] + plan_response = json.dumps({"create": [], "update": [], "related": []}) + + def sync_side_effect(*args, **kwargs): + captured.append(kwargs["messages"]) + mock_resp = MagicMock() + mock_resp.choices = [MagicMock()] + # First call: overview (plain text); second: plan (JSON). + mock_resp.choices[0].message.content = ( + "Overview text" if len(captured) == 1 else plan_response + ) + mock_resp.usage = MagicMock(prompt_tokens=1, completion_tokens=1) + mock_resp.usage.prompt_tokens_details = None + return mock_resp + + with patch("openkb.agent.compiler.litellm") as mock_litellm: + mock_litellm.completion = MagicMock(side_effect=sync_side_effect) + mock_litellm.acompletion = AsyncMock() + await compile_long_doc( + "big", sp, "doc-id-1", tmp_path, "anthropic/claude-sonnet-4-5", + ) + + overview_call = captured[0] + assert overview_call[1]["role"] == "user" + assert self._has_cache_breakpoint(overview_call[1]) + + class TestCompileLongDoc: @pytest.mark.asyncio async def test_full_pipeline(self, tmp_path): From 25e56ab7294569a1a888382773845919f8d42a8e Mon Sep 17 00:00:00 2001 From: "itarun.p" Date: Mon, 4 May 2026 11:39:20 +0700 Subject: [PATCH 2/2] feat(compiler): opt-in OpenRouter Response Caching for compiler LLM calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When response_cache is enabled in the per-KB config and the active model is routed via openrouter/, compile_short_doc and compile_long_doc forward extra_headers={"X-OpenRouter-Cache": "true", optional X-OpenRouter-Cache-TTL} on every LiteLLM call. OpenRouter then returns a cached response in 80-300ms with zero token billing on identical follow-up requests, which benefits the compile-retry path and repeated lint runs. Default OFF — opt-in only. Response caching stores responses on OpenRouter, which conflicts with strict zero-data-retention postures. Skips header emission when the model is not openrouter/-routed, so direct Anthropic/OpenAI/etc. calls remain byte-identical to before. Scope is intentionally limited to compiler.py (the only direct LiteLLM caller). query/chat/linter route through the OpenAI Agents SDK; threading custom headers there is a separate change. Refs #39 Depends on #38 --- openkb/agent/compiler.py | 56 +++++++++++++-- openkb/config.py | 9 +++ tests/test_compiler.py | 149 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 7 deletions(-) diff --git a/openkb/agent/compiler.py b/openkb/agent/compiler.py index bf15a1c3..70964d1b 100644 --- a/openkb/agent/compiler.py +++ b/openkb/agent/compiler.py @@ -149,6 +149,39 @@ def _cached_text(text: str) -> list[dict]: return [{"type": "text", "text": text, "cache_control": {"type": "ephemeral"}}] +def _response_cache_headers(config: dict, model: str) -> dict: + """Build OpenRouter Response Caching headers from config. + + Returns an empty dict when the feature is disabled or the active model + is not routed through OpenRouter (the headers would have no effect on + direct provider calls). When enabled, emits ``X-OpenRouter-Cache: true`` + and, if a TTL is configured, ``X-OpenRouter-Cache-TTL: ``. + """ + if not config.get("response_cache", False): + return {} + if not model.startswith("openrouter/"): + return {} + headers = {"X-OpenRouter-Cache": "true"} + ttl = config.get("response_cache_ttl") + if ttl is not None: + headers["X-OpenRouter-Cache-TTL"] = str(int(ttl)) + return headers + + +def _build_llm_kwargs(config: dict, model: str) -> dict: + """Compose extra LiteLLM kwargs derived from config (e.g. response cache). + + Currently only emits an ``extra_headers`` entry when OpenRouter Response + Caching is enabled. Returns an empty dict when no extras apply, so the + caller can splat with ``**`` and fall back to existing behaviour. + """ + extras: dict = {} + cache_headers = _response_cache_headers(config, model) + if cache_headers: + extras["extra_headers"] = cache_headers + return extras + + class _Spinner: """Animated dots spinner that runs in a background thread.""" @@ -604,13 +637,18 @@ async def _compile_concepts( max_concurrency: int, doc_brief: str = "", doc_type: str = "short", + extra_kwargs: dict | None = None, ) -> None: """Shared Steps 2-4: concepts plan → generate/update → index. Uses ``_CONCEPTS_PLAN_USER`` to get a plan with create/update/related actions, then executes each action type accordingly. + + ``extra_kwargs`` is forwarded to every LiteLLM call (e.g. response-cache + headers). Defaults to no extras. """ source_file = f"summaries/{doc_name}.md" + extra_kwargs = extra_kwargs or {} # --- Step 2: Get concepts plan (A cached) --- concept_briefs = _read_concept_briefs(wiki_dir) @@ -626,7 +664,7 @@ async def _compile_concepts( {"role": "user", "content": _CONCEPTS_PLAN_USER.format( concept_briefs=concept_briefs, )}, - ], "concepts-plan", max_tokens=1024) + ], "concepts-plan", max_tokens=1024, **extra_kwargs) try: parsed = _parse_json(plan_raw) @@ -669,7 +707,7 @@ async def _gen_create(concept: dict) -> tuple[str, str, bool, str]: title=title, doc_name=doc_name, update_instruction="", )}, - ], f"concept: {name}") + ], f"concept: {name}", **extra_kwargs) try: parsed = _parse_json(raw) brief = parsed.get("brief", "") @@ -700,7 +738,7 @@ async def _gen_update(concept: dict) -> tuple[str, str, bool, str]: title=title, doc_name=doc_name, existing_content=existing_content, )}, - ], f"update: {name}") + ], f"update: {name}", **extra_kwargs) try: parsed = _parse_json(raw) brief = parsed.get("brief", "") @@ -783,8 +821,10 @@ async def compile_short_doc( doc_name=doc_name, content=content, ))} + extra_kwargs = _build_llm_kwargs(config, model) + # --- Step 1: Generate summary --- - summary_raw = _llm_call(model, [system_msg, doc_msg], "summary") + summary_raw = _llm_call(model, [system_msg, doc_msg], "summary", **extra_kwargs) try: summary_parsed = _parse_json(summary_raw) doc_brief = summary_parsed.get("brief", "") @@ -798,7 +838,7 @@ async def compile_short_doc( await _compile_concepts( wiki_dir, kb_dir, model, system_msg, doc_msg, summary, doc_name, max_concurrency, doc_brief=doc_brief, - doc_type="short", + doc_type="short", extra_kwargs=extra_kwargs, ) @@ -835,12 +875,14 @@ async def compile_long_doc( doc_name=doc_name, doc_id=doc_id, content=summary_content, ))} + extra_kwargs = _build_llm_kwargs(config, model) + # --- Step 1: Generate overview --- - overview = _llm_call(model, [system_msg, doc_msg], "overview") + overview = _llm_call(model, [system_msg, doc_msg], "overview", **extra_kwargs) # --- Steps 2-4: Concept plan → generate/update → index --- await _compile_concepts( wiki_dir, kb_dir, model, system_msg, doc_msg, overview, doc_name, max_concurrency, doc_brief=doc_description, - doc_type="pageindex", + doc_type="pageindex", extra_kwargs=extra_kwargs, ) diff --git a/openkb/config.py b/openkb/config.py index b83e1346..e8dc1e06 100644 --- a/openkb/config.py +++ b/openkb/config.py @@ -9,6 +9,15 @@ "model": "gpt-5.4-mini", "language": "en", "pageindex_threshold": 20, + # Opt-in OpenRouter Response Caching for compiler LLM calls. + # When enabled and the active model is routed via openrouter/, identical + # requests (same model, messages, params) return a cached response with + # zero token billing. Default off because responses are stored on + # OpenRouter — conflicts with strict zero-data-retention postures. + "response_cache": False, + # Optional TTL override in seconds (1..86400). When None, OpenRouter's + # default of 300s applies. + "response_cache_ttl": None, } GLOBAL_CONFIG_DIR = Path.home() / ".config" / "openkb" diff --git a/tests/test_compiler.py b/tests/test_compiler.py index cb02efc0..f8cb2a7e 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -21,6 +21,8 @@ _add_related_link, _backlink_summary, _backlink_concepts, + _response_cache_headers, + _build_llm_kwargs, ) @@ -776,6 +778,153 @@ def sync_side_effect(*args, **kwargs): assert self._has_cache_breakpoint(overview_call[1]) +class TestResponseCacheHeaders: + """Pure-function unit tests for the OpenRouter response-cache helper.""" + + def test_disabled_returns_empty(self): + assert _response_cache_headers( + {"response_cache": False}, "openrouter/anthropic/claude-sonnet-4.5", + ) == {} + + def test_missing_key_treated_as_disabled(self): + assert _response_cache_headers({}, "openrouter/anthropic/claude-sonnet-4.5") == {} + + def test_enabled_but_non_openrouter_model_returns_empty(self): + assert _response_cache_headers( + {"response_cache": True}, "anthropic/claude-sonnet-4.5", + ) == {} + assert _response_cache_headers( + {"response_cache": True}, "gpt-4o-mini", + ) == {} + + def test_enabled_openrouter_returns_cache_header(self): + headers = _response_cache_headers( + {"response_cache": True}, "openrouter/anthropic/claude-sonnet-4.5", + ) + assert headers == {"X-OpenRouter-Cache": "true"} + + def test_ttl_emits_ttl_header(self): + headers = _response_cache_headers( + {"response_cache": True, "response_cache_ttl": 600}, + "openrouter/anthropic/claude-sonnet-4.5", + ) + assert headers == { + "X-OpenRouter-Cache": "true", + "X-OpenRouter-Cache-TTL": "600", + } + + def test_ttl_none_omits_ttl_header(self): + headers = _response_cache_headers( + {"response_cache": True, "response_cache_ttl": None}, + "openrouter/anthropic/claude-sonnet-4.5", + ) + assert "X-OpenRouter-Cache-TTL" not in headers + + def test_build_llm_kwargs_packs_headers(self): + kw = _build_llm_kwargs( + {"response_cache": True, "response_cache_ttl": 60}, + "openrouter/anthropic/claude-sonnet-4.5", + ) + assert kw == {"extra_headers": { + "X-OpenRouter-Cache": "true", + "X-OpenRouter-Cache-TTL": "60", + }} + + def test_build_llm_kwargs_empty_when_disabled(self): + assert _build_llm_kwargs({"response_cache": False}, "openrouter/x") == {} + + +class TestResponseCacheIntegration: + """End-to-end check that compile_short_doc forwards extra_headers when the + response_cache flag is on, and does not when it is off (regression). + """ + + @pytest.mark.asyncio + async def test_flag_on_forwards_extra_headers(self, tmp_path): + wiki = tmp_path / "wiki" + (wiki / "sources").mkdir(parents=True) + (wiki / "summaries").mkdir(parents=True) + (wiki / "concepts").mkdir(parents=True) + (wiki / "index.md").write_text( + "# Index\n\n## Documents\n\n## Concepts\n", encoding="utf-8", + ) + src = wiki / "sources" / "doc.md" + src.write_text("Body.", encoding="utf-8") + openkb_dir = tmp_path / ".openkb" + openkb_dir.mkdir() + # Per-KB config opts in to response cache. + (openkb_dir / "config.yaml").write_text( + "response_cache: true\nresponse_cache_ttl: 600\n", encoding="utf-8", + ) + + summary_resp = json.dumps({"brief": "B", "content": "summary body"}) + plan_resp = json.dumps({"create": [], "update": [], "related": []}) + sync_responses = [summary_resp, plan_resp] + captured_kwargs: list[dict] = [] + + def sync_side_effect(*args, **kwargs): + captured_kwargs.append(kwargs) + idx = min(len(captured_kwargs) - 1, len(sync_responses) - 1) + mock_resp = MagicMock() + mock_resp.choices = [MagicMock()] + mock_resp.choices[0].message.content = sync_responses[idx] + mock_resp.usage = MagicMock(prompt_tokens=1, completion_tokens=1) + mock_resp.usage.prompt_tokens_details = None + return mock_resp + + with patch("openkb.agent.compiler.litellm") as mock_litellm: + mock_litellm.completion = MagicMock(side_effect=sync_side_effect) + mock_litellm.acompletion = AsyncMock() + await compile_short_doc( + "doc", src, tmp_path, "openrouter/anthropic/claude-sonnet-4.5", + ) + + assert captured_kwargs, "expected at least one sync LLM call" + # Every sync call must carry extra_headers with the cache markers. + for kw in captured_kwargs: + assert "extra_headers" in kw + assert kw["extra_headers"].get("X-OpenRouter-Cache") == "true" + assert kw["extra_headers"].get("X-OpenRouter-Cache-TTL") == "600" + + @pytest.mark.asyncio + async def test_flag_off_no_extra_headers(self, tmp_path): + wiki = tmp_path / "wiki" + (wiki / "sources").mkdir(parents=True) + (wiki / "summaries").mkdir(parents=True) + (wiki / "concepts").mkdir(parents=True) + (wiki / "index.md").write_text( + "# Index\n\n## Documents\n\n## Concepts\n", encoding="utf-8", + ) + src = wiki / "sources" / "doc.md" + src.write_text("Body.", encoding="utf-8") + (tmp_path / ".openkb").mkdir() # no config.yaml → defaults (off) + + summary_resp = json.dumps({"brief": "B", "content": "summary body"}) + plan_resp = json.dumps({"create": [], "update": [], "related": []}) + sync_responses = [summary_resp, plan_resp] + captured_kwargs: list[dict] = [] + + def sync_side_effect(*args, **kwargs): + captured_kwargs.append(kwargs) + idx = min(len(captured_kwargs) - 1, len(sync_responses) - 1) + mock_resp = MagicMock() + mock_resp.choices = [MagicMock()] + mock_resp.choices[0].message.content = sync_responses[idx] + mock_resp.usage = MagicMock(prompt_tokens=1, completion_tokens=1) + mock_resp.usage.prompt_tokens_details = None + return mock_resp + + with patch("openkb.agent.compiler.litellm") as mock_litellm: + mock_litellm.completion = MagicMock(side_effect=sync_side_effect) + mock_litellm.acompletion = AsyncMock() + await compile_short_doc( + "doc", src, tmp_path, "openrouter/anthropic/claude-sonnet-4.5", + ) + + for kw in captured_kwargs: + assert "extra_headers" not in kw, "no headers should leak when flag is off" + + class TestCompileLongDoc: @pytest.mark.asyncio async def test_full_pipeline(self, tmp_path):