Skip to content

Instantly share code, notes, and snippets.

@silvercondor
Last active March 15, 2026 14:53
Show Gist options
  • Select an option

  • Save silvercondor/ad456487f7908918a25a6d1cefe0ad68 to your computer and use it in GitHub Desktop.

Select an option

Save silvercondor/ad456487f7908918a25a6d1cefe0ad68 to your computer and use it in GitHub Desktop.
Grafana 12.1 db migration from sqlite to postgres 17

Grafana SQLite to PostgreSQL Migration Guide

Complete guide for migrating Grafana v12.1 from SQLite to PostgreSQL v17, resolving PVC multi-attach issues.

Overview

This migration eliminates persistent volume conflicts by moving Grafana data from SQLite to PostgreSQL database backend.

Prerequisites

  • Grafana SQLite backup file
  • PostgreSQL v17 instance access
  • Python 3.x with psycopg2-binary

Step 1: PostgreSQL Setup

-- Connect as admin user
CREATE DATABASE grafana;
CREATE USER grafana_user WITH PASSWORD 'your_secure_password';
GRANT ALL PRIVILEGES ON DATABASE grafana TO grafana_user;
GRANT ALL ON SCHEMA public TO grafana_user;

Step 2: Update Grafana Configuration

Configure Grafana to use PostgreSQL and let it create the initial schema:

# grafana values/config
persistence:
  enabled: false

grafana.ini:
  database:
    type: postgres
    host: your-postgres-host:5432
    name: grafana
    user: grafana_user
    password: $GF_DATABASE_PASSWORD
    ssl_mode: require

Deploy this configuration and let Grafana create the PostgreSQL schema tables.

Step 3: Run Migration Script

Run the below migration script python3 migrate_grafana.py

Step 4: Usage Instructions

  1. Install Dependencies:

    pip install psycopg2-binary
  2. Update Configuration:

    • Set SQLITE_DB to your backup file path
    • Update POSTGRES_CONFIG with your database details
    • Replace placeholder passwords with real values
  3. Run Migration:

    python grafana_migration.py

Step 5: Verification

After migration completes:

  1. Check Grafana UI:

    • Login with existing credentials
    • Verify dashboards load correctly
    • Test data source connections
    • Try creating new folders/dashboards
  2. Database Verification:

    -- Check key table counts
    SELECT 'dashboards' as type, COUNT(*) as count FROM dashboard
    UNION ALL
    SELECT 'users', COUNT(*) FROM "user"
    UNION ALL
    SELECT 'folders', COUNT(*) FROM folder;
    
    -- Test sequence functionality
    SELECT nextval('dashboard_id_seq1') as next_dashboard_id;

Migration Features

  • βœ… Automatic Schema Discovery: Detects PostgreSQL boolean columns
  • βœ… Data Type Conversion: Handles SQLite to PostgreSQL type differences
  • βœ… Column Mapping: Manages schema differences between versions
  • βœ… Sequence Fixes: Automatically fixes auto-increment sequences
  • βœ… JSON Data Handling: Properly converts dashboard JSON from bytes
  • βœ… Dependency Ordering: Migrates tables in correct order
  • βœ… Error Recovery: Robust transaction handling

Troubleshooting

Dashboard Not Loading: Check JSON data was converted properly Boolean Errors: Verify boolean column detection worked ID Conflicts: Ensure sequences were fixed after migration Permission Errors: Check PostgreSQL user has proper grants

Success Criteria

  • βœ… Grafana loads without errors
  • βœ… All dashboards display correctly
  • βœ… Data sources functional
  • βœ… New dashboards/folders can be created
  • βœ… No PVC multi-attach issues

This migration eliminates persistent volume conflicts while preserving all Grafana data and functionality.

