diff --git a/ra_aid/proc/interactive.py b/ra_aid/proc/interactive.py index 9d7722b..c8aaac5 100644 --- a/ra_aid/proc/interactive.py +++ b/ra_aid/proc/interactive.py @@ -1,90 +1,121 @@ +#!/usr/bin/env python3 """ Module for running interactive subprocesses with output capture. + +It uses a pseudo-tty and integrates pyte's HistoryScreen to +simulate a terminal and capture the final scrollback history (non-blank lines). +The interface remains compatible with external callers expecting a tuple (output, return_code), +where output is a bytes object (UTF-8 encoded). """ import os -import re import shlex import shutil -import tempfile +import errno +import subprocess from typing import List, Tuple -# Add macOS detection -IS_MACOS = os.uname().sysname == "Darwin" +import pyte +from pyte.screens import HistoryScreen +def render_line(line, columns: int) -> str: + """Render a single screen line from the pyte buffer (a mapping of column to Char).""" + return "".join(line[x].data for x in range(columns)) def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]: """ - Runs an interactive command with a pseudo-tty, capturing combined output. - + Runs an interactive command with a pseudo-tty, capturing final scrollback history. + Assumptions and constraints: - - We are on a Linux system with script available - - `cmd` is a non-empty list where cmd[0] is the executable - - The executable and script are assumed to be on PATH - - If anything is amiss (e.g., command not found), we fail early and cleanly - - The output is cleaned to remove ANSI escape sequences and control characters. - + - Running on a Linux system. + - `cmd` is a non-empty list where cmd[0] is the executable. + - The executable is on PATH. + Returns: - Tuple of (cleaned_output, return_code) + A tuple of (captured_output, return_code), where captured_output is a UTF-8 encoded + bytes object containing the trimmed non-empty history lines from the terminal session. + + Raises: + ValueError: If no command is provided. + FileNotFoundError: If the command is not found in PATH. + RuntimeError: If an error occurs during execution. """ - # Fail early if cmd is empty + # Fail early if cmd is empty. if not cmd: raise ValueError("No command provided.") - - # Check that the command exists + + # Check that the command exists. if shutil.which(cmd[0]) is None: raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.") - - # Create temp files (we'll always clean them up) - output_file = tempfile.NamedTemporaryFile(prefix="output_", delete=False) - retcode_file = tempfile.NamedTemporaryFile(prefix="retcode_", delete=False) - output_path = output_file.name - retcode_path = retcode_file.name - output_file.close() - retcode_file.close() - - # Quote arguments for safety - quoted_cmd = " ".join(shlex.quote(c) for c in cmd) - # Use script to capture output with TTY and save return code - shell_cmd = f"{quoted_cmd}; echo $? > {shlex.quote(retcode_path)}" - - def cleanup(): - for path in [output_path, retcode_path]: - if os.path.exists(path): - os.remove(path) - + + # Determine terminal dimensions; use os.get_terminal_size if available. try: - # Disable pagers by setting environment variables - os.environ["GIT_PAGER"] = "" - os.environ["PAGER"] = "" + term_size = os.get_terminal_size() + cols, rows = term_size.columns, term_size.lines + except OSError: + cols, rows = 80, 24 - # Run command with script for TTY and output capture - if IS_MACOS: - os.system(f"script -q {shlex.quote(output_path)} {shell_cmd}") - else: - os.system( - f"script -q -c {shlex.quote(shell_cmd)} {shlex.quote(output_path)}" - ) + # Instantiate HistoryScreen with a large history (scrollback) buffer. + screen = HistoryScreen(cols, rows, history=1000, ratio=0.5) + stream = pyte.Stream(screen) - # Read and clean the output - with open(output_path, "rb") as f: - output = f.read() - - # Clean ANSI escape sequences and control characters - output = re.sub(rb"\x1b\[[0-9;]*[a-zA-Z]", b"", output) # ANSI escape sequences - output = re.sub(rb"[\x00-\x08\x0b\x0c\x0e-\x1f]", b"", output) # Control chars - - # Get the return code - with open(retcode_path, "r") as f: - return_code = int(f.read().strip()) - - except Exception as e: - # If something goes wrong, cleanup and re-raise - cleanup() - raise RuntimeError("Error running interactive capture") from e + # Open a new pseudo-tty. + master_fd, slave_fd = os.openpty() + + # Spawn the subprocess with its stdio attached to the slave end. + proc = subprocess.Popen( + cmd, + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True + ) + os.close(slave_fd) # Close slave in the parent. + + # Read output from the master file descriptor in real time. + try: + while True: + try: + data = os.read(master_fd, 1024) + except OSError as e: + if e.errno == errno.EIO: + # Expected error when the slave side is closed. + break + else: + raise + if not data: + break + # Feed the decoded data into pyte to update the screen and history. + stream.feed(data.decode("utf-8", errors="ignore")) + # Also write the raw data to stdout for live output. + os.write(1, data) + except KeyboardInterrupt: + proc.terminate() finally: - # Ensure files are removed no matter what - cleanup() + os.close(master_fd) + proc.wait() + + # Assemble full scrollback: combine history.top, the current display, and history.bottom. + top_lines = [render_line(line, cols) for line in screen.history.top] + bottom_lines = [render_line(line, cols) for line in screen.history.bottom] + display_lines = screen.display # List of strings representing the current screen. + all_lines = top_lines + display_lines + bottom_lines - return output, return_code + # Trim out empty lines to get only meaningful "history" lines. + trimmed_lines = [line for line in all_lines if line.strip()] + final_output = "\n".join(trimmed_lines) + + # Return as bytes for compatibility. + return final_output.encode("utf-8"), proc.returncode + +# if __name__ == "__main__": +# # Test command: output 100 lines so that history goes beyond the screen height. +# test_cmd = [ +# "bash", +# "-c", +# "for i in $(seq 1 100); do echo \"Line $i\"; sleep 0.05; done" +# ] +# output, ret = run_interactive_command(test_cmd) +# print("\n=== Captured Scrollback (trimmed history lines) ===") +# print(output.decode("utf-8")) +# print("Return code:", ret) diff --git a/ra_aid/tool_configs.py b/ra_aid/tool_configs.py index edc03a5..181a09e 100644 --- a/ra_aid/tool_configs.py +++ b/ra_aid/tool_configs.py @@ -77,8 +77,8 @@ COMMON_TOOLS = get_read_only_tools() EXPERT_TOOLS = [emit_expert_context, ask_expert] RESEARCH_TOOLS = [ emit_research_notes, + one_shot_completed, # *TEMPORARILY* disabled to improve tool calling perf. - # one_shot_completed, # monorepo_detected, # ui_detected, ] @@ -133,9 +133,9 @@ def get_planning_tools( # Add planning-specific tools planning_tools = [ request_task_implementation, + plan_implementation_completed, # *TEMPORARILY* disabled to improve tool calling perf. # emit_plan, - # plan_implementation_completed, ] tools.extend(planning_tools)