Skip to content

Instantly share code, notes, and snippets.

@Codegass
Last active August 27, 2025 20:43
Show Gist options
  • Select an option

  • Save Codegass/f42622c2e5cad6418b9356c469535ceb to your computer and use it in GitHub Desktop.

Select an option

Save Codegass/f42622c2e5cad6418b9356c469535ceb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
checkout_fixed_baseline.py
Interactive downloader for the 2025-05-30 fixed baseline
"""
import os
import sys
import subprocess
import shutil
from pathlib import Path
# Constants
TARGET_DATE_UTC = '2025-05-30 23:59:59'
ALL_REPOS = {
'commons-configuration': 'https://github.com/apache/commons-configuration.git',
'commons-lang': 'https://github.com/apache/commons-lang.git',
'samza': 'https://github.com/apache/samza.git',
'hudi': 'https://github.com/apache/hudi.git',
'commons-text': 'https://github.com/apache/commons-text.git',
'tika': 'https://github.com/apache/tika.git',
'commons-compress': 'https://github.com/apache/commons-compress.git',
'commons-vfs': 'https://github.com/apache/commons-vfs.git',
'struts': 'https://github.com/apache/struts.git',
'shiro': 'https://github.com/apache/shiro.git',
'commons-beanutils': 'https://github.com/apache/commons-beanutils.git',
'linkis': 'https://github.com/apache/linkis.git',
'storm': 'https://github.com/apache/storm.git',
'commons-jexl': 'https://github.com/apache/commons-jexl.git',
'skywalking': 'https://github.com/apache/skywalking.git',
'commons-dbcp': 'https://github.com/apache/commons-dbcp.git',
'commons-csv': 'https://github.com/apache/commons-csv.git',
'commons-validator': 'https://github.com/apache/commons-validator.git',
'commons-cli': 'https://github.com/apache/commons-cli.git',
}
def run_command(cmd, cwd=None, capture_output=True, check=True):
"""Run a shell command and return the result."""
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
capture_output=capture_output,
text=True,
check=check
)
return result
except subprocess.CalledProcessError as e:
if capture_output:
return None
raise e
def check_git_available():
"""Check if git is available in the system."""
if not shutil.which('git'):
print("❌ Error: git is not installed or not available in PATH")
sys.exit(1)
def ask_yes_no(prompt, default='n'):
"""Ask a yes/no question with a default answer."""
while True:
if default.lower() == 'y':
response = input(f"{prompt} [Y/n]: ").strip().lower()
if not response:
response = 'y'
else:
response = input(f"{prompt} [y/N]: ").strip().lower()
if not response:
response = 'n'
if response in ['y', 'yes']:
return True
elif response in ['n', 'no']:
return False
else:
print("Please answer yes or no.")
def create_dir_if_needed(dir_path):
"""Create directory if it doesn't exist, with user confirmation."""
path_obj = Path(dir_path)
if path_obj.exists():
if not path_obj.is_dir():
print(f"❌ Error: '{dir_path}' exists but is not a directory")
sys.exit(1)
return True
if ask_yes_no(f"Directory '{dir_path}' does not exist. Create it?"):
try:
path_obj.mkdir(parents=True, exist_ok=True)
print(f"✓ Created directory: {dir_path}")
return True
except PermissionError:
print(f"❌ Error: Permission denied creating directory '{dir_path}'")
sys.exit(1)
except Exception as e:
print(f"❌ Error creating directory '{dir_path}': {e}")
sys.exit(1)
else:
print("Operation cancelled.")
sys.exit(1)
def get_default_branch(repo_path):
"""Get the default branch of a git repository."""
print(" 🔍 Detecting default branch...")
result = run_command("git remote show origin", cwd=repo_path)
if not result:
return None
for line in result.stdout.split('\n'):
if 'HEAD branch:' in line:
branch = line.split(':')[-1].strip()
print(f" ✓ Default branch: {branch}")
return branch
return None
def get_commit_before_date(repo_path, branch, target_date):
"""Get the latest commit before the target date."""
print(f" 🔍 Finding commit before {target_date}...")
cmd = f'git rev-list -n 1 --before="{target_date}" "origin/{branch}"'
result = run_command(cmd, cwd=repo_path)
if not result or not result.stdout.strip():
return None
commit_hash = result.stdout.strip()
# Get commit info
date_result = run_command(f'git show -s --format=%ci "{commit_hash}"', cwd=repo_path)
subject_result = run_command(f'git show -s --format=%s "{commit_hash}"', cwd=repo_path)
if date_result and subject_result:
commit_date = date_result.stdout.strip()
commit_subject = subject_result.stdout.strip()
print(f" 📅 Found commit: {commit_hash}")
print(f" Date: {commit_date}")
print(f" Subject: {commit_subject}")
return commit_hash
def has_uncommitted_changes(repo_path):
"""Check if repository has uncommitted changes."""
result = run_command("git diff-index --quiet HEAD", cwd=repo_path, check=False)
return result.returncode != 0
def checkout_baseline(repo_name, repo_url, download_dir):
"""Download and checkout a repository to the baseline commit."""
print(f"▶ Processing {repo_name}...")
repo_path = os.path.join(download_dir, repo_name)
# Clone or update repository
if not os.path.exists(repo_path):
print(" 📥 Cloning repository...")
result = run_command(f'git clone --quiet "{repo_url}" "{repo_name}"', cwd=download_dir)
if not result:
print(" ❌ Failed to clone repository")
return False
print(" ✓ Cloned successfully")
else:
print(" 📁 Repository already exists")
if ask_yes_no(" Update existing repository?"):
print(" 🔄 Fetching latest changes...")
result = run_command("git fetch --quiet origin", cwd=repo_path)
if result:
print(" ✓ Fetched successfully")
else:
print(" ⚠️ Failed to fetch updates, continuing with existing data")
# Check for uncommitted changes
if has_uncommitted_changes(repo_path):
print(" ⚠️ Repository has uncommitted changes")
if ask_yes_no(" Stash changes and continue?"):
stash_result = run_command('git stash push -m "Automated stash before baseline checkout"', cwd=repo_path)
if stash_result:
print(" ✓ Changes stashed")
else:
print(" ❌ Failed to stash changes")
return False
else:
print(" ❌ Skipping repository due to uncommitted changes")
return False
# Get default branch
default_branch = get_default_branch(repo_path)
if not default_branch:
print(" ❌ Cannot determine default branch")
return False
# Fetch the default branch
fetch_result = run_command(f"git fetch --quiet origin {default_branch}", cwd=repo_path)
if not fetch_result:
print(f" ⚠️ Could not fetch branch {default_branch}")
# Find target commit
commit_hash = get_commit_before_date(repo_path, default_branch, TARGET_DATE_UTC)
if not commit_hash:
print(f" ⚠️ No commit found before {TARGET_DATE_UTC}")
return False
# Checkout the commit
checkout_result = run_command(f'git checkout --quiet "{commit_hash}"', cwd=repo_path)
if checkout_result:
print(" ✅ Successfully checked out baseline commit")
return True
else:
print(" ❌ Failed to checkout commit")
return False
def select_projects():
"""Interactive project selection."""
sorted_projects = sorted(ALL_REPOS.keys())
while True:
print("Available projects:")
for i, project in enumerate(sorted_projects, 1):
print(f" {i:2d}. {project}")
print()
selection = input('Enter selection ("all", project names, or numbers - comma-separated): ').strip()
if not selection:
print("⚠️ Please enter a selection")
continue
if selection.lower() == 'all':
selected_projects = list(ALL_REPOS.keys())
else:
# Parse comma-separated list
selected_projects = []
items = [item.strip() for item in selection.split(',')]
for item in items:
# Try to parse as number first
try:
num = int(item)
if 1 <= num <= len(sorted_projects):
project_name = sorted_projects[num - 1]
if project_name not in selected_projects:
selected_projects.append(project_name)
else:
print(f"⚠️ Number {num} is out of range (1-{len(sorted_projects)})")
except ValueError:
# Not a number, try as project name
if item in ALL_REPOS:
if item not in selected_projects:
selected_projects.append(item)
else:
print(f"⚠️ Unknown project '{item}' (ignored)")
if not selected_projects:
print("❌ No valid projects selected. Please try again.")
continue
# Show selection and confirm
print(f"\n📝 Selected {len(selected_projects)} project(s):")
for i, project in enumerate(selected_projects, 1):
print(f" {i}. {project}")
print()
if ask_yes_no("Proceed with these projects?", default='y'):
return selected_projects
else:
print("Please make a new selection.\n")
def get_download_directory():
"""Get and validate download directory."""
while True:
download_dir = input("Enter download directory path (absolute or relative): ").strip()
if not download_dir:
print("⚠️ Please enter a directory path")
continue
try:
# Convert to Path object for easier handling
path_obj = Path(download_dir)
# Expand user home directory if present (~)
path_obj = path_obj.expanduser()
# Convert to absolute path
abs_path_obj = path_obj.resolve()
abs_download_dir = str(abs_path_obj)
# Show the absolute path to user for confirmation
print(f"📍 Resolved absolute path: {abs_download_dir}")
if not ask_yes_no("Is this the correct directory?", default='y'):
continue
# Check if parent directory exists and is writable
parent_dir = abs_path_obj.parent
if not parent_dir.exists():
print(f"❌ Parent directory '{parent_dir}' does not exist")
continue
if not os.access(parent_dir, os.W_OK):
print(f"❌ No write permission for parent directory '{parent_dir}'")
continue
return abs_download_dir
except Exception as e:
print(f"❌ Invalid path '{download_dir}': {e}")
continue
def main():
"""Main function."""
print("🚀 Apache Projects Baseline Checkout Tool")
print(f"Target baseline date: {TARGET_DATE_UTC}")
print()
# Check if git is available
check_git_available()
# Select projects
selected_projects = select_projects()
print(f"📝 Selected {len(selected_projects)} project(s): {', '.join(selected_projects)}")
print()
# Get download directory
download_dir = get_download_directory()
create_dir_if_needed(download_dir)
print()
print("🏁 Starting baseline checkout process...")
print()
# Process each project
success_count = 0
total_count = len(selected_projects)
for project in selected_projects:
repo_url = ALL_REPOS[project]
if checkout_baseline(project, repo_url, download_dir):
success_count += 1
print()
# Summary
print("📊 Summary:")
print(f" ✅ Successful: {success_count}/{total_count}")
if success_count < total_count:
print(f" ❌ Failed: {total_count - success_count}/{total_count}")
print(f" 📁 Location: {download_dir}")
print()
if success_count == total_count:
print("🎉 All repositories successfully checked out to baseline!")
else:
print("⚠️ Some repositories failed to checkout. Check the output above for details.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n❌ Operation cancelled by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Unexpected error: {e}")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment