RA.Aid/ra_aid/proc/interactive.py

79 lines
2.6 KiB
Python

"""
Module for running interactive subprocesses with output capture.
"""
import os
import re
import tempfile
import shlex
import shutil
from typing import List, Tuple
def run_interactive_command(cmd: List[str]) -> Tuple[bytes, int]:
"""
Runs an interactive command with a pseudo-tty, capturing combined output.
Assumptions and constraints:
- We are on a Linux system with script available
- `cmd` is a non-empty list where cmd[0] is the executable
- The executable and script are assumed to be on PATH
- If anything is amiss (e.g., command not found), we fail early and cleanly
The output is cleaned to remove ANSI escape sequences and control characters.
Returns:
Tuple of (cleaned_output, return_code)
"""
# Fail early if cmd is empty
if not cmd:
raise ValueError("No command provided.")
# Check that the command exists
if shutil.which(cmd[0]) is None:
raise FileNotFoundError(f"Command '{cmd[0]}' not found in PATH.")
# Create temp files (we'll always clean them up)
output_file = tempfile.NamedTemporaryFile(prefix="output_", delete=False)
retcode_file = tempfile.NamedTemporaryFile(prefix="retcode_", delete=False)
output_path = output_file.name
retcode_path = retcode_file.name
output_file.close()
retcode_file.close()
# Quote arguments for safety
quoted_cmd = ' '.join(shlex.quote(c) for c in cmd)
# Use script to capture output with TTY and save return code
shell_cmd = f"{quoted_cmd}; echo $? > {shlex.quote(retcode_path)}"
def cleanup():
for path in [output_path, retcode_path]:
if os.path.exists(path):
os.remove(path)
try:
# Run command with script for TTY and output capture
os.system(f"script -q -c {shlex.quote(shell_cmd)} {shlex.quote(output_path)}")
# Read and clean the output
with open(output_path, "rb") as f:
output = f.read()
# Clean ANSI escape sequences and control characters
output = re.sub(rb'\x1b\[[0-9;]*[a-zA-Z]', b'', output) # ANSI escape sequences
output = re.sub(rb'[\x00-\x08\x0b\x0c\x0e-\x1f]', b'', output) # Control chars
# Get the return code
with open(retcode_path, "r") as f:
return_code = int(f.read().strip())
except Exception as e:
# If something goes wrong, cleanup and re-raise
cleanup()
raise RuntimeError("Error running interactive capture") from e
finally:
# Ensure files are removed no matter what
cleanup()
return output, return_code