#!/usr/bin/env python3
"""
Grafana SQLite to PostgreSQL Migration Script with Sequence Fixes
Migrates Grafana data from SQLite backup to PostgreSQL and fixes sequences.
Handles boolean conversions, schema mappings, and data type conversions.
Usage:
1. Install dependencies: pip install psycopg2-binary
2. Update configuration below
3. Run: python migrate_grafana.py
"""
import sqlite3
import psycopg2
import sys
from typing import Dict, List, Any
# =============================================================================
# CONFIGURATION - UPDATE THESE VALUES FOR YOUR ENVIRONMENT
# =============================================================================
# Path to your SQLite backup file
SQLITE_DB = "grafana-backup.db"
# PostgreSQL connection details
POSTGRES_CONFIG = {
"host": "your-postgres-host",
"port": 5432,
"database": "grafana",
"user": "grafana_user",
"password": "your_postgres_password",
"sslmode": "require"
}
# =============================================================================
# MIGRATION CONFIGURATION (usually no changes needed)
# =============================================================================
# Column mappings for schema differences between SQLite and PostgreSQL
COLUMN_MAPPINGS = {
"annotation": {"dashboard_uid": None}, # Drop - use existing dashboard_id
"alert_instance": {"fired_at": None}, # Drop - always NULL in SQLite
"preferences": {"home_dashboard_uid": None} # Drop - use existing home_dashboard_id
}
# Exclude system tables from migration
EXCLUDE_TABLES = {"sqlite_sequence"}
# Tables with foreign key dependencies (migrate last to avoid constraint issues)
DEPENDENT_TABLES = {
"dashboard_acl", "dashboard_tag", "dashboard_version", "alert_instance",
"alert_rule_tag", "annotation_tag", "org_user", "user_role", "team_member",
"user_auth", "user_auth_token", "playlist_item", "library_element_connection",
"query_history_details", "query_history_star", "alert_rule_state",
"resource_version", "resource_history"
}
class GrafanaMigrator:
def __init__(self):
self.sqlite_conn = None
self.postgres_conn = None
self.migrated_tables = set()
self.failed_tables = set()
self.boolean_columns_cache = {}
self.postgres_columns_cache = {}
def connect_databases(self):
"""Connect to both SQLite and PostgreSQL databases"""
try:
# Connect to SQLite backup
self.sqlite_conn = sqlite3.connect(SQLITE_DB)
self.sqlite_conn.row_factory = sqlite3.Row
print(f"βœ… Connected to SQLite: {SQLITE_DB}")
# Connect to PostgreSQL
self.postgres_conn = psycopg2.connect(**POSTGRES_CONFIG)
self.postgres_conn.autocommit = False
print(f"βœ… Connected to PostgreSQL: {POSTGRES_CONFIG['host']}:{POSTGRES_CONFIG['port']}")
except Exception as e:
print(f"❌ Database connection failed: {e}")
print("\nπŸ’‘ Make sure:")
print(" - SQLite backup file exists and is readable")
print(" - PostgreSQL credentials are correct")
print(" - PostgreSQL server is accessible")
sys.exit(1)
def discover_boolean_columns(self):
"""Discover boolean columns from PostgreSQL schema"""
print("\nπŸ” Discovering boolean columns from PostgreSQL schema...")
cursor = self.postgres_conn.cursor()
cursor.execute("""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND data_type = 'boolean'
ORDER BY table_name, column_name
""")
boolean_count = 0
for row in cursor.fetchall():
table_name, column_name = row
if table_name not in self.boolean_columns_cache:
self.boolean_columns_cache[table_name] = []
self.boolean_columns_cache[table_name].append(column_name)
boolean_count += 1
print(f" Found {boolean_count} boolean columns in {len(self.boolean_columns_cache)} tables")
def cache_postgres_schemas(self):
"""Cache PostgreSQL table schemas for column filtering"""
print("πŸ“‹ Caching PostgreSQL table schemas...")
cursor = self.postgres_conn.cursor()
cursor.execute("""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
for row in cursor.fetchall():
table_name, column_name = row
if table_name not in self.postgres_columns_cache:
self.postgres_columns_cache[table_name] = set()
self.postgres_columns_cache[table_name].add(column_name)
print(f" Cached schemas for {len(self.postgres_columns_cache)} tables")
def map_columns_for_postgres(self, table_name: str, row_data: Dict) -> Dict:
"""Map and filter columns to match PostgreSQL schema"""
if table_name not in self.postgres_columns_cache:
return row_data
postgres_columns = self.postgres_columns_cache[table_name]
mapped_data = {}
# Apply column mappings if they exist for this table
dropped_columns = set()
if table_name in COLUMN_MAPPINGS:
table_mappings = COLUMN_MAPPINGS[table_name]
for sqlite_column in table_mappings:
if sqlite_column in row_data and table_mappings[sqlite_column] is None:
dropped_columns.add(sqlite_column)
# Copy columns that exist in PostgreSQL schema (excluding dropped ones)
for column, value in row_data.items():
if column in dropped_columns:
continue # Skip dropped columns
if column in postgres_columns:
mapped_data[column] = value
return mapped_data
def convert_data_types(self, table_name: str, row_data: Dict) -> Dict:
"""Convert data types for PostgreSQL compatibility"""
converted_data = {}
for column, value in row_data.items():
if value is not None:
# Handle dashboard JSON data specifically
if table_name == 'dashboard' and column == 'data':
if isinstance(value, bytes):
try:
converted_data[column] = value.decode('utf-8')
print(f" πŸ”„ Decoded dashboard {row_data.get('id', '?')} JSON data")
except UnicodeDecodeError:
converted_data[column] = value.decode('latin-1', errors='replace')
print(f" ⚠️ Used fallback encoding for dashboard {row_data.get('id', '?')}")
else:
converted_data[column] = value
# Handle other byte data
elif isinstance(value, bytes):
try:
converted_data[column] = value.decode('utf-8')
except UnicodeDecodeError:
converted_data[column] = value.decode('latin-1', errors='replace')
else:
converted_data[column] = value
else:
converted_data[column] = value
return converted_data
def convert_boolean_values(self, table_name: str, row_data: Dict) -> Dict:
"""Convert SQLite 0/1 integers to PostgreSQL booleans"""
if table_name in self.boolean_columns_cache:
for column in self.boolean_columns_cache[table_name]:
if column in row_data and row_data[column] is not None:
value = row_data[column]
if isinstance(value, int):
row_data[column] = bool(value)
elif isinstance(value, str):
row_data[column] = value.lower() in ('1', 'true', 'yes')
return row_data
def get_table_list(self) -> List[str]:
"""Get ordered list of tables from SQLite database"""
cursor = self.sqlite_conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
# Filter excluded tables
tables = [t for t in tables if t not in EXCLUDE_TABLES]
# Sort tables: independent tables first, dependent tables last
# This helps avoid foreign key constraint issues
independent_tables = [t for t in tables if t not in DEPENDENT_TABLES]
dependent_tables = [t for t in tables if t in DEPENDENT_TABLES]
return independent_tables + dependent_tables
def get_table_data(self, table_name: str) -> List[Dict]:
"""Get all data from SQLite table"""
cursor = self.sqlite_conn.cursor()
cursor.execute(f"SELECT * FROM `{table_name}`")
columns = [desc[0] for desc in cursor.description]
rows = []
for row in cursor.fetchall():
rows.append(dict(zip(columns, row)))
return rows
def clear_table(self, table_name: str):
"""Clear PostgreSQL table data"""
cursor = self.postgres_conn.cursor()
try:
cursor.execute(f'DELETE FROM "{table_name}"')
print(f" Cleared table: {table_name}")
except Exception as e:
print(f" ⚠️ Warning: Could not clear {table_name}: {e}")
def insert_table_data(self, table_name: str, rows: List[Dict]) -> bool:
"""Insert data into PostgreSQL table using batch insert"""
if not rows:
print(f" No data for {table_name}")
return True
cursor = self.postgres_conn.cursor()
try:
first_row = rows[0]
columns = list(first_row.keys())
# Create INSERT statement with proper escaping
placeholders = ', '.join(['%s'] * len(columns))
column_names = ', '.join([f'"{col}"' for col in columns])
sql = f'INSERT INTO "{table_name}" ({column_names}) VALUES ({placeholders})'
# Prepare data for batch insert
values_list = [[row.get(col) for col in columns] for row in rows]
cursor.executemany(sql, values_list)
print(f" Inserted {len(rows)} rows")
return True
except Exception as e:
print(f" ❌ Error inserting data: {e}")
return False
def migrate_table(self, table_name: str) -> bool:
"""Migrate a single table from SQLite to PostgreSQL"""
print(f"\nπŸ“„ Migrating: {table_name}")
try:
# Clear any previous failed transaction
self.postgres_conn.rollback()
# Get data from SQLite
rows = self.get_table_data(table_name)
print(f" Found {len(rows)} rows in SQLite")
if not rows:
print(f" βœ… Empty table - skipping")
return True
# Process data through the conversion pipeline
processed_rows = []
for row in rows:
# Step 1: Map columns to match PostgreSQL schema
row = self.map_columns_for_postgres(table_name, row)
# Step 2: Convert data types for PostgreSQL compatibility
row = self.convert_data_types(table_name, row)
# Step 3: Convert boolean values from SQLite integers
row = self.convert_boolean_values(table_name, row)
processed_rows.append(row)
# Clear PostgreSQL table and insert migrated data
self.clear_table(table_name)
success = self.insert_table_data(table_name, processed_rows)
if success:
self.postgres_conn.commit()
print(f" βœ… Successfully migrated {table_name}")
return True
else:
self.postgres_conn.rollback()
return False
except Exception as e:
print(f" ❌ Error migrating {table_name}: {e}")
try:
self.postgres_conn.rollback()
except:
pass # Rollback might fail if connection is broken
return False
def fix_sequences(self):
"""Fix PostgreSQL sequences after migration"""
print("\nπŸ”§ Fixing PostgreSQL sequences...")
cursor = self.postgres_conn.cursor()
# List of important sequences to fix
sequences_to_fix = [
('folder_id_seq', 'folder'),
('dashboard_id_seq1', 'dashboard'),
('user_id_seq1', '"user"'), # user is reserved keyword in PostgreSQL
('org_id_seq', 'org'),
('data_source_id_seq1', 'data_source'),
('alert_id_seq', 'alert'),
('alert_rule_id_seq', 'alert_rule'),
('annotation_id_seq', 'annotation'),
('api_key_id_seq1', 'api_key'),
('dashboard_version_id_seq', 'dashboard_version'),
('library_element_id_seq', 'library_element'),
('playlist_id_seq', 'playlist'),
('preferences_id_seq', 'preferences'),
('role_id_seq', 'role'),
('team_id_seq', 'team'),
('permission_id_seq', 'permission'), # Critical for folder creation
('alert_rule_version_id_seq', 'alert_rule_version'),
('dashboard_acl_id_seq', 'dashboard_acl'),
('org_user_id_seq', 'org_user'),
]
fixed_count = 0
for seq_name, table_name in sequences_to_fix:
try:
# Check if sequence exists
cursor.execute("""
SELECT 1 FROM pg_sequences
WHERE sequencename = %s
""", (seq_name,))
if cursor.fetchone():
# Fix the sequence to match the max ID in the table
cursor.execute(f"""
SELECT setval('{seq_name}',
(SELECT COALESCE(MAX(id), 1) FROM {table_name}),
true)
""")
fixed_count += 1
print(f" βœ… Fixed {seq_name}")
except Exception as e:
print(f" ⚠️ Could not fix {seq_name}: {e}")
self.postgres_conn.commit()
print(f" Fixed {fixed_count} sequences")
def verify_migration(self):
"""Verify migration results by comparing table counts"""
print("\nπŸ” Verifying migration...")
cursor_sqlite = self.sqlite_conn.cursor()
cursor_postgres = self.postgres_conn.cursor()
# Check first 10 migrated tables for verification
tables_to_check = list(self.migrated_tables)[:10]
for table_name in tables_to_check:
try:
# Get SQLite count
cursor_sqlite.execute(f"SELECT COUNT(*) FROM `{table_name}`")
sqlite_count = cursor_sqlite.fetchone()[0]
# Get PostgreSQL count
cursor_postgres.execute(f'SELECT COUNT(*) FROM "{table_name}"')
postgres_count = cursor_postgres.fetchone()[0]
if sqlite_count == postgres_count:
print(f" βœ… {table_name}: {postgres_count} rows")
else:
print(f" ⚠️ {table_name}: SQLite={sqlite_count}, PostgreSQL={postgres_count}")
except Exception as e:
print(f" ❌ Error verifying {table_name}: {e}")
def run_migration(self):
"""Run the complete migration process"""
print("πŸš€ Grafana SQLite to PostgreSQL Migration")
print("=" * 60)
# Step 1: Connect to databases
self.connect_databases()
# Step 2: Analyze PostgreSQL schema
self.discover_boolean_columns()
self.cache_postgres_schemas()
# Step 3: Get tables to migrate
tables = self.get_table_list()
print(f"\nπŸ“‹ Found {len(tables)} tables to migrate")
# Step 4: Migrate each table
print(f"\nπŸ”„ Starting table migration...")
for table_name in tables:
if self.migrate_table(table_name):
self.migrated_tables.add(table_name)
else:
self.failed_tables.add(table_name)
# Step 5: Fix PostgreSQL sequences
if self.migrated_tables:
self.fix_sequences()
# Step 6: Print summary
print("\n" + "=" * 60)
print("πŸ“Š MIGRATION SUMMARY")
print("=" * 60)
print(f"βœ… Successfully migrated: {len(self.migrated_tables)} tables")
print(f"❌ Failed to migrate: {len(self.failed_tables)} tables")
if self.failed_tables:
print(f"\nFailed tables: {', '.join(self.failed_tables)}")
# Step 7: Verify migration results
if self.migrated_tables:
self.verify_migration()
# Step 8: Clean up connections
if self.sqlite_conn:
self.sqlite_conn.close()
if self.postgres_conn:
self.postgres_conn.close()
print(f"\nπŸŽ‰ Migration process completed!")
return len(self.failed_tables) == 0
def main():
"""Main entry point"""
print("Grafana SQLite to PostgreSQL Migration Tool")
print("=" * 50)
# Display configuration
print(f"SQLite backup: {SQLITE_DB}")
print(f"PostgreSQL host: {POSTGRES_CONFIG['host']}:{POSTGRES_CONFIG['port']}")
print(f"Database: {POSTGRES_CONFIG['database']}")
print(f"User: {POSTGRES_CONFIG['user']}")
# Safety confirmation
print("\n⚠️ This will:")
print(" - Clear all data in PostgreSQL Grafana tables")
print(" - Import data from SQLite backup")
print(" - Fix PostgreSQL sequences")
response = input("\nProceed with migration? Type 'yes' to continue: ")
if response.lower() != 'yes':
print("Migration cancelled.")
sys.exit(0)
# Run the migration
migrator = GrafanaMigrator()
success = migrator.run_migration()
if success:
print("\n" + "=" * 60)
print("βœ… MIGRATION COMPLETED SUCCESSFULLY!")
print("=" * 60)
print("\nNext steps:")
print("1. Restart Grafana to use the PostgreSQL backend")
print("2. Login and verify dashboards load correctly")
print("3. Test data source connections")
print("4. Try creating new folders and dashboards")
print("5. Check that alerts and other features work")
sys.exit(0)
else:
print("\n" + "=" * 60)
print("❌ MIGRATION COMPLETED WITH ERRORS")
print("=" * 60)
print("\nSome tables failed to migrate. Check the error messages above.")
print("You may need to:")
print("- Check PostgreSQL permissions")
print("- Verify schema compatibility")
print("- Review failed table structures")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment