Created
August 7, 2025 16:00
-
-
Save robxx/d1c45df74b389a2892cfe66b43d0f85b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import os | |
| from collections import defaultdict | |
| from pathlib import Path | |
| import xxhash | |
| import sys | |
| # Chunk size for hashing large files to keep memory usage low | |
| CHUNK_SIZE = 65536 # 64KB | |
| def format_bytes(byte_count): | |
| """Formats a byte count into a human-readable string (KB, MB, GB).""" | |
| if byte_count is None: | |
| return "N/A" | |
| power = 1024 | |
| n = 0 | |
| power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'} | |
| while byte_count >= power and n < len(power_labels) - 1: | |
| byte_count /= power | |
| n += 1 | |
| return f"{byte_count:.2f} {power_labels[n]}B" | |
| def find_files_by_size(paths, min_bytes, max_bytes): | |
| """ | |
| Scans paths recursively and groups files by size. | |
| Args: | |
| paths (list): A list of directory paths to scan. | |
| min_bytes (int): The minimum file size in bytes. | |
| max_bytes (int or None): The maximum file size in bytes. | |
| Returns: | |
| dict: A dictionary where keys are file sizes and values are lists of | |
| file paths with that size. | |
| """ | |
| print("π Starting file scan...") | |
| sizes = defaultdict(list) | |
| for base_path_str in paths: | |
| base_path = Path(base_path_str).expanduser().resolve() | |
| if not base_path.is_dir(): | |
| print(f"β οΈ Warning: Path '{base_path}' is not a directory. Skipping.") | |
| continue | |
| for root, _, files in os.walk(base_path): | |
| for filename in files: | |
| try: | |
| file_path = Path(root) / filename | |
| # Ensure it's a file and not a symlink to avoid issues | |
| if not file_path.is_file(): | |
| continue | |
| file_size = file_path.stat().st_size | |
| # Apply size filters | |
| if file_size < min_bytes: | |
| continue | |
| if max_bytes is not None and file_size > max_bytes: | |
| continue | |
| sizes[file_size].append(file_path) | |
| except (IOError, OSError) as e: | |
| print(f"π« Error accessing {file_path}: {e}") | |
| continue | |
| # Filter out sizes with only one file, as they can't be duplicates | |
| return {size: files for size, files in sizes.items() if len(files) > 1} | |
| def find_duplicates_by_hash(potential_dupes): | |
| """ | |
| Hashes files with identical sizes to find true content duplicates. | |
| Args: | |
| potential_dupes (dict): A dictionary of files grouped by size. | |
| Returns: | |
| list: A list of lists, where each inner list contains paths to | |
| duplicate files. | |
| """ | |
| print(f"\nFound {len(potential_dupes)} size groups with potential duplicates. Hashing content...") | |
| all_duplicates = [] | |
| for size, files in potential_dupes.items(): | |
| hashes = defaultdict(list) | |
| for filepath in files: | |
| try: | |
| # Use xxh64 for speed. It's excellent for finding duplicates. | |
| hasher = xxhash.xxh64() | |
| with open(filepath, 'rb') as f: | |
| while chunk := f.read(CHUNK_SIZE): | |
| hasher.update(chunk) | |
| file_hash = hasher.hexdigest() | |
| hashes[file_hash].append(filepath) | |
| except IOError as e: | |
| print(f"π« Could not read file {filepath} for hashing: {e}") | |
| # Add any group of files with the same hash to our final list | |
| for file_list in hashes.values(): | |
| if len(file_list) > 1: | |
| all_duplicates.append(file_list) | |
| return all_duplicates | |
| def process_duplicates(duplicate_groups, perform_delete=False): | |
| """ | |
| Prints or deletes and links duplicate files. | |
| Args: | |
| duplicate_groups (list): A list of lists of duplicate file paths. | |
| perform_delete (bool): If True, deletes duplicates and creates hard links. | |
| Otherwise, just prints the found duplicates. | |
| """ | |
| if not duplicate_groups: | |
| if not perform_delete: | |
| print("β All files with matching sizes had unique content. No duplicates found!") | |
| return | |
| total_dupe_files = sum(len(group) for group in duplicate_groups) | |
| print(f"π Found {len(duplicate_groups)} sets of duplicate files ({total_dupe_files} total files).\n") | |
| total_saved_space = 0 | |
| for group in duplicate_groups: | |
| # Sort by modification time to determine the oldest file to keep | |
| try: | |
| sorted_group = sorted(group, key=lambda p: p.stat().st_mtime) | |
| except FileNotFoundError: | |
| print("β οΈ A file was moved or deleted during scan, skipping group.") | |
| continue | |
| file_to_keep = sorted_group[0] | |
| files_to_process = sorted_group[1:] | |
| size_in_bytes = file_to_keep.stat().st_size | |
| human_readable_size = format_bytes(size_in_bytes) | |
| print(f"--- Group (Size: {human_readable_size}) ---") | |
| print(f"β Keeping: {file_to_keep} (Oldest)") | |
| for file_to_link in files_to_process: | |
| if perform_delete: | |
| try: | |
| # This is the core logic: remove the file, then create a hard link | |
| # from the original path to the file we are keeping. | |
| os.remove(file_to_link) | |
| os.link(file_to_keep, file_to_link) | |
| print(f"π Linked: {file_to_link}") | |
| total_saved_space += size_in_bytes | |
| except Exception as e: | |
| print(f"π« ERROR linking {file_to_link}: {e}") | |
| else: | |
| print(f"π Duplicate: {file_to_link}") | |
| print() # Add a blank line for readability | |
| if perform_delete: | |
| print(f"--- Deletion Complete ---") | |
| print(f"πΎ Reclaimed approximately {format_bytes(total_saved_space)} of space.") | |
| def main(): | |
| """Main function to parse arguments and orchestrate the scan.""" | |
| parser = argparse.ArgumentParser( | |
| description="Find and optionally delete/link duplicate files.", | |
| formatter_class=argparse.RawTextHelpFormatter | |
| ) | |
| parser.add_argument( | |
| "paths", | |
| metavar="PATH", | |
| type=str, | |
| nargs='+', | |
| help="One or more absolute or relative paths to scan." | |
| ) | |
| parser.add_argument( | |
| "--min-size", | |
| type=int, | |
| default=0, | |
| metavar="MB", | |
| help="Minimum file size in megabytes (MB). Default: 0." | |
| ) | |
| parser.add_argument( | |
| "--max-size", | |
| type=int, | |
| default=None, | |
| metavar="MB", | |
| help="Maximum file size in megabytes (MB). Default: any." | |
| ) | |
| parser.add_argument( | |
| "--delete", | |
| action="store_true", | |
| help="!!! DANGEROUS !!!\n" | |
| "Delete duplicate files and replace them with hard links.\n" | |
| "This will keep the OLDEST file in each duplicate set." | |
| ) | |
| args = parser.parse_args() | |
| if args.delete: | |
| print("π¨π¨π¨ WARNING: You have chosen to delete files. π¨π¨π¨") | |
| print("This action is IRREVERSIBLE.") | |
| print("The program will keep the OLDEST file from each duplicate set and") | |
| print("replace all other identical files with a hard link to the kept file.") | |
| try: | |
| response = input("π To proceed, type 'yes' and press Enter: ") | |
| if response.lower() != 'yes': | |
| print("Aborting. No files have been changed.") | |
| sys.exit(0) | |
| print("\nConfirmation received. Proceeding with deletion...\n") | |
| except KeyboardInterrupt: | |
| print("\nAborting. No files have been changed.") | |
| sys.exit(0) | |
| min_bytes = args.min_size * 1024 * 1024 | |
| max_bytes = args.max_size * 1024 * 1024 if args.max_size is not None else None | |
| potential_duplicates = find_files_by_size(args.paths, min_bytes, max_bytes) | |
| if not potential_duplicates: | |
| print("\nβ No potential duplicates found based on file size. All clear!") | |
| return | |
| final_duplicates = find_duplicates_by_hash(potential_duplicates) | |
| print("\n--- Scan Complete ---") | |
| process_duplicates(final_duplicates, perform_delete=args.delete) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment