Skip to content

Instantly share code, notes, and snippets.

@robxx
Created August 7, 2025 16:00
Show Gist options
  • Select an option

  • Save robxx/d1c45df74b389a2892cfe66b43d0f85b to your computer and use it in GitHub Desktop.

Select an option

Save robxx/d1c45df74b389a2892cfe66b43d0f85b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import os
from collections import defaultdict
from pathlib import Path
import xxhash
import sys
# Chunk size for hashing large files to keep memory usage low
CHUNK_SIZE = 65536 # 64KB
def format_bytes(byte_count):
"""Formats a byte count into a human-readable string (KB, MB, GB)."""
if byte_count is None:
return "N/A"
power = 1024
n = 0
power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'}
while byte_count >= power and n < len(power_labels) - 1:
byte_count /= power
n += 1
return f"{byte_count:.2f} {power_labels[n]}B"
def find_files_by_size(paths, min_bytes, max_bytes):
"""
Scans paths recursively and groups files by size.
Args:
paths (list): A list of directory paths to scan.
min_bytes (int): The minimum file size in bytes.
max_bytes (int or None): The maximum file size in bytes.
Returns:
dict: A dictionary where keys are file sizes and values are lists of
file paths with that size.
"""
print("πŸ”Ž Starting file scan...")
sizes = defaultdict(list)
for base_path_str in paths:
base_path = Path(base_path_str).expanduser().resolve()
if not base_path.is_dir():
print(f"⚠️ Warning: Path '{base_path}' is not a directory. Skipping.")
continue
for root, _, files in os.walk(base_path):
for filename in files:
try:
file_path = Path(root) / filename
# Ensure it's a file and not a symlink to avoid issues
if not file_path.is_file():
continue
file_size = file_path.stat().st_size
# Apply size filters
if file_size < min_bytes:
continue
if max_bytes is not None and file_size > max_bytes:
continue
sizes[file_size].append(file_path)
except (IOError, OSError) as e:
print(f"🚫 Error accessing {file_path}: {e}")
continue
# Filter out sizes with only one file, as they can't be duplicates
return {size: files for size, files in sizes.items() if len(files) > 1}
def find_duplicates_by_hash(potential_dupes):
"""
Hashes files with identical sizes to find true content duplicates.
Args:
potential_dupes (dict): A dictionary of files grouped by size.
Returns:
list: A list of lists, where each inner list contains paths to
duplicate files.
"""
print(f"\nFound {len(potential_dupes)} size groups with potential duplicates. Hashing content...")
all_duplicates = []
for size, files in potential_dupes.items():
hashes = defaultdict(list)
for filepath in files:
try:
# Use xxh64 for speed. It's excellent for finding duplicates.
hasher = xxhash.xxh64()
with open(filepath, 'rb') as f:
while chunk := f.read(CHUNK_SIZE):
hasher.update(chunk)
file_hash = hasher.hexdigest()
hashes[file_hash].append(filepath)
except IOError as e:
print(f"🚫 Could not read file {filepath} for hashing: {e}")
# Add any group of files with the same hash to our final list
for file_list in hashes.values():
if len(file_list) > 1:
all_duplicates.append(file_list)
return all_duplicates
def process_duplicates(duplicate_groups, perform_delete=False):
"""
Prints or deletes and links duplicate files.
Args:
duplicate_groups (list): A list of lists of duplicate file paths.
perform_delete (bool): If True, deletes duplicates and creates hard links.
Otherwise, just prints the found duplicates.
"""
if not duplicate_groups:
if not perform_delete:
print("βœ… All files with matching sizes had unique content. No duplicates found!")
return
total_dupe_files = sum(len(group) for group in duplicate_groups)
print(f"πŸŽ‰ Found {len(duplicate_groups)} sets of duplicate files ({total_dupe_files} total files).\n")
total_saved_space = 0
for group in duplicate_groups:
# Sort by modification time to determine the oldest file to keep
try:
sorted_group = sorted(group, key=lambda p: p.stat().st_mtime)
except FileNotFoundError:
print("⚠️ A file was moved or deleted during scan, skipping group.")
continue
file_to_keep = sorted_group[0]
files_to_process = sorted_group[1:]
size_in_bytes = file_to_keep.stat().st_size
human_readable_size = format_bytes(size_in_bytes)
print(f"--- Group (Size: {human_readable_size}) ---")
print(f"βœ… Keeping: {file_to_keep} (Oldest)")
for file_to_link in files_to_process:
if perform_delete:
try:
# This is the core logic: remove the file, then create a hard link
# from the original path to the file we are keeping.
os.remove(file_to_link)
os.link(file_to_keep, file_to_link)
print(f"πŸ”— Linked: {file_to_link}")
total_saved_space += size_in_bytes
except Exception as e:
print(f"🚫 ERROR linking {file_to_link}: {e}")
else:
print(f"πŸ“„ Duplicate: {file_to_link}")
print() # Add a blank line for readability
if perform_delete:
print(f"--- Deletion Complete ---")
print(f"πŸ’Ύ Reclaimed approximately {format_bytes(total_saved_space)} of space.")
def main():
"""Main function to parse arguments and orchestrate the scan."""
parser = argparse.ArgumentParser(
description="Find and optionally delete/link duplicate files.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"paths",
metavar="PATH",
type=str,
nargs='+',
help="One or more absolute or relative paths to scan."
)
parser.add_argument(
"--min-size",
type=int,
default=0,
metavar="MB",
help="Minimum file size in megabytes (MB). Default: 0."
)
parser.add_argument(
"--max-size",
type=int,
default=None,
metavar="MB",
help="Maximum file size in megabytes (MB). Default: any."
)
parser.add_argument(
"--delete",
action="store_true",
help="!!! DANGEROUS !!!\n"
"Delete duplicate files and replace them with hard links.\n"
"This will keep the OLDEST file in each duplicate set."
)
args = parser.parse_args()
if args.delete:
print("🚨🚨🚨 WARNING: You have chosen to delete files. 🚨🚨🚨")
print("This action is IRREVERSIBLE.")
print("The program will keep the OLDEST file from each duplicate set and")
print("replace all other identical files with a hard link to the kept file.")
try:
response = input("πŸ‘‰ To proceed, type 'yes' and press Enter: ")
if response.lower() != 'yes':
print("Aborting. No files have been changed.")
sys.exit(0)
print("\nConfirmation received. Proceeding with deletion...\n")
except KeyboardInterrupt:
print("\nAborting. No files have been changed.")
sys.exit(0)
min_bytes = args.min_size * 1024 * 1024
max_bytes = args.max_size * 1024 * 1024 if args.max_size is not None else None
potential_duplicates = find_files_by_size(args.paths, min_bytes, max_bytes)
if not potential_duplicates:
print("\nβœ… No potential duplicates found based on file size. All clear!")
return
final_duplicates = find_duplicates_by_hash(potential_duplicates)
print("\n--- Scan Complete ---")
process_duplicates(final_duplicates, perform_delete=args.delete)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment