initial write_file implementation
This commit is contained in:
parent
35814362cd
commit
7b42ab569c
|
|
@ -200,6 +200,9 @@ Instructions:
|
||||||
Testing:
|
Testing:
|
||||||
|
|
||||||
- If your task involves writing unit tests, first inspect existing test suites and analyze at least one existing test to learn about testing organization and conventions.
|
- If your task involves writing unit tests, first inspect existing test suites and analyze at least one existing test to learn about testing organization and conventions.
|
||||||
|
- If you add or change any unit tests, run them using run_shell_command and ensure they pass (check docs or analyze directory structure/test files to infer how to run them.)
|
||||||
|
- Start with running very specific tests, then move to more general/complete test suites.
|
||||||
|
- If you have any doubts about logic or debugging (or how to best test something), ask the expert to perform deep analysis.
|
||||||
|
|
||||||
Once the task is complete, ensure all updated files are emitted.
|
Once the task is complete, ensure all updated files are emitted.
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ from .programmer import run_programming_task
|
||||||
from .expert import ask_expert, emit_expert_context
|
from .expert import ask_expert, emit_expert_context
|
||||||
from .read_file import read_file_tool
|
from .read_file import read_file_tool
|
||||||
from .file_str_replace import file_str_replace
|
from .file_str_replace import file_str_replace
|
||||||
|
from .write_file import write_file_tool
|
||||||
from .fuzzy_find import fuzzy_find_project_files
|
from .fuzzy_find import fuzzy_find_project_files
|
||||||
from .list_directory import list_directory_tree
|
from .list_directory import list_directory_tree
|
||||||
from .ripgrep import ripgrep_search
|
from .ripgrep import ripgrep_search
|
||||||
|
|
@ -31,6 +32,7 @@ __all__ = [
|
||||||
'run_programming_task',
|
'run_programming_task',
|
||||||
'run_shell_command',
|
'run_shell_command',
|
||||||
'skip_implementation',
|
'skip_implementation',
|
||||||
|
'write_file_tool',
|
||||||
'emit_research_subtask',
|
'emit_research_subtask',
|
||||||
'ripgrep_search',
|
'ripgrep_search',
|
||||||
'file_str_replace'
|
'file_str_replace'
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,90 @@
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Dict
|
||||||
|
from langchain_core.tools import tool
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.panel import Panel
|
||||||
|
|
||||||
|
console = Console()
|
||||||
|
|
||||||
|
@tool
|
||||||
|
def write_file_tool(
|
||||||
|
filepath: str,
|
||||||
|
content: str,
|
||||||
|
encoding: str = 'utf-8',
|
||||||
|
verbose: bool = True
|
||||||
|
) -> Dict[str, any]:
|
||||||
|
"""Write content to a text file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Path to the file to write
|
||||||
|
content: String content to write to the file
|
||||||
|
encoding: File encoding to use (default: utf-8)
|
||||||
|
verbose: Whether to display a Rich panel with write statistics (default: True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing:
|
||||||
|
- success: Boolean indicating if write was successful
|
||||||
|
- bytes_written: Number of bytes written
|
||||||
|
- elapsed_time: Time taken in seconds
|
||||||
|
- error: Error message if any (None if successful)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If file cannot be written
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
result = {
|
||||||
|
"success": False,
|
||||||
|
"bytes_written": 0,
|
||||||
|
"elapsed_time": 0,
|
||||||
|
"error": None,
|
||||||
|
"filepath": None,
|
||||||
|
"message": None
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Ensure directory exists
|
||||||
|
os.makedirs(os.path.dirname(filepath), exist_ok=True)
|
||||||
|
|
||||||
|
logging.debug(f"Starting to write file: {filepath}")
|
||||||
|
|
||||||
|
with open(filepath, 'w', encoding=encoding) as f:
|
||||||
|
f.write(content)
|
||||||
|
result["bytes_written"] = len(content.encode(encoding))
|
||||||
|
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
result["elapsed_time"] = elapsed
|
||||||
|
result["success"] = True
|
||||||
|
result["filepath"] = filepath
|
||||||
|
result["message"] = "Operation completed successfully"
|
||||||
|
|
||||||
|
logging.debug(f"File write complete: {result['bytes_written']} bytes in {elapsed:.2f}s")
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
console.print(Panel(
|
||||||
|
f"Wrote {result['bytes_written']} bytes to {filepath} in {elapsed:.2f}s",
|
||||||
|
title="💾 File Write",
|
||||||
|
border_style="bright_green"
|
||||||
|
))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
error_msg = str(e)
|
||||||
|
logging.error(f"Error writing file {filepath} after {elapsed:.2f}s: {error_msg}")
|
||||||
|
|
||||||
|
result["elapsed_time"] = elapsed
|
||||||
|
result["error"] = error_msg
|
||||||
|
if "embedded null byte" in error_msg.lower():
|
||||||
|
result["message"] = "Invalid file path: contains null byte character"
|
||||||
|
else:
|
||||||
|
result["message"] = error_msg
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
console.print(Panel(
|
||||||
|
f"Failed to write {filepath}\nError: {error_msg}",
|
||||||
|
title="❌ File Write Error",
|
||||||
|
border_style="red"
|
||||||
|
))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
@ -0,0 +1,180 @@
|
||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch, mock_open
|
||||||
|
from ra_aid.tools.write_file import write_file_tool
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_test_dir(tmp_path):
|
||||||
|
"""Create a temporary test directory."""
|
||||||
|
test_dir = tmp_path / "test_write_dir"
|
||||||
|
test_dir.mkdir(exist_ok=True)
|
||||||
|
return test_dir
|
||||||
|
|
||||||
|
def test_basic_write_functionality(temp_test_dir):
|
||||||
|
"""Test basic successful file writing."""
|
||||||
|
test_file = temp_test_dir / "test.txt"
|
||||||
|
content = "Hello, World!\nTest content"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": content
|
||||||
|
})
|
||||||
|
|
||||||
|
# Verify file contents
|
||||||
|
assert test_file.read_text() == content
|
||||||
|
|
||||||
|
# Verify return dict format
|
||||||
|
assert isinstance(result, dict)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["filepath"] == str(test_file)
|
||||||
|
assert result["bytes_written"] == len(content.encode('utf-8'))
|
||||||
|
assert "Operation completed" in result["message"]
|
||||||
|
|
||||||
|
def test_directory_creation(temp_test_dir):
|
||||||
|
"""Test writing to a file in a non-existent directory."""
|
||||||
|
nested_dir = temp_test_dir / "nested" / "subdirs"
|
||||||
|
test_file = nested_dir / "test.txt"
|
||||||
|
content = "Test content"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": content
|
||||||
|
})
|
||||||
|
|
||||||
|
assert test_file.exists()
|
||||||
|
assert test_file.read_text() == content
|
||||||
|
assert result["success"] is True
|
||||||
|
|
||||||
|
def test_different_encodings(temp_test_dir):
|
||||||
|
"""Test writing files with different encodings."""
|
||||||
|
test_file = temp_test_dir / "encoded.txt"
|
||||||
|
content = "Hello 世界" # Mixed ASCII and Unicode
|
||||||
|
|
||||||
|
# Test UTF-8
|
||||||
|
result_utf8 = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": content,
|
||||||
|
"encoding": 'utf-8'
|
||||||
|
})
|
||||||
|
assert result_utf8["success"] is True
|
||||||
|
assert test_file.read_text(encoding='utf-8') == content
|
||||||
|
|
||||||
|
# Test UTF-16
|
||||||
|
result_utf16 = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": content,
|
||||||
|
"encoding": 'utf-16'
|
||||||
|
})
|
||||||
|
assert result_utf16["success"] is True
|
||||||
|
assert test_file.read_text(encoding='utf-16') == content
|
||||||
|
|
||||||
|
@patch('builtins.open')
|
||||||
|
def test_permission_error(mock_open_func, temp_test_dir):
|
||||||
|
"""Test handling of permission errors."""
|
||||||
|
mock_open_func.side_effect = PermissionError("Permission denied")
|
||||||
|
test_file = temp_test_dir / "noperm.txt"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": "test content"
|
||||||
|
})
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "Permission denied" in result["message"]
|
||||||
|
assert result["error"] is not None
|
||||||
|
|
||||||
|
@patch('builtins.open')
|
||||||
|
def test_io_error(mock_open_func, temp_test_dir):
|
||||||
|
"""Test handling of IO errors."""
|
||||||
|
mock_open_func.side_effect = IOError("IO Error occurred")
|
||||||
|
test_file = temp_test_dir / "ioerror.txt"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": "test content"
|
||||||
|
})
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "IO Error" in result["message"]
|
||||||
|
assert result["error"] is not None
|
||||||
|
|
||||||
|
def test_empty_content(temp_test_dir):
|
||||||
|
"""Test writing empty content to a file."""
|
||||||
|
test_file = temp_test_dir / "empty.txt"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": ""
|
||||||
|
})
|
||||||
|
|
||||||
|
assert test_file.exists()
|
||||||
|
assert test_file.read_text() == ""
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["bytes_written"] == 0
|
||||||
|
|
||||||
|
def test_overwrite_existing_file(temp_test_dir):
|
||||||
|
"""Test overwriting an existing file."""
|
||||||
|
test_file = temp_test_dir / "overwrite.txt"
|
||||||
|
|
||||||
|
# Write initial content
|
||||||
|
test_file.write_text("Initial content")
|
||||||
|
|
||||||
|
# Overwrite with new content
|
||||||
|
new_content = "New content"
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": new_content
|
||||||
|
})
|
||||||
|
|
||||||
|
assert test_file.read_text() == new_content
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["bytes_written"] == len(new_content.encode('utf-8'))
|
||||||
|
|
||||||
|
def test_large_file_write(temp_test_dir):
|
||||||
|
"""Test writing a large file and verify statistics."""
|
||||||
|
test_file = temp_test_dir / "large.txt"
|
||||||
|
content = "Large content\n" * 1000 # Create substantial content
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": content
|
||||||
|
})
|
||||||
|
|
||||||
|
assert test_file.exists()
|
||||||
|
assert test_file.read_text() == content
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["bytes_written"] == len(content.encode('utf-8'))
|
||||||
|
assert os.path.getsize(test_file) == len(content.encode('utf-8'))
|
||||||
|
|
||||||
|
def test_invalid_path_characters(temp_test_dir):
|
||||||
|
"""Test handling of invalid path characters."""
|
||||||
|
invalid_path = temp_test_dir / "invalid\0file.txt"
|
||||||
|
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(invalid_path),
|
||||||
|
"content": "test content"
|
||||||
|
})
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "Invalid file path" in result["message"]
|
||||||
|
|
||||||
|
def test_write_to_readonly_directory(temp_test_dir):
|
||||||
|
"""Test writing to a readonly directory."""
|
||||||
|
readonly_dir = temp_test_dir / "readonly"
|
||||||
|
readonly_dir.mkdir()
|
||||||
|
test_file = readonly_dir / "test.txt"
|
||||||
|
|
||||||
|
# Make directory readonly
|
||||||
|
os.chmod(readonly_dir, 0o444)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = write_file_tool.invoke({
|
||||||
|
"filepath": str(test_file),
|
||||||
|
"content": "test content"
|
||||||
|
})
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "Permission" in result["message"]
|
||||||
|
finally:
|
||||||
|
# Restore permissions for cleanup
|
||||||
|
os.chmod(readonly_dir, 0o755)
|
||||||
Loading…
Reference in New Issue