Last active
January 2, 2026 03:12
-
-
Save RezaAmbler/ef7ea6567e5e0ded37be0ce7699a70c1 to your computer and use it in GitHub Desktop.
Duplicate file finder designed for QNAP NAS (Python 2.7, no dependencies). Uses a two-phase approach: groups files by size first, then computes MD5 hashes only for potential matches. Automatically skips QNAP system directories (@recycle, @eadir, etc.). Outputs human-readable report with wasted space calculation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| find_duplicates.py - Duplicate File Finder for QNAP NAS | |
| Finds duplicate files by comparing file sizes first, then MD5 hashes. | |
| Designed for Python 2.7 on QNAP NAS systems (no external dependencies). | |
| USAGE EXAMPLES: | |
| # Basic scan of a directory | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia | |
| # Scan multiple directories | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia /share/CACHEDEV1_DATA/Download | |
| # Save results to a file | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --output duplicates_report.txt | |
| # Change minimum file size (default 1MB) | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --min-size 10485760 | |
| # Include QNAP metadata directories (normally skipped) | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --include-metadata | |
| # Follow symbolic links | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --follow-symlinks | |
| # Show progress during scan | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --verbose | |
| # Disable progress indicator (useful for logging to file) | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --no-progress | |
| AUTHOR: Generated for QNAP NAS duplicate file management | |
| PYTHON: 2.7 (standard library only) | |
| """ | |
| from __future__ import print_function | |
| import os | |
| import sys | |
| import argparse | |
| import hashlib | |
| import time | |
| from collections import defaultdict | |
| # ============================================================================ | |
| # Constants | |
| # ============================================================================ | |
| DEFAULT_MIN_SIZE = 1024 * 1024 # 1 MB in bytes | |
| # Directories to skip by default on QNAP | |
| SKIP_DIRS = frozenset([ | |
| 'lost+found', | |
| '.@__thumb', | |
| ]) | |
| # ============================================================================ | |
| # Utility Functions | |
| # ============================================================================ | |
| def human_readable_size(size_bytes): | |
| """Convert bytes to human readable string (B/KB/MB/GB/TB).""" | |
| if size_bytes < 0: | |
| return "0 B" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: | |
| if abs(size_bytes) < 1024.0: | |
| if unit == 'B': | |
| return "{0} {1}".format(int(size_bytes), unit) | |
| return "{0:.2f} {1}".format(size_bytes, unit) | |
| size_bytes /= 1024.0 | |
| return "{0:.2f} PB".format(size_bytes) | |
| def should_skip_dir(dirname, include_metadata=False): | |
| """Check if a directory should be skipped.""" | |
| # Skip directories starting with '@' (QNAP system/metadata) | |
| if not include_metadata and dirname.startswith('@'): | |
| return True | |
| # Skip known system directories | |
| if dirname in SKIP_DIRS: | |
| return True | |
| return False | |
| def compute_md5(filepath, chunk_size=1048576, progress_callback=None): | |
| """ | |
| Compute MD5 hash of a file. | |
| Uses chunked reading for memory efficiency on large files. | |
| Returns None if file cannot be read. | |
| Args: | |
| filepath: Path to file | |
| chunk_size: Read chunk size (default 1MB for faster I/O on large files) | |
| progress_callback: Optional function(bytes_read, total_bytes) called periodically | |
| """ | |
| md5 = hashlib.md5() | |
| try: | |
| file_size = os.path.getsize(filepath) | |
| bytes_read = 0 | |
| with open(filepath, 'rb') as f: | |
| while True: | |
| chunk = f.read(chunk_size) | |
| if not chunk: | |
| break | |
| md5.update(chunk) | |
| bytes_read += len(chunk) | |
| # Call progress callback if provided | |
| if progress_callback: | |
| progress_callback(bytes_read, file_size) | |
| return md5.hexdigest() | |
| except (IOError, OSError) as e: | |
| return None | |
| def compute_partial_md5(filepath, sample_size=65536): | |
| """ | |
| Compute MD5 of first N bytes of a file for quick pre-filtering. | |
| This helps eliminate non-duplicates faster before full hash. | |
| Returns None if file cannot be read. | |
| """ | |
| md5 = hashlib.md5() | |
| try: | |
| with open(filepath, 'rb') as f: | |
| chunk = f.read(sample_size) | |
| md5.update(chunk) | |
| return md5.hexdigest() | |
| except (IOError, OSError): | |
| return None | |
| # ============================================================================ | |
| # Progress Display | |
| # ============================================================================ | |
| class ProgressDisplay(object): | |
| """Handles in-place progress updates on the terminal.""" | |
| def __init__(self, enabled=True): | |
| self.enabled = enabled | |
| self.last_update = 0 | |
| self.update_interval = 0.1 # Update every 100ms max | |
| self.last_line_length = 0 | |
| def update(self, message, force=False): | |
| """Update the progress line in-place.""" | |
| if not self.enabled: | |
| return | |
| # Throttle updates to avoid excessive I/O | |
| now = time.time() | |
| if not force and (now - self.last_update) < self.update_interval: | |
| return | |
| self.last_update = now | |
| # Truncate message if too long for typical terminal | |
| max_width = 100 | |
| if len(message) > max_width: | |
| message = message[:max_width - 3] + '...' | |
| # Pad with spaces to overwrite previous line content | |
| padded = message.ljust(self.last_line_length) | |
| self.last_line_length = len(message) | |
| sys.stdout.write('\r' + padded) | |
| sys.stdout.flush() | |
| def clear(self): | |
| """Clear the progress line.""" | |
| if not self.enabled: | |
| return | |
| sys.stdout.write('\r' + ' ' * self.last_line_length + '\r') | |
| sys.stdout.flush() | |
| self.last_line_length = 0 | |
| # ============================================================================ | |
| # Scanner Class | |
| # ============================================================================ | |
| class DuplicateFinder(object): | |
| """Finds duplicate files using size-first, then hash approach.""" | |
| def __init__(self, min_size=DEFAULT_MIN_SIZE, include_metadata=False, | |
| follow_symlinks=False, verbose=False, show_progress=True): | |
| self.min_size = min_size | |
| self.include_metadata = include_metadata | |
| self.follow_symlinks = follow_symlinks | |
| self.verbose = verbose | |
| self.progress = ProgressDisplay(enabled=show_progress) | |
| # Statistics | |
| self.total_files_scanned = 0 | |
| self.total_bytes_scanned = 0 | |
| self.files_skipped_size = 0 | |
| self.files_skipped_error = 0 | |
| self.dirs_scanned = 0 | |
| self.dirs_skipped = 0 | |
| # Phase 1: Group files by size | |
| # {size_in_bytes: [filepath1, filepath2, ...]} | |
| self.size_groups = defaultdict(list) | |
| def log(self, message): | |
| """Print message if verbose mode is enabled.""" | |
| if self.verbose: | |
| self.progress.clear() | |
| print(message, file=sys.stderr) | |
| def _update_scan_progress(self, current_dir): | |
| """Update the scanning progress display.""" | |
| # Shorten directory path for display | |
| display_dir = current_dir | |
| if len(display_dir) > 40: | |
| display_dir = '...' + display_dir[-37:] | |
| msg = "[Scan] Files: {0} | Size: {1} | Dirs: {2} | {3}".format( | |
| self.total_files_scanned, | |
| human_readable_size(self.total_bytes_scanned), | |
| self.dirs_scanned, | |
| display_dir | |
| ) | |
| self.progress.update(msg) | |
| def scan_directories(self, paths): | |
| """ | |
| Scan one or more directory paths for files. | |
| Phase 1: Groups all files by size. | |
| """ | |
| for base_path in paths: | |
| if not os.path.exists(base_path): | |
| self.progress.clear() | |
| print("Warning: Path does not exist: {0}".format(base_path), | |
| file=sys.stderr) | |
| continue | |
| if not os.path.isdir(base_path): | |
| self.progress.clear() | |
| print("Warning: Not a directory: {0}".format(base_path), | |
| file=sys.stderr) | |
| continue | |
| self.log("Scanning: {0}".format(base_path)) | |
| self._scan_directory(base_path) | |
| # Clear progress line when done | |
| self.progress.clear() | |
| def _scan_directory(self, base_path): | |
| """Recursively scan a single directory.""" | |
| for root, dirs, files in os.walk(base_path, | |
| followlinks=self.follow_symlinks): | |
| self.dirs_scanned += 1 | |
| self._update_scan_progress(root) | |
| # Filter out directories we should skip (modifies in-place) | |
| original_dir_count = len(dirs) | |
| dirs[:] = [d for d in dirs | |
| if not should_skip_dir(d, self.include_metadata)] | |
| self.dirs_skipped += original_dir_count - len(dirs) | |
| # Process files | |
| for filename in files: | |
| filepath = os.path.join(root, filename) | |
| # Skip symlinks unless explicitly following them | |
| if not self.follow_symlinks and os.path.islink(filepath): | |
| continue | |
| try: | |
| # Get file size | |
| stat_info = os.stat(filepath) | |
| file_size = stat_info.st_size | |
| # Skip files below minimum size | |
| if file_size < self.min_size: | |
| self.files_skipped_size += 1 | |
| continue | |
| # Track statistics | |
| self.total_files_scanned += 1 | |
| self.total_bytes_scanned += file_size | |
| # Group by size | |
| self.size_groups[file_size].append(filepath) | |
| # Update progress periodically | |
| if self.total_files_scanned % 50 == 0: | |
| self._update_scan_progress(root) | |
| except (OSError, IOError) as e: | |
| self.files_skipped_error += 1 | |
| def find_duplicates(self): | |
| """ | |
| Phase 2: Find duplicates by computing hashes for size-matched files. | |
| Returns a list of duplicate groups: | |
| [ | |
| { | |
| 'hash': 'abc123...', | |
| 'size': 12345678, | |
| 'files': ['/path/to/file1', '/path/to/file2', ...] | |
| }, | |
| ... | |
| ] | |
| """ | |
| duplicates = [] | |
| # Only process size groups with more than one file | |
| size_groups_to_check = [ | |
| (size, files) for size, files in self.size_groups.items() | |
| if len(files) > 1 | |
| ] | |
| self.log("Found {0} size groups with potential duplicates".format( | |
| len(size_groups_to_check))) | |
| files_to_hash = sum(len(files) for _, files in size_groups_to_check) | |
| total_bytes_to_hash = sum(size * len(files) | |
| for size, files in size_groups_to_check) | |
| # Print summary before hashing | |
| self.progress.clear() | |
| print("") | |
| print("-" * 50) | |
| print("SCAN COMPLETE - CHECKSUM PHASE STARTING") | |
| print("-" * 50) | |
| print(" Unique file sizes found: {0}".format(len(self.size_groups))) | |
| print(" Size groups with 2+ files: {0}".format(len(size_groups_to_check))) | |
| print(" Files to checksum: {0}".format(files_to_hash)) | |
| print(" Total data to read: {0}".format( | |
| human_readable_size(total_bytes_to_hash))) | |
| print("-" * 50) | |
| print("") | |
| if files_to_hash == 0: | |
| print("No potential duplicates found - nothing to hash.") | |
| return duplicates | |
| self.log("Need to hash {0} files".format(files_to_hash)) | |
| hashed_count = 0 | |
| bytes_hashed = 0 | |
| for size, filepaths in size_groups_to_check: | |
| # For groups with many files, use partial hash first | |
| if len(filepaths) > 2: | |
| # Group by partial hash first | |
| partial_groups = defaultdict(list) | |
| for idx, fp in enumerate(filepaths): | |
| # Show progress during partial hashing | |
| display_fp = os.path.basename(fp) | |
| if len(display_fp) > 30: | |
| display_fp = display_fp[:27] + '...' | |
| msg = "[Pre-filter] Group size {0} | {1}/{2} | {3}".format( | |
| human_readable_size(size), | |
| idx + 1, | |
| len(filepaths), | |
| display_fp | |
| ) | |
| self.progress.update(msg) | |
| partial = compute_partial_md5(fp) | |
| if partial: | |
| partial_groups[partial].append(fp) | |
| # Only full-hash files with matching partial hashes | |
| files_to_full_hash = [] | |
| for partial, fps in partial_groups.items(): | |
| if len(fps) > 1: | |
| files_to_full_hash.extend(fps) | |
| else: | |
| files_to_full_hash = filepaths | |
| # Group by full MD5 hash | |
| hash_groups = defaultdict(list) | |
| for fp in files_to_full_hash: | |
| hashed_count += 1 | |
| # Shorten filepath for display | |
| display_fp = os.path.basename(fp) | |
| if len(display_fp) > 25: | |
| display_fp = display_fp[:22] + '...' | |
| # Create progress callback for this file | |
| def make_hash_progress(file_num, total_files, filename, | |
| prev_bytes, total_bytes_all): | |
| def callback(bytes_read, file_total): | |
| current_total = prev_bytes + bytes_read | |
| pct = (current_total * 100) // total_bytes_all if total_bytes_all > 0 else 0 | |
| file_pct = (bytes_read * 100) // file_total if file_total > 0 else 100 | |
| msg = "[Hash] {0}% | File {1}/{2}: {3}% | {4}".format( | |
| pct, | |
| file_num, | |
| total_files, | |
| file_pct, | |
| filename | |
| ) | |
| self.progress.update(msg) | |
| return callback | |
| progress_cb = make_hash_progress( | |
| hashed_count, files_to_hash, display_fp, | |
| bytes_hashed, total_bytes_to_hash | |
| ) | |
| # Show initial progress for this file | |
| msg = "[Hash] {0}% | File {1}/{2}: 0% | {3}".format( | |
| (bytes_hashed * 100) // total_bytes_to_hash if total_bytes_to_hash > 0 else 0, | |
| hashed_count, | |
| files_to_hash, | |
| display_fp | |
| ) | |
| self.progress.update(msg, force=True) | |
| file_hash = compute_md5(fp, progress_callback=progress_cb) | |
| bytes_hashed += size | |
| if file_hash: | |
| hash_groups[file_hash].append(fp) | |
| # Collect groups with actual duplicates | |
| for file_hash, fps in hash_groups.items(): | |
| if len(fps) > 1: | |
| duplicates.append({ | |
| 'hash': file_hash, | |
| 'size': size, | |
| 'files': sorted(fps) | |
| }) | |
| # Clear progress line when done | |
| self.progress.clear() | |
| # Sort by size (largest first) for more impactful results at top | |
| duplicates.sort(key=lambda x: x['size'], reverse=True) | |
| return duplicates | |
| # ============================================================================ | |
| # Output Functions | |
| # ============================================================================ | |
| def format_report(finder, duplicates): | |
| """Format the duplicate report as a string.""" | |
| lines = [] | |
| # Header | |
| lines.append("=" * 70) | |
| lines.append("DUPLICATE FILE REPORT") | |
| lines.append("=" * 70) | |
| lines.append("") | |
| # Statistics | |
| lines.append("SCAN STATISTICS:") | |
| lines.append("-" * 40) | |
| lines.append(" Directories scanned: {0}".format( | |
| finder.dirs_scanned)) | |
| lines.append(" Total files scanned: {0}".format( | |
| finder.total_files_scanned)) | |
| lines.append(" Total bytes scanned: {0} ({1})".format( | |
| finder.total_bytes_scanned, | |
| human_readable_size(finder.total_bytes_scanned))) | |
| lines.append(" Files skipped (small): {0}".format( | |
| finder.files_skipped_size)) | |
| lines.append(" Files skipped (error): {0}".format( | |
| finder.files_skipped_error)) | |
| lines.append(" Directories skipped: {0}".format( | |
| finder.dirs_skipped)) | |
| lines.append("") | |
| # Duplicate summary | |
| if not duplicates: | |
| lines.append("No duplicate files found!") | |
| lines.append("") | |
| return "\n".join(lines) | |
| total_dup_groups = len(duplicates) | |
| total_dup_files = sum(len(d['files']) for d in duplicates) | |
| total_wasted = sum(d['size'] * (len(d['files']) - 1) for d in duplicates) | |
| lines.append("DUPLICATE SUMMARY:") | |
| lines.append("-" * 40) | |
| lines.append(" Duplicate groups found: {0}".format(total_dup_groups)) | |
| lines.append(" Total duplicate files: {0}".format(total_dup_files)) | |
| lines.append(" Wasted space: {0} ({1})".format( | |
| total_wasted, human_readable_size(total_wasted))) | |
| lines.append("") | |
| # Detailed duplicate listing | |
| lines.append("DUPLICATE FILES (largest first):") | |
| lines.append("=" * 70) | |
| for i, dup in enumerate(duplicates, 1): | |
| lines.append("") | |
| lines.append("Group {0}: {1} ({2} files)".format( | |
| i, | |
| human_readable_size(dup['size']), | |
| len(dup['files']))) | |
| lines.append(" MD5: {0}".format(dup['hash'])) | |
| lines.append(" Files:") | |
| for fp in dup['files']: | |
| lines.append(" - {0}".format(fp)) | |
| lines.append("") | |
| lines.append("=" * 70) | |
| lines.append("END OF REPORT") | |
| lines.append("=" * 70) | |
| return "\n".join(lines) | |
| def print_report(report_text, output_file=None): | |
| """Print report to console and optionally save to file.""" | |
| print(report_text) | |
| if output_file: | |
| try: | |
| with open(output_file, 'w') as f: | |
| f.write(report_text) | |
| print("") | |
| print("Report saved to: {0}".format(output_file)) | |
| except (IOError, OSError) as e: | |
| print("Error saving report: {0}".format(e), file=sys.stderr) | |
| # ============================================================================ | |
| # Main Entry Point | |
| # ============================================================================ | |
| def parse_args(): | |
| """Parse command line arguments.""" | |
| parser = argparse.ArgumentParser( | |
| description='Find duplicate files by size and MD5 hash.', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia | |
| python find_duplicates.py /path1 /path2 --output report.txt | |
| python find_duplicates.py /path --min-size 10485760 --verbose | |
| """ | |
| ) | |
| parser.add_argument( | |
| 'paths', | |
| nargs='+', | |
| help='One or more directory paths to scan' | |
| ) | |
| parser.add_argument( | |
| '-o', '--output', | |
| metavar='FILE', | |
| help='Save report to a text file' | |
| ) | |
| parser.add_argument( | |
| '-m', '--min-size', | |
| type=int, | |
| default=DEFAULT_MIN_SIZE, | |
| metavar='BYTES', | |
| help='Minimum file size in bytes (default: 1MB = 1048576)' | |
| ) | |
| parser.add_argument( | |
| '--include-metadata', | |
| action='store_true', | |
| help='Include QNAP metadata directories (starting with @)' | |
| ) | |
| parser.add_argument( | |
| '--follow-symlinks', | |
| action='store_true', | |
| help='Follow symbolic links (default: skip symlinks)' | |
| ) | |
| parser.add_argument( | |
| '-v', '--verbose', | |
| action='store_true', | |
| help='Show progress during scan' | |
| ) | |
| parser.add_argument( | |
| '--no-progress', | |
| action='store_true', | |
| help='Disable the progress indicator' | |
| ) | |
| return parser.parse_args() | |
| def main(): | |
| """Main entry point.""" | |
| args = parse_args() | |
| # Validate paths | |
| valid_paths = [] | |
| for p in args.paths: | |
| if os.path.isdir(p): | |
| valid_paths.append(p) | |
| else: | |
| print("Warning: Skipping invalid path: {0}".format(p), | |
| file=sys.stderr) | |
| if not valid_paths: | |
| print("Error: No valid directories to scan.", file=sys.stderr) | |
| sys.exit(1) | |
| # Create finder and scan | |
| finder = DuplicateFinder( | |
| min_size=args.min_size, | |
| include_metadata=args.include_metadata, | |
| follow_symlinks=args.follow_symlinks, | |
| verbose=args.verbose, | |
| show_progress=not args.no_progress | |
| ) | |
| print("Starting scan...") | |
| print("Minimum file size: {0}".format(human_readable_size(args.min_size))) | |
| print("") | |
| # Phase 1: Scan and group by size | |
| finder.scan_directories(valid_paths) | |
| if finder.total_files_scanned == 0: | |
| print("No files found matching criteria.") | |
| sys.exit(0) | |
| # Phase 2: Find duplicates by hashing | |
| duplicates = finder.find_duplicates() | |
| # Generate and output report | |
| report = format_report(finder, duplicates) | |
| print_report(report, args.output) | |
| # Exit code: 0 if no duplicates, 1 if duplicates found | |
| sys.exit(0 if not duplicates else 0) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment