feat: implement trajectory model and repository for tracking agent actions

This commit introduces a new `Trajectory` model to the database, which tracks the sequence of actions taken by agents, including tool executions and their results. The addition of the `TrajectoryRepository` allows for storing and retrieving these trajectories, enabling better analysis of agent behavior and debugging of issues.

Additionally, the commit refactors existing code to utilize the new repository and model, improving the overall architecture and maintainability of the codebase. This change is essential for enhancing the capabilities of the agent system and providing a more robust framework for future development.
This commit is contained in:
Ariel Frischer 2025-03-10 17:05:34 -07:00
commit 89e4556e7b
24 changed files with 1915 additions and 64 deletions

View File

@ -34,9 +34,9 @@ from ra_aid.version_check import check_for_newer_version
from ra_aid.agent_utils import (
create_agent,
run_agent_with_retry,
run_planning_agent,
run_research_agent,
)
from ra_aid.agents.research_agent import run_research_agent
from ra_aid.agents import run_planning_agent
from ra_aid.config import (
DEFAULT_MAX_TEST_CMD_RETRIES,
DEFAULT_RECURSION_LIMIT,
@ -53,6 +53,9 @@ from ra_aid.database.repositories.human_input_repository import (
from ra_aid.database.repositories.research_note_repository import (
ResearchNoteRepositoryManager, get_research_note_repository
)
from ra_aid.database.repositories.trajectory_repository import (
TrajectoryRepositoryManager, get_trajectory_repository
)
from ra_aid.database.repositories.related_files_repository import (
RelatedFilesRepositoryManager
)
@ -528,6 +531,7 @@ def main():
HumanInputRepositoryManager(db) as human_input_repo, \
ResearchNoteRepositoryManager(db) as research_note_repo, \
RelatedFilesRepositoryManager() as related_files_repo, \
TrajectoryRepositoryManager(db) as trajectory_repo, \
WorkLogRepositoryManager() as work_log_repo, \
ConfigRepositoryManager(config) as config_repo, \
EnvInvManager(env_data) as env_inv:
@ -537,6 +541,7 @@ def main():
logger.debug("Initialized HumanInputRepository")
logger.debug("Initialized ResearchNoteRepository")
logger.debug("Initialized RelatedFilesRepository")
logger.debug("Initialized TrajectoryRepository")
logger.debug("Initialized WorkLogRepository")
logger.debug("Initialized ConfigRepository")
logger.debug("Initialized Environment Inventory")

View File

@ -360,6 +360,7 @@ def create_agent(
agent_kwargs = build_agent_kwargs(checkpointer, max_input_tokens)
return create_react_agent(
model, tools, interrupt_after=["tools"], **agent_kwargs
<<<<<<< HEAD
)
@ -1414,6 +1415,13 @@ def run_task_implementation_agent(
except Exception as e:
logger.error("Implementation agent failed: %s", str(e), exc_info=True)
raise
=======
)
from ra_aid.agents.research_agent import run_research_agent, run_web_research_agent
from ra_aid.agents.implementation_agent import run_task_implementation_agent
>>>>>>> @{-1}
_CONTEXT_STACK = []

View File

@ -3,16 +3,30 @@ Agent package for various specialized agents.
This package contains agents responsible for specific tasks such as
cleaning up key facts and key snippets in the database when they
exceed certain thresholds.
exceed certain thresholds, as well as performing research tasks,
planning implementation, and implementing specific tasks.
Includes agents for:
- Key facts garbage collection
- Key snippets garbage collection
- Implementation tasks
- Planning tasks
- Research tasks
"""
from typing import Optional
from ra_aid.agents.implementation_agent import run_task_implementation_agent
from ra_aid.agents.key_facts_gc_agent import run_key_facts_gc_agent
from ra_aid.agents.key_snippets_gc_agent import run_key_snippets_gc_agent
from ra_aid.agents.planning_agent import run_planning_agent
from ra_aid.agents.research_agent import run_research_agent, run_web_research_agent
__all__ = ["run_key_facts_gc_agent", "run_key_snippets_gc_agent"]
__all__ = [
"run_key_facts_gc_agent",
"run_key_snippets_gc_agent",
"run_planning_agent",
"run_research_agent",
"run_task_implementation_agent",
"run_web_research_agent"
]

View File

@ -0,0 +1,317 @@
"""
Implementation agent for executing specific implementation tasks.
This module provides functionality for running a task implementation agent
to execute specific tasks based on a plan. The agent can be configured with
expert guidance and web research options.
"""
import inspect
import os
import uuid
from datetime import datetime
from typing import Any, Optional, List
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from ra_aid.agent_context import agent_context, is_completed, reset_completion_flags, should_exit
# Import agent_utils functions at runtime to avoid circular imports
from ra_aid import agent_utils
from ra_aid.database.repositories.key_fact_repository import get_key_fact_repository
from ra_aid.database.repositories.key_snippet_repository import get_key_snippet_repository
from ra_aid.database.repositories.human_input_repository import get_human_input_repository
from ra_aid.database.repositories.research_note_repository import get_research_note_repository
from ra_aid.database.repositories.config_repository import get_config_repository
from ra_aid.database.repositories.work_log_repository import get_work_log_repository
from ra_aid.env_inv_context import get_env_inv
from ra_aid.exceptions import AgentInterrupt
from ra_aid.llm import initialize_expert_llm
from ra_aid.logging_config import get_logger
from ra_aid.model_formatters import format_key_facts_dict
from ra_aid.model_formatters.key_snippets_formatter import format_key_snippets_dict
from ra_aid.model_formatters.research_notes_formatter import format_research_notes_dict
from ra_aid.models_params import models_params, DEFAULT_TOKEN_LIMIT
from ra_aid.project_info import format_project_info, get_project_info
from ra_aid.prompts.expert_prompts import EXPERT_PROMPT_SECTION_IMPLEMENTATION
from ra_aid.prompts.human_prompts import HUMAN_PROMPT_SECTION_IMPLEMENTATION
from ra_aid.prompts.implementation_prompts import IMPLEMENTATION_PROMPT
from ra_aid.prompts.reasoning_assist_prompt import REASONING_ASSIST_PROMPT_IMPLEMENTATION
from ra_aid.prompts.web_research_prompts import WEB_RESEARCH_PROMPT_SECTION_CHAT
from ra_aid.tool_configs import get_implementation_tools
from ra_aid.tools.memory import get_related_files, log_work_event
from ra_aid.text.processing import process_thinking_content
logger = get_logger(__name__)
console = Console()
def run_task_implementation_agent(
base_task: str,
tasks: list,
task: str,
plan: str,
related_files: list,
model,
*,
expert_enabled: bool = False,
web_research_enabled: bool = False,
memory: Optional[Any] = None,
thread_id: Optional[str] = None,
) -> Optional[str]:
"""Run an implementation agent for a specific task.
Args:
base_task: The main task being implemented
tasks: List of tasks to implement
task: The current task to implement
plan: The implementation plan
related_files: List of related files
model: The LLM model to use
expert_enabled: Whether expert mode is enabled
web_research_enabled: Whether web research is enabled
memory: Optional memory instance to use
thread_id: Optional thread ID (defaults to new UUID)
Returns:
Optional[str]: The completion message if task completed successfully
"""
thread_id = thread_id or str(uuid.uuid4())
logger.debug("Starting implementation agent with thread_id=%s", thread_id)
logger.debug(
"Implementation configuration: expert=%s, web=%s",
expert_enabled,
web_research_enabled,
)
logger.debug("Task details: base_task=%s, current_task=%s", base_task, task)
logger.debug("Related files: %s", related_files)
if memory is None:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
if thread_id is None:
thread_id = str(uuid.uuid4())
tools = get_implementation_tools(
expert_enabled=expert_enabled,
web_research_enabled=get_config_repository().get("web_research_enabled", False),
)
agent = agent_utils.create_agent(model, tools, checkpointer=memory, agent_type="planner")
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
working_directory = os.getcwd()
# Make sure key_facts is defined before using it
try:
key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
except RuntimeError as e:
logger.error(f"Failed to access key fact repository: {str(e)}")
key_facts = ""
# Get formatted research notes using repository
try:
repository = get_research_note_repository()
notes_dict = repository.get_notes_dict()
formatted_research_notes = format_research_notes_dict(notes_dict)
except RuntimeError as e:
logger.error(f"Failed to access research note repository: {str(e)}")
formatted_research_notes = ""
# Get latest project info
try:
project_info = get_project_info(".")
formatted_project_info = format_project_info(project_info)
except Exception as e:
logger.warning("Failed to get project info: %s", str(e))
formatted_project_info = "Project info unavailable"
# Get environment inventory information
env_inv = get_env_inv()
# Get model configuration to check for reasoning_assist_default
provider = get_config_repository().get("expert_provider", "")
model_name = get_config_repository().get("expert_model", "")
logger.debug("Checking for reasoning_assist_default on %s/%s", provider, model_name)
model_config = {}
provider_models = models_params.get(provider, {})
if provider_models and model_name in provider_models:
model_config = provider_models[model_name]
# Check if reasoning assist is explicitly enabled/disabled
force_assistance = get_config_repository().get("force_reasoning_assistance", False)
disable_assistance = get_config_repository().get(
"disable_reasoning_assistance", False
)
if force_assistance:
reasoning_assist_enabled = True
elif disable_assistance:
reasoning_assist_enabled = False
else:
# Fall back to model default
reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
# Initialize implementation guidance section
implementation_guidance_section = ""
# If reasoning assist is enabled, make a one-off call to the expert model
if reasoning_assist_enabled:
try:
logger.info(
"Reasoning assist enabled for model %s, getting implementation guidance",
model_name,
)
# Collect tool descriptions
tool_metadata = []
from ra_aid.tools.reflection import get_function_info as get_tool_info
for tool in tools:
try:
tool_info = get_tool_info(tool.func)
name = tool.func.__name__
description = inspect.getdoc(tool.func)
tool_metadata.append(
f"Tool: {name}\nDescription: {description}\n"
)
except Exception as e:
logger.warning(f"Error getting tool info for {tool}: {e}")
# Format tool metadata
formatted_tool_metadata = "\n".join(tool_metadata)
# Initialize expert model
expert_model = initialize_expert_llm(provider, model_name)
# Format the reasoning assist prompt for implementation
reasoning_assist_prompt = REASONING_ASSIST_PROMPT_IMPLEMENTATION.format(
current_date=current_date,
working_directory=working_directory,
task=task,
key_facts=key_facts,
key_snippets=format_key_snippets_dict(
get_key_snippet_repository().get_snippets_dict()
),
research_notes=formatted_research_notes,
related_files="\n".join(related_files),
env_inv=env_inv,
tool_metadata=formatted_tool_metadata,
project_info=formatted_project_info,
)
# Show the reasoning assist query in a panel
console.print(
Panel(
Markdown(
"Consulting with the reasoning model on the best implementation approach."
),
title="📝 Thinking about implementation...",
border_style="yellow",
)
)
logger.debug("Invoking expert model for implementation reasoning assist")
# Make the call to the expert model
response = expert_model.invoke(reasoning_assist_prompt)
# Check if the model supports think tags
supports_think_tag = model_config.get("supports_think_tag", False)
supports_thinking = model_config.get("supports_thinking", False)
# Process response content
content = None
if hasattr(response, "content"):
content = response.content
else:
# Fallback if content attribute is missing
content = str(response)
# Process the response content using the centralized function
content, extracted_thinking = process_thinking_content(
content=content,
supports_think_tag=supports_think_tag,
supports_thinking=supports_thinking,
panel_title="💭 Implementation Thinking",
panel_style="yellow",
logger=logger,
)
# Display the implementation guidance in a panel
console.print(
Panel(
Markdown(content),
title="Implementation Guidance",
border_style="blue",
)
)
# Format the implementation guidance section for the prompt
implementation_guidance_section = f"""<implementation guidance>
{content}
</implementation guidance>"""
logger.info("Received implementation guidance")
except Exception as e:
logger.error("Error getting implementation guidance: %s", e)
implementation_guidance_section = ""
prompt = IMPLEMENTATION_PROMPT.format(
current_date=current_date,
working_directory=working_directory,
base_task=base_task,
task=task,
tasks=tasks,
plan=plan,
related_files=related_files,
key_facts=key_facts,
key_snippets=format_key_snippets_dict(
get_key_snippet_repository().get_snippets_dict()
),
research_notes=formatted_research_notes,
work_log=get_work_log_repository().format_work_log(),
expert_section=EXPERT_PROMPT_SECTION_IMPLEMENTATION if expert_enabled else "",
human_section=(
HUMAN_PROMPT_SECTION_IMPLEMENTATION
if get_config_repository().get("hil", False)
else ""
),
web_research_section=(
WEB_RESEARCH_PROMPT_SECTION_CHAT
if get_config_repository().get("web_research_enabled", False)
else ""
),
env_inv=env_inv,
project_info=formatted_project_info,
implementation_guidance_section=implementation_guidance_section,
)
config_values = get_config_repository().get_all()
recursion_limit = get_config_repository().get(
"recursion_limit", 100
)
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": recursion_limit,
}
run_config.update(config_values)
try:
logger.debug("Implementation agent completed successfully")
none_or_fallback_handler = agent_utils.init_fallback_handler(agent, tools)
_result = agent_utils.run_agent_with_retry(agent, prompt, none_or_fallback_handler)
if _result:
# Log task implementation completion
log_work_event(f"Completed implementation of task: {task}")
return _result
except (KeyboardInterrupt, AgentInterrupt):
raise
except Exception as e:
logger.error("Implementation agent failed: %s", str(e), exc_info=True)
raise

View File

@ -17,7 +17,8 @@ from rich.panel import Panel
logger = logging.getLogger(__name__)
from ra_aid.agent_context import mark_should_exit
from ra_aid.agent_utils import create_agent, run_agent_with_retry
# Import agent_utils functions at runtime to avoid circular imports
from ra_aid import agent_utils
from ra_aid.database.repositories.key_fact_repository import get_key_fact_repository
from ra_aid.database.repositories.human_input_repository import get_human_input_repository
from ra_aid.database.repositories.config_repository import get_config_repository
@ -47,9 +48,7 @@ def delete_key_facts(fact_ids: List[int]) -> str:
# Try to get the current human input to protect its facts
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")
@ -132,9 +131,7 @@ def run_key_facts_gc_agent() -> None:
# Try to get the current human input ID to exclude its facts
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")
@ -164,7 +161,7 @@ def run_key_facts_gc_agent() -> None:
)
# Create the agent with the delete_key_facts tool
agent = create_agent(model, [delete_key_facts])
agent = agent_utils.create_agent(model, [delete_key_facts])
# Format the prompt with the eligible facts
prompt = KEY_FACTS_GC_PROMPT.format(key_facts=formatted_facts)
@ -175,7 +172,7 @@ def run_key_facts_gc_agent() -> None:
}
# Run the agent
run_agent_with_retry(agent, prompt, agent_config)
agent_utils.run_agent_with_retry(agent, prompt, agent_config)
# Get updated count
try:

View File

@ -13,7 +13,8 @@ from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from ra_aid.agent_utils import create_agent, run_agent_with_retry
# Import agent_utils functions at runtime to avoid circular imports
from ra_aid import agent_utils
from ra_aid.database.repositories.key_snippet_repository import get_key_snippet_repository
from ra_aid.database.repositories.human_input_repository import get_human_input_repository
from ra_aid.database.repositories.config_repository import get_config_repository
@ -45,9 +46,7 @@ def delete_key_snippets(snippet_ids: List[int]) -> str:
# Try to get the current human input to protect its snippets
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")
@ -124,9 +123,7 @@ def run_key_snippets_gc_agent() -> None:
# Try to get the current human input ID to exclude its snippets
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")
@ -168,7 +165,7 @@ def run_key_snippets_gc_agent() -> None:
)
# Create the agent with the delete_key_snippets tool
agent = create_agent(model, [delete_key_snippets])
agent = agent_utils.create_agent(model, [delete_key_snippets])
# Format the prompt with the eligible snippets
prompt = KEY_SNIPPETS_GC_PROMPT.format(key_snippets=formatted_snippets)
@ -179,7 +176,7 @@ def run_key_snippets_gc_agent() -> None:
}
# Run the agent
run_agent_with_retry(agent, prompt, agent_config)
agent_utils.run_agent_with_retry(agent, prompt, agent_config)
# Get updated count
updated_snippets = get_key_snippet_repository().get_all()

View File

@ -0,0 +1,362 @@
"""
Planning agent implementation.
This module provides functionality for running a planning agent to create implementation
plans. The agent can be configured with expert guidance and human-in-the-loop options.
"""
import inspect
import os
import uuid
from datetime import datetime
from typing import Any, Optional
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from ra_aid.agent_context import agent_context, is_completed, reset_completion_flags, should_exit
# Import agent_utils functions at runtime to avoid circular imports
from ra_aid import agent_utils
from ra_aid.console.formatting import print_stage_header
from ra_aid.database.repositories.key_fact_repository import get_key_fact_repository
from ra_aid.database.repositories.key_snippet_repository import get_key_snippet_repository
from ra_aid.database.repositories.research_note_repository import get_research_note_repository
from ra_aid.database.repositories.config_repository import get_config_repository
from ra_aid.database.repositories.work_log_repository import get_work_log_repository
from ra_aid.env_inv_context import get_env_inv
from ra_aid.exceptions import AgentInterrupt
from ra_aid.llm import initialize_expert_llm
from ra_aid.logging_config import get_logger
from ra_aid.model_formatters import format_key_facts_dict
from ra_aid.model_formatters.key_snippets_formatter import format_key_snippets_dict
from ra_aid.model_formatters.research_notes_formatter import format_research_notes_dict
from ra_aid.models_params import models_params
from ra_aid.project_info import format_project_info, get_project_info
from ra_aid.prompts.expert_prompts import EXPERT_PROMPT_SECTION_PLANNING
from ra_aid.prompts.human_prompts import HUMAN_PROMPT_SECTION_PLANNING
from ra_aid.prompts.planning_prompts import PLANNING_PROMPT
from ra_aid.prompts.reasoning_assist_prompt import REASONING_ASSIST_PROMPT_PLANNING
from ra_aid.prompts.web_research_prompts import WEB_RESEARCH_PROMPT_SECTION_PLANNING
from ra_aid.tool_configs import get_planning_tools
from ra_aid.tools.memory import get_related_files, log_work_event
logger = get_logger(__name__)
console = Console()
def run_planning_agent(
base_task: str,
model,
*,
expert_enabled: bool = False,
hil: bool = False,
memory: Optional[Any] = None,
thread_id: Optional[str] = None,
) -> Optional[str]:
"""Run a planning agent to create implementation plans.
Args:
base_task: The main task to plan implementation for
model: The LLM model to use
expert_enabled: Whether expert mode is enabled
hil: Whether human-in-the-loop mode is enabled
memory: Optional memory instance to use
thread_id: Optional thread ID (defaults to new UUID)
Returns:
Optional[str]: The completion message if planning completed successfully
"""
thread_id = thread_id or str(uuid.uuid4())
logger.debug("Starting planning agent with thread_id=%s", thread_id)
logger.debug("Planning configuration: expert=%s, hil=%s", expert_enabled, hil)
if memory is None:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
if thread_id is None:
thread_id = str(uuid.uuid4())
# Get latest project info
try:
project_info = get_project_info(".")
formatted_project_info = format_project_info(project_info)
except Exception as e:
logger.warning("Failed to get project info: %s", str(e))
formatted_project_info = "Project info unavailable"
tools = get_planning_tools(
expert_enabled=expert_enabled,
web_research_enabled=get_config_repository().get("web_research_enabled", False),
)
# Get model configuration
provider = get_config_repository().get("expert_provider", "")
model_name = get_config_repository().get("expert_model", "")
logger.debug("Checking for reasoning_assist_default on %s/%s", provider, model_name)
# Get model configuration to check for reasoning_assist_default
model_config = {}
provider_models = models_params.get(provider, {})
if provider_models and model_name in provider_models:
model_config = provider_models[model_name]
# Check if reasoning assist is explicitly enabled/disabled
force_assistance = get_config_repository().get("force_reasoning_assistance", False)
disable_assistance = get_config_repository().get(
"disable_reasoning_assistance", False
)
if force_assistance:
reasoning_assist_enabled = True
elif disable_assistance:
reasoning_assist_enabled = False
else:
# Fall back to model default
reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
# Get all the context information (used both for normal planning and reasoning assist)
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
working_directory = os.getcwd()
# Make sure key_facts is defined before using it
try:
key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
except RuntimeError as e:
logger.error(f"Failed to access key fact repository: {str(e)}")
key_facts = ""
# Make sure key_snippets is defined before using it
try:
key_snippets = format_key_snippets_dict(
get_key_snippet_repository().get_snippets_dict()
)
except RuntimeError as e:
logger.error(f"Failed to access key snippet repository: {str(e)}")
key_snippets = ""
# Get formatted research notes using repository
try:
repository = get_research_note_repository()
notes_dict = repository.get_notes_dict()
formatted_research_notes = format_research_notes_dict(notes_dict)
except RuntimeError as e:
logger.error(f"Failed to access research note repository: {str(e)}")
formatted_research_notes = ""
# Get related files
related_files = "\n".join(get_related_files())
# Get environment inventory information
env_inv = get_env_inv()
# Display the planning stage header before any reasoning assistance
print_stage_header("Planning Stage")
# Initialize expert guidance section
expert_guidance = ""
# If reasoning assist is enabled, make a one-off call to the expert model
if reasoning_assist_enabled:
try:
logger.info(
"Reasoning assist enabled for model %s, getting expert guidance",
model_name,
)
# Collect tool descriptions
tool_metadata = []
from ra_aid.tools.reflection import get_function_info as get_tool_info
for tool in tools:
try:
tool_info = get_tool_info(tool.func)
name = tool.func.__name__
description = inspect.getdoc(tool.func)
tool_metadata.append(f"Tool: {name}\nDescription: {description}\n")
except Exception as e:
logger.warning(f"Error getting tool info for {tool}: {e}")
# Format tool metadata
formatted_tool_metadata = "\n".join(tool_metadata)
# Initialize expert model
expert_model = initialize_expert_llm(provider, model_name)
# Format the reasoning assist prompt
reasoning_assist_prompt = REASONING_ASSIST_PROMPT_PLANNING.format(
current_date=current_date,
working_directory=working_directory,
base_task=base_task,
key_facts=key_facts,
key_snippets=key_snippets,
research_notes=formatted_research_notes,
related_files=related_files,
env_inv=env_inv,
tool_metadata=formatted_tool_metadata,
project_info=formatted_project_info,
)
# Show the reasoning assist query in a panel
console.print(
Panel(
Markdown(
"Consulting with the reasoning model on the best way to do this."
),
title="📝 Thinking about the plan...",
border_style="yellow",
)
)
logger.debug("Invoking expert model for reasoning assist")
# Make the call to the expert model
response = expert_model.invoke(reasoning_assist_prompt)
# Check if the model supports think tags
supports_think_tag = model_config.get("supports_think_tag", False)
supports_thinking = model_config.get("supports_thinking", False)
# Get response content, handling if it's a list (for Claude thinking mode)
content = None
if hasattr(response, "content"):
content = response.content
else:
# Fallback if content attribute is missing
content = str(response)
# Process content based on its type
if isinstance(content, list):
# Handle structured thinking mode (e.g., Claude 3.7)
thinking_content = None
response_text = None
# Process each item in the list
for item in content:
if isinstance(item, dict):
# Extract thinking content
if item.get("type") == "thinking" and "thinking" in item:
thinking_content = item["thinking"]
logger.debug("Found structured thinking content")
# Extract response text
elif item.get("type") == "text" and "text" in item:
response_text = item["text"]
logger.debug("Found structured response text")
# Display thinking content in a separate panel if available
if thinking_content and get_config_repository().get(
"show_thoughts", False
):
logger.debug(
f"Displaying structured thinking content ({len(thinking_content)} chars)"
)
console.print(
Panel(
Markdown(thinking_content),
title="💭 Expert Thinking",
border_style="yellow",
)
)
# Use response_text if available, otherwise fall back to joining
if response_text:
content = response_text
else:
# Fallback: join list items if structured extraction failed
logger.debug(
"No structured response text found, joining list items"
)
content = "\n".join(str(item) for item in content)
elif supports_think_tag or supports_thinking:
# Process thinking content using the centralized function
content, _ = agent_utils.process_thinking_content(
content=content,
supports_think_tag=supports_think_tag,
supports_thinking=supports_thinking,
panel_title="💭 Expert Thinking",
panel_style="yellow",
logger=logger,
)
# Display the expert guidance in a panel
console.print(
Panel(
Markdown(content), title="Reasoning Guidance", border_style="blue"
)
)
# Use the content as expert guidance
expert_guidance = (
content + "\n\nCONSULT WITH THE EXPERT FREQUENTLY ON THIS TASK"
)
logger.info("Received expert guidance for planning")
except Exception as e:
logger.error("Error getting expert guidance for planning: %s", e)
expert_guidance = ""
agent = agent_utils.create_agent(model, tools, checkpointer=memory, agent_type="planner")
expert_section = EXPERT_PROMPT_SECTION_PLANNING if expert_enabled else ""
human_section = HUMAN_PROMPT_SECTION_PLANNING if hil else ""
web_research_section = (
WEB_RESEARCH_PROMPT_SECTION_PLANNING
if get_config_repository().get("web_research_enabled", False)
else ""
)
# Prepare expert guidance section if expert guidance is available
expert_guidance_section = ""
if expert_guidance:
expert_guidance_section = f"""<expert guidance>
{expert_guidance}
</expert guidance>"""
planning_prompt = PLANNING_PROMPT.format(
current_date=current_date,
working_directory=working_directory,
expert_section=expert_section,
human_section=human_section,
web_research_section=web_research_section,
base_task=base_task,
project_info=formatted_project_info,
research_notes=formatted_research_notes,
related_files=related_files,
key_facts=key_facts,
key_snippets=key_snippets,
work_log=get_work_log_repository().format_work_log(),
research_only_note=(
""
if get_config_repository().get("research_only", False)
else " Only request implementation if the user explicitly asked for changes to be made."
),
env_inv=env_inv,
expert_guidance_section=expert_guidance_section,
)
config_values = get_config_repository().get_all()
recursion_limit = get_config_repository().get(
"recursion_limit", 100
)
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": recursion_limit,
}
run_config.update(config_values)
try:
logger.debug("Planning agent completed successfully")
none_or_fallback_handler = agent_utils.init_fallback_handler(agent, tools)
_result = agent_utils.run_agent_with_retry(agent, planning_prompt, none_or_fallback_handler)
if _result:
# Log planning completion
log_work_event(f"Completed planning phase for: {base_task}")
return _result
except (KeyboardInterrupt, AgentInterrupt):
raise
except Exception as e:
logger.error("Planning agent failed: %s", str(e), exc_info=True)
raise

View File

@ -0,0 +1,528 @@
"""
Research agent implementation.
This module provides functionality for running a research agent to investigate tasks
and queries. The agent can perform both general research and web-specific research
tasks, with options for expert guidance and human-in-the-loop collaboration.
"""
import inspect
import os
import uuid
from datetime import datetime
from typing import Any, Optional
from langchain_core.messages import SystemMessage
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from ra_aid.agent_context import agent_context, is_completed, reset_completion_flags, should_exit
# Import agent_utils functions at runtime to avoid circular imports
from ra_aid import agent_utils
from ra_aid.console.formatting import print_error
from ra_aid.database.repositories.key_fact_repository import get_key_fact_repository
from ra_aid.database.repositories.key_snippet_repository import get_key_snippet_repository
from ra_aid.database.repositories.human_input_repository import get_human_input_repository
from ra_aid.database.repositories.research_note_repository import get_research_note_repository
from ra_aid.database.repositories.config_repository import get_config_repository
from ra_aid.database.repositories.work_log_repository import get_work_log_repository
from ra_aid.env_inv_context import get_env_inv
from ra_aid.exceptions import AgentInterrupt
from ra_aid.llm import initialize_expert_llm
from ra_aid.logging_config import get_logger
from ra_aid.model_formatters import format_key_facts_dict
from ra_aid.model_formatters.key_snippets_formatter import format_key_snippets_dict
from ra_aid.model_formatters.research_notes_formatter import format_research_notes_dict
from ra_aid.models_params import models_params
from ra_aid.project_info import display_project_status, format_project_info, get_project_info
from ra_aid.prompts.expert_prompts import EXPERT_PROMPT_SECTION_RESEARCH
from ra_aid.prompts.human_prompts import HUMAN_PROMPT_SECTION_RESEARCH
from ra_aid.prompts.research_prompts import RESEARCH_ONLY_PROMPT, RESEARCH_PROMPT
from ra_aid.prompts.reasoning_assist_prompt import REASONING_ASSIST_PROMPT_RESEARCH
from ra_aid.prompts.web_research_prompts import (
WEB_RESEARCH_PROMPT,
WEB_RESEARCH_PROMPT_SECTION_RESEARCH,
)
from ra_aid.prompts.common_prompts import NEW_PROJECT_HINTS
from ra_aid.tool_configs import get_research_tools, get_web_research_tools
from ra_aid.tools.memory import get_related_files, log_work_event
logger = get_logger(__name__)
console = Console()
def run_research_agent(
base_task_or_query: str,
model,
*,
expert_enabled: bool = False,
research_only: bool = False,
hil: bool = False,
web_research_enabled: bool = False,
memory: Optional[Any] = None,
thread_id: Optional[str] = None,
console_message: Optional[str] = None,
) -> Optional[str]:
"""Run a research agent with the given configuration.
Args:
base_task_or_query: The main task or query for research
model: The LLM model to use
expert_enabled: Whether expert mode is enabled
research_only: Whether this is a research-only task
hil: Whether human-in-the-loop mode is enabled
web_research_enabled: Whether web research is enabled
memory: Optional memory instance to use
thread_id: Optional thread ID (defaults to new UUID)
console_message: Optional message to display before running
Returns:
Optional[str]: The completion message if task completed successfully
Example:
result = run_research_agent(
"Research Python async patterns",
model,
expert_enabled=True,
research_only=True
)
"""
thread_id = thread_id or str(uuid.uuid4())
logger.debug("Starting research agent with thread_id=%s", thread_id)
logger.debug(
"Research configuration: expert=%s, research_only=%s, hil=%s, web=%s",
expert_enabled,
research_only,
hil,
web_research_enabled,
)
if memory is None:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
working_directory = os.getcwd()
# Get the last human input, if it exists
base_task = base_task_or_query
try:
human_input_repository = get_human_input_repository()
most_recent_id = human_input_repository.get_most_recent_id()
if most_recent_id is not None:
recent_input = human_input_repository.get(most_recent_id)
if recent_input and recent_input.content != base_task_or_query:
last_human_input = recent_input.content
base_task = (
f"<last human input>{last_human_input}</last human input>\n{base_task}"
)
except RuntimeError as e:
logger.error(f"Failed to access human input repository: {str(e)}")
# Continue without appending last human input
try:
key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
except RuntimeError as e:
logger.error(f"Failed to access key fact repository: {str(e)}")
key_facts = ""
key_snippets = format_key_snippets_dict(
get_key_snippet_repository().get_snippets_dict()
)
related_files = get_related_files()
try:
project_info = get_project_info(".", file_limit=2000)
formatted_project_info = format_project_info(project_info)
except Exception as e:
logger.warning(f"Failed to get project info: {e}")
formatted_project_info = ""
tools = get_research_tools(
research_only=research_only,
expert_enabled=expert_enabled,
human_interaction=hil,
web_research_enabled=get_config_repository().get("web_research_enabled", False),
)
# Get model info for reasoning assistance configuration
provider = get_config_repository().get("expert_provider", "")
model_name = get_config_repository().get("expert_model", "")
# Get model configuration to check for reasoning_assist_default
model_config = {}
provider_models = models_params.get(provider, {})
if provider_models and model_name in provider_models:
model_config = provider_models[model_name]
# Check if reasoning assist is explicitly enabled/disabled
force_assistance = get_config_repository().get("force_reasoning_assistance", False)
disable_assistance = get_config_repository().get(
"disable_reasoning_assistance", False
)
if force_assistance:
reasoning_assist_enabled = True
elif disable_assistance:
reasoning_assist_enabled = False
else:
# Fall back to model default
reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
expert_guidance = ""
# Get research note information for reasoning assistance
try:
research_notes = format_research_notes_dict(
get_research_note_repository().get_notes_dict()
)
except Exception as e:
logger.warning(f"Failed to get research notes: {e}")
research_notes = ""
# If reasoning assist is enabled, make a one-off call to the expert model
if reasoning_assist_enabled:
try:
logger.info(
"Reasoning assist enabled for model %s, getting expert guidance",
model_name,
)
# Collect tool descriptions
tool_metadata = []
from ra_aid.tools.reflection import get_function_info as get_tool_info
for tool in tools:
try:
tool_info = get_tool_info(tool.func)
name = tool.func.__name__
description = inspect.getdoc(tool.func)
tool_metadata.append(f"Tool: {tool_info}\nDescription: {description}\n")
except Exception as e:
logger.warning(f"Error getting tool info for {tool}: {e}")
# Format tool metadata
formatted_tool_metadata = "\n".join(tool_metadata)
# Initialize expert model
expert_model = initialize_expert_llm(provider, model_name)
# Format the reasoning assist prompt
reasoning_assist_prompt = REASONING_ASSIST_PROMPT_RESEARCH.format(
current_date=current_date,
working_directory=working_directory,
base_task=base_task,
key_facts=key_facts,
key_snippets=key_snippets,
research_notes=research_notes,
related_files=related_files,
env_inv=get_env_inv(),
tool_metadata=formatted_tool_metadata,
project_info=formatted_project_info,
)
# Show the reasoning assist query in a panel
console.print(
Panel(
Markdown(
"Consulting with the reasoning model on the best research approach."
),
title="📝 Thinking about research strategy...",
border_style="yellow",
)
)
logger.debug("Invoking expert model for reasoning assist")
# Make the call to the expert model
response = expert_model.invoke(reasoning_assist_prompt)
# Check if the model supports think tags
supports_think_tag = model_config.get("supports_think_tag", False)
supports_thinking = model_config.get("supports_thinking", False)
# Get response content, handling if it's a list (for Claude thinking mode)
content = None
if hasattr(response, "content"):
content = response.content
else:
# Fallback if content attribute is missing
content = str(response)
# Process content based on its type
if isinstance(content, list):
# Handle structured thinking mode (e.g., Claude 3.7)
thinking_content = None
response_text = None
# Process each item in the list
for item in content:
if isinstance(item, dict):
# Extract thinking content
if item.get("type") == "thinking" and "thinking" in item:
thinking_content = item["thinking"]
logger.debug("Found structured thinking content")
# Extract response text
elif item.get("type") == "text" and "text" in item:
response_text = item["text"]
logger.debug("Found structured response text")
# Display thinking content in a separate panel if available
if thinking_content and get_config_repository().get(
"show_thoughts", False
):
logger.debug(
f"Displaying structured thinking content ({len(thinking_content)} chars)"
)
console.print(
Panel(
Markdown(thinking_content),
title="💭 Expert Thinking",
border_style="yellow",
)
)
# Use response_text if available, otherwise fall back to joining
if response_text:
content = response_text
else:
# Fallback: join list items if structured extraction failed
logger.debug(
"No structured response text found, joining list items"
)
content = "\n".join(str(item) for item in content)
elif supports_think_tag or supports_thinking:
# Process thinking content using the centralized function
content, _ = agent_utils.process_thinking_content(
content=content,
supports_think_tag=supports_think_tag,
supports_thinking=supports_thinking,
panel_title="💭 Expert Thinking",
panel_style="yellow",
logger=logger,
)
# Display the expert guidance in a panel
console.print(
Panel(
Markdown(content),
title="Research Strategy Guidance",
border_style="blue",
)
)
# Use the content as expert guidance
expert_guidance = (
content + "\n\nCONSULT WITH THE EXPERT FREQUENTLY DURING RESEARCH"
)
logger.info("Received expert guidance for research")
except Exception as e:
logger.error("Error getting expert guidance for research: %s", e)
expert_guidance = ""
agent = agent_utils.create_agent(model, tools, checkpointer=memory, agent_type="research")
expert_section = EXPERT_PROMPT_SECTION_RESEARCH if expert_enabled else ""
human_section = HUMAN_PROMPT_SECTION_RESEARCH if hil else ""
web_research_section = (
WEB_RESEARCH_PROMPT_SECTION_RESEARCH
if get_config_repository().get("web_research_enabled")
else ""
)
# Prepare expert guidance section if expert guidance is available
expert_guidance_section = ""
if expert_guidance:
expert_guidance_section = f"""<expert guidance>
{expert_guidance}
</expert guidance>
YOU MUST FOLLOW THE EXPERT'S GUIDANCE OR ELSE BE TERMINATED!
"""
# Format research notes if available
# We get research notes earlier for reasoning assistance
# Get environment inventory information
prompt = (RESEARCH_ONLY_PROMPT if research_only else RESEARCH_PROMPT).format(
current_date=current_date,
working_directory=working_directory,
base_task=base_task,
research_only_note=(
""
if research_only
else " Only request implementation if the user explicitly asked for changes to be made."
),
expert_section=expert_section,
human_section=human_section,
web_research_section=web_research_section,
key_facts=key_facts,
work_log=get_work_log_repository().format_work_log(),
key_snippets=key_snippets,
related_files=related_files,
project_info=formatted_project_info,
new_project_hints=NEW_PROJECT_HINTS if project_info.is_new else "",
env_inv=get_env_inv(),
expert_guidance_section=expert_guidance_section,
)
config = get_config_repository().get_all()
recursion_limit = config.get("recursion_limit", 100)
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": recursion_limit,
}
run_config.update(config)
try:
if console_message:
console.print(
Panel(Markdown(console_message), title="🔬 Looking into it...")
)
if project_info:
display_project_status(project_info)
if agent is not None:
logger.debug("Research agent created successfully")
none_or_fallback_handler = agent_utils.init_fallback_handler(agent, tools)
_result = agent_utils.run_agent_with_retry(agent, prompt, none_or_fallback_handler)
if _result:
# Log research completion
log_work_event(f"Completed research phase for: {base_task_or_query}")
return _result
else:
logger.debug("No model provided, running web research tools directly")
return run_web_research_agent(
base_task_or_query,
model=None,
expert_enabled=expert_enabled,
hil=hil,
web_research_enabled=web_research_enabled,
memory=memory,
thread_id=thread_id,
console_message=console_message,
)
except (KeyboardInterrupt, AgentInterrupt):
raise
except Exception as e:
logger.error("Research agent failed: %s", str(e), exc_info=True)
raise
def run_web_research_agent(
query: str,
model,
*,
expert_enabled: bool = False,
hil: bool = False,
web_research_enabled: bool = False,
memory: Optional[Any] = None,
thread_id: Optional[str] = None,
console_message: Optional[str] = None,
) -> Optional[str]:
"""Run a web research agent with the given configuration.
Args:
query: The mainquery for web research
model: The LLM model to use
expert_enabled: Whether expert mode is enabled
hil: Whether human-in-the-loop mode is enabled
web_research_enabled: Whether web research is enabled
memory: Optional memory instance to use
thread_id: Optional thread ID (defaults to new UUID)
console_message: Optional message to display before running
Returns:
Optional[str]: The completion message if task completed successfully
Example:
result = run_web_research_agent(
"Research latest Python async patterns",
model,
expert_enabled=True
)
"""
thread_id = thread_id or str(uuid.uuid4())
logger.debug("Starting web research agent with thread_id=%s", thread_id)
logger.debug(
"Web research configuration: expert=%s, hil=%s, web=%s",
expert_enabled,
hil,
web_research_enabled,
)
if memory is None:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
if thread_id is None:
thread_id = str(uuid.uuid4())
tools = get_web_research_tools(expert_enabled=expert_enabled)
agent = agent_utils.create_agent(model, tools, checkpointer=memory, agent_type="research")
expert_section = EXPERT_PROMPT_SECTION_RESEARCH if expert_enabled else ""
human_section = HUMAN_PROMPT_SECTION_RESEARCH if hil else ""
try:
key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
except RuntimeError as e:
logger.error(f"Failed to access key fact repository: {str(e)}")
key_facts = ""
try:
key_snippets = format_key_snippets_dict(
get_key_snippet_repository().get_snippets_dict()
)
except RuntimeError as e:
logger.error(f"Failed to access key snippet repository: {str(e)}")
key_snippets = ""
related_files = get_related_files()
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
working_directory = os.getcwd()
# Get environment inventory information
prompt = WEB_RESEARCH_PROMPT.format(
current_date=current_date,
working_directory=working_directory,
web_research_query=query,
expert_section=expert_section,
human_section=human_section,
key_facts=key_facts,
work_log=get_work_log_repository().format_work_log(),
key_snippets=key_snippets,
related_files=related_files,
env_inv=get_env_inv(),
)
config = get_config_repository().get_all()
recursion_limit = config.get("recursion_limit", 100)
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": recursion_limit,
}
if config:
run_config.update(config)
try:
if console_message:
console.print(Panel(Markdown(console_message), title="🔬 Researching..."))
logger.debug("Web research agent completed successfully")
none_or_fallback_handler = agent_utils.init_fallback_handler(agent, tools)
_result = agent_utils.run_agent_with_retry(agent, prompt, none_or_fallback_handler)
if _result:
# Log web research completion
log_work_event(f"Completed web research phase for: {query}")
return _result
except (KeyboardInterrupt, AgentInterrupt):
raise
except Exception as e:
logger.error("Web research agent failed: %s", str(e), exc_info=True)
raise

View File

@ -48,9 +48,7 @@ def delete_research_notes(note_ids: List[int]) -> str:
# Try to get the current human input to protect its notes
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")
@ -138,9 +136,7 @@ def run_research_notes_gc_agent(threshold: int = 30) -> None:
# Try to get the current human input ID to exclude its notes
current_human_input_id = None
try:
recent_inputs = get_human_input_repository().get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
current_human_input_id = recent_inputs[0].id
current_human_input_id = get_human_input_repository().get_most_recent_id()
except Exception as e:
console.print(f"Warning: Could not retrieve current human input: {str(e)}")

View File

@ -42,8 +42,8 @@ def initialize_database():
# to avoid circular imports
# Note: This import needs to be here, not at the top level
try:
from ra_aid.database.models import KeyFact, KeySnippet, HumanInput, ResearchNote
db.create_tables([KeyFact, KeySnippet, HumanInput, ResearchNote], safe=True)
from ra_aid.database.models import KeyFact, KeySnippet, HumanInput, ResearchNote, Trajectory
db.create_tables([KeyFact, KeySnippet, HumanInput, ResearchNote, Trajectory], safe=True)
logger.debug("Ensured database tables exist")
except Exception as e:
logger.error(f"Error creating tables: {str(e)}")
@ -163,3 +163,37 @@ class ResearchNote(BaseModel):
class Meta:
table_name = "research_note"
class Trajectory(BaseModel):
"""
Model representing an agent trajectory stored in the database.
Trajectories track the sequence of actions taken by agents, including
tool executions and their results. This enables analysis of agent behavior,
debugging of issues, and reconstruction of the decision-making process.
Each trajectory record captures details about a single tool execution:
- Which tool was used
- What parameters were passed to the tool
- What result was returned by the tool
- UI rendering data for displaying the tool execution
- Cost and token usage metrics (placeholders for future implementation)
- Error information (when a tool execution fails)
"""
human_input = peewee.ForeignKeyField(HumanInput, backref='trajectories', null=True)
tool_name = peewee.TextField()
tool_parameters = peewee.TextField() # JSON-encoded parameters
tool_result = peewee.TextField() # JSON-encoded result
step_data = peewee.TextField() # JSON-encoded UI rendering data
record_type = peewee.TextField() # Type of trajectory record
cost = peewee.FloatField(null=True) # Placeholder for cost tracking
tokens = peewee.IntegerField(null=True) # Placeholder for token usage tracking
is_error = peewee.BooleanField(default=False) # Flag indicating if this record represents an error
error_message = peewee.TextField(null=True) # The error message
error_type = peewee.TextField(null=True) # The type/class of the error
error_details = peewee.TextField(null=True) # Additional error details like stack traces or context
# created_at and updated_at are inherited from BaseModel
class Meta:
table_name = "trajectory"

View File

@ -258,6 +258,25 @@ class HumanInputRepository:
logger.error(f"Failed to fetch recent human inputs: {str(e)}")
raise
def get_most_recent_id(self) -> Optional[int]:
"""
Get the ID of the most recent human input record.
Returns:
Optional[int]: The ID of the most recent human input, or None if no records exist
Raises:
peewee.DatabaseError: If there's an error accessing the database
"""
try:
recent_inputs = self.get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
return recent_inputs[0].id
return None
except peewee.DatabaseError as e:
logger.error(f"Failed to fetch most recent human input ID: {str(e)}")
raise
def get_by_source(self, source: str) -> List[HumanInput]:
"""
Retrieve human input records by source.

View File

@ -0,0 +1,417 @@
"""
Trajectory repository implementation for database access.
This module provides a repository implementation for the Trajectory model,
following the repository pattern for data access abstraction. It handles
operations for storing and retrieving agent action trajectories.
"""
from typing import Dict, List, Optional, Any, Union
import contextvars
import json
import logging
import peewee
from ra_aid.database.models import Trajectory, HumanInput
from ra_aid.logging_config import get_logger
logger = get_logger(__name__)
# Create contextvar to hold the TrajectoryRepository instance
trajectory_repo_var = contextvars.ContextVar("trajectory_repo", default=None)
class TrajectoryRepositoryManager:
"""
Context manager for TrajectoryRepository.
This class provides a context manager interface for TrajectoryRepository,
using the contextvars approach for thread safety.
Example:
with DatabaseManager() as db:
with TrajectoryRepositoryManager(db) as repo:
# Use the repository
trajectory = repo.create(
tool_name="ripgrep_search",
tool_parameters={"pattern": "example"}
)
all_trajectories = repo.get_all()
"""
def __init__(self, db):
"""
Initialize the TrajectoryRepositoryManager.
Args:
db: Database connection to use (required)
"""
self.db = db
def __enter__(self) -> 'TrajectoryRepository':
"""
Initialize the TrajectoryRepository and return it.
Returns:
TrajectoryRepository: The initialized repository
"""
repo = TrajectoryRepository(self.db)
trajectory_repo_var.set(repo)
return repo
def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[Exception],
exc_tb: Optional[object],
) -> None:
"""
Reset the repository when exiting the context.
Args:
exc_type: The exception type if an exception was raised
exc_val: The exception value if an exception was raised
exc_tb: The traceback if an exception was raised
"""
# Reset the contextvar to None
trajectory_repo_var.set(None)
# Don't suppress exceptions
return False
def get_trajectory_repository() -> 'TrajectoryRepository':
"""
Get the current TrajectoryRepository instance.
Returns:
TrajectoryRepository: The current repository instance
Raises:
RuntimeError: If no repository has been initialized with TrajectoryRepositoryManager
"""
repo = trajectory_repo_var.get()
if repo is None:
raise RuntimeError(
"No TrajectoryRepository available. "
"Make sure to initialize one with TrajectoryRepositoryManager first."
)
return repo
class TrajectoryRepository:
"""
Repository for managing Trajectory database operations.
This class provides methods for performing CRUD operations on the Trajectory model,
abstracting the database access details from the business logic. It handles
serialization and deserialization of JSON fields for tool parameters, results,
and UI rendering data.
Example:
with DatabaseManager() as db:
with TrajectoryRepositoryManager(db) as repo:
trajectory = repo.create(
tool_name="ripgrep_search",
tool_parameters={"pattern": "example"}
)
all_trajectories = repo.get_all()
"""
def __init__(self, db):
"""
Initialize the repository with a database connection.
Args:
db: Database connection to use (required)
"""
if db is None:
raise ValueError("Database connection is required for TrajectoryRepository")
self.db = db
def create(
self,
tool_name: str,
tool_parameters: Dict[str, Any],
tool_result: Optional[Dict[str, Any]] = None,
step_data: Optional[Dict[str, Any]] = None,
record_type: str = "tool_execution",
human_input_id: Optional[int] = None,
cost: Optional[float] = None,
tokens: Optional[int] = None,
is_error: bool = False,
error_message: Optional[str] = None,
error_type: Optional[str] = None,
error_details: Optional[str] = None
) -> Trajectory:
"""
Create a new trajectory record in the database.
Args:
tool_name: Name of the tool that was executed
tool_parameters: Parameters passed to the tool (will be JSON encoded)
tool_result: Result returned by the tool (will be JSON encoded)
step_data: UI rendering data (will be JSON encoded)
record_type: Type of trajectory record
human_input_id: Optional ID of the associated human input
cost: Optional cost of the operation (placeholder)
tokens: Optional token usage (placeholder)
is_error: Flag indicating if this record represents an error (default: False)
error_message: The error message (if is_error is True)
error_type: The type/class of the error (if is_error is True)
error_details: Additional error details like stack traces (if is_error is True)
Returns:
Trajectory: The newly created trajectory instance
Raises:
peewee.DatabaseError: If there's an error creating the record
"""
try:
# Serialize JSON fields
tool_parameters_json = json.dumps(tool_parameters)
tool_result_json = json.dumps(tool_result) if tool_result is not None else None
step_data_json = json.dumps(step_data) if step_data is not None else None
# Create human input reference if provided
human_input = None
if human_input_id is not None:
try:
human_input = HumanInput.get_by_id(human_input_id)
except peewee.DoesNotExist:
logger.warning(f"Human input with ID {human_input_id} not found")
# Create the trajectory record
trajectory = Trajectory.create(
human_input=human_input,
tool_name=tool_name,
tool_parameters=tool_parameters_json,
tool_result=tool_result_json,
step_data=step_data_json,
record_type=record_type,
cost=cost,
tokens=tokens,
is_error=is_error,
error_message=error_message,
error_type=error_type,
error_details=error_details
)
logger.debug(f"Created trajectory record ID {trajectory.id} for tool: {tool_name}")
return trajectory
except peewee.DatabaseError as e:
logger.error(f"Failed to create trajectory record: {str(e)}")
raise
def get(self, trajectory_id: int) -> Optional[Trajectory]:
"""
Retrieve a trajectory record by its ID.
Args:
trajectory_id: The ID of the trajectory record to retrieve
Returns:
Optional[Trajectory]: The trajectory instance if found, None otherwise
Raises:
peewee.DatabaseError: If there's an error accessing the database
"""
try:
return Trajectory.get_or_none(Trajectory.id == trajectory_id)
except peewee.DatabaseError as e:
logger.error(f"Failed to fetch trajectory {trajectory_id}: {str(e)}")
raise
def update(
self,
trajectory_id: int,
tool_result: Optional[Dict[str, Any]] = None,
step_data: Optional[Dict[str, Any]] = None,
cost: Optional[float] = None,
tokens: Optional[int] = None,
is_error: Optional[bool] = None,
error_message: Optional[str] = None,
error_type: Optional[str] = None,
error_details: Optional[str] = None
) -> Optional[Trajectory]:
"""
Update an existing trajectory record.
This is typically used to update the result or metrics after tool execution completes.
Args:
trajectory_id: The ID of the trajectory record to update
tool_result: Updated tool result (will be JSON encoded)
step_data: Updated UI rendering data (will be JSON encoded)
cost: Updated cost information
tokens: Updated token usage information
is_error: Flag indicating if this record represents an error
error_message: The error message
error_type: The type/class of the error
error_details: Additional error details like stack traces
Returns:
Optional[Trajectory]: The updated trajectory if found, None otherwise
Raises:
peewee.DatabaseError: If there's an error updating the record
"""
try:
# First check if the trajectory exists
trajectory = self.get(trajectory_id)
if not trajectory:
logger.warning(f"Attempted to update non-existent trajectory {trajectory_id}")
return None
# Update the fields if provided
update_data = {}
if tool_result is not None:
update_data["tool_result"] = json.dumps(tool_result)
if step_data is not None:
update_data["step_data"] = json.dumps(step_data)
if cost is not None:
update_data["cost"] = cost
if tokens is not None:
update_data["tokens"] = tokens
if is_error is not None:
update_data["is_error"] = is_error
if error_message is not None:
update_data["error_message"] = error_message
if error_type is not None:
update_data["error_type"] = error_type
if error_details is not None:
update_data["error_details"] = error_details
if update_data:
query = Trajectory.update(**update_data).where(Trajectory.id == trajectory_id)
query.execute()
logger.debug(f"Updated trajectory record ID {trajectory_id}")
return self.get(trajectory_id)
return trajectory
except peewee.DatabaseError as e:
logger.error(f"Failed to update trajectory {trajectory_id}: {str(e)}")
raise
def delete(self, trajectory_id: int) -> bool:
"""
Delete a trajectory record by its ID.
Args:
trajectory_id: The ID of the trajectory record to delete
Returns:
bool: True if the record was deleted, False if it wasn't found
Raises:
peewee.DatabaseError: If there's an error deleting the record
"""
try:
# First check if the trajectory exists
trajectory = self.get(trajectory_id)
if not trajectory:
logger.warning(f"Attempted to delete non-existent trajectory {trajectory_id}")
return False
# Delete the trajectory
trajectory.delete_instance()
logger.debug(f"Deleted trajectory record ID {trajectory_id}")
return True
except peewee.DatabaseError as e:
logger.error(f"Failed to delete trajectory {trajectory_id}: {str(e)}")
raise
def get_all(self) -> Dict[int, Trajectory]:
"""
Retrieve all trajectory records from the database.
Returns:
Dict[int, Trajectory]: Dictionary mapping trajectory IDs to trajectory instances
Raises:
peewee.DatabaseError: If there's an error accessing the database
"""
try:
return {trajectory.id: trajectory for trajectory in Trajectory.select().order_by(Trajectory.id)}
except peewee.DatabaseError as e:
logger.error(f"Failed to fetch all trajectories: {str(e)}")
raise
def get_trajectories_by_human_input(self, human_input_id: int) -> List[Trajectory]:
"""
Retrieve all trajectory records associated with a specific human input.
Args:
human_input_id: The ID of the human input to get trajectories for
Returns:
List[Trajectory]: List of trajectory instances associated with the human input
Raises:
peewee.DatabaseError: If there's an error accessing the database
"""
try:
return list(Trajectory.select().where(Trajectory.human_input == human_input_id).order_by(Trajectory.id))
except peewee.DatabaseError as e:
logger.error(f"Failed to fetch trajectories for human input {human_input_id}: {str(e)}")
raise
def parse_json_field(self, json_str: Optional[str]) -> Optional[Dict[str, Any]]:
"""
Parse a JSON string into a Python dictionary.
Args:
json_str: JSON string to parse
Returns:
Optional[Dict[str, Any]]: Parsed dictionary or None if input is None or invalid
"""
if not json_str:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON field: {str(e)}")
return None
def get_parsed_trajectory(self, trajectory_id: int) -> Optional[Dict[str, Any]]:
"""
Get a trajectory record with JSON fields parsed into dictionaries.
Args:
trajectory_id: ID of the trajectory to retrieve
Returns:
Optional[Dict[str, Any]]: Dictionary with trajectory data and parsed JSON fields,
or None if not found
"""
trajectory = self.get(trajectory_id)
if trajectory is None:
return None
return {
"id": trajectory.id,
"created_at": trajectory.created_at,
"updated_at": trajectory.updated_at,
"tool_name": trajectory.tool_name,
"tool_parameters": self.parse_json_field(trajectory.tool_parameters),
"tool_result": self.parse_json_field(trajectory.tool_result),
"step_data": self.parse_json_field(trajectory.step_data),
"record_type": trajectory.record_type,
"cost": trajectory.cost,
"tokens": trajectory.tokens,
"human_input_id": trajectory.human_input.id if trajectory.human_input else None,
"is_error": trajectory.is_error,
"error_message": trajectory.error_message,
"error_type": trajectory.error_type,
"error_details": trajectory.error_details,
}

View File

@ -0,0 +1,104 @@
"""Peewee migrations -- 007_20250310_184046_add_trajectory_model.py.
Some examples (model - class or model name)::
> Model = migrator.orm['table_name'] # Return model in current state by name
> Model = migrator.ModelClass # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.run(func, *args, **kwargs) # Run python function with the given args
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.add_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
> migrator.add_constraint(model, name, sql)
> migrator.drop_index(model, *col_names)
> migrator.drop_not_null(model, *field_names)
> migrator.drop_constraints(model, *constraints)
"""
from contextlib import suppress
import peewee as pw
from peewee_migrate import Migrator
with suppress(ImportError):
import playhouse.postgres_ext as pw_pext
def migrate(migrator: Migrator, database: pw.Database, *, fake=False):
"""Create the trajectory table for storing agent action trajectories."""
# Check if the table already exists
try:
database.execute_sql("SELECT id FROM trajectory LIMIT 1")
# If we reach here, the table exists
return
except pw.OperationalError:
# Table doesn't exist, safe to create
pass
@migrator.create_model
class Trajectory(pw.Model):
id = pw.AutoField()
created_at = pw.DateTimeField()
updated_at = pw.DateTimeField()
tool_name = pw.TextField()
tool_parameters = pw.TextField() # JSON-encoded parameters
tool_result = pw.TextField() # JSON-encoded result
step_data = pw.TextField() # JSON-encoded UI rendering data
record_type = pw.TextField() # Type of trajectory record
cost = pw.FloatField(null=True) # Placeholder for cost tracking
tokens = pw.IntegerField(null=True) # Placeholder for token usage tracking
is_error = pw.BooleanField(default=False) # Flag indicating if this record represents an error
error_message = pw.TextField(null=True) # The error message
error_type = pw.TextField(null=True) # The type/class of the error
error_details = pw.TextField(null=True) # Additional error details like stack traces or context
# We'll add the human_input foreign key in a separate step for safety
class Meta:
table_name = "trajectory"
# Check if HumanInput model exists before adding the foreign key
try:
HumanInput = migrator.orm['human_input']
# Only add the foreign key if the human_input_id column doesn't already exist
try:
database.execute_sql("SELECT human_input_id FROM trajectory LIMIT 1")
except pw.OperationalError:
# Column doesn't exist, safe to add
migrator.add_fields(
'trajectory',
human_input=pw.ForeignKeyField(
HumanInput,
null=True,
field='id',
on_delete='SET NULL'
)
)
except KeyError:
# HumanInput doesn't exist, we'll skip adding the foreign key
pass
def rollback(migrator: Migrator, database: pw.Database, *, fake=False):
"""Remove the trajectory table."""
# First remove any foreign key fields
try:
migrator.remove_fields('trajectory', 'human_input')
except pw.OperationalError:
# Field might not exist, that's fine
pass
# Then remove the model
migrator.remove_model('trajectory')

View File

@ -139,6 +139,8 @@ put_complete_file_contents("/path/to/file.py", '''def example_function():
</example good output>
{last_result_section}
ANSWER QUICKLY AND CONFIDENTLY WITH A FUNCTION CALL. IF YOU ARE UNSURE, JUST YEET THE BEST FUNCTION CALL YOU CAN.
"""
# Prompt to send when the model gives no tool call

View File

@ -45,8 +45,6 @@ YOU MUST **EXPLICITLY** INCLUDE ANY PATHS FROM THE ABOVE INFO IF NEEDED. IT IS N
READ AND STUDY ACTUAL LIBRARY HEADERS/CODE FROM THE ENVIRONMENT, IF AVAILABLE AND RELEVANT.
{implementation_guidance_section}
Important Notes:
- Focus solely on the given task and implement it as described.
- Scale the complexity of your solution to the complexity of the request. For simple requests, keep it straightforward and minimal. For complex requests, maintain the previously planned depth.
@ -95,4 +93,6 @@ IF YOU CAN SEE THE CODE WRITTEN/CHANGED BY THE PROGRAMMER, TRUST IT. YOU DO NOT
YOU MUST READ FILES BEFORE WRITING OR CHANGING THEM.
NEVER ANNOUNCE WHAT YOU ARE DOING, JUST DO IT!
{implementation_guidance_section}
"""

View File

@ -48,8 +48,6 @@ Work done so far:
{work_log}
</work log>
{expert_guidance_section}
Guidelines:
If you need additional input or assistance from the expert (if expert is available), especially for debugging, deeper logic analysis, or correctness checks, use emit_expert_context to provide all relevant context and wait for the expert's response.
@ -102,4 +100,6 @@ DO NOT USE run_shell_command TO WRITE ANY FILE CONTENTS! USE request_task_implem
WORK AND TEST INCREMENTALLY, AND RUN MULTIPLE IMPLEMENTATION TASKS WHERE APPROPRIATE.
NEVER ANNOUNCE WHAT YOU ARE DOING, JUST DO IT!
{expert_guidance_section}
"""

View File

@ -23,6 +23,10 @@ Working Directory: {working_directory}
{related_files}
</related files>
<project info>
{project_info}
</project info>
<environment information>
{env_inv}
</environment information>
@ -46,8 +50,24 @@ WE DO NOT WANT TO EXCESSIVELY EMIT TINY KEY SNIPPETS --THEY SHOULD BE "paragraph
Given the available information, tools, and base task, write a couple paragraphs about how an agentic system might use the available tools to plan the base task, break it down into tasks, and request implementation of those tasks. The agent will not be writing any code at this point, so we should keep it to high level tasks and keep the focus on project planning.
The agent has a tendency to do the same work/functin calls over and over again.
The agent is so dumb it needs you to explicitly say how to use the parameters to the tools as well.
Answer quickly and confidently with five sentences at most.
DO NOT WRITE CODE
WRITE AT LEAST ONE SENTENCE
WRITE NO MORE THAN FIVE PARAGRAPHS.
WRITE ABOUT HOW THE AGENT WILL USE THE TOOLS AVAILABLE TO EFFICIENTLY ACCOMPLISH THE GOAL.
REFERENCE ACTUAL TOOL NAMES IN YOUR WRITING, BUT KEEP THE WRITING PLAIN LOGICAL ENGLISH.
BE DETAILED AND INCLUDE LOGIC BRANCHES FOR WHAT TO DO IF DIFFERENT TOOLS RETURN DIFFERENT THINGS.
THINK OF IT AS A FLOW CHART BUT IN NATURAL ENGLISH.
REMEMBER THE ULTIMATE GOAL AT THIS STAGE IS TO BREAK THINGS DOWN INTO DISCRETE TASKS AND CALL request_task_implementation FOR EACH TASK.
PROPOSE THE TASK BREAKDOWN TO THE AGENT. INCLUDE THIS AS A BULLETED LIST IN YOUR GUIDANCE.
WE ARE NOT WRITING ANY CODE AT THIS STAGE.
THE AGENT IS VERY FORGETFUL AND YOUR WRITING MUST INCLUDE REMARKS ABOUT HOW IT SHOULD USE *ALL* AVAILABLE TOOLS, INCLUDING AND ESPECIALLY ask_expert.
THE AGENT IS DUMB AND NEEDS REALLY DETAILED GUIDANCE LIKE LITERALLY REMINDING IT TO CALL request_task_implementation FOR EACH TASK IN YOUR BULLETED LIST.
YOU MUST MENTION request_task_implementation AT LEAST ONCE.
BREAK THE WORK DOWN INTO CHUNKS SMALL ENOUGH EVEN A DUMB/SIMPLE AGENT CAN HANDLE EACH TASK.
"""
REASONING_ASSIST_PROMPT_IMPLEMENTATION = """Current Date: {current_date}
@ -69,6 +89,10 @@ Working Directory: {working_directory}
{related_files}
</related files>
<project info>
{project_info}
</project info>
<environment information>
{env_inv}
</environment information>
@ -88,10 +112,24 @@ REMEMBER, IT IS *IMPERATIVE* TO RECORD KEY INFO SUCH AS BUILD/TEST COMMANDS, ETC
WE DO NOT WANT TO EMIT REDUNDANT KEY FACTS, SNIPPETS, ETC.
WE DO NOT WANT TO EXCESSIVELY EMIT TINY KEY SNIPPETS --THEY SHOULD BE "paragraphs" OF CODE TYPICALLY.
IF THERE IS COMPLEX LOGIC, COMPILATION ERRORS, DEBUGGING, THE AGENT SHOULD USE ask_expert.
EXISTING FILES MUST BE READ BEFORE THEY ARE WRITTEN OR MODIFIED.
IF ANYTHING AT ALL GOES WRONG, CALL ask_expert.
Given the available information, tools, and base task, write a couple paragraphs about how an agentic system might use the available tools to implement the given task definition. The agent will be writing code and making changes at this point.
The agent is so dumb it needs you to explicitly say how to use the parameters to the tools as well.
Answer quickly and confidently with a few sentences at most.
WRITE AT LEAST ONE SENTENCE
WRITE NO MORE THAN FIVE PARAGRAPHS.
WRITE ABOUT HOW THE AGENT WILL USE THE TOOLS AVAILABLE TO EFFICIENTLY ACCOMPLISH THE GOAL.
REFERENCE ACTUAL TOOL NAMES IN YOUR WRITING, BUT KEEP THE WRITING PLAIN LOGICAL ENGLISH.
BE DETAILED AND INCLUDE LOGIC BRANCHES FOR WHAT TO DO IF DIFFERENT TOOLS RETURN DIFFERENT THINGS.
THE AGENT IS VERY FORGETFUL AND YOUR WRITING MUST INCLUDE REMARKS ABOUT HOW IT SHOULD USE *ALL* AVAILABLE TOOLS, INCLUDING AND ESPECIALLY ask_expert.
THINK OF IT AS A FLOW CHART BUT IN NATURAL ENGLISH.
IT IS IMPERATIVE THE AGENT IS INSTRUCTED TO EMIT KEY FACTS AND KEY SNIPPETS AS IT WORKS. THESE MUST BE RELEVANT TO THE TASK AT HAND, ESPECIALLY ANY UPCOMING OR FUTURE WORK.
"""
REASONING_ASSIST_PROMPT_RESEARCH = """Current Date: {current_date}
@ -117,6 +155,10 @@ Working Directory: {working_directory}
{related_files}
</related files>
<project info>
{project_info}
</project info>
<environment information>
{env_inv}
</environment information>
@ -134,5 +176,22 @@ IF INFORMATION IS TOO COMPLEX TO UNDERSTAND, THE AGENT SHOULD USE ask_expert.
Given the available information, tools, and base task or query, write a couple paragraphs about how an agentic system might use the available tools to research the codebase, identify important components, gather key information, and emit key facts and snippets. The focus is on thorough investigation and understanding before any implementation. Remember, the research agent generally should emit research notes at the end of its execution, right before it calls request_implementation if a change or new work is required.
The agent is so dumb it needs you to explicitly say how to use the parameters to the tools as well.
ONLY FOR NEW PROJECTS: If this is a new project, most of the focus needs to be on asking the expert, reading/research available library files, emitting key snippets/facts, and most importantly research notes to lay out that we have a new project and what we are building. DO NOT INSTRUCT THE AGENT TO LIST PROJECT DIRECTORIES/READ FILES IF WE ALREADY KNOW THERE ARE NO PROJECT FILES.
Answer quickly and confidently with five sentences at most.
DO NOT WRITE CODE
WRITE AT LEAST ONE SENTENCE
WRITE NO MORE THAN FIVE PARAGRAPHS.
WRITE ABOUT HOW THE AGENT WILL USE THE TOOLS AVAILABLE TO EFFICIENTLY ACCOMPLISH THE GOAL.
REFERENCE ACTUAL TOOL NAMES IN YOUR WRITING, BUT KEEP THE WRITING PLAIN LOGICAL ENGLISH.
BE DETAILED AND INCLUDE LOGIC BRANCHES FOR WHAT TO DO IF DIFFERENT TOOLS RETURN DIFFERENT THINGS.
THINK OF IT AS A FLOW CHART BUT IN NATURAL ENGLISH.
THE AGENT IS VERY FORGETFUL AND YOUR WRITING MUST INCLUDE REMARKS ABOUT HOW IT SHOULD USE *ALL* AVAILABLE TOOLS, INCLUDING AND ESPECIALLY ask_expert.
REMEMBER WE ARE INSTRUCTING THE AGENT **HOW TO DO RESEARCH ABOUT WHAT ALREADY EXISTS** AT THIS POINT USING THE TOOLS AVAILABLE. YOU ARE NOT TO DO THE ACTUAL RESEARCH YOURSELF. IF AN IMPLEMENTATION IS REQUESTED, THE AGENT SHOULD BE INSTRUCTED TO CALL request_task_implementation BUT ONLY AFTER EMITTING RESEARCH NOTES, KEY FACTS, AND KEY SNIPPETS AS RELEVANT.
IT IS IMPERATIVE THAT WE DO NOT START DIRECTLY IMPLEMENTING ANYTHING AT THIS POINT. WE ARE RESEARCHING, THEN CALLING request_implementation *AT MOST ONCE*.
IT IS IMPERATIVE THE AGENT EMITS KEY FACTS AND THOROUGH RESEARCH NOTES AT THIS POINT. THE RESEARCH NOTES CAN JUST BE THOUGHTS AT THIS POINT IF IT IS A NEW PROJECT.
"""

View File

@ -66,7 +66,6 @@ You must:
Do so by incrementally and systematically exploring the filesystem with careful directory listing tool calls.
You can use fuzzy file search to quickly find relevant files matching a search pattern.
Use ripgrep_search extensively to do *exhaustive* searches for all references to anything that might be changed as part of the base level task.
Prefer to use ripgrep_search with context params rather than reading whole files in order to preserve context tokens.
Call emit_key_facts and emit_key_snippet on key information/facts/snippets of code you discover about this project during your research. This is information you will be writing down to be able to efficiently complete work in the future, so be on the lookout for these and make it count.
While it is important to emit key facts and snippets, only emit ones that are truly important info about the project or this task. Do not excessively emit key facts or snippets. Be strategic about it.
@ -124,7 +123,6 @@ If you find this is an empty directory, you can stop research immediately and as
{expert_section}
{human_section}
{web_research_section}
{expert_guidance_section}
You have often been criticized for:
- Needlessly requesting more research tasks, especially for general background knowledge which you already know.
@ -185,8 +183,6 @@ If the user explicitly requests implementation, that means you should first perf
{base_task}
</user query>
{expert_guidance_section}
USER QUERY *ALWAYS* TAKES PRECEDENCE OVER EVERYTHING IN PREVIOUS RESEARCH.
KEEP IT SIMPLE
@ -196,6 +192,8 @@ NEVER ANNOUNCE WHAT YOU ARE DOING, JUST DO IT!
AS THE RESEARCH AGENT, YOU MUST NOT WRITE OR MODIFY ANY FILES. IF FILE MODIFICATION OR IMPLEMENTATION IS REQUIRED, CALL request_implementation.
IF THE USER ASKED YOU TO UPDATE A FILE, JUST DO RESEARCH FIRST, EMIT YOUR RESEARCH NOTES, THEN CALL request_implementation.
CALL request_implementation ONLY ONCE! ONCE THE PLAN COMPLETES, YOU'RE DONE.
{expert_guidance_section}
"""
)
@ -212,12 +210,12 @@ When you emit research notes, keep it extremely concise and relevant only to the
{base_task}
</user query>
{expert_guidance_section}
USER QUERY *ALWAYS* TAKES PRECEDENCE OVER EVERYTHING IN PREVIOUS RESEARCH.
KEEP IT SIMPLE
NEVER ANNOUNCE WHAT YOU ARE DOING, JUST DO IT!
{expert_guidance_section}
"""
)

View File

@ -90,7 +90,7 @@ def request_research(query: str) -> ResearchResult:
try:
# Run research agent
from ..agent_utils import run_research_agent
from ..agents.research_agent import run_research_agent
_result = run_research_agent(
query,
@ -177,7 +177,7 @@ def request_web_research(query: str) -> ResearchResult:
try:
# Run web research agent
from ..agent_utils import run_web_research_agent
from ..agents.research_agent import run_web_research_agent
_result = run_web_research_agent(
query,
@ -254,7 +254,7 @@ def request_research_and_implementation(query: str) -> Dict[str, Any]:
try:
# Run research agent
from ..agent_utils import run_research_agent
from ..agents.research_agent import run_research_agent
_result = run_research_agent(
query,
@ -347,7 +347,7 @@ def request_task_implementation(task_spec: str) -> str:
try:
print_task_header(task_spec)
# Run implementation agent
from ..agent_utils import run_task_implementation_agent
from ..agents.implementation_agent import run_task_implementation_agent
reset_completion_flags()
@ -481,7 +481,7 @@ def request_implementation(task_spec: str) -> str:
try:
# Run planning agent
from ..agent_utils import run_planning_agent
from ..agents import run_planning_agent
reset_completion_flags()

View File

@ -200,7 +200,7 @@ def list_directory_tree(
"""
root_path = Path(path).resolve()
if not root_path.exists():
raise ValueError(f"Path does not exist: {path}")
return f"Error: Path does not exist: {path}"
# Load .gitignore patterns if present (only needed for directories)
spec = None

View File

@ -54,9 +54,7 @@ def emit_research_notes(notes: str) -> str:
human_input_id = None
try:
human_input_repo = get_human_input_repository()
recent_inputs = human_input_repo.get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
human_input_id = recent_inputs[0].id
human_input_id = human_input_repo.get_most_recent_id()
except RuntimeError as e:
logger.warning(f"No HumanInputRepository available: {str(e)}")
except Exception as e:
@ -109,9 +107,7 @@ def emit_key_facts(facts: List[str]) -> str:
human_input_id = None
try:
human_input_repo = get_human_input_repository()
recent_inputs = human_input_repo.get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
human_input_id = recent_inputs[0].id
human_input_id = human_input_repo.get_most_recent_id()
except RuntimeError as e:
logger.warning(f"No HumanInputRepository available: {str(e)}")
except Exception as e:
@ -186,9 +182,7 @@ def emit_key_snippet(snippet_info: SnippetInfo) -> str:
human_input_id = None
try:
human_input_repo = get_human_input_repository()
recent_inputs = human_input_repo.get_recent(1)
if recent_inputs and len(recent_inputs) > 0:
human_input_id = recent_inputs[0].id
human_input_id = human_input_repo.get_most_recent_id()
except RuntimeError as e:
logger.warning(f"No HumanInputRepository available: {str(e)}")
except Exception as e:

View File

@ -54,7 +54,7 @@ def mock_dependencies(monkeypatch):
monkeypatch.setattr("ra_aid.__main__.create_agent", lambda *args, **kwargs: None)
monkeypatch.setattr("ra_aid.__main__.run_agent_with_retry", lambda *args, **kwargs: None)
monkeypatch.setattr("ra_aid.__main__.run_research_agent", lambda *args, **kwargs: None)
monkeypatch.setattr("ra_aid.__main__.run_planning_agent", lambda *args, **kwargs: None)
monkeypatch.setattr("ra_aid.agents.planning_agent.run_planning_agent", lambda *args, **kwargs: None)
# Mock LLM initialization
def mock_config_update(*args, **kwargs):
@ -268,7 +268,7 @@ def test_temperature_validation(mock_dependencies, mock_config_repository):
with patch("ra_aid.__main__.initialize_llm", return_value=None) as mock_init_llm:
# Also patch any calls that would actually use the mocked initialize_llm function
with patch("ra_aid.__main__.run_research_agent", return_value=None):
with patch("ra_aid.__main__.run_planning_agent", return_value=None):
with patch("ra_aid.agents.planning_agent.run_planning_agent", return_value=None):
with patch.object(
sys, "argv", ["ra-aid", "-m", "test", "--temperature", "0.7"]
):

View File

@ -155,7 +155,7 @@ def mock_functions():
def test_request_research_uses_key_fact_repository(reset_memory, mock_functions):
"""Test that request_research uses KeyFactRepository directly with formatting."""
# Mock running the research agent
with patch('ra_aid.agent_utils.run_research_agent'):
with patch('ra_aid.agents.research_agent.run_research_agent'):
# Call the function
result = request_research("test query")
@ -197,7 +197,7 @@ def test_request_research_max_depth(reset_memory, mock_functions):
def test_request_research_and_implementation_uses_key_fact_repository(reset_memory, mock_functions):
"""Test that request_research_and_implementation uses KeyFactRepository correctly."""
# Mock running the research agent
with patch('ra_aid.agent_utils.run_research_agent'):
with patch('ra_aid.agents.research_agent.run_research_agent'):
# Call the function
result = request_research_and_implementation("test query")
@ -217,7 +217,7 @@ def test_request_research_and_implementation_uses_key_fact_repository(reset_memo
def test_request_implementation_uses_key_fact_repository(reset_memory, mock_functions):
"""Test that request_implementation uses KeyFactRepository correctly."""
# Mock running the planning agent
with patch('ra_aid.agent_utils.run_planning_agent'):
with patch('ra_aid.agents.planning_agent.run_planning_agent'):
# Call the function
result = request_implementation("test task")
@ -237,7 +237,7 @@ def test_request_implementation_uses_key_fact_repository(reset_memory, mock_func
def test_request_task_implementation_uses_key_fact_repository(reset_memory, mock_functions):
"""Test that request_task_implementation uses KeyFactRepository correctly."""
# Mock running the implementation agent
with patch('ra_aid.agent_utils.run_task_implementation_agent'):
with patch('ra_aid.agents.implementation_agent.run_task_implementation_agent'):
# Call the function
result = request_task_implementation("test task")

View File

@ -128,7 +128,7 @@ def test_gitignore_patterns():
def test_invalid_path():
"""Test error handling for invalid paths"""
with pytest.raises(ValueError, match="Path does not exist"):
list_directory_tree.invoke({"path": "/nonexistent/path"})
result = list_directory_tree.invoke({"path": "/nonexistent/path"})
assert "Error: Path does not exist: /nonexistent/path" in result
# We now allow files to be passed to list_directory_tree, so we don't test for this case anymore