diff --git a/ra_aid/agent_utils.py b/ra_aid/agent_utils.py
index 989b327..e325745 100644
--- a/ra_aid/agent_utils.py
+++ b/ra_aid/agent_utils.py
@@ -116,7 +116,6 @@ from ra_aid.tools.memory import (
)
from ra_aid.database.repositories.config_repository import get_config_repository
from ra_aid.env_inv_context import get_env_inv
-from ra_aid.agents.research_agent import run_web_research_agent
console = Console()
@@ -364,1060 +363,8 @@ def create_agent(
)
-def run_research_agent(
- base_task_or_query: str,
- model,
- *,
- expert_enabled: bool = False,
- research_only: bool = False,
- hil: bool = False,
- web_research_enabled: bool = False,
- memory: Optional[Any] = None,
- thread_id: Optional[str] = None,
- console_message: Optional[str] = None,
-) -> Optional[str]:
- """Run a research agent with the given configuration.
-
- Args:
- base_task_or_query: The main task or query for research
- model: The LLM model to use
- expert_enabled: Whether expert mode is enabled
- research_only: Whether this is a research-only task
- hil: Whether human-in-the-loop mode is enabled
- web_research_enabled: Whether web research is enabled
- memory: Optional memory instance to use
- config: Optional configuration dictionary
- thread_id: Optional thread ID (defaults to new UUID)
- console_message: Optional message to display before running
-
- Returns:
- Optional[str]: The completion message if task completed successfully
-
- Example:
- result = run_research_agent(
- "Research Python async patterns",
- model,
- expert_enabled=True,
- research_only=True
- )
- """
- thread_id = thread_id or str(uuid.uuid4())
- logger.debug("Starting research agent with thread_id=%s", thread_id)
- logger.debug(
- "Research configuration: expert=%s, research_only=%s, hil=%s, web=%s",
- expert_enabled,
- research_only,
- hil,
- web_research_enabled,
- )
-
- if memory is None:
- memory = MemorySaver()
-
- current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- working_directory = os.getcwd()
-
- # Get the last human input, if it exists
- base_task = base_task_or_query
- try:
- human_input_repository = get_human_input_repository()
- recent_inputs = human_input_repository.get_recent(1)
- if recent_inputs and len(recent_inputs) > 0:
- last_human_input = recent_inputs[0].content
- base_task = (
- f"{last_human_input}\n{base_task}"
- )
- except RuntimeError as e:
- logger.error(f"Failed to access human input repository: {str(e)}")
- # Continue without appending last human input
-
- try:
- key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
- except RuntimeError as e:
- logger.error(f"Failed to access key fact repository: {str(e)}")
- key_facts = ""
- key_snippets = format_key_snippets_dict(
- get_key_snippet_repository().get_snippets_dict()
- )
- related_files = get_related_files()
-
- try:
- project_info = get_project_info(".", file_limit=2000)
- formatted_project_info = format_project_info(project_info)
- except Exception as e:
- logger.warning(f"Failed to get project info: {e}")
- formatted_project_info = ""
-
- tools = get_research_tools(
- research_only=research_only,
- expert_enabled=expert_enabled,
- human_interaction=hil,
- web_research_enabled=get_config_repository().get("web_research_enabled", False),
- )
-
- # Get model info for reasoning assistance configuration
- provider = get_config_repository().get("provider", "")
- model_name = get_config_repository().get("model", "")
-
- # Get model configuration to check for reasoning_assist_default
- model_config = {}
- provider_models = models_params.get(provider, {})
- if provider_models and model_name in provider_models:
- model_config = provider_models[model_name]
-
- # Check if reasoning assist is explicitly enabled/disabled
- force_assistance = get_config_repository().get("force_reasoning_assistance", False)
- disable_assistance = get_config_repository().get(
- "disable_reasoning_assistance", False
- )
- if force_assistance:
- reasoning_assist_enabled = True
- elif disable_assistance:
- reasoning_assist_enabled = False
- else:
- # Fall back to model default
- reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
-
- logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
- expert_guidance = ""
-
- # Get research note information for reasoning assistance
- try:
- research_notes = format_research_notes_dict(
- get_research_note_repository().get_notes_dict()
- )
- except Exception as e:
- logger.warning(f"Failed to get research notes: {e}")
- research_notes = ""
-
- # If reasoning assist is enabled, make a one-off call to the expert model
- if reasoning_assist_enabled:
- try:
- logger.info(
- "Reasoning assist enabled for model %s, getting expert guidance",
- model_name,
- )
-
- # Collect tool descriptions
- tool_metadata = []
- from ra_aid.tools.reflection import get_function_info as get_tool_info
-
- for tool in tools:
- try:
- tool_info = get_tool_info(tool.func)
- name = tool.func.__name__
- description = inspect.getdoc(tool.func)
- tool_metadata.append(f"Tool: {name}\nDescription: {description}\n")
- except Exception as e:
- logger.warning(f"Error getting tool info for {tool}: {e}")
-
- # Format tool metadata
- formatted_tool_metadata = "\n".join(tool_metadata)
-
- # Initialize expert model
- expert_model = initialize_expert_llm(provider, model_name)
-
- # Format the reasoning assist prompt
- reasoning_assist_prompt = REASONING_ASSIST_PROMPT_RESEARCH.format(
- current_date=current_date,
- working_directory=working_directory,
- base_task=base_task,
- key_facts=key_facts,
- key_snippets=key_snippets,
- research_notes=research_notes,
- related_files=related_files,
- env_inv=get_env_inv(),
- tool_metadata=formatted_tool_metadata,
- )
-
- # Show the reasoning assist query in a panel
- console.print(
- Panel(
- Markdown(
- "Consulting with the reasoning model on the best research approach."
- ),
- title="📝 Thinking about research strategy...",
- border_style="yellow",
- )
- )
-
- logger.debug("Invoking expert model for reasoning assist")
- # Make the call to the expert model
- response = expert_model.invoke(reasoning_assist_prompt)
-
- # Check if the model supports think tags
- supports_think_tag = model_config.get("supports_think_tag", False)
- supports_thinking = model_config.get("supports_thinking", False)
-
- # Get response content, handling if it's a list (for Claude thinking mode)
- content = None
-
- if hasattr(response, "content"):
- content = response.content
- else:
- # Fallback if content attribute is missing
- content = str(response)
-
- # Process content based on its type
- if isinstance(content, list):
- # Handle structured thinking mode (e.g., Claude 3.7)
- thinking_content = None
- response_text = None
-
- # Process each item in the list
- for item in content:
- if isinstance(item, dict):
- # Extract thinking content
- if item.get("type") == "thinking" and "thinking" in item:
- thinking_content = item["thinking"]
- logger.debug("Found structured thinking content")
- # Extract response text
- elif item.get("type") == "text" and "text" in item:
- response_text = item["text"]
- logger.debug("Found structured response text")
-
- # Display thinking content in a separate panel if available
- if thinking_content and get_config_repository().get(
- "show_thoughts", False
- ):
- logger.debug(
- f"Displaying structured thinking content ({len(thinking_content)} chars)"
- )
- console.print(
- Panel(
- Markdown(thinking_content),
- title="💭 Expert Thinking",
- border_style="yellow",
- )
- )
-
- # Use response_text if available, otherwise fall back to joining
- if response_text:
- content = response_text
- else:
- # Fallback: join list items if structured extraction failed
- logger.debug(
- "No structured response text found, joining list items"
- )
- content = "\n".join(str(item) for item in content)
- elif supports_think_tag or supports_thinking:
- # Process thinking content using the centralized function
- content, _ = process_thinking_content(
- content=content,
- supports_think_tag=supports_think_tag,
- supports_thinking=supports_thinking,
- panel_title="💭 Expert Thinking",
- panel_style="yellow",
- logger=logger,
- )
-
- # Display the expert guidance in a panel
- console.print(
- Panel(
- Markdown(content),
- title="Research Strategy Guidance",
- border_style="blue",
- )
- )
-
- # Use the content as expert guidance
- expert_guidance = (
- content + "\n\nCONSULT WITH THE EXPERT FREQUENTLY DURING RESEARCH"
- )
-
- logger.info("Received expert guidance for research")
- except Exception as e:
- logger.error("Error getting expert guidance for research: %s", e)
- expert_guidance = ""
-
- agent = create_agent(model, tools, checkpointer=memory, agent_type="research")
-
- expert_section = EXPERT_PROMPT_SECTION_RESEARCH if expert_enabled else ""
- human_section = HUMAN_PROMPT_SECTION_RESEARCH if hil else ""
- web_research_section = (
- WEB_RESEARCH_PROMPT_SECTION_RESEARCH
- if get_config_repository().get("web_research_enabled")
- else ""
- )
-
- # Prepare expert guidance section if expert guidance is available
- expert_guidance_section = ""
- if expert_guidance:
- expert_guidance_section = f"""
-{expert_guidance}
-"""
-
- # Format research notes if available
- # We get research notes earlier for reasoning assistance
-
- # Get environment inventory information
-
- prompt = (RESEARCH_ONLY_PROMPT if research_only else RESEARCH_PROMPT).format(
- current_date=current_date,
- working_directory=working_directory,
- base_task=base_task,
- research_only_note=(
- ""
- if research_only
- else " Only request implementation if the user explicitly asked for changes to be made."
- ),
- expert_section=expert_section,
- human_section=human_section,
- web_research_section=web_research_section,
- key_facts=key_facts,
- work_log=get_work_log_repository().format_work_log(),
- key_snippets=key_snippets,
- related_files=related_files,
- project_info=formatted_project_info,
- new_project_hints=NEW_PROJECT_HINTS if project_info.is_new else "",
- env_inv=get_env_inv(),
- expert_guidance_section=expert_guidance_section,
- )
-
- config = get_config_repository().get_all()
- recursion_limit = config.get("recursion_limit", DEFAULT_RECURSION_LIMIT)
- run_config = {
- "configurable": {"thread_id": thread_id},
- "recursion_limit": recursion_limit,
- }
- run_config.update(config)
-
- try:
- if console_message:
- console.print(
- Panel(Markdown(console_message), title="🔬 Looking into it...")
- )
-
- if project_info:
- display_project_status(project_info)
-
- if agent is not None:
- logger.debug("Research agent created successfully")
- none_or_fallback_handler = init_fallback_handler(agent, tools)
- _result = run_agent_with_retry(agent, prompt, none_or_fallback_handler)
- if _result:
- # Log research completion
- log_work_event(f"Completed research phase for: {base_task_or_query}")
- return _result
- else:
- logger.debug("No model provided, running web research tools directly")
- return run_web_research_agent(
- base_task_or_query,
- model=None,
- expert_enabled=expert_enabled,
- hil=hil,
- web_research_enabled=web_research_enabled,
- memory=memory,
- thread_id=thread_id,
- console_message=console_message,
- )
- except (KeyboardInterrupt, AgentInterrupt):
- raise
- except Exception as e:
- logger.error("Research agent failed: %s", str(e), exc_info=True)
- raise
-
-
-def run_web_research_agent(
- query: str,
- model,
- *,
- expert_enabled: bool = False,
- hil: bool = False,
- web_research_enabled: bool = False,
- memory: Optional[Any] = None,
- thread_id: Optional[str] = None,
- console_message: Optional[str] = None,
-) -> Optional[str]:
- """Run a web research agent with the given configuration.
-
- Args:
- query: The mainquery for web research
- model: The LLM model to use
- expert_enabled: Whether expert mode is enabled
- hil: Whether human-in-the-loop mode is enabled
- web_research_enabled: Whether web research is enabled
- memory: Optional memory instance to use
- config: Optional configuration dictionary
- thread_id: Optional thread ID (defaults to new UUID)
- console_message: Optional message to display before running
-
- Returns:
- Optional[str]: The completion message if task completed successfully
-
- Example:
- result = run_web_research_agent(
- "Research latest Python async patterns",
- model,
- expert_enabled=True
- )
- """
- thread_id = thread_id or str(uuid.uuid4())
- logger.debug("Starting web research agent with thread_id=%s", thread_id)
- logger.debug(
- "Web research configuration: expert=%s, hil=%s, web=%s",
- expert_enabled,
- hil,
- web_research_enabled,
- )
-
- if memory is None:
- memory = MemorySaver()
-
- if thread_id is None:
- thread_id = str(uuid.uuid4())
-
- tools = get_web_research_tools(expert_enabled=expert_enabled)
-
- agent = create_agent(model, tools, checkpointer=memory, agent_type="research")
-
- expert_section = EXPERT_PROMPT_SECTION_RESEARCH if expert_enabled else ""
- human_section = HUMAN_PROMPT_SECTION_RESEARCH if hil else ""
-
- try:
- key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
- except RuntimeError as e:
- logger.error(f"Failed to access key fact repository: {str(e)}")
- key_facts = ""
- try:
- key_snippets = format_key_snippets_dict(
- get_key_snippet_repository().get_snippets_dict()
- )
- except RuntimeError as e:
- logger.error(f"Failed to access key snippet repository: {str(e)}")
- key_snippets = ""
- related_files = get_related_files()
-
- current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- working_directory = os.getcwd()
-
- # Get environment inventory information
-
- prompt = WEB_RESEARCH_PROMPT.format(
- current_date=current_date,
- working_directory=working_directory,
- web_research_query=query,
- expert_section=expert_section,
- human_section=human_section,
- key_facts=key_facts,
- work_log=get_work_log_repository().format_work_log(),
- key_snippets=key_snippets,
- related_files=related_files,
- env_inv=get_env_inv(),
- )
-
- config = get_config_repository().get_all()
-
- recursion_limit = config.get("recursion_limit", DEFAULT_RECURSION_LIMIT)
- run_config = {
- "configurable": {"thread_id": thread_id},
- "recursion_limit": recursion_limit,
- }
- if config:
- run_config.update(config)
-
- try:
- if console_message:
- console.print(Panel(Markdown(console_message), title="🔬 Researching..."))
-
- logger.debug("Web research agent completed successfully")
- none_or_fallback_handler = init_fallback_handler(agent, tools)
- _result = run_agent_with_retry(agent, prompt, none_or_fallback_handler)
- if _result:
- # Log web research completion
- log_work_event(f"Completed web research phase for: {query}")
- return _result
-
- except (KeyboardInterrupt, AgentInterrupt):
- raise
- except Exception as e:
- logger.error("Web research agent failed: %s", str(e), exc_info=True)
- raise
-
-
-def run_planning_agent(
- base_task: str,
- model,
- *,
- expert_enabled: bool = False,
- hil: bool = False,
- memory: Optional[Any] = None,
- thread_id: Optional[str] = None,
-) -> Optional[str]:
- """Run a planning agent to create implementation plans.
-
- Args:
- base_task: The main task to plan implementation for
- model: The LLM model to use
- expert_enabled: Whether expert mode is enabled
- hil: Whether human-in-the-loop mode is enabled
- memory: Optional memory instance to use
- thread_id: Optional thread ID (defaults to new UUID)
-
- Returns:
- Optional[str]: The completion message if planning completed successfully
- """
- thread_id = thread_id or str(uuid.uuid4())
- logger.debug("Starting planning agent with thread_id=%s", thread_id)
- logger.debug("Planning configuration: expert=%s, hil=%s", expert_enabled, hil)
-
- if memory is None:
- memory = MemorySaver()
-
- if thread_id is None:
- thread_id = str(uuid.uuid4())
-
- # Get latest project info
- try:
- project_info = get_project_info(".")
- formatted_project_info = format_project_info(project_info)
- except Exception as e:
- logger.warning("Failed to get project info: %s", str(e))
- formatted_project_info = "Project info unavailable"
-
- tools = get_planning_tools(
- expert_enabled=expert_enabled,
- web_research_enabled=get_config_repository().get("web_research_enabled", False),
- )
-
- # Get model configuration
- provider = get_config_repository().get("provider", "")
- model_name = get_config_repository().get("model", "")
- logger.debug("Checking for reasoning_assist_default on %s/%s", provider, model_name)
-
- # Get model configuration to check for reasoning_assist_default
- model_config = {}
- provider_models = models_params.get(provider, {})
- if provider_models and model_name in provider_models:
- model_config = provider_models[model_name]
-
- # Check if reasoning assist is explicitly enabled/disabled
- force_assistance = get_config_repository().get("force_reasoning_assistance", False)
- disable_assistance = get_config_repository().get(
- "disable_reasoning_assistance", False
- )
-
- if force_assistance:
- reasoning_assist_enabled = True
- elif disable_assistance:
- reasoning_assist_enabled = False
- else:
- # Fall back to model default
- reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
-
- logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
-
- # Get all the context information (used both for normal planning and reasoning assist)
- current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- working_directory = os.getcwd()
-
- # Make sure key_facts is defined before using it
- try:
- key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
- except RuntimeError as e:
- logger.error(f"Failed to access key fact repository: {str(e)}")
- key_facts = ""
-
- # Make sure key_snippets is defined before using it
- try:
- key_snippets = format_key_snippets_dict(
- get_key_snippet_repository().get_snippets_dict()
- )
- except RuntimeError as e:
- logger.error(f"Failed to access key snippet repository: {str(e)}")
- key_snippets = ""
-
- # Get formatted research notes using repository
- try:
- repository = get_research_note_repository()
- notes_dict = repository.get_notes_dict()
- formatted_research_notes = format_research_notes_dict(notes_dict)
- except RuntimeError as e:
- logger.error(f"Failed to access research note repository: {str(e)}")
- formatted_research_notes = ""
-
- # Get related files
- related_files = "\n".join(get_related_files())
-
- # Get environment inventory information
- env_inv = get_env_inv()
-
- # Display the planning stage header before any reasoning assistance
- print_stage_header("Planning Stage")
-
- # Initialize expert guidance section
- expert_guidance = ""
-
- # If reasoning assist is enabled, make a one-off call to the expert model
- if reasoning_assist_enabled:
- try:
- logger.info(
- "Reasoning assist enabled for model %s, getting expert guidance",
- model_name,
- )
-
- # Collect tool descriptions
- tool_metadata = []
- from ra_aid.tools.reflection import get_function_info as get_tool_info
-
- for tool in tools:
- try:
- tool_info = get_tool_info(tool.func)
- name = tool.func.__name__
- description = inspect.getdoc(tool.func)
- tool_metadata.append(f"Tool: {name}\nDescription: {description}\n")
- except Exception as e:
- logger.warning(f"Error getting tool info for {tool}: {e}")
-
- # Format tool metadata
- formatted_tool_metadata = "\n".join(tool_metadata)
-
- # Initialize expert model
- expert_model = initialize_expert_llm(provider, model_name)
-
- # Format the reasoning assist prompt
- reasoning_assist_prompt = REASONING_ASSIST_PROMPT_PLANNING.format(
- current_date=current_date,
- working_directory=working_directory,
- base_task=base_task,
- key_facts=key_facts,
- key_snippets=key_snippets,
- research_notes=formatted_research_notes,
- related_files=related_files,
- env_inv=env_inv,
- tool_metadata=formatted_tool_metadata,
- )
-
- # Show the reasoning assist query in a panel
- console.print(
- Panel(
- Markdown(
- "Consulting with the reasoning model on the best way to do this."
- ),
- title="📝 Thinking about the plan...",
- border_style="yellow",
- )
- )
-
- logger.debug("Invoking expert model for reasoning assist")
- # Make the call to the expert model
- response = expert_model.invoke(reasoning_assist_prompt)
-
- # Check if the model supports think tags
- supports_think_tag = model_config.get("supports_think_tag", False)
- supports_thinking = model_config.get("supports_thinking", False)
-
- # Get response content, handling if it's a list (for Claude thinking mode)
- content = None
-
- if hasattr(response, "content"):
- content = response.content
- else:
- # Fallback if content attribute is missing
- content = str(response)
-
- # Process content based on its type
- if isinstance(content, list):
- # Handle structured thinking mode (e.g., Claude 3.7)
- thinking_content = None
- response_text = None
-
- # Process each item in the list
- for item in content:
- if isinstance(item, dict):
- # Extract thinking content
- if item.get("type") == "thinking" and "thinking" in item:
- thinking_content = item["thinking"]
- logger.debug("Found structured thinking content")
- # Extract response text
- elif item.get("type") == "text" and "text" in item:
- response_text = item["text"]
- logger.debug("Found structured response text")
-
- # Display thinking content in a separate panel if available
- if thinking_content and get_config_repository().get(
- "show_thoughts", False
- ):
- logger.debug(
- f"Displaying structured thinking content ({len(thinking_content)} chars)"
- )
- console.print(
- Panel(
- Markdown(thinking_content),
- title="💭 Expert Thinking",
- border_style="yellow",
- )
- )
-
- # Use response_text if available, otherwise fall back to joining
- if response_text:
- content = response_text
- else:
- # Fallback: join list items if structured extraction failed
- logger.debug(
- "No structured response text found, joining list items"
- )
- content = "\n".join(str(item) for item in content)
- elif supports_think_tag or supports_thinking:
- # Process thinking content using the centralized function
- content, _ = process_thinking_content(
- content=content,
- supports_think_tag=supports_think_tag,
- supports_thinking=supports_thinking,
- panel_title="💭 Expert Thinking",
- panel_style="yellow",
- logger=logger,
- )
-
- # Display the expert guidance in a panel
- console.print(
- Panel(
- Markdown(content), title="Reasoning Guidance", border_style="blue"
- )
- )
-
- # Use the content as expert guidance
- expert_guidance = (
- content + "\n\nCONSULT WITH THE EXPERT FREQUENTLY ON THIS TASK"
- )
-
- logger.info("Received expert guidance for planning")
- except Exception as e:
- logger.error("Error getting expert guidance for planning: %s", e)
- expert_guidance = ""
-
- agent = create_agent(model, tools, checkpointer=memory, agent_type="planner")
-
- expert_section = EXPERT_PROMPT_SECTION_PLANNING if expert_enabled else ""
- human_section = HUMAN_PROMPT_SECTION_PLANNING if hil else ""
- web_research_section = (
- WEB_RESEARCH_PROMPT_SECTION_PLANNING
- if get_config_repository().get("web_research_enabled", False)
- else ""
- )
-
- # Prepare expert guidance section if expert guidance is available
- expert_guidance_section = ""
- if expert_guidance:
- expert_guidance_section = f"""
-{expert_guidance}
-"""
-
- planning_prompt = PLANNING_PROMPT.format(
- current_date=current_date,
- working_directory=working_directory,
- expert_section=expert_section,
- human_section=human_section,
- web_research_section=web_research_section,
- base_task=base_task,
- project_info=formatted_project_info,
- research_notes=formatted_research_notes,
- related_files=related_files,
- key_facts=key_facts,
- key_snippets=key_snippets,
- work_log=get_work_log_repository().format_work_log(),
- research_only_note=(
- ""
- if get_config_repository().get("research_only", False)
- else " Only request implementation if the user explicitly asked for changes to be made."
- ),
- env_inv=env_inv,
- expert_guidance_section=expert_guidance_section,
- )
-
- config_values = get_config_repository().get_all()
- recursion_limit = get_config_repository().get(
- "recursion_limit", DEFAULT_RECURSION_LIMIT
- )
- run_config = {
- "configurable": {"thread_id": thread_id},
- "recursion_limit": recursion_limit,
- }
- run_config.update(config_values)
-
- try:
- logger.debug("Planning agent completed successfully")
- none_or_fallback_handler = init_fallback_handler(agent, tools)
- _result = run_agent_with_retry(agent, planning_prompt, none_or_fallback_handler)
- if _result:
- # Log planning completion
- log_work_event(f"Completed planning phase for: {base_task}")
- return _result
- except (KeyboardInterrupt, AgentInterrupt):
- raise
- except Exception as e:
- logger.error("Planning agent failed: %s", str(e), exc_info=True)
- raise
-
-
-def run_task_implementation_agent(
- base_task: str,
- tasks: list,
- task: str,
- plan: str,
- related_files: list,
- model,
- *,
- expert_enabled: bool = False,
- web_research_enabled: bool = False,
- memory: Optional[Any] = None,
- thread_id: Optional[str] = None,
-) -> Optional[str]:
- """Run an implementation agent for a specific task.
-
- Args:
- base_task: The main task being implemented
- tasks: List of tasks to implement
- plan: The implementation plan
- related_files: List of related files
- model: The LLM model to use
- expert_enabled: Whether expert mode is enabled
- web_research_enabled: Whether web research is enabled
- memory: Optional memory instance to use
- thread_id: Optional thread ID (defaults to new UUID)
-
- Returns:
- Optional[str]: The completion message if task completed successfully
- """
- thread_id = thread_id or str(uuid.uuid4())
- logger.debug("Starting implementation agent with thread_id=%s", thread_id)
- logger.debug(
- "Implementation configuration: expert=%s, web=%s",
- expert_enabled,
- web_research_enabled,
- )
- logger.debug("Task details: base_task=%s, current_task=%s", base_task, task)
- logger.debug("Related files: %s", related_files)
-
- if memory is None:
- memory = MemorySaver()
-
- if thread_id is None:
- thread_id = str(uuid.uuid4())
-
- tools = get_implementation_tools(
- expert_enabled=expert_enabled,
- web_research_enabled=get_config_repository().get("web_research_enabled", False),
- )
-
- agent = create_agent(model, tools, checkpointer=memory, agent_type="planner")
-
- current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- working_directory = os.getcwd()
-
- # Make sure key_facts is defined before using it
- try:
- key_facts = format_key_facts_dict(get_key_fact_repository().get_facts_dict())
- except RuntimeError as e:
- logger.error(f"Failed to access key fact repository: {str(e)}")
- key_facts = ""
-
- # Get formatted research notes using repository
- try:
- repository = get_research_note_repository()
- notes_dict = repository.get_notes_dict()
- formatted_research_notes = format_research_notes_dict(notes_dict)
- except RuntimeError as e:
- logger.error(f"Failed to access research note repository: {str(e)}")
- formatted_research_notes = ""
-
- # Get latest project info
- try:
- project_info = get_project_info(".")
- formatted_project_info = format_project_info(project_info)
- except Exception as e:
- logger.warning("Failed to get project info: %s", str(e))
- formatted_project_info = "Project info unavailable"
-
- # Get environment inventory information
- env_inv = get_env_inv()
-
- # Get model configuration to check for reasoning_assist_default
- provider = get_config_repository().get("provider", "")
- model_name = get_config_repository().get("model", "")
- logger.debug("Checking for reasoning_assist_default on %s/%s", provider, model_name)
-
- model_config = {}
- provider_models = models_params.get(provider, {})
- if provider_models and model_name in provider_models:
- model_config = provider_models[model_name]
-
- # Check if reasoning assist is explicitly enabled/disabled
- force_assistance = get_config_repository().get("force_reasoning_assistance", False)
- disable_assistance = get_config_repository().get(
- "disable_reasoning_assistance", False
- )
-
- if force_assistance:
- reasoning_assist_enabled = True
- elif disable_assistance:
- reasoning_assist_enabled = False
- else:
- # Fall back to model default
- reasoning_assist_enabled = model_config.get("reasoning_assist_default", False)
-
- logger.debug("Reasoning assist enabled: %s", reasoning_assist_enabled)
-
- # Initialize implementation guidance section
- implementation_guidance_section = ""
-
- # If reasoning assist is enabled, make a one-off call to the expert model
- if reasoning_assist_enabled:
- try:
- logger.info(
- "Reasoning assist enabled for model %s, getting implementation guidance",
- model_name,
- )
-
- # Collect tool descriptions
- tool_metadata = []
- from ra_aid.tools.reflection import get_function_info as get_tool_info
-
- for tool in tools:
- try:
- tool_info = get_tool_info(tool.func)
- name = tool.func.__name__
- description = inspect.getdoc(tool.func)
- tool_metadata.append(
- f"Tool: {name}\\nDescription: {description}\\n"
- )
- except Exception as e:
- logger.warning(f"Error getting tool info for {tool}: {e}")
-
- # Format tool metadata
- formatted_tool_metadata = "\\n".join(tool_metadata)
-
- # Initialize expert model
- expert_model = initialize_expert_llm(provider, model_name)
-
- # Format the reasoning assist prompt for implementation
- reasoning_assist_prompt = REASONING_ASSIST_PROMPT_IMPLEMENTATION.format(
- current_date=current_date,
- working_directory=working_directory,
- task=task,
- key_facts=key_facts,
- key_snippets=format_key_snippets_dict(
- get_key_snippet_repository().get_snippets_dict()
- ),
- research_notes=formatted_research_notes,
- related_files="\\n".join(related_files),
- env_inv=env_inv,
- tool_metadata=formatted_tool_metadata,
- )
-
- # Show the reasoning assist query in a panel
- console.print(
- Panel(
- Markdown(
- "Consulting with the reasoning model on the best implementation approach."
- ),
- title="📝 Thinking about implementation...",
- border_style="yellow",
- )
- )
-
- logger.debug("Invoking expert model for implementation reasoning assist")
- # Make the call to the expert model
- response = expert_model.invoke(reasoning_assist_prompt)
-
- # Check if the model supports think tags
- supports_think_tag = model_config.get("supports_think_tag", False)
- supports_thinking = model_config.get("supports_thinking", False)
-
- # Process response content
- content = None
-
- if hasattr(response, "content"):
- content = response.content
- else:
- # Fallback if content attribute is missing
- content = str(response)
-
- # Process the response content using the centralized function
- content, extracted_thinking = process_thinking_content(
- content=content,
- supports_think_tag=supports_think_tag,
- supports_thinking=supports_thinking,
- panel_title="💭 Implementation Thinking",
- panel_style="yellow",
- logger=logger,
- )
-
- # Display the implementation guidance in a panel
- console.print(
- Panel(
- Markdown(content),
- title="Implementation Guidance",
- border_style="blue",
- )
- )
-
- # Format the implementation guidance section for the prompt
- implementation_guidance_section = f"""
-{content}
-"""
-
- logger.info("Received implementation guidance")
- except Exception as e:
- logger.error("Error getting implementation guidance: %s", e)
- implementation_guidance_section = ""
-
- prompt = IMPLEMENTATION_PROMPT.format(
- current_date=current_date,
- working_directory=working_directory,
- base_task=base_task,
- task=task,
- tasks=tasks,
- plan=plan,
- related_files=related_files,
- key_facts=key_facts,
- key_snippets=format_key_snippets_dict(
- get_key_snippet_repository().get_snippets_dict()
- ),
- research_notes=formatted_research_notes,
- work_log=get_work_log_repository().format_work_log(),
- expert_section=EXPERT_PROMPT_SECTION_IMPLEMENTATION if expert_enabled else "",
- human_section=(
- HUMAN_PROMPT_SECTION_IMPLEMENTATION
- if get_config_repository().get("hil", False)
- else ""
- ),
- web_research_section=(
- WEB_RESEARCH_PROMPT_SECTION_CHAT
- if get_config_repository().get("web_research_enabled", False)
- else ""
- ),
- env_inv=env_inv,
- project_info=formatted_project_info,
- implementation_guidance_section=implementation_guidance_section,
- )
-
- config_values = get_config_repository().get_all()
- recursion_limit = get_config_repository().get(
- "recursion_limit", DEFAULT_RECURSION_LIMIT
- )
- run_config = {
- "configurable": {"thread_id": thread_id},
- "recursion_limit": recursion_limit,
- }
- run_config.update(config_values)
-
- try:
- logger.debug("Implementation agent completed successfully")
- none_or_fallback_handler = init_fallback_handler(agent, tools)
- _result = run_agent_with_retry(agent, prompt, none_or_fallback_handler)
- if _result:
- # Log task implementation completion
- log_work_event(f"Completed implementation of task: {task}")
- return _result
- except (KeyboardInterrupt, AgentInterrupt):
- raise
- except Exception as e:
- logger.error("Implementation agent failed: %s", str(e), exc_info=True)
- raise
- )
-
-
+from ra_aid.agents.research_agent import run_research_agent, run_web_research_agent
+from ra_aid.agents.implementation_agent import run_task_implementation_agent
_CONTEXT_STACK = []
@@ -1595,7 +542,7 @@ def _run_agent_stream(agent: RAgents, msg_list: list[BaseMessage]):
logger.debug("Agent output: %s", chunk)
check_interrupt()
agent_type = get_agent_type(agent)
- print_agent_output(chunk, agent_type, cb)
+ print_agent_output(chunk, agent_type, cost_cb=cb)
if is_completed() or should_exit():
reset_completion_flags()