|
#!/usr/bin/env python3 |
|
""" |
|
Grafana SQLite to PostgreSQL Migration Script with Sequence Fixes |
|
|
|
Migrates Grafana data from SQLite backup to PostgreSQL and fixes sequences. |
|
Handles boolean conversions, schema mappings, and data type conversions. |
|
|
|
Usage: |
|
1. Install dependencies: pip install psycopg2-binary |
|
2. Update configuration below |
|
3. Run: python migrate_grafana.py |
|
""" |
|
|
|
import sqlite3 |
|
import psycopg2 |
|
import sys |
|
from typing import Dict, List, Any |
|
|
|
# ============================================================================= |
|
# CONFIGURATION - UPDATE THESE VALUES FOR YOUR ENVIRONMENT |
|
# ============================================================================= |
|
|
|
# Path to your SQLite backup file |
|
SQLITE_DB = "grafana-backup.db" |
|
|
|
# PostgreSQL connection details |
|
POSTGRES_CONFIG = { |
|
"host": "your-postgres-host", |
|
"port": 5432, |
|
"database": "grafana", |
|
"user": "grafana_user", |
|
"password": "your_postgres_password", |
|
"sslmode": "require" |
|
} |
|
|
|
# ============================================================================= |
|
# MIGRATION CONFIGURATION (usually no changes needed) |
|
# ============================================================================= |
|
|
|
# Column mappings for schema differences between SQLite and PostgreSQL |
|
COLUMN_MAPPINGS = { |
|
"annotation": {"dashboard_uid": None}, # Drop - use existing dashboard_id |
|
"alert_instance": {"fired_at": None}, # Drop - always NULL in SQLite |
|
"preferences": {"home_dashboard_uid": None} # Drop - use existing home_dashboard_id |
|
} |
|
|
|
# Exclude system tables from migration |
|
EXCLUDE_TABLES = {"sqlite_sequence"} |
|
|
|
# Tables with foreign key dependencies (migrate last to avoid constraint issues) |
|
DEPENDENT_TABLES = { |
|
"dashboard_acl", "dashboard_tag", "dashboard_version", "alert_instance", |
|
"alert_rule_tag", "annotation_tag", "org_user", "user_role", "team_member", |
|
"user_auth", "user_auth_token", "playlist_item", "library_element_connection", |
|
"query_history_details", "query_history_star", "alert_rule_state", |
|
"resource_version", "resource_history" |
|
} |
|
|
|
class GrafanaMigrator: |
|
def __init__(self): |
|
self.sqlite_conn = None |
|
self.postgres_conn = None |
|
self.migrated_tables = set() |
|
self.failed_tables = set() |
|
self.boolean_columns_cache = {} |
|
self.postgres_columns_cache = {} |
|
|
|
def connect_databases(self): |
|
"""Connect to both SQLite and PostgreSQL databases""" |
|
try: |
|
# Connect to SQLite backup |
|
self.sqlite_conn = sqlite3.connect(SQLITE_DB) |
|
self.sqlite_conn.row_factory = sqlite3.Row |
|
print(f"β
Connected to SQLite: {SQLITE_DB}") |
|
|
|
# Connect to PostgreSQL |
|
self.postgres_conn = psycopg2.connect(**POSTGRES_CONFIG) |
|
self.postgres_conn.autocommit = False |
|
print(f"β
Connected to PostgreSQL: {POSTGRES_CONFIG['host']}:{POSTGRES_CONFIG['port']}") |
|
|
|
except Exception as e: |
|
print(f"β Database connection failed: {e}") |
|
print("\nπ‘ Make sure:") |
|
print(" - SQLite backup file exists and is readable") |
|
print(" - PostgreSQL credentials are correct") |
|
print(" - PostgreSQL server is accessible") |
|
sys.exit(1) |
|
|
|
def discover_boolean_columns(self): |
|
"""Discover boolean columns from PostgreSQL schema""" |
|
print("\nπ Discovering boolean columns from PostgreSQL schema...") |
|
|
|
cursor = self.postgres_conn.cursor() |
|
cursor.execute(""" |
|
SELECT table_name, column_name |
|
FROM information_schema.columns |
|
WHERE table_schema = 'public' |
|
AND data_type = 'boolean' |
|
ORDER BY table_name, column_name |
|
""") |
|
|
|
boolean_count = 0 |
|
for row in cursor.fetchall(): |
|
table_name, column_name = row |
|
if table_name not in self.boolean_columns_cache: |
|
self.boolean_columns_cache[table_name] = [] |
|
self.boolean_columns_cache[table_name].append(column_name) |
|
boolean_count += 1 |
|
|
|
print(f" Found {boolean_count} boolean columns in {len(self.boolean_columns_cache)} tables") |
|
|
|
def cache_postgres_schemas(self): |
|
"""Cache PostgreSQL table schemas for column filtering""" |
|
print("π Caching PostgreSQL table schemas...") |
|
|
|
cursor = self.postgres_conn.cursor() |
|
cursor.execute(""" |
|
SELECT table_name, column_name |
|
FROM information_schema.columns |
|
WHERE table_schema = 'public' |
|
ORDER BY table_name, ordinal_position |
|
""") |
|
|
|
for row in cursor.fetchall(): |
|
table_name, column_name = row |
|
if table_name not in self.postgres_columns_cache: |
|
self.postgres_columns_cache[table_name] = set() |
|
self.postgres_columns_cache[table_name].add(column_name) |
|
|
|
print(f" Cached schemas for {len(self.postgres_columns_cache)} tables") |
|
|
|
def map_columns_for_postgres(self, table_name: str, row_data: Dict) -> Dict: |
|
"""Map and filter columns to match PostgreSQL schema""" |
|
if table_name not in self.postgres_columns_cache: |
|
return row_data |
|
|
|
postgres_columns = self.postgres_columns_cache[table_name] |
|
mapped_data = {} |
|
|
|
# Apply column mappings if they exist for this table |
|
dropped_columns = set() |
|
if table_name in COLUMN_MAPPINGS: |
|
table_mappings = COLUMN_MAPPINGS[table_name] |
|
for sqlite_column in table_mappings: |
|
if sqlite_column in row_data and table_mappings[sqlite_column] is None: |
|
dropped_columns.add(sqlite_column) |
|
|
|
# Copy columns that exist in PostgreSQL schema (excluding dropped ones) |
|
for column, value in row_data.items(): |
|
if column in dropped_columns: |
|
continue # Skip dropped columns |
|
|
|
if column in postgres_columns: |
|
mapped_data[column] = value |
|
|
|
return mapped_data |
|
|
|
def convert_data_types(self, table_name: str, row_data: Dict) -> Dict: |
|
"""Convert data types for PostgreSQL compatibility""" |
|
converted_data = {} |
|
|
|
for column, value in row_data.items(): |
|
if value is not None: |
|
# Handle dashboard JSON data specifically |
|
if table_name == 'dashboard' and column == 'data': |
|
if isinstance(value, bytes): |
|
try: |
|
converted_data[column] = value.decode('utf-8') |
|
print(f" π Decoded dashboard {row_data.get('id', '?')} JSON data") |
|
except UnicodeDecodeError: |
|
converted_data[column] = value.decode('latin-1', errors='replace') |
|
print(f" β οΈ Used fallback encoding for dashboard {row_data.get('id', '?')}") |
|
else: |
|
converted_data[column] = value |
|
# Handle other byte data |
|
elif isinstance(value, bytes): |
|
try: |
|
converted_data[column] = value.decode('utf-8') |
|
except UnicodeDecodeError: |
|
converted_data[column] = value.decode('latin-1', errors='replace') |
|
else: |
|
converted_data[column] = value |
|
else: |
|
converted_data[column] = value |
|
|
|
return converted_data |
|
|
|
def convert_boolean_values(self, table_name: str, row_data: Dict) -> Dict: |
|
"""Convert SQLite 0/1 integers to PostgreSQL booleans""" |
|
if table_name in self.boolean_columns_cache: |
|
for column in self.boolean_columns_cache[table_name]: |
|
if column in row_data and row_data[column] is not None: |
|
value = row_data[column] |
|
if isinstance(value, int): |
|
row_data[column] = bool(value) |
|
elif isinstance(value, str): |
|
row_data[column] = value.lower() in ('1', 'true', 'yes') |
|
|
|
return row_data |
|
|
|
def get_table_list(self) -> List[str]: |
|
"""Get ordered list of tables from SQLite database""" |
|
cursor = self.sqlite_conn.cursor() |
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") |
|
tables = [row[0] for row in cursor.fetchall()] |
|
|
|
# Filter excluded tables |
|
tables = [t for t in tables if t not in EXCLUDE_TABLES] |
|
|
|
# Sort tables: independent tables first, dependent tables last |
|
# This helps avoid foreign key constraint issues |
|
independent_tables = [t for t in tables if t not in DEPENDENT_TABLES] |
|
dependent_tables = [t for t in tables if t in DEPENDENT_TABLES] |
|
|
|
return independent_tables + dependent_tables |
|
|
|
def get_table_data(self, table_name: str) -> List[Dict]: |
|
"""Get all data from SQLite table""" |
|
cursor = self.sqlite_conn.cursor() |
|
cursor.execute(f"SELECT * FROM `{table_name}`") |
|
|
|
columns = [desc[0] for desc in cursor.description] |
|
rows = [] |
|
|
|
for row in cursor.fetchall(): |
|
rows.append(dict(zip(columns, row))) |
|
|
|
return rows |
|
|
|
def clear_table(self, table_name: str): |
|
"""Clear PostgreSQL table data""" |
|
cursor = self.postgres_conn.cursor() |
|
try: |
|
cursor.execute(f'DELETE FROM "{table_name}"') |
|
print(f" Cleared table: {table_name}") |
|
except Exception as e: |
|
print(f" β οΈ Warning: Could not clear {table_name}: {e}") |
|
|
|
def insert_table_data(self, table_name: str, rows: List[Dict]) -> bool: |
|
"""Insert data into PostgreSQL table using batch insert""" |
|
if not rows: |
|
print(f" No data for {table_name}") |
|
return True |
|
|
|
cursor = self.postgres_conn.cursor() |
|
|
|
try: |
|
first_row = rows[0] |
|
columns = list(first_row.keys()) |
|
|
|
# Create INSERT statement with proper escaping |
|
placeholders = ', '.join(['%s'] * len(columns)) |
|
column_names = ', '.join([f'"{col}"' for col in columns]) |
|
sql = f'INSERT INTO "{table_name}" ({column_names}) VALUES ({placeholders})' |
|
|
|
# Prepare data for batch insert |
|
values_list = [[row.get(col) for col in columns] for row in rows] |
|
cursor.executemany(sql, values_list) |
|
|
|
print(f" Inserted {len(rows)} rows") |
|
return True |
|
|
|
except Exception as e: |
|
print(f" β Error inserting data: {e}") |
|
return False |
|
|
|
def migrate_table(self, table_name: str) -> bool: |
|
"""Migrate a single table from SQLite to PostgreSQL""" |
|
print(f"\nπ Migrating: {table_name}") |
|
|
|
try: |
|
# Clear any previous failed transaction |
|
self.postgres_conn.rollback() |
|
|
|
# Get data from SQLite |
|
rows = self.get_table_data(table_name) |
|
print(f" Found {len(rows)} rows in SQLite") |
|
|
|
if not rows: |
|
print(f" β
Empty table - skipping") |
|
return True |
|
|
|
# Process data through the conversion pipeline |
|
processed_rows = [] |
|
for row in rows: |
|
# Step 1: Map columns to match PostgreSQL schema |
|
row = self.map_columns_for_postgres(table_name, row) |
|
# Step 2: Convert data types for PostgreSQL compatibility |
|
row = self.convert_data_types(table_name, row) |
|
# Step 3: Convert boolean values from SQLite integers |
|
row = self.convert_boolean_values(table_name, row) |
|
processed_rows.append(row) |
|
|
|
# Clear PostgreSQL table and insert migrated data |
|
self.clear_table(table_name) |
|
success = self.insert_table_data(table_name, processed_rows) |
|
|
|
if success: |
|
self.postgres_conn.commit() |
|
print(f" β
Successfully migrated {table_name}") |
|
return True |
|
else: |
|
self.postgres_conn.rollback() |
|
return False |
|
|
|
except Exception as e: |
|
print(f" β Error migrating {table_name}: {e}") |
|
try: |
|
self.postgres_conn.rollback() |
|
except: |
|
pass # Rollback might fail if connection is broken |
|
return False |
|
|
|
def fix_sequences(self): |
|
"""Fix PostgreSQL sequences after migration""" |
|
print("\nπ§ Fixing PostgreSQL sequences...") |
|
|
|
cursor = self.postgres_conn.cursor() |
|
|
|
# List of important sequences to fix |
|
sequences_to_fix = [ |
|
('folder_id_seq', 'folder'), |
|
('dashboard_id_seq1', 'dashboard'), |
|
('user_id_seq1', '"user"'), # user is reserved keyword in PostgreSQL |
|
('org_id_seq', 'org'), |
|
('data_source_id_seq1', 'data_source'), |
|
('alert_id_seq', 'alert'), |
|
('alert_rule_id_seq', 'alert_rule'), |
|
('annotation_id_seq', 'annotation'), |
|
('api_key_id_seq1', 'api_key'), |
|
('dashboard_version_id_seq', 'dashboard_version'), |
|
('library_element_id_seq', 'library_element'), |
|
('playlist_id_seq', 'playlist'), |
|
('preferences_id_seq', 'preferences'), |
|
('role_id_seq', 'role'), |
|
('team_id_seq', 'team'), |
|
('permission_id_seq', 'permission'), # Critical for folder creation |
|
('alert_rule_version_id_seq', 'alert_rule_version'), |
|
('dashboard_acl_id_seq', 'dashboard_acl'), |
|
('org_user_id_seq', 'org_user'), |
|
] |
|
|
|
fixed_count = 0 |
|
for seq_name, table_name in sequences_to_fix: |
|
try: |
|
# Check if sequence exists |
|
cursor.execute(""" |
|
SELECT 1 FROM pg_sequences |
|
WHERE sequencename = %s |
|
""", (seq_name,)) |
|
|
|
if cursor.fetchone(): |
|
# Fix the sequence to match the max ID in the table |
|
cursor.execute(f""" |
|
SELECT setval('{seq_name}', |
|
(SELECT COALESCE(MAX(id), 1) FROM {table_name}), |
|
true) |
|
""") |
|
fixed_count += 1 |
|
print(f" β
Fixed {seq_name}") |
|
except Exception as e: |
|
print(f" β οΈ Could not fix {seq_name}: {e}") |
|
|
|
self.postgres_conn.commit() |
|
print(f" Fixed {fixed_count} sequences") |
|
|
|
def verify_migration(self): |
|
"""Verify migration results by comparing table counts""" |
|
print("\nπ Verifying migration...") |
|
|
|
cursor_sqlite = self.sqlite_conn.cursor() |
|
cursor_postgres = self.postgres_conn.cursor() |
|
|
|
# Check first 10 migrated tables for verification |
|
tables_to_check = list(self.migrated_tables)[:10] |
|
|
|
for table_name in tables_to_check: |
|
try: |
|
# Get SQLite count |
|
cursor_sqlite.execute(f"SELECT COUNT(*) FROM `{table_name}`") |
|
sqlite_count = cursor_sqlite.fetchone()[0] |
|
|
|
# Get PostgreSQL count |
|
cursor_postgres.execute(f'SELECT COUNT(*) FROM "{table_name}"') |
|
postgres_count = cursor_postgres.fetchone()[0] |
|
|
|
if sqlite_count == postgres_count: |
|
print(f" β
{table_name}: {postgres_count} rows") |
|
else: |
|
print(f" β οΈ {table_name}: SQLite={sqlite_count}, PostgreSQL={postgres_count}") |
|
|
|
except Exception as e: |
|
print(f" β Error verifying {table_name}: {e}") |
|
|
|
def run_migration(self): |
|
"""Run the complete migration process""" |
|
print("π Grafana SQLite to PostgreSQL Migration") |
|
print("=" * 60) |
|
|
|
# Step 1: Connect to databases |
|
self.connect_databases() |
|
|
|
# Step 2: Analyze PostgreSQL schema |
|
self.discover_boolean_columns() |
|
self.cache_postgres_schemas() |
|
|
|
# Step 3: Get tables to migrate |
|
tables = self.get_table_list() |
|
print(f"\nπ Found {len(tables)} tables to migrate") |
|
|
|
# Step 4: Migrate each table |
|
print(f"\nπ Starting table migration...") |
|
for table_name in tables: |
|
if self.migrate_table(table_name): |
|
self.migrated_tables.add(table_name) |
|
else: |
|
self.failed_tables.add(table_name) |
|
|
|
# Step 5: Fix PostgreSQL sequences |
|
if self.migrated_tables: |
|
self.fix_sequences() |
|
|
|
# Step 6: Print summary |
|
print("\n" + "=" * 60) |
|
print("π MIGRATION SUMMARY") |
|
print("=" * 60) |
|
print(f"β
Successfully migrated: {len(self.migrated_tables)} tables") |
|
print(f"β Failed to migrate: {len(self.failed_tables)} tables") |
|
|
|
if self.failed_tables: |
|
print(f"\nFailed tables: {', '.join(self.failed_tables)}") |
|
|
|
# Step 7: Verify migration results |
|
if self.migrated_tables: |
|
self.verify_migration() |
|
|
|
# Step 8: Clean up connections |
|
if self.sqlite_conn: |
|
self.sqlite_conn.close() |
|
if self.postgres_conn: |
|
self.postgres_conn.close() |
|
|
|
print(f"\nπ Migration process completed!") |
|
return len(self.failed_tables) == 0 |
|
|
|
def main(): |
|
"""Main entry point""" |
|
print("Grafana SQLite to PostgreSQL Migration Tool") |
|
print("=" * 50) |
|
|
|
# Display configuration |
|
print(f"SQLite backup: {SQLITE_DB}") |
|
print(f"PostgreSQL host: {POSTGRES_CONFIG['host']}:{POSTGRES_CONFIG['port']}") |
|
print(f"Database: {POSTGRES_CONFIG['database']}") |
|
print(f"User: {POSTGRES_CONFIG['user']}") |
|
|
|
# Safety confirmation |
|
print("\nβ οΈ This will:") |
|
print(" - Clear all data in PostgreSQL Grafana tables") |
|
print(" - Import data from SQLite backup") |
|
print(" - Fix PostgreSQL sequences") |
|
|
|
response = input("\nProceed with migration? Type 'yes' to continue: ") |
|
if response.lower() != 'yes': |
|
print("Migration cancelled.") |
|
sys.exit(0) |
|
|
|
# Run the migration |
|
migrator = GrafanaMigrator() |
|
success = migrator.run_migration() |
|
|
|
if success: |
|
print("\n" + "=" * 60) |
|
print("β
MIGRATION COMPLETED SUCCESSFULLY!") |
|
print("=" * 60) |
|
print("\nNext steps:") |
|
print("1. Restart Grafana to use the PostgreSQL backend") |
|
print("2. Login and verify dashboards load correctly") |
|
print("3. Test data source connections") |
|
print("4. Try creating new folders and dashboards") |
|
print("5. Check that alerts and other features work") |
|
sys.exit(0) |
|
else: |
|
print("\n" + "=" * 60) |
|
print("β MIGRATION COMPLETED WITH ERRORS") |
|
print("=" * 60) |
|
print("\nSome tables failed to migrate. Check the error messages above.") |
|
print("You may need to:") |
|
print("- Check PostgreSQL permissions") |
|
print("- Verify schema compatibility") |
|
print("- Review failed table structures") |
|
sys.exit(1) |
|
|
|
if __name__ == "__main__": |
|
main() |