fix one shot completion signalling; clean up error messages

This commit is contained in:
AI Christianson 2024-12-17 15:34:41 -05:00
parent 5878a53710
commit aee3ef9b86
6 changed files with 130 additions and 111 deletions

View File

@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
- Fix one shot completion signaling.
- Clean up error outputs.
## [0.6.2]
- Allow shell commands to be run in read-only mode.
- When asking for shell command approval, allow cowboy mode to be enabled.

View File

@ -1,5 +1,6 @@
import argparse
import sys
from typing import Optional
from rich.panel import Panel
from rich.console import Console
from langchain_core.messages import HumanMessage
@ -21,7 +22,6 @@ from ra_aid.prompts import (
PLANNING_PROMPT,
IMPLEMENTATION_PROMPT,
)
from ra_aid.exceptions import TaskCompletedException
import time
from anthropic import APIError, APITimeoutError, RateLimitError, InternalServerError
from ra_aid.llm import initialize_llm
@ -198,8 +198,20 @@ def is_stage_requested(stage: str) -> bool:
return len(_global_memory.get('implementation_requested', [])) > 0
return False
def run_agent_with_retry(agent, prompt: str, config: dict):
"""Run an agent with retry logic for internal server errors."""
def run_agent_with_retry(agent, prompt: str, config: dict) -> Optional[str]:
"""Run an agent with retry logic for internal server errors and task completion handling.
Args:
agent: The agent to run
prompt: The prompt to send to the agent
config: Configuration dictionary for the agent
Returns:
Optional[str]: The completion message if task was completed, None otherwise
Handles API errors with exponential backoff retry logic and checks for task
completion after each chunk of output.
"""
max_retries = 20
base_delay = 1 # Initial delay in seconds
@ -210,6 +222,17 @@ def run_agent_with_retry(agent, prompt: str, config: dict):
config
):
print_agent_output(chunk)
# Check for task completion after each chunk
if _global_memory.get('task_completed'):
completion_msg = _global_memory.get('completion_message', 'Task was completed successfully.')
print_stage_header("Task Completed")
console.print(Panel(
f"[green]{completion_msg}[/green]",
title="Task Completed",
style="green"
))
return completion_msg
break
except (InternalServerError, APITimeoutError, RateLimitError, APIError) as e:
if attempt == max_retries - 1:
@ -291,106 +314,100 @@ def run_research_subtasks(base_task: str, config: dict, model, expert_enabled: b
def main():
"""Main entry point for the ra-aid command line tool."""
try:
try:
args = parse_arguments()
expert_enabled, expert_missing = validate_environment(args) # Will exit if main env vars missing
if expert_missing:
console.print(Panel(
f"[yellow]Expert tools disabled due to missing configuration:[/yellow]\n" +
"\n".join(f"- {m}" for m in expert_missing) +
"\nSet the required environment variables or args to enable expert mode.",
title="Expert Tools Disabled",
style="yellow"
))
# Create the base model after validation
model = initialize_llm(args.provider, args.model)
args = parse_arguments()
expert_enabled, expert_missing = validate_environment(args) # Will exit if main env vars missing
if expert_missing:
console.print(Panel(
f"[yellow]Expert tools disabled due to missing configuration:[/yellow]\n" +
"\n".join(f"- {m}" for m in expert_missing) +
"\nSet the required environment variables or args to enable expert mode.",
title="Expert Tools Disabled",
style="yellow"
))
# Create the base model after validation
model = initialize_llm(args.provider, args.model)
# Validate message is provided
if not args.message:
print_error("--message is required")
sys.exit(1)
base_task = args.message
config = {
"configurable": {
"thread_id": "abc123"
},
"recursion_limit": 100,
"research_only": args.research_only,
"cowboy_mode": args.cowboy_mode
}
# Store config in global memory for access by is_informational_query
_global_memory['config'] = config
# Store expert provider and model in config
_global_memory['config']['expert_provider'] = args.expert_provider
_global_memory['config']['expert_model'] = args.expert_model
# Run research stage
print_stage_header("Research Stage")
# Create research agent
research_agent = create_react_agent(
model,
get_research_tools(research_only=_global_memory.get('config', {}).get('research_only', False), expert_enabled=expert_enabled),
checkpointer=research_memory
)
research_prompt = f"""User query: {base_task} --keep it simple
# Validate message is provided
if not args.message:
print_error("--message is required")
sys.exit(1)
base_task = args.message
config = {
"configurable": {
"thread_id": "abc123"
},
"recursion_limit": 100,
"research_only": args.research_only,
"cowboy_mode": args.cowboy_mode
}
# Store config in global memory for access by is_informational_query
_global_memory['config'] = config
# Store expert provider and model in config
_global_memory['config']['expert_provider'] = args.expert_provider
_global_memory['config']['expert_model'] = args.expert_model
# Run research stage
print_stage_header("Research Stage")
# Create research agent
research_agent = create_react_agent(
model,
get_research_tools(research_only=_global_memory.get('config', {}).get('research_only', False), expert_enabled=expert_enabled),
checkpointer=research_memory
)
research_prompt = f"""User query: {base_task} --keep it simple
{RESEARCH_PROMPT}
Be very thorough in your research and emit lots of snippets, key facts. If you take more than a few steps, be eager to emit research subtasks.{'' if args.research_only else ' Only request implementation if the user explicitly asked for changes to be made.'}"""
try:
run_agent_with_retry(research_agent, research_prompt, config)
except TaskCompletedException as e:
print_stage_header("Task Completed")
raise # Re-raise to be caught by outer handler
# Run research agent and check for one-shot completion
output = run_agent_with_retry(research_agent, research_prompt, config)
if isinstance(output, str) and "one_shot_completed" in str(output):
print_stage_header("Task Completed")
console.print(Panel(
"[green]Task was completed successfully as a one-shot operation.[/green]",
title="Task Completed",
style="green"
))
sys.exit(0)
# Run any research subtasks
run_research_subtasks(base_task, config, model, expert_enabled=expert_enabled)
# Proceed with planning and implementation if not an informational query
if not is_informational_query():
print_stage_header("Planning Stage")
# Create planning agent
planning_agent = create_react_agent(model, get_planning_tools(expert_enabled=expert_enabled), checkpointer=planning_memory)
planning_prompt = PLANNING_PROMPT.format(
research_notes=get_memory_value('research_notes'),
key_facts=get_memory_value('key_facts'),
key_snippets=get_memory_value('key_snippets'),
base_task=base_task,
related_files="\n".join(get_related_files())
)
# Run any research subtasks
run_research_subtasks(base_task, config, model, expert_enabled=expert_enabled)
# Proceed with planning and implementation if not an informational query
if not is_informational_query():
print_stage_header("Planning Stage")
# Create planning agent
planning_agent = create_react_agent(model, get_planning_tools(expert_enabled=expert_enabled), checkpointer=planning_memory)
planning_prompt = PLANNING_PROMPT.format(
research_notes=get_memory_value('research_notes'),
key_facts=get_memory_value('key_facts'),
key_snippets=get_memory_value('key_snippets'),
base_task=base_task,
related_files="\n".join(get_related_files())
)
# Run planning agent
run_agent_with_retry(planning_agent, planning_prompt, config)
# Run planning agent
run_agent_with_retry(planning_agent, planning_prompt, config)
# Run implementation stage with task-specific agents
run_implementation_stage(
base_task,
get_memory_value('tasks'),
get_memory_value('plan'),
get_related_files(),
model,
expert_enabled=expert_enabled
)
except TaskCompletedException:
console.print(Panel(
"[green]Task was completed successfully as a one-shot operation.[/green]",
title="Task Completed",
style="green"
))
sys.exit(0)
finally:
pass
# Run implementation stage with task-specific agents
run_implementation_stage(
base_task,
get_memory_value('tasks'),
get_memory_value('plan'),
get_related_files(),
model,
expert_enabled=expert_enabled
)
if __name__ == "__main__":
main()

View File

@ -24,8 +24,8 @@ def print_agent_output(chunk: Dict[str, Any]) -> None:
console.print(Panel(Markdown(content['text']), title="🤖 Assistant"))
else:
if msg.content.strip():
console.print(Panel(Markdown(msg.content), title="🤖 Assistant"))
console.print(Panel(Markdown(msg.content.strip()), title="🤖 Assistant"))
elif 'tools' in chunk and 'messages' in chunk['tools']:
for msg in chunk['tools']['messages']:
if msg.status == 'error' and msg.content:
console.print(Panel(Markdown(msg.content), title="❌ Tool Error", border_style="red bold"))
console.print(Panel(Markdown(msg.content.strip()), title="❌ Tool Error", border_style="red bold"))

View File

@ -1,3 +0,0 @@
class TaskCompletedException(Exception):
"""Raised when a one-shot task has been completed."""
pass

View File

@ -1,5 +1,4 @@
from typing import Dict, List, Any, Union, TypedDict, Optional, Sequence, Set
from ra_aid.exceptions import TaskCompletedException
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
@ -15,10 +14,12 @@ class SnippetInfo(TypedDict):
console = Console()
# Global memory store
_global_memory: Dict[str, Union[List[Any], Dict[int, str], Dict[int, SnippetInfo], int, Set[str]]] = {
_global_memory: Dict[str, Union[List[Any], Dict[int, str], Dict[int, SnippetInfo], int, Set[str], bool, str]] = {
'research_notes': [],
'plans': [],
'tasks': {}, # Dict[int, str] - ID to task mapping
'task_completed': False, # Flag indicating if task is complete
'completion_message': '', # Message explaining completion
'task_id_counter': 0, # Counter for generating unique task IDs
'research_subtasks': [],
'key_facts': {}, # Dict[int, str] - ID to fact mapping
@ -316,18 +317,18 @@ def one_shot_completed(message: str) -> str:
Args:
message: Completion message to display
Raises:
ValueError: If there are pending research subtasks or implementation requests
TaskCompletedException: When task is truly complete with no pending items
Returns:
Never returns, always raises exception
Original message if task can be completed, or error message if there are
pending subtasks or implementation requests
"""
if len(_global_memory['research_subtasks']) > 0:
raise ValueError("Cannot complete in one shot - research subtasks pending")
return "Cannot complete in one shot - research subtasks pending"
if len(_global_memory['implementation_requested']) > 0:
raise ValueError("Cannot complete in one shot - implementation was requested")
raise TaskCompletedException(message)
return "Cannot complete in one shot - implementation was requested"
_global_memory['task_completed'] = True
_global_memory['completion_message'] = message
return message
def get_related_files() -> Set[str]:
"""Get the current set of related files.

View File

@ -74,5 +74,4 @@ def read_file_tool(
except Exception as e:
elapsed = time.time() - start_time
logging.error(f"Error reading file {filepath} after {elapsed:.2f}s: {str(e)}")
raise