diff --git a/ra_aid/prompts/research_prompts.py b/ra_aid/prompts/research_prompts.py index ec2d66e..5232cb7 100644 --- a/ra_aid/prompts/research_prompts.py +++ b/ra_aid/prompts/research_prompts.py @@ -168,6 +168,8 @@ Decision on Implementation If this is a top-level README.md or docs folder, start there. +If the user explicitly requested implementation, that means you should first perform all the background research for that task, then call request_implementation where the implementation will be carried out. + NEVER ANNOUNCE WHAT YOU ARE DOING, JUST DO IT! AS THE RESEARCH AGENT, YOU MUST NOT WRITE OR MODIFY ANY FILES. IF FILE MODIFICATION OR IMPLEMENTATINO IS REQUIRED, CALL request_implementation. diff --git a/ra_aid/tools/memory.py b/ra_aid/tools/memory.py index 4ae0120..00a8ac7 100644 --- a/ra_aid/tools/memory.py +++ b/ra_aid/tools/memory.py @@ -500,12 +500,20 @@ def is_binary_file(filepath): mime = magic.from_file(filepath, mime=True) file_type = magic.from_file(filepath) - if not mime.startswith("text/"): - return True - - if "ASCII text" in file_type: + # If MIME type starts with 'text/', it's likely a text file + if mime.startswith("text/"): return False - + + # Also consider 'application/x-python' and similar script types as text + if any(mime.startswith(prefix) for prefix in ['application/x-python', 'application/javascript']): + return False + + # Check for common text file descriptors + text_indicators = ["text", "script", "xml", "json", "yaml", "markdown", "HTML"] + if any(indicator.lower() in file_type.lower() for indicator in text_indicators): + return False + + # If none of the text indicators are present, assume it's binary return True except Exception: return _is_binary_fallback(filepath) diff --git a/tests/ra_aid/mocks/agent_utils_mock.py b/tests/ra_aid/mocks/agent_utils_mock.py new file mode 100644 index 0000000..e08dc2a --- /dev/null +++ b/tests/ra_aid/mocks/agent_utils_mock.py @@ -0,0 +1,311 @@ +"""Simplified mock of agent_utils.py for testing binary file detection. + +This file includes typical Python constructs like imports, functions, classes, and docstrings +to replicate the characteristics of the real agent_utils.py that's causing issues with +binary file detection. +""" + +import os +import sys +import time +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional, Literal, Sequence + +from rich.console import Console +from rich.markdown import Markdown +from rich.panel import Panel + +# Define a logger +logger = None # In real code, this would be an actual logger + + +class MockAgent: + """Mock agent class to simulate the real agent class structure.""" + + def __init__(self, model=None, tools=None, max_tokens=4096, config=None): + """Initialize a mock agent. + + Args: + model: The language model to use + tools: List of tools available to the agent + max_tokens: Maximum tokens to use in context + config: Additional configuration + """ + self.model = model + self.tools = tools or [] + self.max_tokens = max_tokens + self.config = config or {} + self._initialized = True + + def run(self, input_text, config=None): + """Run the agent on input text. + + Args: + input_text: The text to process + config: Optional runtime configuration + + Returns: + Mock agent response + """ + # Simulate processing with a delay + time.sleep(0.1) + return f"Processed: {input_text[:20]}..." + + @staticmethod + def _estimate_tokens(text): + """Estimate number of tokens in text. + + Args: + text: Text to estimate tokens for + + Returns: + Estimated token count (roughly 1 token per 4 characters) + """ + return len(text) // 4 + + +def run_mock_agent(task: str, model=None, **kwargs) -> Optional[str]: + """Run a mock agent on a task. + + This function creates a new agent, sets up tools, and runs the agent on the task. + It includes various parameters and logic to mimic the complexity of the real agent_utils.py. + + Args: + task: The task to process + model: The model to use + **kwargs: Additional keyword arguments + + Returns: + Optional[str]: Result from the agent + """ + # Create a unique ID for this run + run_id = str(uuid.uuid4()) + + # Set up mock console for output + console = Console() + + # Log the start of execution + console.print(Panel(Markdown(f"Starting agent with ID: {run_id}"), title="🤖 Agent")) + + # Setup some complex nested data structures to mimic real code + memory = { + "task_history": [], + "agent_state": { + "initialized": True, + "tools_enabled": True + }, + "config": { + "max_retries": 3, + "timeout": 30, + "debug": False + } + } + + # Track some metrics + metrics = { + "start_time": datetime.now(), + "steps": 0, + "tokens_used": 0 + } + + # Create a mock agent + agent = MockAgent(model=model, config=kwargs.get("config")) + + try: + # Process the task + memory["task_history"].append(task) + metrics["steps"] += 1 + + # Simulate token counting + task_tokens = MockAgent._estimate_tokens(task) + metrics["tokens_used"] += task_tokens + + # Check if we should short-circuit for any reason + if task.lower() == "exit" or task.lower() == "quit": + return "Exit requested" + + # Run the main agent logic + result = agent.run(task) + + # Update completion time + metrics["end_time"] = datetime.now() + metrics["duration"] = (metrics["end_time"] - metrics["start_time"]).total_seconds() + + # Generate a fancy completion message with some complex formatting + completion_message = f""" +## Agent Run Complete + +- **Task**: {task[:50]}{"..." if len(task) > 50 else ""} +- **Duration**: {metrics["duration"]:.2f}s +- **Tokens**: {metrics["tokens_used"]} +- **Steps**: {metrics["steps"]} +- **Result**: Success + """ + + console.print(Panel(Markdown(completion_message), title="✅ Complete")) + + return result + except Exception as e: + # Handle errors + error_message = f"Agent failed: {str(e)}" + console.print(Panel(error_message, title="❌ Error", style="red")) + return None + + +def calculate_something_complex(a: int, b: int, operation: str = "add") -> int: + """Calculate something using the specified operation. + + Args: + a: First number + b: Second number + operation: Operation to perform (add, subtract, multiply, divide) + + Returns: + Result of the calculation + + Raises: + ValueError: If operation is invalid + """ + if operation == "add": + return a + b + elif operation == "subtract": + return a - b + elif operation == "multiply": + return a * b + elif operation == "divide": + if b == 0: + raise ValueError("Cannot divide by zero") + return a / b + else: + raise ValueError(f"Unknown operation: {operation}") + + +class DataProcessor: + """Example class that processes data in various ways.""" + + def __init__(self, data: List[Any]): + """Initialize with data. + + Args: + data: List of data to process + """ + self.data = data + self.processed = False + self.results = {} + + def process(self, method: str = "default"): + """Process the data using specified method. + + Args: + method: Processing method to use + + Returns: + Processed data + """ + if method == "default": + result = [item for item in self.data if item is not None] + elif method == "sum": + result = sum(self.data) + elif method == "count": + result = len(self.data) + else: + result = self.data + + self.results[method] = result + self.processed = True + return result + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the data. + + Returns: + Dictionary of statistics + """ + if not self.data: + return {"count": 0, "empty": True} + + return { + "count": len(self.data), + "empty": len(self.data) == 0, + "methods_used": list(self.results.keys()), + "has_nulls": any(item is None for item in self.data) + } + + def __str__(self): + """String representation. + + Returns: + String describing the processor + """ + return f"DataProcessor(items={len(self.data)}, processed={self.processed})" + + +# Add some multi-line strings with various quotes and formatting +TEMPLATE = """ +# Agent Report + +## Overview +This report was generated by the agent system. + +## Details +- Task: {task} +- Date: {date} +- Status: {status} + +## Summary +{summary} +""" + +SQL_QUERY = ''' +SELECT * +FROM users +WHERE status = 'active' + AND last_login > '2023-01-01' +ORDER BY last_login DESC +LIMIT 10; +''' + +# Regular expression pattern with escapes +PATTERN = r"def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\(" + +# Add a global dictionary with mixed types +GLOBAL_CONFIG = { + "debug": False, + "max_retries": 3, + "timeout": 30, + "endpoints": ["api/v1", "api/v2"], + "rate_limits": { + "minute": 60, + "hour": 3600, + "day": 86400 + }, + "features": { + "experimental": True, + "beta_tools": False + } +} + +# Some unusual unicode characters to ensure encoding handling +UNICODE_EXAMPLE = "Hello 世界! This has unicode: ™ ® © ♥ ⚡ ☁ ☀" + + +def main(): + """Main function to demonstrate the module.""" + # Run a simple example + result = run_mock_agent("Test the mock agent") + print(f"Result: {result}") + + # Try the data processor + processor = DataProcessor([1, 2, 3, None, 5]) + processor.process("default") + stats = processor.get_stats() + print(f"Stats: {stats}") + + # Do a calculation + calc = calculate_something_complex(10, 5, "multiply") + print(f"Calculation: {calc}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/ra_aid/tools/test_memory.py b/tests/ra_aid/tools/test_memory.py index 860de40..e381095 100644 --- a/tests/ra_aid/tools/test_memory.py +++ b/tests/ra_aid/tools/test_memory.py @@ -1,4 +1,5 @@ import sys +import os import types import importlib import pytest @@ -19,6 +20,8 @@ from ra_aid.tools.memory import ( log_work_event, reset_work_log, swap_task_order, + is_binary_file, + _is_binary_fallback, ) from ra_aid.database.repositories.key_fact_repository import get_key_fact_repository from ra_aid.database.repositories.key_snippet_repository import get_key_snippet_repository @@ -955,4 +958,53 @@ def test_is_binary_file_with_null_bytes(reset_memory, monkeypatch): finally: # Clean up if os.path.exists(binary_file.name): - os.unlink(binary_file.name) \ No newline at end of file + os.unlink(binary_file.name) + + +def test_python_file_detection(): + """Test that Python files are correctly identified as text files. + + This test demonstrates an issue where certain Python files are + incorrectly identified as binary files when using the magic library. + The root cause is that the file doesn't have 'ASCII text' in its file type + description despite being a valid text file. + """ + # Path to our mock Python file + mock_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), + '..', 'mocks', 'agent_utils_mock.py')) + + # Verify the file exists + assert os.path.exists(mock_file_path), f"Test file not found: {mock_file_path}" + + # Verify using fallback method correctly identifies as text file + is_binary_fallback = _is_binary_fallback(mock_file_path) + assert not is_binary_fallback, "Fallback method should identify Python file as text" + + # The following test will fail with the current implementation when using magic + try: + import magic + if magic: + # Only run this part of the test if magic is available + with patch('ra_aid.tools.memory.magic') as mock_magic: + # Mock magic to simulate the behavior that causes the issue + mock_magic.from_file.side_effect = [ + "text/x-python", # First call with mime=True + "Python script text executable" # Second call without mime=True + ] + + # This should return False (not binary) but currently returns True + is_binary = is_binary_file(mock_file_path) + + # Verify the magic library was called correctly + mock_magic.from_file.assert_any_call(mock_file_path, mime=True) + mock_magic.from_file.assert_any_call(mock_file_path) + + # This assertion is EXPECTED TO FAIL with the current implementation + # It demonstrates the bug we need to fix + assert not is_binary, ( + "Python file incorrectly identified as binary. " + "The current implementation requires 'ASCII text' in file_type description, " + "but Python files often have 'Python script text' instead." + ) + except ImportError: + pytest.skip("magic library not available, skipping magic-specific test") \ No newline at end of file