RA.Aid/ra_aid/database/migrations.py

285 lines
9.4 KiB
Python

"""
Database migrations for ra_aid.
This module provides functionality for managing database schema migrations
using peewee-migrate. It includes tools for creating, checking, and applying
migrations automatically.
"""
import os
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
import peewee
from peewee_migrate import Router
from peewee_migrate.router import DEFAULT_MIGRATE_DIR
from ra_aid.database.connection import get_db, DatabaseManager
from ra_aid.logging_config import get_logger
logger = get_logger(__name__)
# Constants
MIGRATIONS_DIRNAME = "migrations"
MIGRATIONS_TABLE = "migrationshistory"
class MigrationManager:
"""
Manages database migrations for the ra_aid application.
This class provides methods to initialize the migrator, check for
pending migrations, apply migrations, and create new migrations.
"""
def __init__(self, db_path: Optional[str] = None, migrations_dir: Optional[str] = None):
"""
Initialize the MigrationManager.
Args:
db_path: Optional path to the database file. If None, uses the default.
migrations_dir: Optional path to the migrations directory. If None, uses default.
"""
self.db = get_db()
# Determine database path
if db_path is None:
# Get current working directory
cwd = os.getcwd()
ra_aid_dir = os.path.join(cwd, ".ra-aid")
db_path = os.path.join(ra_aid_dir, "pk.db")
self.db_path = db_path
# Determine migrations directory
if migrations_dir is None:
# Use a directory within .ra-aid
ra_aid_dir = os.path.dirname(self.db_path)
migrations_dir = os.path.join(ra_aid_dir, MIGRATIONS_DIRNAME)
self.migrations_dir = migrations_dir
# Ensure migrations directory exists
self._ensure_migrations_dir()
# Initialize router
self.router = self._init_router()
def _ensure_migrations_dir(self) -> None:
"""
Ensure that the migrations directory exists.
Creates the directory if it doesn't exist.
"""
try:
migrations_path = Path(self.migrations_dir)
if not migrations_path.exists():
logger.debug(f"Creating migrations directory at: {self.migrations_dir}")
migrations_path.mkdir(parents=True, exist_ok=True)
# Create __init__.py to make it a proper package
init_file = migrations_path / "__init__.py"
if not init_file.exists():
init_file.touch()
logger.debug(f"Using migrations directory: {self.migrations_dir}")
except Exception as e:
logger.error(f"Failed to create migrations directory: {str(e)}")
raise
def _init_router(self) -> Router:
"""
Initialize the peewee-migrate Router.
Returns:
Router: Configured peewee-migrate Router instance
"""
try:
router = Router(self.db, migrate_dir=self.migrations_dir, migrate_table=MIGRATIONS_TABLE)
logger.debug(f"Initialized migration router with table: {MIGRATIONS_TABLE}")
return router
except Exception as e:
logger.error(f"Failed to initialize migration router: {str(e)}")
raise
def check_migrations(self) -> Tuple[List[str], List[str]]:
"""
Check for pending migrations.
Returns:
Tuple[List[str], List[str]]: A tuple containing (applied_migrations, pending_migrations)
"""
try:
# Get all migrations
all_migrations = self.router.todo
# Get applied migrations
applied = self.router.done
# Calculate pending migrations
pending = [m for m in all_migrations if m not in applied]
logger.debug(f"Found {len(applied)} applied migrations and {len(pending)} pending migrations")
return applied, pending
except Exception as e:
logger.error(f"Failed to check migrations: {str(e)}")
return [], []
def apply_migrations(self, fake: bool = False) -> bool:
"""
Apply all pending migrations.
Args:
fake: If True, mark migrations as applied without running them
Returns:
bool: True if migrations were applied successfully, False otherwise
"""
try:
# Get pending migrations
_, pending = self.check_migrations()
if not pending:
logger.info("No pending migrations to apply")
return True
logger.info(f"Applying {len(pending)} pending migrations...")
# Apply migrations
for migration in pending:
try:
logger.info(f"Applying migration: {migration}")
self.router.run(migration, fake=fake)
logger.info(f"Successfully applied migration: {migration}")
except Exception as e:
logger.error(f"Failed to apply migration {migration}: {str(e)}")
return False
logger.info(f"Successfully applied {len(pending)} migrations")
return True
except Exception as e:
logger.error(f"Failed to apply migrations: {str(e)}")
return False
def create_migration(self, name: str, auto: bool = True) -> Optional[str]:
"""
Create a new migration.
Args:
name: Name of the migration
auto: If True, automatically detect model changes
Returns:
Optional[str]: The name of the created migration, or None if creation failed
"""
try:
# Sanitize migration name
safe_name = name.replace(' ', '_').lower()
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
migration_name = f"{timestamp}_{safe_name}"
logger.info(f"Creating new migration: {migration_name}")
# Create migration
self.router.create(migration_name, auto=auto)
logger.info(f"Successfully created migration: {migration_name}")
return migration_name
except Exception as e:
logger.error(f"Failed to create migration: {str(e)}")
return None
def get_migration_status(self) -> Dict[str, Any]:
"""
Get the current migration status.
Returns:
Dict[str, Any]: A dictionary containing migration status information
"""
applied, pending = self.check_migrations()
return {
"applied_count": len(applied),
"pending_count": len(pending),
"applied": applied,
"pending": pending,
"migrations_dir": self.migrations_dir,
"db_path": self.db_path,
}
def init_migrations(db_path: Optional[str] = None, migrations_dir: Optional[str] = None) -> MigrationManager:
"""
Initialize the migration manager.
Args:
db_path: Optional path to the database file
migrations_dir: Optional path to the migrations directory
Returns:
MigrationManager: Initialized migration manager
"""
return MigrationManager(db_path, migrations_dir)
def ensure_migrations_applied() -> bool:
"""
Check for and apply any pending migrations.
This function should be called during application startup to ensure
the database schema is up to date.
Returns:
bool: True if migrations were applied successfully or none were pending
"""
with DatabaseManager() as db:
try:
migration_manager = init_migrations()
return migration_manager.apply_migrations()
except Exception as e:
logger.error(f"Failed to apply migrations: {str(e)}")
return False
def create_new_migration(name: str, auto: bool = True) -> Optional[str]:
"""
Create a new migration with the given name.
Args:
name: Name of the migration
auto: If True, automatically detect model changes
Returns:
Optional[str]: The name of the created migration, or None if creation failed
"""
with DatabaseManager() as db:
try:
migration_manager = init_migrations()
return migration_manager.create_migration(name, auto)
except Exception as e:
logger.error(f"Failed to create migration: {str(e)}")
return None
def get_migration_status() -> Dict[str, Any]:
"""
Get the current migration status.
Returns:
Dict[str, Any]: A dictionary containing migration status information
"""
with DatabaseManager() as db:
try:
migration_manager = init_migrations()
return migration_manager.get_migration_status()
except Exception as e:
logger.error(f"Failed to get migration status: {str(e)}")
return {
"error": str(e),
"applied_count": 0,
"pending_count": 0,
"applied": [],
"pending": [],
}