import os from typing import Dict, List, Optional, Set, Union try: import magic except ImportError: magic = None from langchain_core.tools import tool from rich.console import Console from rich.markdown import Markdown from rich.panel import Panel from typing_extensions import TypedDict class WorkLogEntry(TypedDict): timestamp: str event: str class SnippetInfo(TypedDict): filepath: str line_number: int snippet: str description: Optional[str] console = Console() # Global memory store _global_memory: Dict[ str, Union[ Dict[int, str], Dict[int, SnippetInfo], Dict[int, WorkLogEntry], int, Set[str], bool, str, List[str], List[WorkLogEntry], ], ] = { "research_notes": [], "plans": [], "tasks": {}, # Dict[int, str] - ID to task mapping "task_completed": False, # Flag indicating if task is complete "completion_message": "", # Message explaining completion "task_id_counter": 1, # Counter for generating unique task IDs "key_facts": {}, # Dict[int, str] - ID to fact mapping "key_fact_id_counter": 1, # Counter for generating unique fact IDs "key_snippets": {}, # Dict[int, SnippetInfo] - ID to snippet mapping "key_snippet_id_counter": 1, # Counter for generating unique snippet IDs "implementation_requested": False, "related_files": {}, # Dict[int, str] - ID to filepath mapping "related_file_id_counter": 1, # Counter for generating unique file IDs "plan_completed": False, "agent_depth": 0, "work_log": [], # List[WorkLogEntry] - Timestamped work events } @tool("emit_research_notes") def emit_research_notes(notes: str) -> str: """Use this when you have completed your research to share your notes in markdown format. Keep your research notes information dense and no more than 300 words. Args: notes: REQUIRED The research notes to store """ _global_memory["research_notes"].append(notes) console.print(Panel(Markdown(notes), title="🔍 Research Notes")) return "Research notes stored." @tool("emit_plan") def emit_plan(plan: str) -> str: """Store a plan step in global memory. Args: plan: The plan step to store (markdown format; be clear, complete, use newlines, and use as many tokens as you need) """ _global_memory["plans"].append(plan) console.print(Panel(Markdown(plan), title="📋 Plan")) log_work_event(f"Added plan step:\n\n{plan}") return "Plan stored." @tool("emit_task") def emit_task(task: str) -> str: """Store a task in global memory. Args: task: The task to store """ # Get and increment task ID task_id = _global_memory["task_id_counter"] _global_memory["task_id_counter"] += 1 # Store task with ID _global_memory["tasks"][task_id] = task console.print(Panel(Markdown(task), title=f"✅ Task #{task_id}")) log_work_event(f"Task #{task_id} added:\n\n{task}") return f"Task #{task_id} stored." @tool("emit_key_facts") def emit_key_facts(facts: List[str]) -> str: """Store multiple key facts about the project or current task in global memory. Args: facts: List of key facts to store """ results = [] for fact in facts: # Get and increment fact ID fact_id = _global_memory["key_fact_id_counter"] _global_memory["key_fact_id_counter"] += 1 # Store fact with ID _global_memory["key_facts"][fact_id] = fact # Display panel with ID console.print( Panel( Markdown(fact), title=f"💡 Key Fact #{fact_id}", border_style="bright_cyan", ) ) # Add result message results.append(f"Stored fact #{fact_id}: {fact}") log_work_event(f"Stored {len(facts)} key facts.") return "Facts stored." @tool("delete_key_facts") def delete_key_facts(fact_ids: List[int]) -> str: """Delete multiple key facts from global memory by their IDs. Silently skips any IDs that don't exist. Args: fact_ids: List of fact IDs to delete """ results = [] for fact_id in fact_ids: if fact_id in _global_memory["key_facts"]: # Delete the fact deleted_fact = _global_memory["key_facts"].pop(fact_id) success_msg = f"Successfully deleted fact #{fact_id}: {deleted_fact}" console.print( Panel(Markdown(success_msg), title="Fact Deleted", border_style="green") ) results.append(success_msg) log_work_event(f"Deleted facts {fact_ids}.") return "Facts deleted." @tool("delete_tasks") def delete_tasks(task_ids: List[int]) -> str: """Delete multiple tasks from global memory by their IDs. Silently skips any IDs that don't exist. Args: task_ids: List of task IDs to delete """ results = [] for task_id in task_ids: if task_id in _global_memory["tasks"]: # Delete the task deleted_task = _global_memory["tasks"].pop(task_id) success_msg = f"Successfully deleted task #{task_id}: {deleted_task}" console.print( Panel(Markdown(success_msg), title="Task Deleted", border_style="green") ) results.append(success_msg) log_work_event(f"Deleted tasks {task_ids}.") return "Tasks deleted." @tool("request_implementation") def request_implementation() -> str: """Request that implementation proceed after research/planning. Used to indicate the agent should move to implementation stage. Think carefully before requesting implementation. Do you need to request research subtasks first? Have you run relevant unit tests, if they exist, to get a baseline (this can be a subtask)? Do you need to crawl deeper to find all related files and symbols? """ _global_memory["implementation_requested"] = True console.print(Panel("🚀 Implementation Requested", style="yellow", padding=0)) log_work_event("Implementation requested.") return "Implementation requested." @tool("emit_key_snippet") def emit_key_snippet(snippet_info: SnippetInfo) -> str: """Store a single source code snippet in global memory which represents key information. Automatically adds the filepath of the snippet to related files. This is for **existing**, or **just-written** files, not for things to be created in the future. ONLY emit snippets if they will be relevant to UPCOMING work. Focus on external interfaces and things that are very specific and relevant to UPCOMING work. Args: snippet_info: Dict with keys: - filepath: Path to the source file - line_number: Line number where the snippet starts - snippet: The source code snippet text - description: Optional description of the significance """ # Add filepath to related files emit_related_files.invoke({"files": [snippet_info["filepath"]]}) # Get and increment snippet ID snippet_id = _global_memory["key_snippet_id_counter"] _global_memory["key_snippet_id_counter"] += 1 # Store snippet info _global_memory["key_snippets"][snippet_id] = snippet_info # Format display text as markdown display_text = [ "**Source Location**:", f"- File: `{snippet_info['filepath']}`", f"- Line: `{snippet_info['line_number']}`", "", # Empty line before code block "**Code**:", "```python", snippet_info["snippet"].rstrip(), # Remove trailing whitespace "```", ] if snippet_info["description"]: display_text.extend(["", "**Description**:", snippet_info["description"]]) # Display panel console.print( Panel( Markdown("\n".join(display_text)), title=f"📝 Key Snippet #{snippet_id}", border_style="bright_cyan", ) ) log_work_event(f"Stored code snippet #{snippet_id}.") return f"Snippet #{snippet_id} stored." @tool("delete_key_snippets") def delete_key_snippets(snippet_ids: List[int]) -> str: """Delete multiple key snippets from global memory by their IDs. Silently skips any IDs that don't exist. Args: snippet_ids: List of snippet IDs to delete """ results = [] for snippet_id in snippet_ids: if snippet_id in _global_memory["key_snippets"]: # Delete the snippet deleted_snippet = _global_memory["key_snippets"].pop(snippet_id) success_msg = f"Successfully deleted snippet #{snippet_id} from {deleted_snippet['filepath']}" console.print( Panel( Markdown(success_msg), title="Snippet Deleted", border_style="green" ) ) results.append(success_msg) log_work_event(f"Deleted snippets {snippet_ids}.") return "Snippets deleted." @tool("swap_task_order") def swap_task_order(id1: int, id2: int) -> str: """Swap the order of two tasks in global memory by their IDs. Args: id1: First task ID id2: Second task ID """ # Validate IDs are different if id1 == id2: return "Cannot swap task with itself" # Validate both IDs exist if id1 not in _global_memory["tasks"] or id2 not in _global_memory["tasks"]: return "Invalid task ID(s)" # Swap the tasks _global_memory["tasks"][id1], _global_memory["tasks"][id2] = ( _global_memory["tasks"][id2], _global_memory["tasks"][id1], ) # Display what was swapped console.print( Panel( Markdown(f"Swapped:\n- Task #{id1} ↔️ Task #{id2}"), title="🔄 Tasks Reordered", border_style="green", ) ) return "Tasks deleted." @tool("one_shot_completed") def one_shot_completed(message: str) -> str: """Signal that a one-shot task has been completed and execution should stop. Only call this if you have already **fully** completed the original request. Args: message: Completion message to display """ if _global_memory.get("implementation_requested", False): return "Cannot complete in one shot - implementation was requested" _global_memory["task_completed"] = True _global_memory["completion_message"] = message console.print(Panel(Markdown(message), title="✅ Task Completed")) log_work_event(f"Task completed\n\n{message}") return "Completion noted." @tool("task_completed") def task_completed(message: str) -> str: """Mark the current task as completed with a completion message. Args: message: Message explaining how/why the task is complete """ _global_memory["task_completed"] = True _global_memory["completion_message"] = message console.print(Panel(Markdown(message), title="✅ Task Completed")) return "Completion noted." @tool("plan_implementation_completed") def plan_implementation_completed(message: str) -> str: """Mark the entire implementation plan as completed. Args: message: Message explaining how the implementation plan was completed """ _global_memory["task_completed"] = True _global_memory["completion_message"] = message _global_memory["plan_completed"] = True _global_memory["completion_message"] = message _global_memory["tasks"].clear() # Clear task list when plan is completed _global_memory["task_id_counter"] = 1 console.print(Panel(Markdown(message), title="✅ Plan Executed")) log_work_event(f"Plan execution completed:\n\n{message}") return "Plan completion noted and task list cleared." def get_related_files() -> List[str]: """Get the current list of related files. Returns: List of formatted strings in the format 'ID#X path/to/file.py' """ files = _global_memory["related_files"] return [f"ID#{file_id} {filepath}" for file_id, filepath in sorted(files.items())] @tool("emit_related_files") def emit_related_files(files: List[str]) -> str: """Store multiple related files that tools should work with. Args: files: List of file paths to add """ results = [] added_files = [] invalid_paths = [] binary_files = [] # Process files for file in files: # First check if path exists if not os.path.exists(file): invalid_paths.append(file) results.append(f"Error: Path '{file}' does not exist") continue # Then check if it's a directory if os.path.isdir(file): invalid_paths.append(file) results.append(f"Error: Path '{file}' is a directory, not a file") continue # Finally validate it's a regular file if not os.path.isfile(file): invalid_paths.append(file) results.append(f"Error: Path '{file}' exists but is not a regular file") continue # Check if it's a binary file if is_binary_file(file): binary_files.append(file) results.append(f"Skipped binary file: '{file}'") continue # Normalize the path normalized_path = os.path.abspath(file) # Check if normalized path already exists in values existing_id = None for fid, fpath in _global_memory["related_files"].items(): if fpath == normalized_path: existing_id = fid break if existing_id is not None: # File exists, use existing ID results.append(f"File ID #{existing_id}: {file}") else: # New file, assign new ID file_id = _global_memory["related_file_id_counter"] _global_memory["related_file_id_counter"] += 1 # Store normalized path with ID _global_memory["related_files"][file_id] = normalized_path added_files.append((file_id, file)) # Keep original path for display results.append(f"File ID #{file_id}: {file}") # Rich output - single consolidated panel for added files if added_files: files_added_md = "\n".join(f"- `{file}`" for id, file in added_files) md_content = f"**Files Noted:**\n{files_added_md}" console.print( Panel( Markdown(md_content), title="📁 Related Files Noted", border_style="green", ) ) # Display skipped binary files if binary_files: binary_files_md = "\n".join(f"- `{file}`" for file in binary_files) md_content = f"**Binary Files Skipped:**\n{binary_files_md}" console.print( Panel( Markdown(md_content), title="⚠️ Binary Files Not Added", border_style="yellow", ) ) # Return summary message if binary_files: binary_files_list = ", ".join(f"'{file}'" for file in binary_files) return f"Files noted. Binary files skipped: {binary_files_list}" else: return "Files noted." def log_work_event(event: str) -> str: """Add timestamped entry to work log. Internal function used to track major events during agent execution. Each entry is stored with an ISO format timestamp. Args: event: Description of the event to log Returns: Confirmation message Note: Entries can be retrieved with get_work_log() as markdown formatted text. """ from datetime import datetime entry = WorkLogEntry(timestamp=datetime.now().isoformat(), event=event) _global_memory["work_log"].append(entry) return f"Event logged: {event}" def is_binary_file(filepath): """Check if a file is binary using magic library if available.""" if magic: try: mime = magic.from_file(filepath, mime=True) return not mime.startswith('text/') except Exception: # Fallback if magic fails return False else: # Basic binary detection if magic is not available try: with open(filepath, 'r', encoding='utf-8') as f: f.read(1024) # Try to read as text return False except UnicodeDecodeError: return True def get_work_log() -> str: """Return formatted markdown of work log entries. Returns: Markdown formatted text with timestamps as headings and events as content, or 'No work log entries' if the log is empty. Example: ## 2024-12-23T11:39:10 Task #1 added: Create login form """ if not _global_memory["work_log"]: return "No work log entries" entries = [] for entry in _global_memory["work_log"]: entries.extend( [ f"## {entry['timestamp']}", "", entry["event"], "", # Blank line between entries ] ) return "\n".join(entries).rstrip() # Remove trailing newline def reset_work_log() -> str: """Clear the work log. Returns: Confirmation message Note: This permanently removes all work log entries. The operation cannot be undone. """ _global_memory["work_log"].clear() return "Work log cleared" @tool("deregister_related_files") def deregister_related_files(file_ids: List[int]) -> str: """Delete multiple related files from global memory by their IDs. Silently skips any IDs that don't exist. Args: file_ids: List of file IDs to delete """ results = [] for file_id in file_ids: if file_id in _global_memory["related_files"]: # Delete the file reference deleted_file = _global_memory["related_files"].pop(file_id) success_msg = ( f"Successfully removed related file #{file_id}: {deleted_file}" ) console.print( Panel( Markdown(success_msg), title="File Reference Removed", border_style="green", ) ) results.append(success_msg) return "Files noted." def get_memory_value(key: str) -> str: """Get a value from global memory. Different memory types return different formats: - key_facts: Returns numbered list of facts in format '#ID: fact' - key_snippets: Returns formatted snippets with file path, line number and content - All other types: Returns newline-separated list of values Args: key: The key to get from memory Returns: String representation of the memory values: - For key_facts: '#ID: fact' format, one per line - For key_snippets: Formatted snippet blocks - For other types: One value per line """ values = _global_memory.get(key, []) if key == "key_facts": # For empty dict, return empty string if not values: return "" # Sort by ID for consistent output and format as markdown sections facts = [] for k, v in sorted(values.items()): facts.extend( [ f"## 🔑 Key Fact #{k}", "", # Empty line for better markdown spacing v, "", # Empty line between facts ] ) return "\n".join(facts).rstrip() # Remove trailing newline if key == "key_snippets": if not values: return "" # Format each snippet with file info and content using markdown snippets = [] for k, v in sorted(values.items()): snippet_text = [ f"## 📝 Code Snippet #{k}", "", # Empty line for better markdown spacing "**Source Location**:", f"- File: `{v['filepath']}`", f"- Line: `{v['line_number']}`", "", # Empty line before code block "**Code**:", "```python", v["snippet"].rstrip(), # Remove trailing whitespace "```", ] if v["description"]: # Add empty line and description snippet_text.extend(["", "**Description**:", v["description"]]) snippets.append("\n".join(snippet_text)) return "\n\n".join(snippets) if key == "work_log": if not values: return "" entries = [f"## {entry['timestamp']}\n{entry['event']}" for entry in values] return "\n\n".join(entries) # For other types (lists), join with newlines return "\n".join(str(v) for v in values)