fix junk in command capture

This commit is contained in:
AI Christianson 2025-02-25 14:46:53 -05:00
parent 2e13b2bf4d
commit eede183110
2 changed files with 43 additions and 24 deletions

View File

@ -184,10 +184,6 @@ def run_interactive_command(
cols, rows = get_terminal_size()
# Set up pyte screen and stream to capture terminal output.
screen = HistoryScreen(cols, rows, history=2000, ratio=0.5)
stream = pyte.Stream(screen)
# Set up environment variables for the subprocess using detected terminal size.
env = os.environ.copy()
env.update(
@ -243,8 +239,6 @@ def run_interactive_command(
if not data:
break
captured_data.append(data)
decoded = data.decode("utf-8", errors="ignore")
stream.feed(decoded)
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
except (OSError, IOError):
@ -261,8 +255,6 @@ def run_interactive_command(
if not data:
break
captured_data.append(data)
decoded = data.decode("utf-8", errors="ignore")
stream.feed(decoded)
sys.stderr.buffer.write(data)
sys.stderr.buffer.flush()
except (OSError, IOError):
@ -352,8 +344,6 @@ def run_interactive_command(
if not data: # EOF detected.
break
captured_data.append(data)
decoded = data.decode("utf-8", errors="ignore")
stream.feed(decoded)
os.write(1, data)
if stdin_fd in rlist:
try:
@ -386,8 +376,6 @@ def run_interactive_command(
if not data: # EOF detected.
break
captured_data.append(data)
decoded = data.decode("utf-8", errors="ignore")
stream.feed(decoded)
os.write(1, data)
except KeyboardInterrupt:
proc.terminate()
@ -400,24 +388,36 @@ def run_interactive_command(
# Ensure we have captured data even if the screen processing failed
raw_output = b"".join(captured_data)
# Assemble full scrollback from the terminal emulation
# Process the captured output through a fresh screen
try:
# Assemble full scrollback: combine history.top, the current display, and history.bottom.
top_lines = [render_line(line, cols) for line in screen.history.top]
# Create a new screen and stream for final processing
screen = HistoryScreen(cols, rows, history=2000, ratio=0.5)
stream = pyte.Stream(screen)
# Feed all captured data at once to get the final state
raw_output = b"".join(captured_data)
decoded = raw_output.decode("utf-8", errors="ignore")
stream.feed(decoded)
# Get only the current display (final screen state), not the entire history
display_lines = [render_line(line, cols) for line in screen.display]
bottom_lines = [render_line(line, cols) for line in screen.history.bottom]
# Combine all lines to get the complete terminal history
all_lines = top_lines + display_lines + bottom_lines
# Trim out empty lines to get only meaningful "history" lines
trimmed_lines = [line for line in all_lines if line and line.strip()]
# Trim out empty lines to get only meaningful lines
# Also strip trailing whitespace from each line
trimmed_lines = [line.rstrip() for line in display_lines if line and line.strip()]
final_output = "\n".join(trimmed_lines)
except Exception as e:
# If anything goes wrong with screen processing, fall back to raw output
print(f"Warning: Error processing terminal output: {e}", file=sys.stderr)
final_output = raw_output.decode('utf-8', errors='replace').strip()
try:
# Decode raw output, strip trailing whitespace from each line
decoded = raw_output.decode('utf-8', errors='replace')
lines = [line.rstrip() for line in decoded.splitlines()]
final_output = "\n".join(lines)
except Exception:
# Ultimate fallback if line processing fails
final_output = raw_output.decode('utf-8', errors='replace').strip()
# Add timeout message if process was terminated due to timeout.
if was_terminated:
@ -433,7 +433,8 @@ def run_interactive_command(
else:
# Handle any unexpected type
final_output = str(final_output)[-8000:].encode("utf-8")
print("HERE", final_output)
return final_output, proc.returncode

View File

@ -106,7 +106,8 @@ def test_unicode_handling():
output, retcode = run_interactive_command(
["/bin/bash", "-c", f"echo '{test_string}'"]
)
assert test_string.encode() in output
# Since we now strip trailing whitespace, we should check for the string without trailing space
assert test_string.strip().encode() in output
assert retcode == 0
@ -170,6 +171,23 @@ def test_realtime_output():
assert retcode == 0
def test_strip_trailing_whitespace():
"""Test that trailing whitespace is properly stripped from each line."""
# Create a command that outputs text with trailing whitespace
cmd = 'echo "Line with spaces at end "; echo "Another trailing space line "; echo "Line with tabs at end\t\t"'
output, retcode = run_interactive_command(["/bin/bash", "-c", cmd])
# Check that the output contains the lines without trailing whitespace
lines = output.splitlines()
assert b"Line with spaces at end" in lines[0]
assert not lines[0].endswith(b" ")
assert b"Another trailing space line" in lines[1]
assert not lines[1].endswith(b" ")
assert b"Line with tabs at end" in lines[2]
assert not lines[2].endswith(b"\t")
assert retcode == 0
def test_tty_available():
"""Test that commands have access to a TTY."""
output, retcode = run_interactive_command(["/bin/bash", "-c", "tty"])