Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ Predicate agent examples.
- `predicate_browser_agent_minimal.py`: minimal `PredicateBrowserAgent` usage.
- `predicate_browser_agent_custom_prompt.py`: customize the compact prompt builder.
- `predicate_browser_agent_video_recording_playwright.py`: enable Playwright video recording via context options (recommended).
- `planner_executor_strict_fail_fast.py`: demonstrate `PlannerExecutorConfig(strict_fail_fast=True)` vs default retry/replan behavior.

154 changes: 154 additions & 0 deletions examples/agent/planner_executor_strict_fail_fast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
Example: PlannerExecutorAgent strict fail-fast behavior.

This demo runs the same failing required step in two modes:
- default mode (allows recovery/replan policy)
- strict fail-fast mode (abort immediately on required-step failure)

Why this example is deterministic:
- We inject a fixed single-step plan.
- We inject a fixed failed step outcome.
- We count whether recovery/replan hooks are reached.

Usage:
python examples/agent/planner_executor_strict_fail_fast.py
"""

from __future__ import annotations

import asyncio

from predicate.agents import (
Plan,
PlanStep,
PlannerExecutorAgent,
PlannerExecutorConfig,
PredicateSpec,
RetryConfig,
StepOutcome,
StepStatus,
)
from predicate.llm_provider import LLMProvider, LLMResponse


class FixedProvider(LLMProvider):
"""Minimal provider used only to satisfy agent construction."""

def __init__(self) -> None:
super().__init__(model="fixed-provider")

def generate(self, system_prompt: str, user_prompt: str, **kwargs) -> LLMResponse:
_ = system_prompt, user_prompt, kwargs
return LLMResponse(content="{}", model_name=self.model_name)

def supports_json_mode(self) -> bool:
return True

@property
def model_name(self) -> str:
return "fixed-provider"


class DemoRuntime:
"""Tiny runtime for the fail-fast demo."""

def __init__(self, start_url: str = "https://shop.example.com/search") -> None:
self._url = start_url

async def get_url(self) -> str:
return self._url

async def goto(self, url: str) -> None:
self._url = url

async def stabilize(self) -> None:
return None


async def run_demo(strict_fail_fast: bool) -> None:
config = PlannerExecutorConfig(
strict_fail_fast=strict_fail_fast,
retry=RetryConfig(max_replans=1),
auto_fallback_to_stepwise=False,
)
agent = PlannerExecutorAgent(
planner=FixedProvider(),
executor=FixedProvider(),
config=config,
)
runtime = DemoRuntime()

plan = Plan(
task="Open a product details page",
steps=[
PlanStep(
id=1,
goal="Click a product link",
action="CLICK",
intent="product link",
verify=[PredicateSpec(predicate="url_contains", args=["/product/"])],
required=True,
)
],
)

failed_step = StepOutcome(
step_id=1,
goal="Click a product link",
status=StepStatus.FAILED,
action_taken="CLICK(1)",
verification_passed=False,
error="verification_failed",
)

call_counts = {"recovery": 0, "replan": 0}

async def fake_plan(*args, **kwargs) -> Plan:
_ = args, kwargs
return plan

async def fake_execute_step(*args, **kwargs) -> StepOutcome:
_ = args, kwargs
return failed_step

async def fake_attempt_recovery(*args, **kwargs) -> bool:
_ = args, kwargs
call_counts["recovery"] += 1
return False

async def fake_replan(*args, **kwargs) -> Plan:
_ = args, kwargs
call_counts["replan"] += 1
# Mirror internal replan accounting so the loop exits after one replan.
agent._replans_used += 1 # type: ignore[attr-defined]
return plan

agent.plan = fake_plan # type: ignore[method-assign]
agent._execute_step = fake_execute_step # type: ignore[method-assign]
agent._attempt_recovery = fake_attempt_recovery # type: ignore[method-assign]
agent.replan = fake_replan # type: ignore[method-assign]

result = await agent.run(
runtime=runtime,
task="Open a product details page",
start_url="https://shop.example.com",
)

mode = "STRICT_FAIL_FAST" if strict_fail_fast else "DEFAULT"
print(f"\n=== {mode} ===")
print(f"success={result.success}")
print(f"error={result.error}")
print(f"steps_completed={result.steps_completed}")
print(f"replans_used={result.replans_used}")
print(f"recovery_calls={call_counts['recovery']}")
print(f"replan_calls={call_counts['replan']}")


async def main() -> None:
print("PlannerExecutorAgent strict fail-fast demo")
await run_demo(strict_fail_fast=False)
await run_demo(strict_fail_fast=True)


if __name__ == "__main__":
asyncio.run(main())
17 changes: 15 additions & 2 deletions predicate/agents/planner_executor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,11 @@ class PlannerExecutorConfig:
# Pre-step verification (skip step if predicates already pass)
pre_step_verification: bool = True

# Strict fail-fast mode:
# - required step failures abort the run immediately
# - disables recovery/replan and intra-step fallback recoveries
strict_fail_fast: bool = False

# Scroll-to-find: automatically scroll to find elements when not in viewport
scroll_to_find_enabled: bool = True
scroll_to_find_max_scrolls: int = 3 # Max scroll attempts per direction
Expand Down Expand Up @@ -4799,7 +4804,7 @@ async def _execute_step(
pass # Ignore snapshot errors

# If verification failed and we have optional substeps, try them
if not verification_passed and step.optional_substeps:
if not verification_passed and step.optional_substeps and not self.config.strict_fail_fast:
substep_outcomes = await self._execute_optional_substeps(
step.optional_substeps,
runtime,
Expand All @@ -4812,7 +4817,11 @@ async def _execute_step(
# Fallback: For navigation-causing actions, if URL changed significantly,
# consider the action successful even if predicate verification failed.
# This handles cases where local LLMs generate imprecise predicates.
if not verification_passed and original_action in ("TYPE_AND_SUBMIT", "CLICK"):
if (
not verification_passed
and original_action in ("TYPE_AND_SUBMIT", "CLICK")
and not self.config.strict_fail_fast
):
current_url = await runtime.get_url() if hasattr(runtime, "get_url") else None
if current_url and pre_url and current_url != pre_url:
# Check if this is a meaningful URL change (not just anchor change)
Expand Down Expand Up @@ -5233,6 +5242,10 @@ async def run(

# Handle failure
if outcome.status == StepStatus.FAILED and step.required:
if self.config.strict_fail_fast:
error = f"Step {step.id} failed: {outcome.error or 'verification_failed'}"
break

# Check if we've reached an authentication boundary
# This is a graceful terminal state - agent did all it could
if self.config.auth_boundary.enabled:
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/test_planner_executor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,81 @@ def test_page_context_max_chars_customizable(self) -> None:
config = PlannerExecutorConfig(use_page_context=True, page_context_max_chars=4000)
assert config.page_context_max_chars == 4000

def test_strict_fail_fast_default_disabled(self) -> None:
config = PlannerExecutorConfig()
assert config.strict_fail_fast is False

def test_strict_fail_fast_can_be_enabled(self) -> None:
config = PlannerExecutorConfig(strict_fail_fast=True)
assert config.strict_fail_fast is True


class TestStrictFailFastBehavior:
"""Behavioral tests for strict fail-fast mode."""

@pytest.mark.asyncio
async def test_run_aborts_required_failure_without_recovery_or_replan(self) -> None:
from unittest.mock import AsyncMock, MagicMock

from predicate.agents.planner_executor_agent import (
PlannerExecutorAgent,
StepOutcome,
StepStatus,
)

config = PlannerExecutorConfig(strict_fail_fast=True)
agent = PlannerExecutorAgent(
planner=MockLLMProvider(),
executor=MockLLMProvider(),
config=config,
)

plan = Plan(
task="Search for product",
steps=[
PlanStep(
id=1,
goal="Click product result",
action="CLICK",
intent="product link",
verify=[PredicateSpec(predicate="url_contains", args=["/product"])],
required=True,
)
],
)

failed_outcome = StepOutcome(
step_id=1,
goal="Click product result",
status=StepStatus.FAILED,
action_taken="CLICK(1)",
verification_passed=False,
error="verification_failed",
)

runtime = MagicMock()
runtime.get_url = AsyncMock(return_value="https://shop.example.com/search")
runtime.goto = AsyncMock()
runtime.read_markdown = AsyncMock(return_value=None)

agent.plan = AsyncMock(return_value=plan) # type: ignore[method-assign]
agent._execute_step = AsyncMock(return_value=failed_outcome) # type: ignore[method-assign]
agent.replan = AsyncMock(side_effect=RuntimeError("should not replan")) # type: ignore[method-assign]
agent._attempt_recovery = AsyncMock(return_value=True) # type: ignore[method-assign]

outcome = await agent.run(
runtime,
task="Search for product",
start_url="https://shop.example.com",
)

assert outcome.success is False
assert outcome.replans_used == 0
assert outcome.error == "Step 1 failed: verification_failed"
assert len(outcome.step_outcomes) == 1
agent.replan.assert_not_awaited() # type: ignore[attr-defined]
agent._attempt_recovery.assert_not_awaited() # type: ignore[attr-defined]


# ---------------------------------------------------------------------------
# Test PlanStep with optional_substeps
Expand Down
Loading