From eede18311049b585c4144e7e32d4ad543cc32629 Mon Sep 17 00:00:00 2001 From: AI Christianson Date: Tue, 25 Feb 2025 14:46:53 -0500 Subject: [PATCH] fix junk in command capture --- ra_aid/proc/interactive.py | 47 ++++++++++++++------------- tests/ra_aid/proc/test_interactive.py | 20 +++++++++++- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/ra_aid/proc/interactive.py b/ra_aid/proc/interactive.py index af66818..1428e67 100644 --- a/ra_aid/proc/interactive.py +++ b/ra_aid/proc/interactive.py @@ -184,10 +184,6 @@ def run_interactive_command( cols, rows = get_terminal_size() - # Set up pyte screen and stream to capture terminal output. - screen = HistoryScreen(cols, rows, history=2000, ratio=0.5) - stream = pyte.Stream(screen) - # Set up environment variables for the subprocess using detected terminal size. env = os.environ.copy() env.update( @@ -243,8 +239,6 @@ def run_interactive_command( if not data: break captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) sys.stdout.buffer.write(data) sys.stdout.buffer.flush() except (OSError, IOError): @@ -261,8 +255,6 @@ def run_interactive_command( if not data: break captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) sys.stderr.buffer.write(data) sys.stderr.buffer.flush() except (OSError, IOError): @@ -352,8 +344,6 @@ def run_interactive_command( if not data: # EOF detected. break captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) os.write(1, data) if stdin_fd in rlist: try: @@ -386,8 +376,6 @@ def run_interactive_command( if not data: # EOF detected. break captured_data.append(data) - decoded = data.decode("utf-8", errors="ignore") - stream.feed(decoded) os.write(1, data) except KeyboardInterrupt: proc.terminate() @@ -400,24 +388,36 @@ def run_interactive_command( # Ensure we have captured data even if the screen processing failed raw_output = b"".join(captured_data) - # Assemble full scrollback from the terminal emulation + # Process the captured output through a fresh screen try: - # Assemble full scrollback: combine history.top, the current display, and history.bottom. - top_lines = [render_line(line, cols) for line in screen.history.top] + # Create a new screen and stream for final processing + screen = HistoryScreen(cols, rows, history=2000, ratio=0.5) + stream = pyte.Stream(screen) + + # Feed all captured data at once to get the final state + raw_output = b"".join(captured_data) + decoded = raw_output.decode("utf-8", errors="ignore") + stream.feed(decoded) + + # Get only the current display (final screen state), not the entire history display_lines = [render_line(line, cols) for line in screen.display] - bottom_lines = [render_line(line, cols) for line in screen.history.bottom] - # Combine all lines to get the complete terminal history - all_lines = top_lines + display_lines + bottom_lines - - # Trim out empty lines to get only meaningful "history" lines - trimmed_lines = [line for line in all_lines if line and line.strip()] + # Trim out empty lines to get only meaningful lines + # Also strip trailing whitespace from each line + trimmed_lines = [line.rstrip() for line in display_lines if line and line.strip()] final_output = "\n".join(trimmed_lines) except Exception as e: # If anything goes wrong with screen processing, fall back to raw output print(f"Warning: Error processing terminal output: {e}", file=sys.stderr) - final_output = raw_output.decode('utf-8', errors='replace').strip() + try: + # Decode raw output, strip trailing whitespace from each line + decoded = raw_output.decode('utf-8', errors='replace') + lines = [line.rstrip() for line in decoded.splitlines()] + final_output = "\n".join(lines) + except Exception: + # Ultimate fallback if line processing fails + final_output = raw_output.decode('utf-8', errors='replace').strip() # Add timeout message if process was terminated due to timeout. if was_terminated: @@ -433,7 +433,8 @@ def run_interactive_command( else: # Handle any unexpected type final_output = str(final_output)[-8000:].encode("utf-8") - + + print("HERE", final_output) return final_output, proc.returncode diff --git a/tests/ra_aid/proc/test_interactive.py b/tests/ra_aid/proc/test_interactive.py index ba45c91..341b0ee 100644 --- a/tests/ra_aid/proc/test_interactive.py +++ b/tests/ra_aid/proc/test_interactive.py @@ -106,7 +106,8 @@ def test_unicode_handling(): output, retcode = run_interactive_command( ["/bin/bash", "-c", f"echo '{test_string}'"] ) - assert test_string.encode() in output + # Since we now strip trailing whitespace, we should check for the string without trailing space + assert test_string.strip().encode() in output assert retcode == 0 @@ -170,6 +171,23 @@ def test_realtime_output(): assert retcode == 0 +def test_strip_trailing_whitespace(): + """Test that trailing whitespace is properly stripped from each line.""" + # Create a command that outputs text with trailing whitespace + cmd = 'echo "Line with spaces at end "; echo "Another trailing space line "; echo "Line with tabs at end\t\t"' + output, retcode = run_interactive_command(["/bin/bash", "-c", cmd]) + + # Check that the output contains the lines without trailing whitespace + lines = output.splitlines() + assert b"Line with spaces at end" in lines[0] + assert not lines[0].endswith(b" ") + assert b"Another trailing space line" in lines[1] + assert not lines[1].endswith(b" ") + assert b"Line with tabs at end" in lines[2] + assert not lines[2].endswith(b"\t") + assert retcode == 0 + + def test_tty_available(): """Test that commands have access to a TTY.""" output, retcode = run_interactive_command(["/bin/bash", "-c", "tty"])