From 2e13b2bf4dae9e6a86c4eb8436e27a00cef20511 Mon Sep 17 00:00:00 2001 From: AI Christianson Date: Tue, 25 Feb 2025 14:16:50 -0500 Subject: [PATCH] remove file --- ra_aid/proc/interactive.py.master | 229 ------------------------------ 1 file changed, 229 deletions(-) delete mode 100644 ra_aid/proc/interactive.py.master diff --git a/ra_aid/proc/interactive.py.master b/ra_aid/proc/interactive.py.master deleted file mode 100644 index ef21b33..0000000 --- a/ra_aid/proc/interactive.py.master +++ /dev/null @@ -1,229 +0,0 @@ -#!/usr/bin/env python3 -""" -Module for running interactive subprocesses with output capture, -with full raw input passthrough for interactive commands. - -It uses a pseudo-tty and integrates pyte's HistoryScreen to simulate -a terminal and capture the final scrollback history (non-blank lines). -The interface remains compatible with external callers expecting a tuple (output, return_code), -where output is a bytes object (UTF-8 encoded). -""" - -import errno -import io -import os -import select -import shutil -import signal -import subprocess -import sys -import termios -import time -import tty -from typing import List, Tuple - -import pyte -from pyte.screens import HistoryScreen - - -def render_line(line, columns: int) -> str: - """Render a single screen line from the pyte buffer (a mapping of column to Char).""" - return "".join(line[x].data for x in range(columns)) - - -def run_interactive_command( - cmd: List[str], expected_runtime_seconds: int = 30 -) -> Tuple[bytes, int]: - """ - Runs an interactive command with a pseudo-tty, capturing final scrollback history. - - Assumptions and constraints: - - Running on a Linux system. - - `cmd` is a non-empty list where cmd[0] is the executable. - - The executable is on PATH. - - Args: - cmd: A list containing the command and its arguments. - expected_runtime_seconds: Expected runtime in seconds, defaults to 30. - If process exceeds 2x this value, it will be terminated gracefully. - If process exceeds 3x this value, it will be killed forcefully. - Must be between 1 and 1800 seconds (30 minutes). - - Returns: - A tuple of (captured_output, return_code), where captured_output is a UTF-8 encoded - bytes object containing the trimmed non-empty history lines from the terminal session. - - Raises: - ValueError: If no command is provided. - FileNotFoundError: If the command is not found in PATH. - ValueError: If expected_runtime_seconds is less than or equal to 0 or greater than 1800. - RuntimeError: If an error occurs during execution. - """ - if not cmd: - raise ValueError("No command provided.") - if shutil.which(cmd[0]) is None: - raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.") - if expected_runtime_seconds <= 0 or expected_runtime_seconds > 1800: - raise ValueError( - "expected_runtime_seconds must be between 1 and 1800 seconds (30 minutes)" - ) - - try: - term_size = os.get_terminal_size() - cols, rows = term_size.columns, term_size.lines - except OSError: - cols, rows = 80, 24 - - # Set up pyte screen and stream to capture terminal output. - screen = HistoryScreen(cols, rows, history=2000, ratio=0.5) - stream = pyte.Stream(screen) - - # Open a new pseudo-tty. - master_fd, slave_fd = os.openpty() - # Set master_fd to non-blocking to avoid indefinite blocking. - os.set_blocking(master_fd, False) - - try: - stdin_fd = sys.stdin.fileno() - except (AttributeError, io.UnsupportedOperation): - stdin_fd = None - - # Set up environment variables for the subprocess using detected terminal size. - env = os.environ.copy() - env.update( - { - "DEBIAN_FRONTEND": "noninteractive", - "GIT_PAGER": "", - "PYTHONUNBUFFERED": "1", - "CI": "true", - "LANG": "C.UTF-8", - "LC_ALL": "C.UTF-8", - "COLUMNS": str(cols), - "LINES": str(rows), - "FORCE_COLOR": "1", - "GIT_TERMINAL_PROMPT": "0", - "PYTHONDONTWRITEBYTECODE": "1", - "NODE_OPTIONS": "--unhandled-rejections=strict", - } - ) - - proc = subprocess.Popen( - cmd, - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - bufsize=0, - close_fds=True, - env=env, - preexec_fn=os.setsid, # Create new process group for proper signal handling. - ) - os.close(slave_fd) # Close slave end in the parent process. - - captured_data = [] - start_time = time.time() - was_terminated = False - - def check_timeout(): - elapsed = time.time() - start_time - if elapsed > 3 * expected_runtime_seconds: - os.killpg(os.getpgid(proc.pid), signal.SIGKILL) - return True - elif elapsed > 2 * expected_runtime_seconds: - os.killpg(os.getpgid(proc.pid), signal.SIGTERM) - return True - return False - - # Interactive mode: forward input if running in a TTY. - if stdin_fd is not None and sys.stdin.isatty(): - old_settings = termios.tcgetattr(stdin_fd) - tty.setraw(stdin_fd) - try: - while True: - if check_timeout(): - was_terminated = True - break - # Use a finite timeout to avoid indefinite blocking. - rlist, _, _ = select.select([master_fd, stdin_fd], [], [], 1.0) - if master_fd in rlist: - try: - data = os.read(master_fd, 1024) - except OSError as e: - if e.errno == errno.EIO: - break - else: - raise - if not data: # EOF detected. - break - captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) - os.write(1, data) - if stdin_fd in rlist: - try: - input_data = os.read(stdin_fd, 1024) - except OSError: - input_data = b"" - if input_data: - os.write(master_fd, input_data) - except KeyboardInterrupt: - proc.terminate() - finally: - termios.tcsetattr(stdin_fd, termios.TCSADRAIN, old_settings) - else: - # Non-interactive mode. - try: - while True: - if check_timeout(): - was_terminated = True - break - rlist, _, _ = select.select([master_fd], [], [], 1.0) - if not rlist: - continue - try: - data = os.read(master_fd, 1024) - except OSError as e: - if e.errno == errno.EIO: - break - else: - raise - if not data: # EOF detected. - break - captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) - os.write(1, data) - except KeyboardInterrupt: - proc.terminate() - - os.close(master_fd) - proc.wait() - - # Assemble full scrollback: combine history.top, the current display, and history.bottom. - top_lines = [render_line(line, cols) for line in screen.history.top] - bottom_lines = [render_line(line, cols) for line in screen.history.bottom] - display_lines = screen.display # List of strings representing the current screen. - all_lines = top_lines + display_lines + bottom_lines - - # Trim out empty lines to get only meaningful "history" lines. - trimmed_lines = [line for line in all_lines if line.strip()] - final_output = "\n".join(trimmed_lines) - - # Add timeout message if process was terminated due to timeout. - if was_terminated: - timeout_msg = f"\n[Process exceeded timeout ({expected_runtime_seconds} seconds expected)]" - final_output += timeout_msg - - # Limit output to the last 8000 bytes. - final_output = final_output[-8000:] - - return final_output.encode("utf-8"), proc.returncode - - -if __name__ == "__main__": - import sys - - if len(sys.argv) < 2: - print("Usage: interactive.py [args...]") - sys.exit(1) - output, return_code = run_interactive_command(sys.argv[1:]) - sys.exit(return_code)