Last active
August 27, 2025 20:43
-
-
Save Codegass/f42622c2e5cad6418b9356c469535ceb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| checkout_fixed_baseline.py | |
| Interactive downloader for the 2025-05-30 fixed baseline | |
| """ | |
| import os | |
| import sys | |
| import subprocess | |
| import shutil | |
| from pathlib import Path | |
| # Constants | |
| TARGET_DATE_UTC = '2025-05-30 23:59:59' | |
| ALL_REPOS = { | |
| 'commons-configuration': 'https://github.com/apache/commons-configuration.git', | |
| 'commons-lang': 'https://github.com/apache/commons-lang.git', | |
| 'samza': 'https://github.com/apache/samza.git', | |
| 'hudi': 'https://github.com/apache/hudi.git', | |
| 'commons-text': 'https://github.com/apache/commons-text.git', | |
| 'tika': 'https://github.com/apache/tika.git', | |
| 'commons-compress': 'https://github.com/apache/commons-compress.git', | |
| 'commons-vfs': 'https://github.com/apache/commons-vfs.git', | |
| 'struts': 'https://github.com/apache/struts.git', | |
| 'shiro': 'https://github.com/apache/shiro.git', | |
| 'commons-beanutils': 'https://github.com/apache/commons-beanutils.git', | |
| 'linkis': 'https://github.com/apache/linkis.git', | |
| 'storm': 'https://github.com/apache/storm.git', | |
| 'commons-jexl': 'https://github.com/apache/commons-jexl.git', | |
| 'skywalking': 'https://github.com/apache/skywalking.git', | |
| 'commons-dbcp': 'https://github.com/apache/commons-dbcp.git', | |
| 'commons-csv': 'https://github.com/apache/commons-csv.git', | |
| 'commons-validator': 'https://github.com/apache/commons-validator.git', | |
| 'commons-cli': 'https://github.com/apache/commons-cli.git', | |
| } | |
| def run_command(cmd, cwd=None, capture_output=True, check=True): | |
| """Run a shell command and return the result.""" | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| shell=True, | |
| cwd=cwd, | |
| capture_output=capture_output, | |
| text=True, | |
| check=check | |
| ) | |
| return result | |
| except subprocess.CalledProcessError as e: | |
| if capture_output: | |
| return None | |
| raise e | |
| def check_git_available(): | |
| """Check if git is available in the system.""" | |
| if not shutil.which('git'): | |
| print("❌ Error: git is not installed or not available in PATH") | |
| sys.exit(1) | |
| def ask_yes_no(prompt, default='n'): | |
| """Ask a yes/no question with a default answer.""" | |
| while True: | |
| if default.lower() == 'y': | |
| response = input(f"{prompt} [Y/n]: ").strip().lower() | |
| if not response: | |
| response = 'y' | |
| else: | |
| response = input(f"{prompt} [y/N]: ").strip().lower() | |
| if not response: | |
| response = 'n' | |
| if response in ['y', 'yes']: | |
| return True | |
| elif response in ['n', 'no']: | |
| return False | |
| else: | |
| print("Please answer yes or no.") | |
| def create_dir_if_needed(dir_path): | |
| """Create directory if it doesn't exist, with user confirmation.""" | |
| path_obj = Path(dir_path) | |
| if path_obj.exists(): | |
| if not path_obj.is_dir(): | |
| print(f"❌ Error: '{dir_path}' exists but is not a directory") | |
| sys.exit(1) | |
| return True | |
| if ask_yes_no(f"Directory '{dir_path}' does not exist. Create it?"): | |
| try: | |
| path_obj.mkdir(parents=True, exist_ok=True) | |
| print(f"✓ Created directory: {dir_path}") | |
| return True | |
| except PermissionError: | |
| print(f"❌ Error: Permission denied creating directory '{dir_path}'") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"❌ Error creating directory '{dir_path}': {e}") | |
| sys.exit(1) | |
| else: | |
| print("Operation cancelled.") | |
| sys.exit(1) | |
| def get_default_branch(repo_path): | |
| """Get the default branch of a git repository.""" | |
| print(" 🔍 Detecting default branch...") | |
| result = run_command("git remote show origin", cwd=repo_path) | |
| if not result: | |
| return None | |
| for line in result.stdout.split('\n'): | |
| if 'HEAD branch:' in line: | |
| branch = line.split(':')[-1].strip() | |
| print(f" ✓ Default branch: {branch}") | |
| return branch | |
| return None | |
| def get_commit_before_date(repo_path, branch, target_date): | |
| """Get the latest commit before the target date.""" | |
| print(f" 🔍 Finding commit before {target_date}...") | |
| cmd = f'git rev-list -n 1 --before="{target_date}" "origin/{branch}"' | |
| result = run_command(cmd, cwd=repo_path) | |
| if not result or not result.stdout.strip(): | |
| return None | |
| commit_hash = result.stdout.strip() | |
| # Get commit info | |
| date_result = run_command(f'git show -s --format=%ci "{commit_hash}"', cwd=repo_path) | |
| subject_result = run_command(f'git show -s --format=%s "{commit_hash}"', cwd=repo_path) | |
| if date_result and subject_result: | |
| commit_date = date_result.stdout.strip() | |
| commit_subject = subject_result.stdout.strip() | |
| print(f" 📅 Found commit: {commit_hash}") | |
| print(f" Date: {commit_date}") | |
| print(f" Subject: {commit_subject}") | |
| return commit_hash | |
| def has_uncommitted_changes(repo_path): | |
| """Check if repository has uncommitted changes.""" | |
| result = run_command("git diff-index --quiet HEAD", cwd=repo_path, check=False) | |
| return result.returncode != 0 | |
| def checkout_baseline(repo_name, repo_url, download_dir): | |
| """Download and checkout a repository to the baseline commit.""" | |
| print(f"▶ Processing {repo_name}...") | |
| repo_path = os.path.join(download_dir, repo_name) | |
| # Clone or update repository | |
| if not os.path.exists(repo_path): | |
| print(" 📥 Cloning repository...") | |
| result = run_command(f'git clone --quiet "{repo_url}" "{repo_name}"', cwd=download_dir) | |
| if not result: | |
| print(" ❌ Failed to clone repository") | |
| return False | |
| print(" ✓ Cloned successfully") | |
| else: | |
| print(" 📁 Repository already exists") | |
| if ask_yes_no(" Update existing repository?"): | |
| print(" 🔄 Fetching latest changes...") | |
| result = run_command("git fetch --quiet origin", cwd=repo_path) | |
| if result: | |
| print(" ✓ Fetched successfully") | |
| else: | |
| print(" ⚠️ Failed to fetch updates, continuing with existing data") | |
| # Check for uncommitted changes | |
| if has_uncommitted_changes(repo_path): | |
| print(" ⚠️ Repository has uncommitted changes") | |
| if ask_yes_no(" Stash changes and continue?"): | |
| stash_result = run_command('git stash push -m "Automated stash before baseline checkout"', cwd=repo_path) | |
| if stash_result: | |
| print(" ✓ Changes stashed") | |
| else: | |
| print(" ❌ Failed to stash changes") | |
| return False | |
| else: | |
| print(" ❌ Skipping repository due to uncommitted changes") | |
| return False | |
| # Get default branch | |
| default_branch = get_default_branch(repo_path) | |
| if not default_branch: | |
| print(" ❌ Cannot determine default branch") | |
| return False | |
| # Fetch the default branch | |
| fetch_result = run_command(f"git fetch --quiet origin {default_branch}", cwd=repo_path) | |
| if not fetch_result: | |
| print(f" ⚠️ Could not fetch branch {default_branch}") | |
| # Find target commit | |
| commit_hash = get_commit_before_date(repo_path, default_branch, TARGET_DATE_UTC) | |
| if not commit_hash: | |
| print(f" ⚠️ No commit found before {TARGET_DATE_UTC}") | |
| return False | |
| # Checkout the commit | |
| checkout_result = run_command(f'git checkout --quiet "{commit_hash}"', cwd=repo_path) | |
| if checkout_result: | |
| print(" ✅ Successfully checked out baseline commit") | |
| return True | |
| else: | |
| print(" ❌ Failed to checkout commit") | |
| return False | |
| def select_projects(): | |
| """Interactive project selection.""" | |
| sorted_projects = sorted(ALL_REPOS.keys()) | |
| while True: | |
| print("Available projects:") | |
| for i, project in enumerate(sorted_projects, 1): | |
| print(f" {i:2d}. {project}") | |
| print() | |
| selection = input('Enter selection ("all", project names, or numbers - comma-separated): ').strip() | |
| if not selection: | |
| print("⚠️ Please enter a selection") | |
| continue | |
| if selection.lower() == 'all': | |
| selected_projects = list(ALL_REPOS.keys()) | |
| else: | |
| # Parse comma-separated list | |
| selected_projects = [] | |
| items = [item.strip() for item in selection.split(',')] | |
| for item in items: | |
| # Try to parse as number first | |
| try: | |
| num = int(item) | |
| if 1 <= num <= len(sorted_projects): | |
| project_name = sorted_projects[num - 1] | |
| if project_name not in selected_projects: | |
| selected_projects.append(project_name) | |
| else: | |
| print(f"⚠️ Number {num} is out of range (1-{len(sorted_projects)})") | |
| except ValueError: | |
| # Not a number, try as project name | |
| if item in ALL_REPOS: | |
| if item not in selected_projects: | |
| selected_projects.append(item) | |
| else: | |
| print(f"⚠️ Unknown project '{item}' (ignored)") | |
| if not selected_projects: | |
| print("❌ No valid projects selected. Please try again.") | |
| continue | |
| # Show selection and confirm | |
| print(f"\n📝 Selected {len(selected_projects)} project(s):") | |
| for i, project in enumerate(selected_projects, 1): | |
| print(f" {i}. {project}") | |
| print() | |
| if ask_yes_no("Proceed with these projects?", default='y'): | |
| return selected_projects | |
| else: | |
| print("Please make a new selection.\n") | |
| def get_download_directory(): | |
| """Get and validate download directory.""" | |
| while True: | |
| download_dir = input("Enter download directory path (absolute or relative): ").strip() | |
| if not download_dir: | |
| print("⚠️ Please enter a directory path") | |
| continue | |
| try: | |
| # Convert to Path object for easier handling | |
| path_obj = Path(download_dir) | |
| # Expand user home directory if present (~) | |
| path_obj = path_obj.expanduser() | |
| # Convert to absolute path | |
| abs_path_obj = path_obj.resolve() | |
| abs_download_dir = str(abs_path_obj) | |
| # Show the absolute path to user for confirmation | |
| print(f"📍 Resolved absolute path: {abs_download_dir}") | |
| if not ask_yes_no("Is this the correct directory?", default='y'): | |
| continue | |
| # Check if parent directory exists and is writable | |
| parent_dir = abs_path_obj.parent | |
| if not parent_dir.exists(): | |
| print(f"❌ Parent directory '{parent_dir}' does not exist") | |
| continue | |
| if not os.access(parent_dir, os.W_OK): | |
| print(f"❌ No write permission for parent directory '{parent_dir}'") | |
| continue | |
| return abs_download_dir | |
| except Exception as e: | |
| print(f"❌ Invalid path '{download_dir}': {e}") | |
| continue | |
| def main(): | |
| """Main function.""" | |
| print("🚀 Apache Projects Baseline Checkout Tool") | |
| print(f"Target baseline date: {TARGET_DATE_UTC}") | |
| print() | |
| # Check if git is available | |
| check_git_available() | |
| # Select projects | |
| selected_projects = select_projects() | |
| print(f"📝 Selected {len(selected_projects)} project(s): {', '.join(selected_projects)}") | |
| print() | |
| # Get download directory | |
| download_dir = get_download_directory() | |
| create_dir_if_needed(download_dir) | |
| print() | |
| print("🏁 Starting baseline checkout process...") | |
| print() | |
| # Process each project | |
| success_count = 0 | |
| total_count = len(selected_projects) | |
| for project in selected_projects: | |
| repo_url = ALL_REPOS[project] | |
| if checkout_baseline(project, repo_url, download_dir): | |
| success_count += 1 | |
| print() | |
| # Summary | |
| print("📊 Summary:") | |
| print(f" ✅ Successful: {success_count}/{total_count}") | |
| if success_count < total_count: | |
| print(f" ❌ Failed: {total_count - success_count}/{total_count}") | |
| print(f" 📁 Location: {download_dir}") | |
| print() | |
| if success_count == total_count: | |
| print("🎉 All repositories successfully checked out to baseline!") | |
| else: | |
| print("⚠️ Some repositories failed to checkout. Check the output above for details.") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n\n❌ Operation cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\n❌ Unexpected error: {e}") | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment