RA.Aid/ra_aid/proc/interactive.py

89 lines
2.9 KiB
Python

"""
Module for running interactive subprocesses with output capture.
"""
import os
import re
import tempfile
import shlex
import shutil
from typing import List, Tuple
# Add macOS detection
IS_MACOS = os.uname().sysname == "Darwin"
def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]:
"""
Runs an interactive command with a pseudo-tty, capturing combined output.
Assumptions and constraints:
- We are on a Linux system with script available
- `cmd` is a non-empty list where cmd[0] is the executable
- The executable and script are assumed to be on PATH
- If anything is amiss (e.g., command not found), we fail early and cleanly
The output is cleaned to remove ANSI escape sequences and control characters.
Returns:
Tuple of (cleaned_output, return_code)
"""
# Fail early if cmd is empty
if not cmd:
raise ValueError("No command provided.")
# Check that the command exists
if shutil.which(cmd[0]) is None:
raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.")
# Create temp files (we'll always clean them up)
output_file = tempfile.NamedTemporaryFile(prefix="output_", delete=False)
retcode_file = tempfile.NamedTemporaryFile(prefix="retcode_", delete=False)
output_path = output_file.name
retcode_path = retcode_file.name
output_file.close()
retcode_file.close()
# Quote arguments for safety
quoted_cmd = ' '.join(shlex.quote(c) for c in cmd)
# Use script to capture output with TTY and save return code
shell_cmd = f"{quoted_cmd}; echo $? > {shlex.quote(retcode_path)}"
def cleanup():
for path in [output_path, retcode_path]:
if os.path.exists(path):
os.remove(path)
try:
# Disable pagers by setting environment variables
os.environ['GIT_PAGER'] = ''
os.environ['PAGER'] = ''
# Run command with script for TTY and output capture
if IS_MACOS:
os.system(f"script -q {shlex.quote(output_path)} {shell_cmd}")
else:
os.system(f"script -q -c {shlex.quote(shell_cmd)} {shlex.quote(output_path)}")
# Read and clean the output
with open(output_path, "rb") as f:
output = f.read()
# Clean ANSI escape sequences and control characters
output = re.sub(rb'\x1b\[[0-9;]*[a-zA-Z]', b'', output) # ANSI escape sequences
output = re.sub(rb'[\x00-\x08\x0b\x0c\x0e-\x1f]', b'', output) # Control chars
# Get the return code
with open(retcode_path, "r") as f:
return_code = int(f.read().strip())
except Exception as e:
# If something goes wrong, cleanup and re-raise
cleanup()
raise RuntimeError("Error running interactive capture") from e
finally:
# Ensure files are removed no matter what
cleanup()
return output, return_code