From cfb0ec148f47a57f52b4668bb9b4d117c3f9bc82 Mon Sep 17 00:00:00 2001 From: AI Christianson Date: Sat, 8 Mar 2025 14:48:46 -0500 Subject: [PATCH] extract thinking processing --- ra_aid/agent_backends/ciayn_agent.py | 17 +-- ra_aid/agent_utils.py | 21 ++-- ra_aid/text/__init__.py | 4 +- ra_aid/text/processing.py | 116 +++++++++++++++++- ra_aid/tools/expert.py | 60 ++-------- tests/ra_aid/text/test_process_thinking.py | 130 +++++++++++++++++++++ 6 files changed, 277 insertions(+), 71 deletions(-) create mode 100644 tests/ra_aid/text/test_process_thinking.py diff --git a/ra_aid/agent_backends/ciayn_agent.py b/ra_aid/agent_backends/ciayn_agent.py index 72d2609..b05488f 100644 --- a/ra_aid/agent_backends/ciayn_agent.py +++ b/ra_aid/agent_backends/ciayn_agent.py @@ -20,7 +20,7 @@ from ra_aid.tools.reflection import get_function_info from ra_aid.console.output import cpm from ra_aid.console.formatting import print_warning, print_error, console from ra_aid.agent_context import should_exit -from ra_aid.text import extract_think_tag +from ra_aid.text.processing import extract_think_tag, process_thinking_content from rich.panel import Panel from rich.markdown import Markdown @@ -631,13 +631,14 @@ class CiaynAgent: supports_think_tag = model_config.get("supports_think_tag", False) supports_thinking = model_config.get("supports_thinking", False) - # Extract think tags if supported - if supports_think_tag or supports_thinking: - think_content, remaining_text = extract_think_tag(response.content) - if think_content: - if self.config.get("show_thoughts", False): - console.print(Panel(Markdown(think_content), title="💭 Thoughts")) - response.content = remaining_text + # Process thinking content if supported + response.content, _ = process_thinking_content( + content=response.content, + supports_think_tag=supports_think_tag, + supports_thinking=supports_thinking, + panel_title="💭 Thoughts", + show_thoughts=self.config.get("show_thoughts", False) + ) # Check if the response is empty or doesn't contain a valid tool call if not response.content or not response.content.strip(): diff --git a/ra_aid/agent_utils.py b/ra_aid/agent_utils.py index 821ccc9..8f6f98d 100644 --- a/ra_aid/agent_utils.py +++ b/ra_aid/agent_utils.py @@ -52,6 +52,7 @@ from ra_aid.fallback_handler import FallbackHandler from ra_aid.logging_config import get_logger from ra_aid.llm import initialize_expert_llm from ra_aid.models_params import DEFAULT_TOKEN_LIMIT, models_params +from ra_aid.text.processing import process_thinking_content from ra_aid.project_info import ( display_project_status, format_project_info, @@ -804,16 +805,16 @@ def run_planning_agent( # Fallback: join list items if structured extraction failed logger.debug("No structured response text found, joining list items") content = "\n".join(str(item) for item in content) - elif (supports_think_tag or supports_thinking) and isinstance(content, str): - # Extract think tags if model supports them - think_content, remaining_text = extract_think_tag(content) - if think_content: - logger.debug(f"Found think tag content ({len(think_content)} chars)") - if get_config_repository().get("show_thoughts", False): - console.print( - Panel(Markdown(think_content), title="💭 Expert Thinking", border_style="yellow") - ) - content = remaining_text + elif (supports_think_tag or supports_thinking): + # Process thinking content using the centralized function + content, _ = process_thinking_content( + content=content, + supports_think_tag=supports_think_tag, + supports_thinking=supports_thinking, + panel_title="💭 Expert Thinking", + panel_style="yellow", + logger=logger + ) # Display the expert guidance in a panel console.print( diff --git a/ra_aid/text/__init__.py b/ra_aid/text/__init__.py index 3ab76b6..d5a0b2e 100644 --- a/ra_aid/text/__init__.py +++ b/ra_aid/text/__init__.py @@ -1,3 +1,3 @@ -from .processing import truncate_output, extract_think_tag +from .processing import truncate_output, extract_think_tag, process_thinking_content -__all__ = ["truncate_output", "extract_think_tag"] \ No newline at end of file +__all__ = ["truncate_output", "extract_think_tag", "process_thinking_content"] \ No newline at end of file diff --git a/ra_aid/text/processing.py b/ra_aid/text/processing.py index aae86c5..721a43f 100644 --- a/ra_aid/text/processing.py +++ b/ra_aid/text/processing.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional, Tuple, Union, List, Any import re @@ -67,4 +67,116 @@ def extract_think_tag(text: str) -> Tuple[Optional[str], str]: remaining_text = text[end_index:] return think_content, remaining_text else: - return None, text \ No newline at end of file + return None, text + + +def process_thinking_content( + content: Union[str, List[Any]], + supports_think_tag: bool = False, + supports_thinking: bool = False, + panel_title: str = "💭 Thoughts", + panel_style: str = None, + show_thoughts: bool = None, + logger = None, +) -> Tuple[Union[str, List[Any]], Optional[str]]: + """Process model response content to extract and optionally display thinking content. + + This function centralizes the logic for extracting and displaying thinking content + from model responses, handling both string content with tags and structured + thinking content (lists). + + Args: + content: The model response content (string or list) + supports_think_tag: Whether the model supports tags + supports_thinking: Whether the model supports structured thinking + panel_title: Title to display in the thinking panel + panel_style: Border style for the panel (None uses default) + show_thoughts: Whether to display thinking content (if None, checks config) + logger: Optional logger instance for debug messages + + Returns: + A tuple containing: + - The processed content with thinking removed + - The extracted thinking content (None if no thinking found) + """ + extracted_thinking = None + + # Skip processing if model doesn't support thinking features + if not (supports_think_tag or supports_thinking): + return content, extracted_thinking + + # Determine whether to show thoughts + if show_thoughts is None: + try: + from ra_aid.database.repositories.config_repository import get_config_repository + show_thoughts = get_config_repository().get("show_thoughts", False) + except (ImportError, RuntimeError): + show_thoughts = False + + # Handle structured thinking content (list format) from models like Claude 3.7 + if isinstance(content, list): + # Extract thinking items and regular content + thinking_items = [] + regular_items = [] + + for item in content: + if isinstance(item, dict) and item.get("type") == "thinking": + thinking_items.append(item.get("text", "")) + else: + regular_items.append(item) + + # If we found thinking items, process them + if thinking_items: + extracted_thinking = "\n\n".join(thinking_items) + + if logger: + logger.debug(f"Found structured thinking content ({len(extracted_thinking)} chars)") + + # Display thinking content if enabled + if show_thoughts: + from rich.panel import Panel + from rich.markdown import Markdown + from rich.console import Console + + console = Console() + panel_kwargs = {"title": panel_title} + if panel_style is not None: + panel_kwargs["border_style"] = panel_style + + console.print(Panel(Markdown(extracted_thinking), **panel_kwargs)) + + # Return remaining items as processed content + return regular_items, extracted_thinking + + # Handle string content with potential think tags + elif isinstance(content, str): + if logger: + logger.debug("Checking for think tags in response") + + think_content, remaining_text = extract_think_tag(content) + + if think_content: + extracted_thinking = think_content + if logger: + logger.debug(f"Found think tag content ({len(think_content)} chars)") + + # Display thinking content if enabled + if show_thoughts: + from rich.panel import Panel + from rich.markdown import Markdown + from rich.console import Console + + console = Console() + panel_kwargs = {"title": panel_title} + if panel_style is not None: + panel_kwargs["border_style"] = panel_style + + console.print(Panel(Markdown(think_content), **panel_kwargs)) + + # Return remaining text as processed content + return remaining_text, extracted_thinking + elif logger: + logger.debug("No think tag content found in response") + + # Return the original content if no thinking was found + return content, extracted_thinking \ No newline at end of file diff --git a/ra_aid/tools/expert.py b/ra_aid/tools/expert.py index 2ccb330..8c4cc92 100644 --- a/ra_aid/tools/expert.py +++ b/ra_aid/tools/expert.py @@ -19,7 +19,7 @@ from ..model_formatters import format_key_facts_dict from ..model_formatters.key_snippets_formatter import format_key_snippets_dict from ..model_formatters.research_notes_formatter import format_research_notes_dict from ..models_params import models_params -from ..text import extract_think_tag +from ..text.processing import process_thinking_content console = Console() _model = None @@ -247,55 +247,17 @@ def ask_expert(question: str) -> str: logger.debug(f"Model supports think tag: {supports_think_tag}") logger.debug(f"Model supports thinking: {supports_thinking}") - # Handle thinking mode responses (content is a list) or regular responses (content is a string) + # Process thinking content using the common processing function try: - # Case 1: Check for think tags if the model supports them - if (supports_think_tag or supports_thinking) and isinstance(content, str): - logger.debug("Checking for think tags in expert response") - think_content, remaining_text = extract_think_tag(content) - if think_content: - logger.debug(f"Found think tag content ({len(think_content)} chars)") - if get_config_repository().get("show_thoughts", False): - console.print( - Panel(Markdown(think_content), title="💭 Thoughts", border_style="yellow") - ) - content = remaining_text - else: - logger.debug("No think tag content found in expert response") - - # Case 2: Handle structured thinking (content is a list of dictionaries) - elif isinstance(content, list): - logger.debug("Expert response content is a list, processing structured thinking") - # Extract thinking content and response text from structured response - thinking_content = None - response_text = None - - # Process each item in the list - for item in content: - if isinstance(item, dict): - # Extract thinking content - if item.get('type') == 'thinking' and 'thinking' in item: - thinking_content = item['thinking'] - logger.debug("Found structured thinking content") - # Extract response text - elif item.get('type') == 'text' and 'text' in item: - response_text = item['text'] - logger.debug("Found structured response text") - - # Display thinking content in a separate panel if available - if thinking_content and get_config_repository().get("show_thoughts", False): - logger.debug(f"Displaying structured thinking content ({len(thinking_content)} chars)") - console.print( - Panel(Markdown(thinking_content), title="Expert Thinking", border_style="yellow") - ) - - # Use response_text if available, otherwise fall back to joining - if response_text: - content = response_text - else: - # Fallback: join list items if structured extraction failed - logger.debug("No structured response text found, joining list items") - content = "\n".join(str(item) for item in content) + # Use the process_thinking_content function to handle both string and list responses + content, thinking = process_thinking_content( + content=content, + supports_think_tag=supports_think_tag, + supports_thinking=supports_thinking, + panel_title="💭 Thoughts", + panel_style="yellow", + logger=logger + ) except Exception as e: logger.error(f"Exception during content processing: {str(e)}") diff --git a/tests/ra_aid/text/test_process_thinking.py b/tests/ra_aid/text/test_process_thinking.py new file mode 100644 index 0000000..9cfe94a --- /dev/null +++ b/tests/ra_aid/text/test_process_thinking.py @@ -0,0 +1,130 @@ +import pytest +from unittest.mock import MagicMock, patch +from ra_aid.text.processing import process_thinking_content + + +class TestProcessThinkingContent: + def test_unsupported_model(self): + """Test when the model doesn't support thinking.""" + content = "This is a test response" + result, thinking = process_thinking_content(content, supports_think_tag=False, supports_thinking=False) + assert result == content + assert thinking is None + + def test_string_with_think_tag(self): + """Test extraction of think tags from string content.""" + content = "This is thinking contentThis is the actual response" + result, thinking = process_thinking_content( + content, + supports_think_tag=True, + show_thoughts=False, + logger=MagicMock() + ) + assert result == "This is the actual response" + assert thinking == "This is thinking content" + + def test_string_without_think_tag(self): + """Test handling of string content without think tags.""" + content = "This is a response without thinking" + logger = MagicMock() + result, thinking = process_thinking_content( + content, + supports_think_tag=True, + show_thoughts=False, + logger=logger + ) + assert result == content + assert thinking is None + logger.debug.assert_any_call("Checking for think tags in response") + logger.debug.assert_any_call("No think tag content found in response") + + def test_structured_thinking(self): + """Test handling of structured thinking content (list format).""" + content = [ + {"type": "thinking", "text": "First thinking step"}, + {"type": "thinking", "text": "Second thinking step"}, + {"text": "Actual response"} + ] + logger = MagicMock() + result, thinking = process_thinking_content( + content, + supports_thinking=True, + show_thoughts=False, + logger=logger + ) + assert result == [{"text": "Actual response"}] + assert thinking == "First thinking step\n\nSecond thinking step" + # Check that debug was called with a string starting with "Found structured thinking content" + debug_calls = [call[0][0] for call in logger.debug.call_args_list] + assert any(call.startswith("Found structured thinking content") for call in debug_calls) + + def test_mixed_content_types(self): + """Test with a mixed list of different content types.""" + content = [ + {"type": "thinking", "text": "Thinking"}, + "Plain string", + {"other": "data"} + ] + result, thinking = process_thinking_content( + content, + supports_thinking=True, + show_thoughts=False + ) + assert result == ["Plain string", {"other": "data"}] + assert thinking == "Thinking" + + def test_config_lookup(self): + """Test it looks up show_thoughts from config when not provided.""" + content = "ThinkingResponse" + + # Mock the imported modules + with patch("ra_aid.database.repositories.config_repository.get_config_repository") as mock_get_config: + with patch("rich.panel.Panel") as mock_panel: + with patch("rich.markdown.Markdown") as mock_markdown: + with patch("rich.console.Console") as mock_console: + # Setup mocks + mock_repo = MagicMock() + mock_repo.get.return_value = True + mock_get_config.return_value = mock_repo + mock_console_instance = MagicMock() + mock_console.return_value = mock_console_instance + + # Call the function + result, thinking = process_thinking_content( + content, + supports_think_tag=True + ) + + # Verify results + mock_repo.get.assert_called_once_with("show_thoughts", False) + mock_console_instance.print.assert_called_once() + mock_panel.assert_called_once() + mock_markdown.assert_called_once() + assert result == "Response" + assert thinking == "Thinking" + + def test_panel_styling(self): + """Test custom panel title and style are applied.""" + content = "Custom thinkingResponse" + + # Mock the imported modules + with patch("rich.panel.Panel") as mock_panel: + with patch("rich.markdown.Markdown"): + with patch("rich.console.Console") as mock_console: + # Setup mock + mock_console_instance = MagicMock() + mock_console.return_value = mock_console_instance + + # Call the function + process_thinking_content( + content, + supports_think_tag=True, + show_thoughts=True, + panel_title="Custom Title", + panel_style="red" + ) + + # Check that Panel was called with the right kwargs + _, kwargs = mock_panel.call_args + assert kwargs["title"] == "Custom Title" + assert kwargs["border_style"] == "red" \ No newline at end of file