RA.Aid/tests/ra_aid/proc/test_interactive.py

183 lines
6.0 KiB
Python

"""Tests for the interactive subprocess module."""
import os
import tempfile
import pytest
from ra_aid.proc.interactive import run_interactive_command
def test_basic_command():
"""Test running a basic command."""
output, retcode = run_interactive_command(["echo", "hello world"])
assert b"hello world" in output
assert retcode == 0
def test_shell_pipeline():
"""Test running a shell pipeline command."""
output, retcode = run_interactive_command(
["/bin/bash", "-c", "echo 'hello world' | grep 'world'"]
)
assert b"world" in output
assert retcode == 0
def test_stderr_capture():
"""Test that stderr is properly captured in combined output."""
# Use a command that definitely writes to stderr.
output, retcode = run_interactive_command(
["/bin/bash", "-c", "ls /nonexistent/path"]
)
assert b"No such file or directory" in output
assert retcode != 0 # ls returns non-zero on failure.
def test_command_not_found():
"""Test handling of non-existent commands."""
with pytest.raises(FileNotFoundError):
run_interactive_command(["nonexistentcommand"])
def test_empty_command():
"""Test handling of empty commands."""
with pytest.raises(ValueError):
run_interactive_command([])
def test_interactive_command():
"""Test running an interactive command.
This test verifies that output appears in real-time using process substitution.
We use a command that prints to both stdout and stderr.
"""
output, retcode = run_interactive_command(
["/bin/bash", "-c", "echo stdout; echo stderr >&2"]
)
assert b"stdout" in output
assert b"stderr" in output
assert retcode == 0
def test_large_output():
"""Test handling of commands that produce large output."""
# Generate a large output with predictable content
# Each line will be approximately 30 bytes
cmd = 'for i in {1..1000}; do echo "Line $i of test output"; done'
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
# Clean up any leading artifacts
output_cleaned = output.lstrip(b"^D")
# Verify the output size is limited to 8000 bytes
assert (
len(output_cleaned) <= 8000
), f"Output exceeded 8000 bytes: {len(output_cleaned)} bytes"
# Verify we have the last lines (should contain the highest numbers)
assert b"Line 1000" in output_cleaned, "Missing last line of output"
assert retcode == 0
def test_byte_limit():
"""Test that output is properly limited to 8000 bytes."""
# Create a string that's definitely over 8000 bytes
# Each line will be about 80 bytes
cmd = 'for i in {1..200}; do printf "%04d: %s\\n" "$i" "This is a line with padding to ensure we go over the byte limit quickly"; done'
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
output_cleaned = output.lstrip(b"^D")
# Verify exact 8000 byte limit
assert (
len(output_cleaned) <= 8000
), f"Output exceeded 8000 bytes: {len(output_cleaned)} bytes"
# Get the last line number from the output
last_line = output_cleaned.splitlines()[-1]
last_num = int(last_line.split(b":")[0])
# Verify we have a high number in the last line (should be near 200)
assert last_num > 150, f"Expected last line number to be near 200, got {last_num}"
assert retcode == 0
def test_unicode_handling():
"""Test handling of unicode characters."""
test_string = "Hello "
output, retcode = run_interactive_command(
["/bin/bash", "-c", f"echo '{test_string}'"]
)
assert test_string.encode() in output
assert retcode == 0
def test_multiple_commands():
"""Test running multiple commands in sequence."""
output, retcode = run_interactive_command(
["/bin/bash", "-c", "echo 'first'; echo 'second'"]
)
assert b"first" in output
assert b"second" in output
assert retcode == 0
def test_cat_medium_file():
"""Test that cat command properly captures output for medium-length files."""
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
for i in range(500):
f.write(f"This is test line {i}\n")
temp_path = f.name
try:
output, retcode = run_interactive_command(
["/bin/bash", "-c", f"cat {temp_path}"]
)
output_cleaned = output.lstrip(b"^D")
lines = [
line
for line in output_cleaned.splitlines()
if b"Script" not in line and line.strip()
]
# With 8000 byte limit, we expect to see the last portion of lines
# The exact number may vary due to terminal settings, but we should
# at least have the last lines of the file
assert (
len(lines) >= 90
), f"Expected at least 90 lines due to 8000 byte limit, got {len(lines)}"
# Most importantly, verify we have the last lines
last_line = lines[-1].decode("utf-8")
assert (
"This is test line 499" in last_line
), f"Expected last line to be 499, got: {last_line}"
assert retcode == 0
finally:
os.unlink(temp_path)
def test_realtime_output():
"""Test that output appears in real-time and is captured correctly."""
# Create a command that sleeps briefly between outputs.
cmd = "echo 'first'; sleep 0.1; echo 'second'; sleep 0.1; echo 'third'"
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
lines = [
line for line in output.splitlines() if b"Script" not in line and line.strip()
]
assert b"first" in lines[0]
assert b"second" in lines[1]
assert b"third" in lines[2]
assert retcode == 0
def test_tty_available():
"""Test that commands have access to a TTY."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "tty"])
output_cleaned = output.lstrip(b"^D")
print(f"Cleaned TTY Output: {output_cleaned}")
# Check if the output contains a valid TTY path.
assert (
b"/dev/pts/" in output_cleaned or b"/dev/ttys" in output_cleaned
), f"Unexpected TTY output: {output_cleaned}"
assert retcode == 0