Initial commit

This commit is contained in:
AI Christianson 2024-12-10 19:01:20 -05:00
commit 341bc91e65
33 changed files with 3065 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
/.pyenv/
/__pycache__/
__pycache__/
.aider*
.env
/work

202
LICENSE Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [2024] [AI Christianson]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

4
MANIFEST.in Normal file
View File

@ -0,0 +1,4 @@
include README.md
include LICENSE
include requirements*.txt
recursive-include agent_langchain *.py

218
README.md Normal file
View File

@ -0,0 +1,218 @@
██▀███ ▄▄▄ ▄▄▄ ██▓▓█████▄
▓██ ▒ ██▒▒████▄ ▒████▄ ▓██▒▒██▀ ██▌
▓██ ░▄█ ▒▒██ ▀█▄ ▒██ ▀█▄ ▒██▒░██ █▌
▒██▀▀█▄ ░██▄▄▄▄██ ░██▄▄▄▄██ ░██░░▓█▄ ▌
░██▓ ▒██▒ ▓█ ▓██▒ ██▓ ▓█ ▓██▒░██░░▒████▓
░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ▒▓▒ ▒▒ ▓▒█░░▓ ▒▒▓ ▒
░▒ ░ ▒░ ▒ ▒▒ ░ ░▒ ▒ ▒▒ ░ ▒ ░ ░ ▒ ▒
░░ ░ ░ ▒ ░ ░ ▒ ▒ ░ ░ ░ ░
░ ░ ░ ░ ░ ░ ░ ░
░ ░
[![Python Versions](https://img.shields.io/badge/python-3.8%2B-blue)](https://www.python.org)
[![License](https://img.shields.io/badge/license-Apache%202.0-blue)](LICENSE)
[![Status](https://img.shields.io/badge/status-Beta-yellow)]()
# RA.Aid
RA.Aid is a powerful AI-driven command-line tool designed to assist developers and researchers in executing programming and research tasks efficiently. Built on top of LangChain and LLMs, it provides an intelligent assistant that can help with research, planning, and implementation of development tasks.
## Table of Contents
- [Features](#features)
- [Installation](#installation)
- [Usage](#usage)
- [Architecture](#architecture)
- [Dependencies](#dependencies)
- [Development Setup](#development-setup)
- [Contributing](#contributing)
- [License](#license)
- [Contact](#contact)
## Features
- **Three-Stage Architecture**: Implements a sophisticated workflow with Research, Planning, and Implementation stages, each powered by dedicated AI agents and toolsets.
- **Advanced AI Integration**: Built on LangChain and leverages the latest LLMs for natural language understanding and generation.
- **Comprehensive Toolset**:
- Shell command execution
- Expert querying system
- File operations and management
- Memory management
- Research and planning tools
- Code analysis capabilities
- **Interactive CLI Interface**: Simple yet powerful command-line interface for seamless interaction
- **Modular Design**: Structured as a Python package with specialized modules for console output, processing, text utilities, and tools
- **Git Integration**: Built-in support for Git operations and repository management
## Installation
### Prerequisites
- Python 3.8 or higher
- pip package manager
### Steps
1. Install from PyPI:
```bash
pip install ra-aid
```
Or install from source:
```bash
git clone https://github.com/ai-christianson/ra-aid.git
cd ra-aid
pip install .
```
2. Install additional dependencies:
```bash
pip install -r requirements.txt
```
3. (Optional) Install development dependencies:
```bash
pip install -r requirements-dev.txt
```
## Usage
RA.Aid is used via the `ra-aid` command. The basic usage pattern is:
```bash
ra-aid [task]
```
### Examples
Research a topic:
```bash
ra-aid "Research best practices for Python package structure"
```
Plan a development task:
```bash
ra-aid "Plan the implementation of a new REST API endpoint"
```
Generate code or documentation:
```bash
ra-aid "Create a README.md template for my project"
```
### Interactive Mode
For an interactive session where you can enter multiple tasks:
```bash
ra-aid
```
This will start an interactive prompt where you can input tasks sequentially.
## Architecture
RA.Aid implements a three-stage architecture for handling development and research tasks:
1. **Research Stage**:
- Gathers information and context
- Analyzes requirements
- Identifies key components and dependencies
2. **Planning Stage**:
- Develops detailed implementation plans
- Breaks down tasks into manageable steps
- Identifies potential challenges and solutions
3. **Implementation Stage**:
- Executes planned tasks
- Generates code or documentation
- Performs necessary system operations
### Core Components
- **Console Module** (`console/`): Handles console output formatting and user interaction
- **Processing Module** (`proc/`): Manages interactive processing and workflow control
- **Text Module** (`text/`): Provides text processing and manipulation utilities
- **Tools Module** (`tools/`): Contains various utility tools for file operations, search, and more
## Dependencies
### Core Dependencies
- `langchain-anthropic`: LangChain integration with Anthropic's Claude
- `langgraph`: Graph-based workflow management
- `rich>=13.0.0`: Terminal formatting and output
- `GitPython==3.1.41`: Git repository management
- `fuzzywuzzy==0.18.0`: Fuzzy string matching
- `python-Levenshtein==0.23.0`: Fast string matching
- `pathspec>=0.11.0`: Path specification utilities
### Development Dependencies
- `pytest>=7.0.0`: Testing framework
- `pytest-timeout>=2.2.0`: Test timeout management
## Development Setup
1. Clone the repository:
```bash
git clone https://github.com/ai-christianson/ra-aid.git
cd ra-aid
```
2. Create and activate a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
```
3. Install development dependencies:
```bash
pip install -r requirements-dev.txt
```
4. Run tests:
```bash
python -m pytest
```
## Contributing
Contributions are welcome! Please follow these steps:
1. Fork the repository
2. Create a feature branch:
```bash
git checkout -b feature/your-feature-name
```
3. Make your changes and commit:
```bash
git commit -m 'Add some feature'
```
4. Push to your fork:
```bash
git push origin feature/your-feature-name
```
5. Open a Pull Request
### Guidelines
- Follow PEP 8 style guidelines
- Add tests for new features
- Update documentation as needed
- Keep commits focused and message clear
- Ensure all tests pass before submitting PR
## License
This project is licensed under the Apache License 2.0 - see the [LICENSE](LICENSE) file for details.
Copyright (c) 2024 AI Christianson
## Contact
- **Issues**: Please report bugs and feature requests on our [Issue Tracker](https://github.com/ai-christianson/ra-aid/issues)
- **Repository**: [https://github.com/ai-christianson/ra-aid](https://github.com/ai-christianson/ra-aid)
- **Documentation**: [https://github.com/ai-christianson/ra-aid#readme](https://github.com/ai-christianson/ra-aid#readme)

53
pyproject.toml Normal file
View File

@ -0,0 +1,53 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ra-aid"
dynamic = ["version"]
description = "RA.Aid - ReAct Aid"
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.8"
keywords = ["langchain", "ai", "agent", "tools", "development"]
authors = [{name = "AI Christianson", email = "ai.christianson@christianson.ai"}]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Software Development :: Libraries :: Python Modules"
]
dependencies = [
"langchain-anthropic",
"langgraph",
"rich>=13.0.0",
"GitPython==3.1.41",
"fuzzywuzzy==0.18.0",
"python-Levenshtein==0.23.0",
"pathspec>=0.11.0",
]
[project.optional-dependencies]
dev = [
"pytest-timeout>=2.2.0",
"pytest>=7.0.0",
]
[project.scripts]
ra-aid = "ra_aid.__main__:main"
[project.urls]
Homepage = "https://github.com/ai-christianson/ra-aid"
Documentation = "https://github.com/ai-christianson/ra-aid#readme"
Repository = "https://github.com/ai-christianson/ra-aid.git"
Issues = "https://github.com/ai-christianson/ra-aid/issues"
[tool.setuptools.dynamic]
version = {attr = "ra_aid.version.__version__"}
[tool.hatch.build.targets.wheel]
packages = ["ra_aid"]

12
ra_aid/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from .version import __version__
from .console.formatting import print_stage_header, print_task_header
from .console.output import print_agent_output
from .text.processing import truncate_output
__all__ = [
'print_stage_header',
'print_task_header',
'print_agent_output',
'truncate_output',
'__version__'
]

252
ra_aid/__main__.py Normal file
View File

@ -0,0 +1,252 @@
import sqlite3
import argparse
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from ra_aid.tools import (
ask_expert, run_shell_command, run_programming_task,
emit_research_notes, emit_plan, emit_related_file, emit_task,
emit_expert_context, get_memory_value, emit_key_fact, delete_key_fact,
emit_key_snippet, delete_key_snippet,
request_implementation, read_file_tool, emit_research_subtask,
fuzzy_find_project_files, ripgrep_search, list_directory_tree
)
from ra_aid.tools.memory import _global_memory
from ra_aid import print_agent_output, print_stage_header, print_task_header
from ra_aid.tools.programmer import related_files
from ra_aid.prompts import (
RESEARCH_PROMPT,
PLANNING_PROMPT,
IMPLEMENTATION_PROMPT,
SUMMARY_PROMPT
)
def parse_arguments():
parser = argparse.ArgumentParser(
description='AI Agent for executing programming and research tasks',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'task',
type=str,
help='The task to be executed by the agent'
)
return parser.parse_args()
# Create the base model
model = ChatAnthropic(model_name="claude-3-5-sonnet-20241022")
# Create individual memory objects for each agent
research_memory = MemorySaver()
planning_memory = MemorySaver()
implementation_memory = MemorySaver()
# Define tool sets for each stage
research_tools = [list_directory_tree, emit_research_subtask, run_shell_command, emit_expert_context, ask_expert, emit_research_notes, emit_related_file, emit_key_fact, delete_key_fact, emit_key_snippet, delete_key_snippet, request_implementation, read_file_tool, fuzzy_find_project_files, ripgrep_search]
planning_tools = [list_directory_tree, emit_expert_context, ask_expert, emit_plan, emit_task, emit_related_file, emit_key_fact, delete_key_fact, emit_key_snippet, delete_key_snippet, read_file_tool, fuzzy_find_project_files, ripgrep_search]
implementation_tools = [list_directory_tree, run_shell_command, emit_expert_context, ask_expert, run_programming_task, emit_related_file, emit_key_fact, delete_key_fact, emit_key_snippet, delete_key_snippet, read_file_tool, fuzzy_find_project_files, ripgrep_search]
# Create stage-specific agents with individual memory objects
research_agent = create_react_agent(model, research_tools, checkpointer=research_memory)
planning_agent = create_react_agent(model, planning_tools, checkpointer=planning_memory)
implementation_agent = create_react_agent(model, implementation_tools, checkpointer=implementation_memory)
def is_informational_query() -> bool:
"""Determine if the current query is informational based on implementation_requested state.
Returns:
bool: True if query is informational (no implementation requested), False otherwise
"""
return not is_stage_requested('implementation')
def is_stage_requested(stage: str) -> bool:
"""Check if a stage has been requested to proceed.
Args:
stage: The stage to check ('implementation')
Returns:
True if the stage was requested, False otherwise
"""
if stage == 'implementation':
return len(_global_memory.get('implementation_requested', [])) > 0
return False
def run_implementation_stage(base_task, tasks, plan, related_files):
"""Run implementation stage with a distinct agent for each task."""
if not is_stage_requested('implementation'):
print_stage_header("SKIPPING IMPLEMENTATION STAGE (not requested)")
return
print_stage_header("IMPLEMENTATION STAGE")
# Get tasks directly from memory instead of using get_memory_value which joins with newlines
task_list = _global_memory['tasks']
print_task_header(f"Found {len(task_list)} tasks to implement")
for i, task in enumerate(task_list, 1):
print_task_header(task)
# Create a unique memory instance for this task
task_memory = MemorySaver()
# Create a fresh agent for each task with its own memory
task_agent = create_react_agent(model, implementation_tools, checkpointer=task_memory)
# Construct task-specific prompt
task_prompt = IMPLEMENTATION_PROMPT.format(
plan=plan,
key_facts=get_memory_value('key_facts'),
key_snippets=get_memory_value('key_snippets'),
task=task,
related_files="\n".join(related_files),
base_task=base_task
)
# Run agent for this task
while True:
try:
for chunk in task_agent.stream(
{"messages": [HumanMessage(content=task_prompt)]},
{"configurable": {"thread_id": "abc123"}, "recursion_limit": 100}
):
print_agent_output(chunk)
break
except ChatAnthropic.InternalServerError as e:
print(f"Encountered Anthropic Internal Server Error: {e}. Retrying...")
continue
def summarize_research_findings(base_task: str, config: dict) -> None:
"""Summarize research findings for informational queries.
Generates and prints a concise summary of research findings including key facts
and research notes collected during the research stage.
Args:
base_task: The original user query
config: Configuration dictionary for the agent
"""
print_stage_header("RESEARCH SUMMARY")
# Create dedicated memory for research summarization
summary_memory = MemorySaver()
# Create fresh agent for summarization with its own memory
summary_agent = create_react_agent(model, implementation_tools, checkpointer=summary_memory)
summary_prompt = SUMMARY_PROMPT.format(
base_task=base_task,
research_notes=get_memory_value('research_notes'),
key_facts=get_memory_value('key_facts'),
key_snippets=get_memory_value('key_snippets')
)
while True:
try:
for chunk in summary_agent.stream(
{"messages": [HumanMessage(content=summary_prompt)]},
config
):
print_agent_output(chunk)
break
except ChatAnthropic.InternalServerError as e:
print(f"Encountered Anthropic Internal Server Error: {e}. Retrying...")
continue
def run_research_subtasks(base_task: str, config: dict):
"""Run research subtasks with separate agents."""
subtasks = _global_memory.get('research_subtasks', [])
if not subtasks:
return
print_stage_header("RESEARCH SUBTASKS")
# Create tools for subtask agents (excluding spawn_research_subtask and request_implementation)
subtask_tools = [
tool for tool in research_tools
if tool.name not in ['emit_research_subtask', 'request_implementation']
]
for i, subtask in enumerate(subtasks, 1):
print_task_header(f"Research Subtask {i}/{len(subtasks)}")
# Create fresh memory and agent for each subtask
subtask_memory = MemorySaver()
subtask_agent = create_react_agent(
model,
subtask_tools,
checkpointer=subtask_memory
)
# Run the subtask agent
subtask_prompt = f"Research Subtask: {subtask}\n\n{RESEARCH_PROMPT}"
while True:
try:
for chunk in subtask_agent.stream(
{"messages": [HumanMessage(content=subtask_prompt)]},
config
):
print_agent_output(chunk)
break
except ChatAnthropic.InternalServerError as e:
print(f"Encountered Anthropic Internal Server Error: {e}. Retrying...")
continue
if __name__ == "__main__":
args = parse_arguments()
base_task = args.task
config = {"configurable": {"thread_id": "abc123"}, "recursion_limit": 100}
# Run research stage
print_stage_header("RESEARCH STAGE")
while True:
try:
for chunk in research_agent.stream(
{"messages": [HumanMessage(content=f"User query: {base_task}\n\n{RESEARCH_PROMPT}\n\nBe very thorough in your research and emit lots of snippets, key facts. If you take more than a few steps, be eager to emit research subtasks. Only request implementation if the user explicitly asked for changes to be made.")]},
config
):
print_agent_output(chunk)
break
except ChatAnthropic.InternalServerError as e:
print(f"Encountered Anthropic Internal Server Error: {e}. Retrying...")
continue
# Run any research subtasks
run_research_subtasks(base_task, config)
# For informational queries, summarize findings
if is_informational_query():
summarize_research_findings(base_task, config)
else:
# Only proceed with planning and implementation if not an informational query
print_stage_header("PLANNING STAGE")
planning_prompt = PLANNING_PROMPT.format(
research_notes=get_memory_value('research_notes'),
key_facts=get_memory_value('key_facts'),
key_snippets=get_memory_value('key_snippets'),
base_task=base_task
)
# Run planning agent
while True:
try:
for chunk in planning_agent.stream(
{"messages": [HumanMessage(content=planning_prompt)]},
config
):
print_agent_output(chunk)
break
except ChatAnthropic.InternalServerError as e:
print(f"Encountered Anthropic Internal Server Error: {e}. Retrying...")
continue
# Run implementation stage with task-specific agents
run_implementation_stage(
base_task,
get_memory_value('tasks'),
get_memory_value('plan'),
related_files
)

View File

@ -0,0 +1,4 @@
from .formatting import print_stage_header, print_task_header
from .output import print_agent_output
__all__ = ['print_stage_header', 'print_task_header', 'print_agent_output']

View File

@ -0,0 +1,21 @@
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
console = Console()
def print_stage_header(stage: str) -> None:
"""Print a stage header with green styling and rocket emoji. Content is rendered as Markdown.
Args:
stage: The stage name to print (supports Markdown formatting)
"""
console.print(Panel(Markdown(stage), title="🚀 Stage", style="green bold"))
def print_task_header(task: str) -> None:
"""Print a task header with yellow styling and wrench emoji. Content is rendered as Markdown.
Args:
task: The task text to print (supports Markdown formatting)
"""
console.print(Panel(Markdown(task), title="🔧 Task", border_style="yellow bold"))

26
ra_aid/console/output.py Normal file
View File

@ -0,0 +1,26 @@
from typing import Any, Dict
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from langchain_core.messages import AIMessage
# Import shared console instance
from .formatting import console
def print_agent_output(chunk: Dict[str, Any]) -> None:
"""Print only the agent's message content, not tool calls.
Args:
chunk: A dictionary containing agent or tool messages
"""
if 'agent' in chunk and 'messages' in chunk['agent']:
messages = chunk['agent']['messages']
for msg in messages:
if isinstance(msg, AIMessage):
# Handle text content
if isinstance(msg.content, list):
for content in msg.content:
if content['type'] == 'text':
console.print(Panel(Markdown(content['text']), title="🤖 Assistant"))
else:
console.print(Panel(Markdown(msg.content), title="🤖 Assistant"))

View File

@ -0,0 +1,78 @@
"""
Module for running interactive subprocesses with output capture.
"""
import os
import re
import tempfile
import shlex
import shutil
from typing import List, Tuple
def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]:
"""
Runs an interactive command with a pseudo-tty, capturing combined output.
Assumptions and constraints:
- We are on a Linux system with script available
- `cmd` is a non-empty list where cmd[0] is the executable
- The executable and script are assumed to be on PATH
- If anything is amiss (e.g., command not found), we fail early and cleanly
The output is cleaned to remove ANSI escape sequences and control characters.
Returns:
Tuple of (cleaned_output, return_code)
"""
# Fail early if cmd is empty
if not cmd:
raise ValueError("No command provided.")
# Check that the command exists
if shutil.which(cmd[0]) is None:
raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.")
# Create temp files (we'll always clean them up)
output_file = tempfile.NamedTemporaryFile(prefix="output_", delete=False)
retcode_file = tempfile.NamedTemporaryFile(prefix="retcode_", delete=False)
output_path = output_file.name
retcode_path = retcode_file.name
output_file.close()
retcode_file.close()
# Quote arguments for safety
quoted_cmd = ' '.join(shlex.quote(c) for c in cmd)
# Use script to capture output with TTY and save return code
shell_cmd = f"{quoted_cmd}; echo $? > {shlex.quote(retcode_path)}"
def cleanup():
for path in [output_path, retcode_path]:
if os.path.exists(path):
os.remove(path)
try:
# Run command with script for TTY and output capture
os.system(f"script -q -c {shlex.quote(shell_cmd)} {shlex.quote(output_path)}")
# Read and clean the output
with open(output_path, "rb") as f:
output = f.read()
# Clean ANSI escape sequences and control characters
output = re.sub(rb'\x1b\[[0-9;]*[a-zA-Z]', b'', output) # ANSI escape sequences
output = re.sub(rb'[\x00-\x08\x0b\x0c\x0e-\x1f]', b'', output) # Control chars
# Get the return code
with open(retcode_path, "r") as f:
return_code = int(f.read().strip())
except Exception as e:
# If something goes wrong, cleanup and re-raise
cleanup()
raise RuntimeError("Error running interactive capture") from e
finally:
# Ensure files are removed no matter what
cleanup()
return output, return_code

258
ra_aid/prompts.py Normal file
View File

@ -0,0 +1,258 @@
"""
Stage-specific prompts for the AI agent system.
Each prompt constant uses str.format() style template substitution for variable replacement.
The prompts guide the agent through different stages of task execution.
"""
# Research stage prompt - guides initial codebase analysis
# Research stage prompt - guides initial codebase analysis
RESEARCH_PROMPT = """
Objective
Your only goal is to thoroughly research what currently exists in the codebasenothing else.
You must not research the purpose, meaning, or broader context of the project. Do not discuss or reason about the problem the code is trying to solve. Do not plan improvements or speculate on future changes.
Role
You are an autonomous research agent focused solely on enumerating and describing the current codebase and its related files. You are not a planner, not an implementer, and not a chatbot for general problem solving. You will not propose solutions, improvements, or modifications.
Strict Focus on Existing Artifacts
You must:
Identify directories and files currently in the codebase.
Describe what exists in these files (file names, directory structures, documentation found, code patterns, dependencies).
Do so by incrementally and systematically exploring the filesystem with careful directory listing tool calls.
You can use fuzzy file search to quickly find relevant files matching a search pattern.
You must not:
Explain why the code or files exist.
Discuss the project's purpose or the problem it may solve.
Suggest any future actions, improvements, or architectural changes.
Make assumptions or speculate about things not explicitly present in the files.
Tools and Methodology
Use only non-recursive, targeted fuzzy find, ripgrep_search tool (which provides context), list_directory_tree tool, shell commands, etc. (use your imagination) to efficiently explore the project structure. For example:
After identifying files, you may read them to confirm their contents only if needed to understand what currently exists (for example, to confirm if a file is a documentation file or a configuration file).
Be meticulous: If you find a directory, explore it thoroughly. If you find files of potential relevance, record them. Make sure you do not skip any directories you discover.
Prefer to use list_directory_tree and other tools over shell commands.
Do not produce huge outputs from your commands. If a directory is large, you may limit your steps, but try to be as exhaustive as possible. Incrementally gather details as needed.
Spawn subtasks for topics that require deeper investigation.
Reporting Findings
Use emit_research_notes to record detailed, fact-based observations about what currently exists.
For each significant file or directory that is part of the codebase, use emit_related_file to list it.
Your research notes should be strictly about what you have observed:
Document files by their names and locations.
Document discovered documentation files and their contents at a high level (e.g., "There is a README.md in the root directory that explains the folder structure").
Document code files by type or apparent purpose (e.g., "There is a main.py file containing code to launch an application").
Document configuration files, dependencies (like package.json, requirements.txt), testing files, and anything else present.
No Planning or Problem-Solving
Do not suggest fixes or improvements.
Do not mention what should be done.
Do not discuss how the code could be better structured.
Do not provide advice or commentary on the projects future.
You must remain strictly within the bounds of describing what currently exists.
Thoroughness and Completeness
If this is determined to be a new/empty project (no code or files), state that and stop.
If it is an existing project, explore it fully:
Start at the root directory, ls to see whats there.
For each directory found, navigate in and run ls again.
Continue this process until you have discovered all directories and files at all levels.
Carefully report what you found, including all directories and files.
Do not move on until you are certain you have a complete picture of the codebase structure.
Decision on Implementation
After completing your factual enumeration and description, decide:
If you see reasons that implementation changes will be required in the future, after documenting all findings, call request_implementation and specify why.
If no changes are needed, simply state that no changes are required.
Do not do any implementation or planning now. Just request it if needed.
If there is a top-level README.md or docs/ folder, always start with that.
"""
# Planning stage prompt - guides task breakdown and implementation planning
PLANNING_PROMPT = """Base Task:
{base_task}
Research Notes:
<notes>
{research_notes}
</notes>
Key Facts:
{key_facts}
Key Snippets:
{key_snippets}
Fact Management:
Each fact is identified with [Fact ID: X].
Facts may be deleted if they become outdated, irrelevant, or duplicates.
Use delete_key_fact with the specific Fact ID to remove unnecessary facts.
Snippet Management:
Each snippet is identified with [Snippet ID: X].
Snippets include file path, line number, and source code.
Snippets may have optional descriptions explaining their significance.
Delete snippets with delete_key_snippet if they become outdated or irrelevant.
Use emit_key_snippet to store important code sections needed for reference.
Fact Management:
Each fact is identified with [Fact ID: X].
Facts may be deleted if they become outdated, irrelevant, or duplicates.
Use delete_key_fact with the specific Fact ID to remove unnecessary facts.
Snippet Management:
Each snippet is identified with [Snippet ID: X].
Snippets include file path, line number, and source code.
Snippets may have optional descriptions explaining their significance.
Delete snippets with delete_key_snippet if they become outdated or irrelevant.
Use emit_key_snippet to store important code sections needed for reference.
Guidelines:
If you need additional input or assistance from the expert, first use emit_expert_context to provide all relevant context. Wait for the experts response before defining tasks in non-trivial scenarios.
When planning the implementation:
Break the overall work into sub-tasks that are as detailed as possible.
Each sub-task should be clear and unambiguous, and should fully describe what needs to be done, including:
Purpose and goals of the sub-task
Steps required to complete it
Any external interfaces it will integrate with
Data models and structures it will use
API contracts, endpoints, or protocols it requires or provides
Detailed testing strategies specific to the sub-task
Be explicit about inputs, outputs, error cases, and edge conditions.
For complex tasks, include:
Sample requests and responses (if APIs are involved)
Details on error handling and logging
Relevant data validation rules
Any performance, scalability, or security considerations
After finalizing the overall approach:
Use emit_plan to store the high-level implementation plan.
For each sub-task, use emit_task to store a thorough, step-by-step description.
The description should be so detailed that it could be handed to another engineer who could implement it without further clarification.
Only stop after all necessary tasks are fully detailed and cover the entire scope of the original request.
Avoid unnecessary complexity, but do not omit critical details.
Do not implement anything yet.
You are an autonomous agent, not a chatbot."""
# Research summary prompt - guides generation of research summaries
SUMMARY_PROMPT = """
Using only the information provided in the Research Notes and Key Facts below, write a concise and direct answer to the user's query.
User's Query:
{base_task}
Research Notes:
{research_notes}
Key Facts:
{key_facts}
Key Snippets:
{key_snippets}
Fact Management:
Each fact is identified with [Fact ID: X].
Facts may be deleted if they become outdated, irrelevant, or duplicates.
Use delete_key_fact with the specific Fact ID to remove unnecessary facts.
Snippet Management:
Each snippet is identified with [Snippet ID: X].
Snippets include file path, line number, and source code.
Snippets may have optional descriptions explaining their significance.
Delete snippets with delete_key_snippet if they become outdated or irrelevant.
Use emit_key_snippet to store important code sections needed for reference.
Instructions:
- **Stay Within Provided Information**: Do not include any information not present in the Research Notes or Key Facts. Avoid assumptions or external knowledge.
- **Handle Contradictions Appropriately**: If there are contradictions in the provided information, you may take further research steps to resolve the contradiction. If you cannot, note and explain the contradictions as best as you can.
- **Maintain Focus and Brevity**: Keep your response succinct yet comprehensive and focused solely on the user's query without adding unnecessary details.
- **Include technical details**: If it is a technical query or a query related to files on the filesystem, always take time to read those and include relevant snippets.
"""
# Implementation stage prompt - guides specific task implementation
IMPLEMENTATION_PROMPT = """Base-level task (for reference only):
{base_task}
Plan Overview:
{plan}
Key Facts:
{key_facts}
Key Snippets:
{key_snippets}
Relevant Files:
{related_files}
Important Notes:
- You must focus solely on the given task and implement it as described.
- Do not implement other tasks or deviate from the defined scope.
- Use the delete_key_fact tool to remove facts that become outdated, irrelevant, or duplicated.
- Whenever referencing facts, use their assigned **[Fact ID: X]** format.
- Aggressively manage code snippets throughout implementation:
**When to Add Snippets**
- Capture code with emit_key_snippet:
* Before modifying any existing code
* When discovering related code that impacts the task
* After implementing new code sections
* When finding code patterns that will be modified
**When to Remove Snippets**
- Use delete_key_snippet with [Snippet ID: X]:
* Immediately after modifying or replacing referenced code
* When the snippet becomes obsolete or irrelevant
* When newer versions of the code exist
* When the referenced code has been deleted
**Snippet Management Examples**
- Adding a snippet before modification:
emit_key_snippet with:
filepath: "path/to/file.py"
line_number: 10
snippet: "[code to be modified]"
description: "Original version before changes"
- Removing an outdated snippet:
delete_key_snippet with [Snippet ID: X] after the code is modified
**Maintaining Snippet Quality**
- Only keep snippets relevant to current or future task understanding
- Regularly review snippets to ensure they match current codebase
- Prioritize snippet management but don't let it block implementation progress
- Use snippets to complement version control by highlighting key code sections
Instructions:
1. Review the provided base task, plan, and key facts.
2. Implement only the specified task:
{task}
3. While implementing, follow these guidelines:
- Work incrementally, testing and validating as you go.
- Update or remove any key facts that no longer apply.
- Do not build features not explicitly required by the task.
- Only create or modify files directly related to this task.
4. Once the task is complete, ensure all updated files are emitted.
No other activities (such as discussing purpose, future improvements, or unrelated steps) are allowed. Stay fully focused on completing the defined implementation task.
"""

3
ra_aid/text/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .processing import truncate_output
__all__ = ['truncate_output']

42
ra_aid/text/processing.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Optional
def truncate_output(output: str, max_lines: Optional[int] = 5000) -> str:
"""Truncate output string to keep only the most recent lines if it exceeds max_lines.
When truncation occurs, adds a message indicating how many lines were removed.
Preserves original line endings and handles Unicode characters correctly.
Args:
output: The string output to potentially truncate
max_lines: Maximum number of lines to keep (default: 5000)
Returns:
The truncated string if it exceeded max_lines, or the original string if not
"""
# Handle empty output
if not output:
return ""
# Set max_lines to default if None
if max_lines is None:
max_lines = 5000
# Split while preserving line endings
lines = output.splitlines(keepends=True)
total_lines = len(lines)
# Return original if under limit
if total_lines <= max_lines:
return output
# Calculate lines to remove
lines_removed = total_lines - max_lines
# Keep only the most recent lines
truncated_lines = lines[-max_lines:]
# Add truncation message at start
truncation_msg = f"[{lines_removed} lines of output truncated]\n"
# Combine message with remaining lines
return truncation_msg + "".join(truncated_lines)

36
ra_aid/tools/__init__.py Normal file
View File

@ -0,0 +1,36 @@
from .shell import run_shell_command
from .programmer import run_programming_task, emit_related_file
from .expert import ask_expert, emit_expert_context
from .read_file import read_file_tool
from .fuzzy_find import fuzzy_find_project_files
from .list_directory import list_directory_tree
from .ripgrep import ripgrep_search
from .memory import (
emit_research_notes, emit_plan, emit_task, get_memory_value, emit_key_fact,
request_implementation, skip_implementation, delete_key_fact, emit_research_subtask,
emit_key_snippet, delete_key_snippet
)
__all__ = [
'ask_expert',
'delete_key_fact',
'delete_key_snippet',
'emit_expert_context',
'emit_key_fact',
'emit_key_snippet',
'emit_plan',
'emit_related_file',
'emit_research_notes',
'emit_task',
'fuzzy_find_project_files',
'get_memory_value',
'list_directory_tree',
'read_file_tool',
'request_implementation',
'run_programming_task',
'run_shell_command',
'skip_implementation',
'emit_research_subtask',
'fuzzy_find_project_files',
'ripgrep_search'
]

100
ra_aid/tools/expert.py Normal file
View File

@ -0,0 +1,100 @@
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from langchain_openai import ChatOpenAI
from .memory import get_memory_value
console = Console()
model = ChatOpenAI(model_name="o1-preview")
# Keep track of context globally
expert_context = []
@tool("emit_expert_context")
def emit_expert_context(context: str) -> str:
"""Add context for the next expert question.
This should be highly detailed contents such as entire sections of source code, etc.
Do not include your question in the additional context.
Err on the side of adding more context rather than less.
Expert context will be reset after the ask_expert tool is called.
Args:
context: The context to add
Returns:
Confirmation message
"""
global expert_context
expert_context.append(context)
return f"Added context: {context}"
@tool("ask_expert")
def ask_expert(question: str) -> str:
"""Ask a question to an expert AI model.
Keep your questions specific, but long and detailed.
You only query the expert when you have a specific question in mind.
The expert can be extremely useful at logic questions, debugging, and reviewing complex source code, but you must provide all context including source manually.
Try to phrase your question in a way that it does not expand the scope of our top-level task.
The expert can be prone to overthinking depending on what and how you ask it.
Args:
question: The question to ask the expert
Returns:
The expert's response
"""
global expert_context
# Build query with context and key facts
query_parts = []
# Add key facts if they exist
key_facts = get_memory_value('key_facts')
if key_facts and len(key_facts) > 0:
query_parts.append("# Key Facts About This Project")
query_parts.append(key_facts)
# Add other context if it exists
if expert_context:
query_parts.append("\n# Additional Context")
query_parts.append("\n".join(expert_context))
# Add the question last
if query_parts: # If we have context/facts, add a newline before question
query_parts.append("\n# Question")
query_parts.append(question)
# Join all parts
query = "\n".join(query_parts)
# Display the query in a panel before making the call
console.print(Panel(
Markdown(query),
title="🤔 Expert Query",
border_style="yellow"
))
# Clear context after use
expert_context.clear()
# Get response
response = model.invoke(query)
# Format and display response
console.print(Panel(
Markdown(response.content),
title="Expert Response",
border_style="blue"
))
return response.content

151
ra_aid/tools/fuzzy_find.py Normal file
View File

@ -0,0 +1,151 @@
from pathlib import Path
from typing import List, Tuple, Optional
import fnmatch
from git import Repo
from git.exc import InvalidGitRepositoryError
from fuzzywuzzy import process
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
console = Console()
DEFAULT_EXCLUDE_PATTERNS = [
'*.pyc',
'__pycache__/*',
'.git/*',
'*.so',
'*.o',
'*.class'
]
@tool
def fuzzy_find_project_files(
search_term: str,
*,
repo_path: str = ".",
threshold: int = 60,
max_results: int = 10,
include_paths: Optional[List[str]] = None,
exclude_patterns: Optional[List[str]] = None
) -> List[Tuple[str, int]]:
"""Fuzzy find files in a git repository matching the search term.
This tool searches for files within a git repository using fuzzy string matching,
allowing for approximate matches to the search term. It returns a list of matched
files along with their match scores.
Args:
search_term: String to match against file paths
repo_path: Path to git repository (defaults to current directory)
threshold: Minimum similarity score (0-100) for matches (default: 60)
max_results: Maximum number of results to return (default: 10)
include_paths: Optional list of path patterns to include in search
exclude_patterns: Optional list of path patterns to exclude from search
Returns:
List of tuples containing (file_path, match_score)
Raises:
InvalidGitRepositoryError: If repo_path is not a git repository
ValueError: If threshold is not between 0 and 100
"""
# Validate threshold
if not 0 <= threshold <= 100:
raise ValueError("Threshold must be between 0 and 100")
# Handle empty search term as special case
if not search_term:
return []
# Initialize repo for normal search
repo = Repo(repo_path)
# Get all tracked files
tracked_files = repo.git.ls_files().splitlines()
# Get all untracked files
untracked_files = repo.untracked_files
# Combine file lists
all_files = tracked_files + untracked_files
# Apply include patterns if specified
if include_paths:
filtered_files = []
for pattern in include_paths:
filtered_files.extend(
f for f in all_files
if fnmatch.fnmatch(f, pattern)
)
all_files = filtered_files
# Apply exclude patterns
patterns = DEFAULT_EXCLUDE_PATTERNS + (exclude_patterns or [])
for pattern in patterns:
all_files = [
f for f in all_files
if not fnmatch.fnmatch(f, pattern)
]
# Perform fuzzy matching
matches = process.extract(
search_term,
all_files,
limit=max_results
)
# Filter by threshold
filtered_matches = [
(path, score)
for path, score in matches
if score >= threshold
]
# Build info panel content
info_sections = []
# Search parameters section
params_section = [
"## Search Parameters",
f"**Search Term**: `{search_term}`",
f"**Repository**: `{repo_path}`",
f"**Threshold**: {threshold}",
f"**Max Results**: {max_results}"
]
if include_paths:
params_section.append("\n**Include Patterns**:")
for pattern in include_paths:
params_section.append(f"- `{pattern}`")
if exclude_patterns:
params_section.append("\n**Exclude Patterns**:")
for pattern in exclude_patterns:
params_section.append(f"- `{pattern}`")
info_sections.append("\n".join(params_section))
# Results statistics section
stats_section = [
"## Results Statistics",
f"**Total Files Scanned**: {len(all_files)}",
f"**Matches Found**: {len(filtered_matches)}"
]
info_sections.append("\n".join(stats_section))
# Top results section
if filtered_matches:
results_section = ["## Top Matches"]
for path, score in filtered_matches[:5]: # Show top 5 matches
results_section.append(f"- `{path}` (score: {score})")
info_sections.append("\n".join(results_section))
else:
info_sections.append("## Results\n*No matches found*")
# Display the panel
console.print(Panel(
Markdown("\n\n".join(info_sections)),
title="🔍 Fuzzy Find Results",
border_style="bright_blue"
))
return filtered_matches

View File

@ -0,0 +1,203 @@
from pathlib import Path
from typing import List, Optional, Dict, Any
import datetime
from dataclasses import dataclass
import pathspec
from rich.tree import Tree
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from langchain_core.tools import tool
import fnmatch
console = Console()
@dataclass
class DirScanConfig:
"""Configuration for directory scanning"""
max_depth: int
follow_links: bool
show_size: bool
show_modified: bool
exclude_patterns: List[str]
def format_size(size_bytes: int) -> str:
"""Format file size in human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
return f"{size_bytes:.1f}{unit}"
size_bytes /= 1024
return f"{size_bytes:.1f}TB"
def format_time(timestamp: float) -> str:
"""Format timestamp as readable date"""
dt = datetime.datetime.fromtimestamp(timestamp)
return dt.strftime("%Y-%m-%d %H:%M")
# Default patterns to exclude
DEFAULT_EXCLUDE_PATTERNS = [
".*", # Hidden files
"__pycache__", # Python cache
"*.pyc", # Python bytecode
"node_modules", # Node.js modules
"*.swp", # Vim swap files
"*.swo", # Vim swap files
"*.swn", # Vim swap files
"*.class", # Java bytecode
"*.o", # Object files
"*.so", # Shared libraries
"*.dll", # Dynamic libraries
"*.exe", # Executables
"*.log", # Log files
"*.bak", # Backup files
"*.tmp", # Temporary files
"*.cache", # Cache files
]
def load_gitignore_patterns(path: Path) -> pathspec.PathSpec:
"""Load gitignore patterns from .gitignore file or use defaults.
Args:
path: Directory path to search for .gitignore
Returns:
PathSpec object configured with the loaded patterns
"""
gitignore_path = path / '.gitignore'
patterns = []
# Load patterns from .gitignore if it exists
if gitignore_path.exists():
with open(gitignore_path) as f:
patterns.extend(line.strip() for line in f
if line.strip() and not line.startswith('#'))
# Add default patterns
patterns.extend(DEFAULT_EXCLUDE_PATTERNS)
return pathspec.PathSpec.from_lines(pathspec.patterns.GitWildMatchPattern, patterns)
def should_ignore(path: str, spec: pathspec.PathSpec) -> bool:
"""Check if a path should be ignored based on gitignore patterns"""
return spec.match_file(path)
def should_exclude(name: str, patterns: List[str]) -> bool:
"""Check if a file/directory name matches any exclude patterns"""
return any(fnmatch.fnmatch(name, pattern) for pattern in patterns)
def build_tree(
path: Path,
tree: Tree,
config: DirScanConfig,
current_depth: int = 0,
spec: Optional[pathspec.PathSpec] = None
) -> None:
"""Recursively build a Rich tree representation of the directory"""
if current_depth >= config.max_depth:
return
try:
# Get sorted list of directory contents
entries = sorted(path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower()))
for entry in entries:
# Get relative path from root for pattern matching
rel_path = entry.relative_to(path)
# Skip if path matches exclude patterns
if spec and should_ignore(str(rel_path), spec):
continue
if should_exclude(entry.name, config.exclude_patterns):
continue
# Skip if symlink and not following links
if entry.is_symlink() and not config.follow_links:
continue
try:
if entry.is_dir():
# Add directory node
branch = tree.add(
f"📁 {entry.name}/"
)
# Recursively process subdirectory
build_tree(entry, branch, config, current_depth + 1, spec)
else:
# Add file node with optional metadata
meta = []
if config.show_size:
meta.append(format_size(entry.stat().st_size))
if config.show_modified:
meta.append(format_time(entry.stat().st_mtime))
label = entry.name
if meta:
label = f"{label} ({', '.join(meta)})"
tree.add(label)
except PermissionError:
tree.add(f"🔒 {entry.name} (Permission denied)")
except PermissionError:
tree.add("🔒 (Permission denied)")
@tool
def list_directory_tree(
path: str = ".",
*,
max_depth: int = 1, # Default to no recursion
follow_links: bool = False,
show_size: bool = False, # Default to not showing size
show_modified: bool = False, # Default to not showing modified time
exclude_patterns: Optional[List[str]] = None
) -> str:
"""List directory contents in a tree format with optional metadata.
Args:
path: Directory path to list
max_depth: Maximum depth to traverse (default: 1 for no recursion)
follow_links: Whether to follow symbolic links
show_size: Show file sizes (default: False)
show_modified: Show last modified times (default: False)
exclude_patterns: List of patterns to exclude (uses gitignore syntax)
Returns:
Rendered tree string
"""
root_path = Path(path).resolve()
if not root_path.exists():
raise ValueError(f"Path does not exist: {path}")
if not root_path.is_dir():
raise ValueError(f"Path is not a directory: {path}")
# Load .gitignore patterns if present
spec = load_gitignore_patterns(root_path)
# Create tree
tree = Tree(f"📁 {root_path}/")
config = DirScanConfig(
max_depth=max_depth,
follow_links=follow_links,
show_size=show_size,
show_modified=show_modified,
exclude_patterns=DEFAULT_EXCLUDE_PATTERNS + (exclude_patterns or [])
)
# Build tree
build_tree(root_path, tree, config, 0, spec)
# Capture tree output
with console.capture() as capture:
console.print(tree)
tree_str = capture.get()
# Display panel
console.print(Panel(
Markdown(f"```\n{tree_str}\n```"),
title="📂 Directory Tree",
border_style="bright_blue"
))
return tree_str

298
ra_aid/tools/memory.py Normal file
View File

@ -0,0 +1,298 @@
from typing import Dict, List, Any, Union, TypedDict, Optional
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from langchain_core.tools import tool
class SnippetInfo(TypedDict):
"""Type definition for source code snippet information"""
filepath: str
line_number: int
snippet: str
description: Optional[str]
console = Console()
# Global memory store
_global_memory: Dict[str, Union[List[Any], Dict[int, str], Dict[int, SnippetInfo], int]] = {
'research_notes': [],
'plans': [],
'tasks': [],
'research_subtasks': [],
'key_facts': {}, # Dict[int, str] - ID to fact mapping
'key_fact_id_counter': 0, # Counter for generating unique fact IDs
'key_snippets': {}, # Dict[int, SnippetInfo] - ID to snippet mapping
'key_snippet_id_counter': 0, # Counter for generating unique snippet IDs
'implementation_requested': [],
'implementation_skipped': []
}
@tool("emit_research_notes")
def emit_research_notes(notes: str) -> str:
"""Store research notes in global memory.
Args:
notes: The research notes to store
Returns:
The stored notes
"""
_global_memory['research_notes'].append(notes)
console.print(Panel(Markdown(notes), title="🔍 Research Notes"))
return notes
@tool("emit_plan")
def emit_plan(plan: str) -> str:
"""Store a plan step in global memory.
Args:
plan: The plan step to store
Returns:
The stored plan
"""
_global_memory['plans'].append(plan)
console.print(Panel(Markdown(plan), title="📋 Plan"))
return plan
@tool("emit_task")
def emit_task(task: str) -> str:
"""Store a task in global memory.
Args:
task: The task to store
Returns:
The stored task
"""
_global_memory['tasks'].append(task)
console.print(Panel(Markdown(task), title="✅ Task"))
return task
@tool("emit_research_subtask")
def emit_research_subtask(subtask: str) -> str:
"""Spawn a research subtask for deeper investigation of a specific topic.
Only use this when a topic requires dedicated focused research beyond the main task.
This should be used sparingly for truly complex research needs.
Args:
subtask: Detailed description of the research subtask
Returns:
Confirmation message
"""
_global_memory['research_subtasks'].append(subtask)
console.print(Panel(Markdown(subtask), title="🔬 Research Subtask"))
return f"Added research subtask: {subtask}"
@tool("emit_key_fact")
def emit_key_fact(fact: str) -> str:
"""Store a key fact about the project or current task in global memory.
Key facts are things like:
- Specific files/functions to look at and what they do
- Coding conventions
- Specific external interfaces related to the task
Key facts should be objective and not restating things already specified in our top-level task.
They are generally things that will not change throughout the duration of our top-level task.
Args:
fact: The key fact to store
Returns:
The stored fact
"""
# Get and increment fact ID
fact_id = _global_memory['key_fact_id_counter']
_global_memory['key_fact_id_counter'] += 1
# Store fact with ID
_global_memory['key_facts'][fact_id] = fact
# Display panel with ID
console.print(Panel(Markdown(fact), title=f"💡 Key Fact #{fact_id}", border_style="bright_cyan"))
# Return fact with ID
return f"Stored fact #{fact_id}: {fact}"
@tool("delete_key_fact")
def delete_key_fact(fact_id: int) -> str:
"""Delete a key fact from global memory by its ID.
Args:
fact_id: The ID of the fact to delete
Returns:
A message indicating success or failure
"""
if fact_id not in _global_memory['key_facts']:
error_msg = f"Error: No fact found with ID #{fact_id}"
console.print(Panel(Markdown(error_msg), title="❌ Delete Failed", border_style="red"))
return error_msg
# Delete the fact
deleted_fact = _global_memory['key_facts'].pop(fact_id)
success_msg = f"Successfully deleted fact #{fact_id}: {deleted_fact}"
console.print(Panel(Markdown(success_msg), title="🗑️ Fact Deleted", border_style="green"))
return success_msg
@tool("request_implementation")
def request_implementation(reason: str) -> str:
"""Request that implementation proceed after research/planning.
Used to indicate the agent should move to implementation stage.
Args:
reason: Why implementation should proceed
Returns:
The stored reason
"""
_global_memory['implementation_requested'].append(reason)
console.print(Panel(Markdown(reason), title="🚀 Implementation Requested"))
return reason
@tool("skip_implementation")
def skip_implementation(reason: str) -> str:
"""Indicate that implementation can be skipped.
Used when research/planning determines no changes are needed.
Args:
reason: Why implementation can be skipped
Returns:
The stored reason
"""
_global_memory['implementation_skipped'].append(reason)
console.print(Panel(Markdown(reason), title="⏭️ Implementation Skipped"))
return reason
@tool("emit_key_snippet")
def emit_key_snippet(filepath: str, line_number: int, snippet: str, description: Optional[str] = None) -> str:
"""Store a key source code snippet in global memory.
Args:
filepath: Path to the source file
line_number: Line number where the snippet starts
snippet: The source code snippet text
description: Optional description of the snippet's significance
Returns:
The stored snippet information
"""
# Get and increment snippet ID
snippet_id = _global_memory['key_snippet_id_counter']
_global_memory['key_snippet_id_counter'] += 1
# Store snippet info
snippet_info: SnippetInfo = {
'filepath': filepath,
'line_number': line_number,
'snippet': snippet,
'description': description
}
_global_memory['key_snippets'][snippet_id] = snippet_info
# Format display text as markdown
display_text = [
f"**Source Location**:",
f"- File: `{filepath}`",
f"- Line: `{line_number}`",
"", # Empty line before code block
"**Code**:",
"```python",
snippet.rstrip(), # Remove trailing whitespace
"```"
]
if description:
display_text.extend(["", "**Description**:", description])
# Display panel
console.print(Panel(Markdown("\n".join(display_text)), title=f"📝 Key Snippet #{snippet_id}", border_style="bright_cyan"))
return f"Stored snippet #{snippet_id}"
@tool("delete_key_snippet")
def delete_key_snippet(snippet_id: int) -> str:
"""Delete a key snippet from global memory by its ID.
Args:
snippet_id: The ID of the snippet to delete
Returns:
A message indicating success or failure
"""
if snippet_id not in _global_memory['key_snippets']:
error_msg = f"Error: No snippet found with ID #{snippet_id}"
console.print(Panel(Markdown(error_msg), title="❌ Delete Failed", border_style="red"))
return error_msg
# Delete the snippet
deleted_snippet = _global_memory['key_snippets'].pop(snippet_id)
success_msg = f"Successfully deleted snippet #{snippet_id} from {deleted_snippet['filepath']}"
console.print(Panel(Markdown(success_msg), title="🗑️ Snippet Deleted", border_style="green"))
return success_msg
def get_memory_value(key: str) -> str:
"""Get a value from global memory.
Different memory types return different formats:
- key_facts: Returns numbered list of facts in format '#ID: fact'
- key_snippets: Returns formatted snippets with file path, line number and content
- All other types: Returns newline-separated list of values
Args:
key: The key to get from memory
Returns:
String representation of the memory values:
- For key_facts: '#ID: fact' format, one per line
- For key_snippets: Formatted snippet blocks
- For other types: One value per line
"""
values = _global_memory.get(key, [])
if key == 'key_facts':
# For empty dict, return empty string
if not values:
return ""
# Sort by ID for consistent output and format as markdown sections
facts = []
for k, v in sorted(values.items()):
facts.extend([
f"## 🔑 Key Fact #{k}",
"", # Empty line for better markdown spacing
v,
"" # Empty line between facts
])
return "\n".join(facts).rstrip() # Remove trailing newline
if key == 'key_snippets':
if not values:
return ""
# Format each snippet with file info and content using markdown
snippets = []
for k, v in sorted(values.items()):
snippet_text = [
f"## 📝 Code Snippet #{k}",
"", # Empty line for better markdown spacing
f"**Source Location**:",
f"- File: `{v['filepath']}`",
f"- Line: `{v['line_number']}`",
"", # Empty line before code block
"**Code**:",
"```python",
v['snippet'].rstrip(), # Remove trailing whitespace
"```"
]
if v['description']:
# Add empty line and description
snippet_text.extend(["", "**Description**:", v['description']])
snippets.append("\n".join(snippet_text))
return "\n\n".join(snippets)
# For other types (lists), join with newlines
return "\n".join(str(v) for v in values)

153
ra_aid/tools/programmer.py Normal file
View File

@ -0,0 +1,153 @@
import subprocess
from typing import List, Optional, Dict, Union
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.markdown import Markdown
from rich.text import Text
from ra_aid.proc.interactive import run_interactive_command
from pydantic import BaseModel, Field
from .memory import get_memory_value
from ra_aid.text.processing import truncate_output
console = Console()
# Keep track of related files globally
related_files = []
@tool
def emit_related_file(file: str) -> str:
"""Add a single file to the list of files that the programmer tool should work with.
Args:
file: File path to add
Returns:
A confirmation message with the added file
"""
global related_files
# Check if file is already in the set
if file not in related_files:
related_files.append(file)
md_content = f"`{file}`"
# Display in a panel
console.print(Panel(Markdown(md_content), title="📁 Related File Added", border_style="green"))
return md_content
class RunProgrammingTaskInput(BaseModel):
instructions: str = Field(description="Instructions for the programming task")
files: Optional[List[str]] = Field(None, description="Optional list of files for Aider to examine")
@tool
def run_programming_task(input: RunProgrammingTaskInput) -> Dict[str, Union[str, int, bool]]:
"""Execute a programming task using Aider.
Be very detailed in your instructions, but do not write the full code for the programmer, as that's the job of the programmer.
The programmer can edit multiple files at once and is intelligent.
If any new files are created, remember to emit them using the emit_related_file tool once this tool completes.
Additionally, before invoking this tool, make sure all existing related files have been emitted using the emit_related_file tool.
Args:
instructions: Instructions for the programming task
files: Optional list of files for Aider to examine. If not provided, uses related_files.
Returns:
A dictionary containing:
- output: The command output (stdout + stderr combined)
- return_code: The process return code (0 typically means success)
- success: Boolean indicating if the command succeeded
"""
# Build command
command = [
"aider",
"--sonnet",
"--yes-always",
"--no-auto-commits",
"--dark-mode",
"--no-suggest-shell-commands",
"-m"
]
# Inject key facts into instructions if they exist
key_facts = get_memory_value('key_facts')
enhanced_instructions = input.instructions
# Get and format snippets if they exist
key_snippets = get_memory_value('key_snippets')
# Combine all sections
enhanced_instructions = f"""Key Facts About This Project:
{key_facts}
Key Code Snippets:
{key_snippets}
Instructions:
{input.instructions}
Only implement the immediate instructions, do not expand scope.
"""
command.append(enhanced_instructions)
# Use both input files and related files
files_to_use = set(related_files) # Start with related files
if input.files: # Add any additional input files
files_to_use.update(input.files)
if files_to_use:
command.extend(list(files_to_use))
# Create a pretty display of what we're doing
task_display = [
"## Instructions\n",
f"{enhanced_instructions}\n"
]
if files_to_use:
task_display.extend([
"\n## Files\n",
*[f"- `{file}`\n" for file in files_to_use]
])
markdown_content = "".join(task_display)
console.print(Panel(Markdown(markdown_content), title="🤖 Aider Task", border_style="bright_blue"))
try:
# Run the command interactively
print()
output, return_code = run_interactive_command(command)
print()
# Return structured output
return {
"output": truncate_output(output.decode() if output else ""),
"return_code": return_code,
"success": return_code == 0
}
except Exception as e:
print()
error_text = Text()
error_text.append("Error running programming task:\n", style="bold red")
error_text.append(str(e), style="red")
console.print(error_text)
return {
"output": str(e),
"return_code": 1,
"success": False
}
# Export the functions
__all__ = ['run_programming_task', 'emit_related_file']

78
ra_aid/tools/read_file.py Normal file
View File

@ -0,0 +1,78 @@
import os.path
import logging
import time
from typing import Dict, Optional, Tuple
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from ra_aid.text.processing import truncate_output
console = Console()
# Standard buffer size for file reading
CHUNK_SIZE = 8192
@tool
def read_file_tool(
filepath: str,
verbose: bool = True,
encoding: str = 'utf-8'
) -> Dict[str, str]:
"""Read and return the contents of a text file.
Args:
filepath: Path to the file to read
verbose: Whether to display a Rich panel with read statistics (default: True)
encoding: File encoding to use (default: utf-8)
Returns:
Dict containing:
- content: The file contents as a string (truncated if needed)
Raises:
RuntimeError: If file cannot be read or does not exist
"""
start_time = time.time()
try:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
logging.debug(f"Starting to read file: {filepath}")
content = []
line_count = 0
total_bytes = 0
with open(filepath, 'r', encoding=encoding) as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
content.append(chunk)
total_bytes += len(chunk)
line_count += chunk.count('\n')
logging.debug(f"Read chunk: {len(chunk)} bytes, running total: {total_bytes} bytes")
full_content = ''.join(content)
elapsed = time.time() - start_time
logging.debug(f"File read complete: {total_bytes} bytes in {elapsed:.2f}s")
logging.debug(f"Pre-truncation stats: {total_bytes} bytes, {line_count} lines")
if verbose:
console.print(Panel(
f"Read {line_count} lines ({total_bytes} bytes) from {filepath} in {elapsed:.2f}s",
title="📄 File Read",
border_style="bright_blue"
))
# Truncate if needed
truncated = truncate_output(full_content) if full_content else ""
return {"content": truncated}
except Exception as e:
elapsed = time.time() - start_time
logging.error(f"Error reading file {filepath} after {elapsed:.2f}s: {str(e)}")
raise

116
ra_aid/tools/ripgrep.py Normal file
View File

@ -0,0 +1,116 @@
from typing import Dict, Union, Optional, List
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from ra_aid.proc.interactive import run_interactive_command
from ra_aid.text.processing import truncate_output
console = Console()
DEFAULT_EXCLUDE_DIRS = [
'.git',
'node_modules',
'vendor',
'.venv',
'__pycache__',
'.cache',
'dist',
'build',
'env',
'.env',
'venv',
'.idea',
'.vscode'
]
@tool
def ripgrep_search(
pattern: str,
*,
file_type: Optional[str] = None,
case_sensitive: bool = True,
include_hidden: bool = False,
follow_links: bool = False,
exclude_dirs: Optional[List[str]] = None
) -> Dict[str, Union[str, int, bool]]:
"""Execute a ripgrep (rg) search with formatting and common options.
Args:
pattern: Search pattern to find
file_type: Optional file type to filter results (e.g. 'py' for Python files)
case_sensitive: Whether to do case-sensitive search (default: True)
include_hidden: Whether to search hidden files and directories (default: False)
follow_links: Whether to follow symbolic links (default: False)
exclude_dirs: Additional directories to exclude (combines with defaults)
Returns:
Dict containing:
- output: The formatted search results
- return_code: Process return code (0 means success)
- success: Boolean indicating if search succeeded
"""
# Build rg command with options
cmd = ['rg', '--color', 'always']
if not case_sensitive:
cmd.append('-i')
if include_hidden:
cmd.append('--hidden')
if follow_links:
cmd.append('--follow')
if file_type:
cmd.extend(['-t', file_type])
# Add exclusions
exclusions = DEFAULT_EXCLUDE_DIRS + (exclude_dirs or [])
for dir in exclusions:
cmd.extend(['--glob', f'!{dir}'])
# Add the search pattern
cmd.append(pattern)
# Build info sections for display
info_sections = []
# Search parameters section
params = [
"## Search Parameters",
f"**Pattern**: `{pattern}`",
f"**Case Sensitive**: {case_sensitive}",
f"**File Type**: {file_type or 'all'}"
]
if include_hidden:
params.append("**Including Hidden Files**: yes")
if follow_links:
params.append("**Following Symlinks**: yes")
if exclude_dirs:
params.append("\n**Additional Exclusions**:")
for dir in exclude_dirs:
params.append(f"- `{dir}`")
info_sections.append("\n".join(params))
# Execute command
try:
print()
output, return_code = run_interactive_command(cmd)
print()
decoded_output = output.decode() if output else ""
return {
"output": truncate_output(decoded_output),
"return_code": return_code,
"success": return_code == 0
}
except Exception as e:
error_msg = str(e)
console.print(Panel(error_msg, title="❌ Error", border_style="red"))
return {
"output": error_msg,
"return_code": 1,
"success": False
}

59
ra_aid/tools/shell.py Normal file
View File

@ -0,0 +1,59 @@
from typing import Dict, Union
from langchain_core.tools import tool
from rich.console import Console
from rich.panel import Panel
from ra_aid.proc.interactive import run_interactive_command
from ra_aid.text.processing import truncate_output
console = Console()
@tool
def run_shell_command(command: str) -> Dict[str, Union[str, int, bool]]:
"""Execute a shell command and return its output.
Assume these are available:
- rg
- tree
- standard linux utilities
Important notes:
1. Try to constrain/limit the output. Output processing is expensive, and infinite/looping output will cause us to fail.
2. When using commands like 'find', 'grep', or similar recursive search tools, always exclude common
development directories and files that can cause excessive output or slow performance:
- Version control: .git
- Dependencies: node_modules, vendor, .venv
- Cache: __pycache__, .cache
- Build: dist, build
- Environment: .env, venv, env
- IDE: .idea, .vscode
3. Avoid doing recursive lists, finds, etc. that could be slow and have a ton of output. Likewise, avoid flags like '-l' that needlessly increase the output. But if you really need to, you can.
Args:
command: List of command arguments. First item is the command, rest are arguments.
Returns:
A dictionary containing:
- output: The command output (stdout + stderr combined)
- return_code: The process return code (0 typically means success)
- success: Boolean indicating if the command succeeded (return code == 0)
"""
# Show just the command in a simple panel
console.print(Panel(command, title="🐚 Shell", border_style="bright_yellow"))
try:
print()
output, return_code = run_interactive_command(['/bin/bash', '-c', command])
print()
return {
"output": truncate_output(output.decode()) if output else "",
"return_code": return_code,
"success": return_code == 0
}
except Exception as e:
print()
console.print(Panel(str(e), title="❌ Error", border_style="red"))
return {
"output": str(e),
"return_code": 1,
"success": False
}

2
ra_aid/version.py Normal file
View File

@ -0,0 +1,2 @@
"""Version information."""
__version__ = "0.1.0"

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
pytest-timeout>=2.2.0
pytest>=7.0.0

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
langchain-anthropic
langgraph
rich>=13.0.0
GitPython==3.1.41
fuzzywuzzy==0.18.0
python-Levenshtein==0.23.0
pathspec>=0.11.0

2
tests/pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
timeout = 30

View File

@ -0,0 +1,132 @@
"""Tests for the interactive subprocess module."""
import os
import sys
import pytest
import tempfile
from ra_aid.proc.interactive import run_interactive_command
def test_basic_command():
"""Test running a basic command."""
output, retcode = run_interactive_command(["echo", "hello world"])
assert b"hello world" in output
assert retcode == 0
def test_shell_pipeline():
"""Test running a shell pipeline command."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "echo 'hello world' | grep 'world'"])
assert b"world" in output
assert retcode == 0
def test_stderr_capture():
"""Test that stderr is properly captured in combined output."""
# Use a command that definitely writes to stderr
output, retcode = run_interactive_command(["/bin/bash", "-c", "ls /nonexistent/path"])
assert b"No such file or directory" in output
assert retcode == 2 # ls returns 2 for file not found
def test_command_not_found():
"""Test handling of non-existent commands."""
with pytest.raises(FileNotFoundError):
run_interactive_command(["nonexistentcommand"])
def test_empty_command():
"""Test handling of empty commands."""
with pytest.raises(ValueError):
run_interactive_command([])
def test_interactive_command():
"""Test running an interactive command.
This test verifies that output appears in real-time using process substitution.
We use a command that prints to both stdout and stderr to verify capture."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "echo stdout; echo stderr >&2"])
assert b"stdout" in output
assert b"stderr" in output
assert retcode == 0
def test_large_output():
"""Test handling of commands that produce large output."""
# Generate a large output with predictable content
cmd = "for i in {1..10000}; do echo \"Line $i of test output\"; done"
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
# Filter out script header/footer
lines = [line for line in output.splitlines() if b"Script" not in line and line.strip()]
# Verify we got all 10000 lines
assert len(lines) == 10000
# Verify content of some lines
assert lines[0] == b"Line 1 of test output"
assert lines[999] == b"Line 1000 of test output"
assert lines[-1] == b"Line 10000 of test output"
assert retcode == 0
def test_unicode_handling():
"""Test handling of unicode characters."""
test_string = "Hello "
output, retcode = run_interactive_command(["/bin/bash", "-c", f"echo '{test_string}'"])
assert test_string.encode() in output
assert retcode == 0
def test_multiple_commands():
"""Test running multiple commands in sequence."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "echo 'first'; echo 'second'"])
assert b"first" in output
assert b"second" in output
assert retcode == 0
def test_cat_medium_file():
"""Test that cat command properly captures output for medium-length files."""
# Create a temporary file with known content
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
for i in range(500):
f.write(f"This is test line {i}\n")
temp_path = f.name
try:
output, retcode = run_interactive_command(["/bin/bash", "-c", f"cat {temp_path}"])
# Split by newlines and filter out script header/footer lines
lines = [line for line in output.splitlines() if b"Script" not in line and line.strip()]
assert len(lines) == 500
assert retcode == 0
# Verify content integrity by checking first and last lines
assert b"This is test line 0" in lines[0]
assert b"This is test line 499" in lines[-1]
finally:
os.unlink(temp_path)
def test_realtime_output():
"""Test that output appears in real-time and is captured correctly."""
# Create a command that sleeps briefly between outputs
cmd = "echo 'first'; sleep 0.1; echo 'second'; sleep 0.1; echo 'third'"
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
# Filter out script header/footer lines
lines = [line for line in output.splitlines() if b"Script" not in line and line.strip()]
assert b"first" in lines[0]
assert b"second" in lines[1]
assert b"third" in lines[2]
assert retcode == 0
def test_tty_available():
"""Test that commands have access to a TTY."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "tty"])
assert b"/dev/pts/" in output # Should show a PTY device
assert retcode == 0

116
tests/ra_aid/test_utils.py Normal file
View File

@ -0,0 +1,116 @@
"""Tests for utility functions."""
import pytest
from ra_aid.text.processing import truncate_output
def test_normal_truncation():
"""Test normal truncation behavior with more lines than max."""
# Create input with 10 lines
input_lines = [f"Line {i}\n" for i in range(10)]
input_text = "".join(input_lines)
# Truncate to 5 lines
result = truncate_output(input_text, max_lines=5)
# Verify truncation message and content
assert "[5 lines of output truncated]" in result
assert "Line 5\n" in result
assert "Line 9\n" in result
assert "Line 0\n" not in result
assert "Line 4\n" not in result
def test_no_truncation_needed():
"""Test when input is shorter than max_lines."""
input_text = "Line 1\nLine 2\nLine 3\n"
result = truncate_output(input_text, max_lines=5)
# Should return original text unchanged
assert result == input_text
assert "[lines of output truncated]" not in result
def test_empty_input():
"""Test with empty input."""
assert truncate_output("") == ""
assert truncate_output(None) == ""
def test_exact_max_lines():
"""Test when input is exactly max_lines."""
# Create input with exactly 5 lines
input_lines = [f"Line {i}\n" for i in range(5)]
input_text = "".join(input_lines)
result = truncate_output(input_text, max_lines=5)
# Should return original text unchanged
assert result == input_text
assert "[lines of output truncated]" not in result
def test_different_line_endings():
"""Test with different line endings (\\n, \\r\\n, \\r)."""
# Mix of different line endings
input_text = "Line 1\nLine 2\r\nLine 3\rLine 4\nLine 5\r\nLine 6"
result = truncate_output(input_text, max_lines=3)
# Should preserve line endings in truncated output
assert "[3 lines of output truncated]" in result
assert "Line 4" in result
assert "Line 6" in result
assert "Line 1" not in result
def test_ansi_sequences():
"""Test with ANSI escape sequences."""
# Input with ANSI color codes
input_lines = [
"\033[31mRed Line 1\033[0m\n",
"\033[32mGreen Line 2\033[0m\n",
"\033[34mBlue Line 3\033[0m\n",
"\033[33mYellow Line 4\033[0m\n"
]
input_text = "".join(input_lines)
result = truncate_output(input_text, max_lines=2)
# Should preserve ANSI sequences in truncated output
assert "[2 lines of output truncated]" in result
assert "\033[34mBlue Line 3\033[0m" in result
assert "\033[33mYellow Line 4\033[0m" in result
assert "\033[31mRed Line 1\033[0m" not in result
def test_custom_max_lines():
"""Test with custom max_lines value."""
# Create input with 100 lines
input_lines = [f"Line {i}\n" for i in range(100)]
input_text = "".join(input_lines)
# Test with custom max_lines=10
result = truncate_output(input_text, max_lines=10)
# Should have truncation message and last 10 lines
assert "[90 lines of output truncated]" in result
assert "Line 90\n" in result
assert "Line 99\n" in result
assert "Line 0\n" not in result
assert "Line 89\n" not in result
def test_no_trailing_newline():
"""Test with input that doesn't end in newline."""
input_lines = [f"Line {i}" for i in range(10)]
input_text = "\n".join(input_lines) # No trailing newline
result = truncate_output(input_text, max_lines=5)
# Should handle truncation correctly without trailing newline
assert "[5 lines of output truncated]" in result
assert "Line 5" in result
assert "Line 9" in result
assert "Line 0" not in result
assert "Line 4" not in result

View File

@ -0,0 +1,134 @@
import pytest
from pytest import mark
from git import Repo
from git.exc import InvalidGitRepositoryError
from ra_aid.tools import fuzzy_find_project_files
@pytest.fixture
def git_repo(tmp_path):
"""Create a temporary git repository with some test files"""
repo = Repo.init(tmp_path)
# Create some files
(tmp_path / "main.py").write_text("print('hello')")
(tmp_path / "test_main.py").write_text("def test_main(): pass")
(tmp_path / "lib").mkdir()
(tmp_path / "lib/utils.py").write_text("def util(): pass")
(tmp_path / "lib/__pycache__").mkdir()
(tmp_path / "lib/__pycache__/utils.cpython-39.pyc").write_text("cache")
# Create some untracked files
(tmp_path / "untracked.txt").write_text("untracked content")
(tmp_path / "draft.py").write_text("# draft code")
# Add and commit only some files
repo.index.add(["main.py", "lib/utils.py"])
repo.index.commit("Initial commit")
return tmp_path
def test_basic_fuzzy_search(git_repo):
"""Test basic fuzzy matching functionality"""
results = fuzzy_find_project_files.invoke({"search_term": "utils", "repo_path": str(git_repo)})
assert len(results) >= 1
assert any("lib/utils.py" in match[0] for match in results)
assert all(isinstance(match[1], int) for match in results)
def test_threshold_filtering(git_repo):
"""Test threshold parameter behavior"""
# Should match with high threshold
results_high = fuzzy_find_project_files.invoke({
"search_term": "main",
"threshold": 80,
"repo_path": str(git_repo)
})
assert len(results_high) >= 1
assert any("main.py" in match[0] for match in results_high)
# Should not match with very high threshold
results_very_high = fuzzy_find_project_files.invoke({
"search_term": "mian",
"threshold": 99,
"repo_path": str(git_repo)
})
assert len(results_very_high) == 0
def test_max_results_limit(git_repo):
"""Test max_results parameter"""
max_results = 1
results = fuzzy_find_project_files.invoke({
"search_term": "py",
"max_results": max_results,
"repo_path": str(git_repo)
})
assert len(results) <= max_results
def test_include_paths_filter(git_repo):
"""Test include_paths filtering"""
results = fuzzy_find_project_files.invoke({
"search_term": "py",
"include_paths": ["lib/*"],
"repo_path": str(git_repo)
})
assert all("lib/" in match[0] for match in results)
def test_exclude_patterns_filter(git_repo):
"""Test exclude_patterns filtering"""
results = fuzzy_find_project_files.invoke({
"search_term": "py",
"exclude_patterns": ["*test*"],
"repo_path": str(git_repo)
})
assert not any("test" in match[0] for match in results)
def test_invalid_threshold():
"""Test error handling for invalid threshold"""
with pytest.raises(ValueError):
fuzzy_find_project_files.invoke({
"search_term": "test",
"threshold": 101
})
def test_non_git_repo(tmp_path):
"""Test error handling outside git repo"""
with pytest.raises(InvalidGitRepositoryError):
fuzzy_find_project_files.invoke({
"search_term": "test",
"repo_path": str(tmp_path)
})
def test_exact_match(git_repo):
"""Test exact matching returns 100% score"""
results = fuzzy_find_project_files.invoke({
"search_term": "main.py",
"repo_path": str(git_repo)
})
assert len(results) >= 1
assert any(match[1] == 100 for match in results)
def test_empty_search_term(git_repo):
"""Test behavior with empty search term"""
results = fuzzy_find_project_files.invoke({
"search_term": "",
"repo_path": str(git_repo)
})
assert len(results) == 0
def test_untracked_files(git_repo):
"""Test that untracked files are included in search results"""
results = fuzzy_find_project_files.invoke({
"search_term": "untracked",
"repo_path": str(git_repo)
})
assert len(results) >= 1
assert any("untracked.txt" in match[0] for match in results)
def test_no_matches(git_repo):
"""Test behavior when no files match the search term"""
results = fuzzy_find_project_files.invoke({
"search_term": "nonexistentfile",
"threshold": 80,
"repo_path": str(git_repo)
})
assert len(results) == 0

View File

@ -0,0 +1,123 @@
import os
import pytest
import tempfile
from pathlib import Path
from ra_aid.tools import list_directory_tree
from ra_aid.tools.list_directory import load_gitignore_patterns, should_ignore
@pytest.fixture
def temp_dir():
"""Create a temporary directory for testing"""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
def create_test_directory_structure(path: Path):
"""Create a test directory structure"""
# Create files
(path / "file1.txt").write_text("content1")
(path / "file2.py").write_text("content2")
(path / ".hidden").write_text("hidden")
# Create subdirectories
subdir1 = path / "subdir1"
subdir1.mkdir()
(subdir1 / "subfile1.txt").write_text("subcontent1")
(subdir1 / "subfile2.py").write_text("subcontent2")
subdir2 = path / "subdir2"
subdir2.mkdir()
(subdir2 / ".git").mkdir()
(subdir2 / "__pycache__").mkdir()
def test_list_directory_basic(temp_dir):
"""Test basic directory listing functionality"""
create_test_directory_structure(temp_dir)
result = list_directory_tree.invoke({
"path": str(temp_dir),
"max_depth": 2,
"follow_links": False
})
# Check basic structure
assert isinstance(result, str)
assert "file1.txt" in result
assert "file2.py" in result
assert "subdir1" in result
assert "subdir2" in result
# Hidden files should be excluded by default
assert ".hidden" not in result
assert ".git" not in result
assert "__pycache__" not in result
# File details should not be present by default
assert "bytes" not in result.lower()
assert "2024-" not in result
def test_list_directory_with_details(temp_dir):
"""Test directory listing with file details"""
create_test_directory_structure(temp_dir)
result = list_directory_tree.invoke({
"path": str(temp_dir),
"max_depth": 2,
"show_size": True,
"show_modified": True
})
# File details should be present
assert "bytes" in result.lower() or "kb" in result.lower() or "b" in result.lower()
assert "2024-" in result
def test_list_directory_depth_limit(temp_dir):
"""Test max_depth parameter"""
create_test_directory_structure(temp_dir)
# Test with depth 1 (default)
result = list_directory_tree.invoke({
"path": str(temp_dir) # Use defaults
})
assert isinstance(result, str)
assert "subdir1" in result # Directory name should be visible
assert "subfile1.txt" not in result # But not its contents
assert "subfile2.py" not in result
def test_list_directory_ignore_patterns(temp_dir):
"""Test exclude patterns"""
create_test_directory_structure(temp_dir)
result = list_directory_tree.invoke({
"path": str(temp_dir),
"max_depth": 2,
"exclude_patterns": ["*.py"]
})
assert isinstance(result, str)
assert "file1.txt" in result
assert "file2.py" not in result
assert "subfile2.py" not in result
def test_gitignore_patterns():
"""Test gitignore pattern loading and matching"""
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir)
# Create a .gitignore file
(path / ".gitignore").write_text("*.log\n*.tmp\n")
spec = load_gitignore_patterns(path)
assert should_ignore("test.log", spec) is True
assert should_ignore("test.tmp", spec) is True
assert should_ignore("test.txt", spec) is False
assert should_ignore("dir/test.log", spec) is True
def test_invalid_path():
"""Test error handling for invalid paths"""
with pytest.raises(ValueError, match="Path does not exist"):
list_directory_tree.invoke({"path": "/nonexistent/path"})
with pytest.raises(ValueError, match="Path is not a directory"):
list_directory_tree.invoke({"path": __file__}) # Try to list the test file itself

View File

@ -0,0 +1,104 @@
import pytest
from ra_aid.tools.memory import (
_global_memory,
emit_key_fact,
delete_key_fact,
get_memory_value,
emit_research_subtask
)
@pytest.fixture
def reset_memory():
"""Reset global memory before each test"""
_global_memory['key_facts'] = {}
_global_memory['key_fact_id_counter'] = 0
_global_memory['research_notes'] = []
_global_memory['plans'] = []
_global_memory['tasks'] = []
_global_memory['research_subtasks'] = []
yield
# Clean up after test
_global_memory['key_facts'] = {}
_global_memory['key_fact_id_counter'] = 0
_global_memory['research_notes'] = []
_global_memory['plans'] = []
_global_memory['tasks'] = []
_global_memory['research_subtasks'] = []
def test_emit_key_fact(reset_memory):
"""Test emitting key facts with ID assignment"""
# First fact should get ID 0
result = emit_key_fact("First fact")
assert result == "Stored fact #0: First fact"
assert _global_memory['key_facts'][0] == "First fact"
# Second fact should get ID 1
result = emit_key_fact("Second fact")
assert result == "Stored fact #1: Second fact"
assert _global_memory['key_facts'][1] == "Second fact"
# Counter should be at 2
assert _global_memory['key_fact_id_counter'] == 2
def test_delete_key_fact(reset_memory):
"""Test deleting key facts"""
# Add some facts
emit_key_fact("First fact")
emit_key_fact("Second fact")
# Delete fact #0
result = delete_key_fact({'fact_id': 0})
assert result == "Successfully deleted fact #0: First fact"
assert 0 not in _global_memory['key_facts']
assert 1 in _global_memory['key_facts']
def test_delete_invalid_fact(reset_memory):
"""Test error handling when deleting non-existent facts"""
result = delete_key_fact({'fact_id': 999})
assert result == "Error: No fact found with ID #999"
# Add and delete a fact, then try to delete it again
emit_key_fact("Test fact")
delete_key_fact({'fact_id': 0})
result = delete_key_fact({'fact_id': 0})
assert result == "Error: No fact found with ID #0"
def test_get_memory_value_key_facts(reset_memory):
"""Test get_memory_value with key facts dictionary"""
# Empty key facts should return empty string
assert get_memory_value('key_facts') == ""
# Add some facts
emit_key_fact("First fact")
emit_key_fact("Second fact")
# Should return markdown formatted list
expected = "## 🔑 Key Fact #0\n\nFirst fact\n\n## 🔑 Key Fact #1\n\nSecond fact"
assert get_memory_value('key_facts') == expected
def test_get_memory_value_other_types(reset_memory):
"""Test get_memory_value remains compatible with other memory types"""
# Add some research notes
_global_memory['research_notes'].append("Note 1")
_global_memory['research_notes'].append("Note 2")
assert get_memory_value('research_notes') == "Note 1\nNote 2"
# Test with empty list
assert get_memory_value('plans') == ""
# Test with non-existent key
assert get_memory_value('nonexistent') == ""
def test_emit_research_subtask(reset_memory):
"""Test emitting research subtasks"""
# Test adding a research subtask
subtask = "Research Python async patterns"
result = emit_research_subtask(subtask)
# Verify return message
assert result == f"Added research subtask: {subtask}"
# Verify it was stored in memory
assert len(_global_memory['research_subtasks']) == 1
assert _global_memory['research_subtasks'][0] == subtask

View File

@ -0,0 +1,70 @@
import pytest
from pytest import mark
from ra_aid.tools import read_file_tool
def test_basic_file_reading(tmp_path):
"""Test basic file reading functionality"""
# Create a test file
test_file = tmp_path / "test.txt"
test_content = "Hello\nWorld\n"
test_file.write_text(test_content)
# Read the file
result = read_file_tool(str(test_file))
# Verify return format and content
assert isinstance(result, dict)
assert 'content' in result
assert result['content'] == test_content
def test_no_truncation(tmp_path):
"""Test that files under max_lines are not truncated"""
# Create a test file with content under the limit
test_file = tmp_path / "small.txt"
line_count = 4000 # Well under 5000 limit
test_content = "line\n" * line_count
test_file.write_text(test_content)
# Read the file
result = read_file_tool(str(test_file))
# Verify no truncation occurred
assert isinstance(result, dict)
assert '[lines of output truncated]' not in result['content']
assert len(result['content'].splitlines()) == line_count
@pytest.mark.timeout(30)
def test_with_truncation(tmp_path):
"""Test that files over max_lines are properly truncated"""
# Create a test file exceeding the limit
test_file = tmp_path / "large.txt"
line_count = 6000 # Exceeds 5000 limit
test_content = "line\n" * line_count
test_file.write_text(test_content)
# Read the file
result = read_file_tool(str(test_file))
# Verify truncation occurred correctly
assert isinstance(result, dict)
assert '[1000 lines of output truncated]' in result['content']
assert len(result['content'].splitlines()) == 5001 # 5000 content lines + 1 truncation message
def test_nonexistent_file():
"""Test error handling for non-existent files"""
with pytest.raises(FileNotFoundError):
read_file_tool("/nonexistent/file.txt")
def test_empty_file(tmp_path):
"""Test reading an empty file"""
# Create an empty test file
test_file = tmp_path / "empty.txt"
test_file.write_text("")
# Read the file
result = read_file_tool(str(test_file))
# Verify return format and empty content
assert isinstance(result, dict)
assert 'content' in result
assert result['content'] == ""