From 18b0ce230e1fb30c3005c4c947df3b9f0f2e31d2 Mon Sep 17 00:00:00 2001 From: AI Christianson Date: Wed, 12 Feb 2025 17:59:34 -0500 Subject: [PATCH] handle raw input in interactive --- ra_aid/proc/interactive.py | 124 ++++++++++++++++---------- tests/ra_aid/proc/test_interactive.py | 81 +++++------------ 2 files changed, 97 insertions(+), 108 deletions(-) diff --git a/ra_aid/proc/interactive.py b/ra_aid/proc/interactive.py index 8d2d1bd..79d1135 100644 --- a/ra_aid/proc/interactive.py +++ b/ra_aid/proc/interactive.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 """ -Module for running interactive subprocesses with output capture. +Module for running interactive subprocesses with output capture, +with full raw input passthrough for interactive commands. -It uses a pseudo-tty and integrates pyte's HistoryScreen to -simulate a terminal and capture the final scrollback history (non-blank lines). +It uses a pseudo-tty and integrates pyte's HistoryScreen to simulate +a terminal and capture the final scrollback history (non-blank lines). The interface remains compatible with external callers expecting a tuple (output, return_code), where output is a bytes object (UTF-8 encoded). """ @@ -15,6 +16,9 @@ import errno import sys import io import subprocess +import select +import termios +import tty from typing import List, Tuple import pyte @@ -42,68 +46,97 @@ def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]: FileNotFoundError: If the command is not found in PATH. RuntimeError: If an error occurs during execution. """ - # Fail early if cmd is empty. if not cmd: raise ValueError("No command provided.") - - # Check that the command exists. if shutil.which(cmd[0]) is None: raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.") - # Determine terminal dimensions; use os.get_terminal_size if available. try: term_size = os.get_terminal_size() cols, rows = term_size.columns, term_size.lines except OSError: cols, rows = 80, 24 - # Instantiate HistoryScreen with a large history (scrollback) buffer. + # Set up pyte screen and stream to capture terminal output. screen = HistoryScreen(cols, rows, history=2000, ratio=0.5) stream = pyte.Stream(screen) # Open a new pseudo-tty. master_fd, slave_fd = os.openpty() - + try: - # Try to use real TTY stdin if available stdin_fd = sys.stdin.fileno() except (AttributeError, io.UnsupportedOperation): - # Fallback to pseudo-TTY for tests - stdin_fd = slave_fd + stdin_fd = None proc = subprocess.Popen( cmd, - stdin=stdin_fd, + stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, bufsize=0, close_fds=True ) - os.close(slave_fd) # Close slave in the parent. - - # Read output from the master file descriptor in real time. - try: - while True: - try: - data = os.read(master_fd, 1024) - except OSError as e: - if e.errno == errno.EIO: - # Expected error when the slave side is closed. + os.close(slave_fd) # Close slave end in the parent process. + + captured_data = [] + + # If we're in an interactive TTY, set raw mode and forward input. + if stdin_fd is not None and sys.stdin.isatty(): + old_settings = termios.tcgetattr(stdin_fd) + tty.setraw(stdin_fd) + try: + while True: + rlist, _, _ = select.select([master_fd, stdin_fd], [], []) + if master_fd in rlist: + try: + data = os.read(master_fd, 1024) + except OSError as e: + if e.errno == errno.EIO: + break + else: + raise + if not data: + break + captured_data.append(data) + # Update pyte's screen state. + stream.feed(data.decode("utf-8", errors="ignore")) + # Write to stdout for live output. + os.write(1, data) + if stdin_fd in rlist: + try: + input_data = os.read(stdin_fd, 1024) + except OSError: + input_data = b"" + if input_data: + # Forward raw keystrokes directly to the subprocess. + os.write(master_fd, input_data) + except KeyboardInterrupt: + proc.terminate() + finally: + termios.tcsetattr(stdin_fd, termios.TCSADRAIN, old_settings) + else: + # Non-interactive mode (e.g., during unit tests). + try: + while True: + try: + data = os.read(master_fd, 1024) + except OSError as e: + if e.errno == errno.EIO: + break + else: + raise + if not data: break - else: - raise - if not data: - break - # Feed the decoded data into pyte to update the screen and history. - stream.feed(data.decode("utf-8", errors="ignore")) - # Also write the raw data to stdout for live output. - os.write(1, data) - except KeyboardInterrupt: - proc.terminate() - finally: - os.close(master_fd) + captured_data.append(data) + stream.feed(data.decode("utf-8", errors="ignore")) + os.write(1, data) + except KeyboardInterrupt: + proc.terminate() + + os.close(master_fd) proc.wait() - + # Assemble full scrollback: combine history.top, the current display, and history.bottom. top_lines = [render_line(line, cols) for line in screen.history.top] bottom_lines = [render_line(line, cols) for line in screen.history.bottom] @@ -114,17 +147,12 @@ def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]: trimmed_lines = [line for line in all_lines if line.strip()] final_output = "\n".join(trimmed_lines) - # Return as bytes for compatibility. return final_output.encode("utf-8"), proc.returncode -# if __name__ == "__main__": -# # Test command: output 100 lines so that history goes beyond the screen height. -# test_cmd = [ -# "bash", -# "-c", -# "for i in $(seq 1 100); do echo \"Line $i\"; sleep 0.05; done" -# ] -# output, ret = run_interactive_command(test_cmd) -# print("\n=== Captured Scrollback (trimmed history lines) ===") -# print(output.decode("utf-8")) -# print("Return code:", ret) +if __name__ == "__main__": + import sys + if len(sys.argv) < 2: + print("Usage: interactive.py [args...]") + sys.exit(1) + output, return_code = run_interactive_command(sys.argv[1:]) + sys.exit(return_code) diff --git a/tests/ra_aid/proc/test_interactive.py b/tests/ra_aid/proc/test_interactive.py index 15afeb2..4213540 100644 --- a/tests/ra_aid/proc/test_interactive.py +++ b/tests/ra_aid/proc/test_interactive.py @@ -27,12 +27,12 @@ def test_shell_pipeline(): def test_stderr_capture(): """Test that stderr is properly captured in combined output.""" - # Use a command that definitely writes to stderr + # Use a command that definitely writes to stderr. output, retcode = run_interactive_command( ["/bin/bash", "-c", "ls /nonexistent/path"] ) assert b"No such file or directory" in output - assert retcode != 0 # ls returns 0 upon success + assert retcode != 0 # ls returns non-zero on failure. def test_command_not_found(): @@ -49,9 +49,10 @@ def test_empty_command(): def test_interactive_command(): """Test running an interactive command. - + This test verifies that output appears in real-time using process substitution. - We use a command that prints to both stdout and stderr to verify capture.""" + We use a command that prints to both stdout and stderr. + """ output, retcode = run_interactive_command( ["/bin/bash", "-c", "echo stdout; echo stderr >&2"] ) @@ -62,30 +63,23 @@ def test_interactive_command(): def test_large_output(): """Test handling of commands that produce large output.""" - # Generate a large output with predictable content + # Generate a large output with predictable content. cmd = 'for i in {1..3000}; do echo "Line $i of test output"; done' output, retcode = run_interactive_command(["/bin/bash", "-c", cmd]) - - # Clean up specific artifacts (e.g., ^D) - output_cleaned = output.lstrip(b"^D") # Remove the leading ^D if present - - # Split and filter lines + # Clean up any leading artifacts. + output_cleaned = output.lstrip(b"^D") lines = [ line.strip() for line in output_cleaned.splitlines() if b"Script" not in line and line.strip() ] - - # We expect around 2000 lines plus some additional lines from the current display - # The exact number may vary slightly due to terminal buffering, but should be close to 2024 - # (2000 history lines + 24 terminal lines) - assert 2000 <= len(lines) <= 2050, f"Expected between 2000-2050 lines due to history limit plus terminal display, but got {len(lines)}" - - # Verify that we have the last lines (should include line 3000) + # Expect roughly 2000 history lines plus the display lines. + assert 2000 <= len(lines) <= 2050, ( + f"Expected between 2000-2050 lines due to history limit plus terminal display, but got {len(lines)}" + ) + # Verify that we have the last line. assert lines[-1] == b"Line 3000 of test output", f"Unexpected last line: {lines[-1]}" - - # Verify return code - assert retcode == 0, f"Unexpected return code: {retcode}" + assert retcode == 0 def test_unicode_handling(): @@ -110,7 +104,6 @@ def test_multiple_commands(): def test_cat_medium_file(): """Test that cat command properly captures output for medium-length files.""" - # Create a temporary file with known content with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: for i in range(500): f.write(f"This is test line {i}\n") @@ -120,7 +113,6 @@ def test_cat_medium_file(): output, retcode = run_interactive_command( ["/bin/bash", "-c", f"cat {temp_path}"] ) - # Split by newlines and filter out script header/footer lines lines = [ line for line in output.splitlines() @@ -129,7 +121,7 @@ def test_cat_medium_file(): assert len(lines) == 500 assert retcode == 0 - # Verify content integrity by checking first and last lines + # Verify content integrity. assert b"This is test line 0" in lines[0] assert b"This is test line 499" in lines[-1] finally: @@ -138,15 +130,14 @@ def test_cat_medium_file(): def test_realtime_output(): """Test that output appears in real-time and is captured correctly.""" - # Create a command that sleeps briefly between outputs + # Create a command that sleeps briefly between outputs. cmd = "echo 'first'; sleep 0.1; echo 'second'; sleep 0.1; echo 'third'" output, retcode = run_interactive_command(["/bin/bash", "-c", cmd]) - - # Filter out script header/footer lines lines = [ - line for line in output.splitlines() if b"Script" not in line and line.strip() + line + for line in output.splitlines() + if b"Script" not in line and line.strip() ] - assert b"first" in lines[0] assert b"second" in lines[1] assert b"third" in lines[2] @@ -155,41 +146,11 @@ def test_realtime_output(): def test_tty_available(): """Test that commands have access to a TTY.""" - # Run the tty command output, retcode = run_interactive_command(["/bin/bash", "-c", "tty"]) - - # Clean up specific artifacts (e.g., ^D) - output_cleaned = output.lstrip(b"^D") # Remove leading ^D if present - - # Debug: Print cleaned output + output_cleaned = output.lstrip(b"^D") print(f"Cleaned TTY Output: {output_cleaned}") - - # Check if the output contains a valid TTY path + # Check if the output contains a valid TTY path. assert ( b"/dev/pts/" in output_cleaned or b"/dev/ttys" in output_cleaned ), f"Unexpected TTY output: {output_cleaned}" assert retcode == 0 - - -def test_interactive_input(): - """Test that interactive input works properly with cat command.""" - # Create a temporary file to store expected input - with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: - f.write("test input\n") - temp_path = f.name - - try: - # Redirect the temp file as stdin and run cat - with open(temp_path, 'rb') as stdin_file: - old_stdin = sys.stdin - sys.stdin = stdin_file - try: - output, retcode = run_interactive_command(["cat"]) - finally: - sys.stdin = old_stdin - - # Verify the output matches input - assert b"test input" in output - assert retcode == 0 - finally: - os.unlink(temp_path)