Skip to content

Instantly share code, notes, and snippets.

@RezaAmbler
Last active January 2, 2026 03:12
Show Gist options
  • Select an option

  • Save RezaAmbler/ef7ea6567e5e0ded37be0ce7699a70c1 to your computer and use it in GitHub Desktop.

Select an option

Save RezaAmbler/ef7ea6567e5e0ded37be0ce7699a70c1 to your computer and use it in GitHub Desktop.
Duplicate file finder designed for QNAP NAS (Python 2.7, no dependencies). Uses a two-phase approach: groups files by size first, then computes MD5 hashes only for potential matches. Automatically skips QNAP system directories (@recycle, @eadir, etc.). Outputs human-readable report with wasted space calculation.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
find_duplicates.py - Duplicate File Finder for QNAP NAS
Finds duplicate files by comparing file sizes first, then MD5 hashes.
Designed for Python 2.7 on QNAP NAS systems (no external dependencies).
USAGE EXAMPLES:
# Basic scan of a directory
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia
# Scan multiple directories
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia /share/CACHEDEV1_DATA/Download
# Save results to a file
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --output duplicates_report.txt
# Change minimum file size (default 1MB)
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --min-size 10485760
# Include QNAP metadata directories (normally skipped)
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --include-metadata
# Follow symbolic links
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --follow-symlinks
# Show progress during scan
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --verbose
# Disable progress indicator (useful for logging to file)
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia --no-progress
AUTHOR: Generated for QNAP NAS duplicate file management
PYTHON: 2.7 (standard library only)
"""
from __future__ import print_function
import os
import sys
import argparse
import hashlib
import time
from collections import defaultdict
# ============================================================================
# Constants
# ============================================================================
DEFAULT_MIN_SIZE = 1024 * 1024 # 1 MB in bytes
# Directories to skip by default on QNAP
SKIP_DIRS = frozenset([
'lost+found',
'.@__thumb',
])
# ============================================================================
# Utility Functions
# ============================================================================
def human_readable_size(size_bytes):
"""Convert bytes to human readable string (B/KB/MB/GB/TB)."""
if size_bytes < 0:
return "0 B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if abs(size_bytes) < 1024.0:
if unit == 'B':
return "{0} {1}".format(int(size_bytes), unit)
return "{0:.2f} {1}".format(size_bytes, unit)
size_bytes /= 1024.0
return "{0:.2f} PB".format(size_bytes)
def should_skip_dir(dirname, include_metadata=False):
"""Check if a directory should be skipped."""
# Skip directories starting with '@' (QNAP system/metadata)
if not include_metadata and dirname.startswith('@'):
return True
# Skip known system directories
if dirname in SKIP_DIRS:
return True
return False
def compute_md5(filepath, chunk_size=1048576, progress_callback=None):
"""
Compute MD5 hash of a file.
Uses chunked reading for memory efficiency on large files.
Returns None if file cannot be read.
Args:
filepath: Path to file
chunk_size: Read chunk size (default 1MB for faster I/O on large files)
progress_callback: Optional function(bytes_read, total_bytes) called periodically
"""
md5 = hashlib.md5()
try:
file_size = os.path.getsize(filepath)
bytes_read = 0
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
md5.update(chunk)
bytes_read += len(chunk)
# Call progress callback if provided
if progress_callback:
progress_callback(bytes_read, file_size)
return md5.hexdigest()
except (IOError, OSError) as e:
return None
def compute_partial_md5(filepath, sample_size=65536):
"""
Compute MD5 of first N bytes of a file for quick pre-filtering.
This helps eliminate non-duplicates faster before full hash.
Returns None if file cannot be read.
"""
md5 = hashlib.md5()
try:
with open(filepath, 'rb') as f:
chunk = f.read(sample_size)
md5.update(chunk)
return md5.hexdigest()
except (IOError, OSError):
return None
# ============================================================================
# Progress Display
# ============================================================================
class ProgressDisplay(object):
"""Handles in-place progress updates on the terminal."""
def __init__(self, enabled=True):
self.enabled = enabled
self.last_update = 0
self.update_interval = 0.1 # Update every 100ms max
self.last_line_length = 0
def update(self, message, force=False):
"""Update the progress line in-place."""
if not self.enabled:
return
# Throttle updates to avoid excessive I/O
now = time.time()
if not force and (now - self.last_update) < self.update_interval:
return
self.last_update = now
# Truncate message if too long for typical terminal
max_width = 100
if len(message) > max_width:
message = message[:max_width - 3] + '...'
# Pad with spaces to overwrite previous line content
padded = message.ljust(self.last_line_length)
self.last_line_length = len(message)
sys.stdout.write('\r' + padded)
sys.stdout.flush()
def clear(self):
"""Clear the progress line."""
if not self.enabled:
return
sys.stdout.write('\r' + ' ' * self.last_line_length + '\r')
sys.stdout.flush()
self.last_line_length = 0
# ============================================================================
# Scanner Class
# ============================================================================
class DuplicateFinder(object):
"""Finds duplicate files using size-first, then hash approach."""
def __init__(self, min_size=DEFAULT_MIN_SIZE, include_metadata=False,
follow_symlinks=False, verbose=False, show_progress=True):
self.min_size = min_size
self.include_metadata = include_metadata
self.follow_symlinks = follow_symlinks
self.verbose = verbose
self.progress = ProgressDisplay(enabled=show_progress)
# Statistics
self.total_files_scanned = 0
self.total_bytes_scanned = 0
self.files_skipped_size = 0
self.files_skipped_error = 0
self.dirs_scanned = 0
self.dirs_skipped = 0
# Phase 1: Group files by size
# {size_in_bytes: [filepath1, filepath2, ...]}
self.size_groups = defaultdict(list)
def log(self, message):
"""Print message if verbose mode is enabled."""
if self.verbose:
self.progress.clear()
print(message, file=sys.stderr)
def _update_scan_progress(self, current_dir):
"""Update the scanning progress display."""
# Shorten directory path for display
display_dir = current_dir
if len(display_dir) > 40:
display_dir = '...' + display_dir[-37:]
msg = "[Scan] Files: {0} | Size: {1} | Dirs: {2} | {3}".format(
self.total_files_scanned,
human_readable_size(self.total_bytes_scanned),
self.dirs_scanned,
display_dir
)
self.progress.update(msg)
def scan_directories(self, paths):
"""
Scan one or more directory paths for files.
Phase 1: Groups all files by size.
"""
for base_path in paths:
if not os.path.exists(base_path):
self.progress.clear()
print("Warning: Path does not exist: {0}".format(base_path),
file=sys.stderr)
continue
if not os.path.isdir(base_path):
self.progress.clear()
print("Warning: Not a directory: {0}".format(base_path),
file=sys.stderr)
continue
self.log("Scanning: {0}".format(base_path))
self._scan_directory(base_path)
# Clear progress line when done
self.progress.clear()
def _scan_directory(self, base_path):
"""Recursively scan a single directory."""
for root, dirs, files in os.walk(base_path,
followlinks=self.follow_symlinks):
self.dirs_scanned += 1
self._update_scan_progress(root)
# Filter out directories we should skip (modifies in-place)
original_dir_count = len(dirs)
dirs[:] = [d for d in dirs
if not should_skip_dir(d, self.include_metadata)]
self.dirs_skipped += original_dir_count - len(dirs)
# Process files
for filename in files:
filepath = os.path.join(root, filename)
# Skip symlinks unless explicitly following them
if not self.follow_symlinks and os.path.islink(filepath):
continue
try:
# Get file size
stat_info = os.stat(filepath)
file_size = stat_info.st_size
# Skip files below minimum size
if file_size < self.min_size:
self.files_skipped_size += 1
continue
# Track statistics
self.total_files_scanned += 1
self.total_bytes_scanned += file_size
# Group by size
self.size_groups[file_size].append(filepath)
# Update progress periodically
if self.total_files_scanned % 50 == 0:
self._update_scan_progress(root)
except (OSError, IOError) as e:
self.files_skipped_error += 1
def find_duplicates(self):
"""
Phase 2: Find duplicates by computing hashes for size-matched files.
Returns a list of duplicate groups:
[
{
'hash': 'abc123...',
'size': 12345678,
'files': ['/path/to/file1', '/path/to/file2', ...]
},
...
]
"""
duplicates = []
# Only process size groups with more than one file
size_groups_to_check = [
(size, files) for size, files in self.size_groups.items()
if len(files) > 1
]
self.log("Found {0} size groups with potential duplicates".format(
len(size_groups_to_check)))
files_to_hash = sum(len(files) for _, files in size_groups_to_check)
total_bytes_to_hash = sum(size * len(files)
for size, files in size_groups_to_check)
# Print summary before hashing
self.progress.clear()
print("")
print("-" * 50)
print("SCAN COMPLETE - CHECKSUM PHASE STARTING")
print("-" * 50)
print(" Unique file sizes found: {0}".format(len(self.size_groups)))
print(" Size groups with 2+ files: {0}".format(len(size_groups_to_check)))
print(" Files to checksum: {0}".format(files_to_hash))
print(" Total data to read: {0}".format(
human_readable_size(total_bytes_to_hash)))
print("-" * 50)
print("")
if files_to_hash == 0:
print("No potential duplicates found - nothing to hash.")
return duplicates
self.log("Need to hash {0} files".format(files_to_hash))
hashed_count = 0
bytes_hashed = 0
for size, filepaths in size_groups_to_check:
# For groups with many files, use partial hash first
if len(filepaths) > 2:
# Group by partial hash first
partial_groups = defaultdict(list)
for idx, fp in enumerate(filepaths):
# Show progress during partial hashing
display_fp = os.path.basename(fp)
if len(display_fp) > 30:
display_fp = display_fp[:27] + '...'
msg = "[Pre-filter] Group size {0} | {1}/{2} | {3}".format(
human_readable_size(size),
idx + 1,
len(filepaths),
display_fp
)
self.progress.update(msg)
partial = compute_partial_md5(fp)
if partial:
partial_groups[partial].append(fp)
# Only full-hash files with matching partial hashes
files_to_full_hash = []
for partial, fps in partial_groups.items():
if len(fps) > 1:
files_to_full_hash.extend(fps)
else:
files_to_full_hash = filepaths
# Group by full MD5 hash
hash_groups = defaultdict(list)
for fp in files_to_full_hash:
hashed_count += 1
# Shorten filepath for display
display_fp = os.path.basename(fp)
if len(display_fp) > 25:
display_fp = display_fp[:22] + '...'
# Create progress callback for this file
def make_hash_progress(file_num, total_files, filename,
prev_bytes, total_bytes_all):
def callback(bytes_read, file_total):
current_total = prev_bytes + bytes_read
pct = (current_total * 100) // total_bytes_all if total_bytes_all > 0 else 0
file_pct = (bytes_read * 100) // file_total if file_total > 0 else 100
msg = "[Hash] {0}% | File {1}/{2}: {3}% | {4}".format(
pct,
file_num,
total_files,
file_pct,
filename
)
self.progress.update(msg)
return callback
progress_cb = make_hash_progress(
hashed_count, files_to_hash, display_fp,
bytes_hashed, total_bytes_to_hash
)
# Show initial progress for this file
msg = "[Hash] {0}% | File {1}/{2}: 0% | {3}".format(
(bytes_hashed * 100) // total_bytes_to_hash if total_bytes_to_hash > 0 else 0,
hashed_count,
files_to_hash,
display_fp
)
self.progress.update(msg, force=True)
file_hash = compute_md5(fp, progress_callback=progress_cb)
bytes_hashed += size
if file_hash:
hash_groups[file_hash].append(fp)
# Collect groups with actual duplicates
for file_hash, fps in hash_groups.items():
if len(fps) > 1:
duplicates.append({
'hash': file_hash,
'size': size,
'files': sorted(fps)
})
# Clear progress line when done
self.progress.clear()
# Sort by size (largest first) for more impactful results at top
duplicates.sort(key=lambda x: x['size'], reverse=True)
return duplicates
# ============================================================================
# Output Functions
# ============================================================================
def format_report(finder, duplicates):
"""Format the duplicate report as a string."""
lines = []
# Header
lines.append("=" * 70)
lines.append("DUPLICATE FILE REPORT")
lines.append("=" * 70)
lines.append("")
# Statistics
lines.append("SCAN STATISTICS:")
lines.append("-" * 40)
lines.append(" Directories scanned: {0}".format(
finder.dirs_scanned))
lines.append(" Total files scanned: {0}".format(
finder.total_files_scanned))
lines.append(" Total bytes scanned: {0} ({1})".format(
finder.total_bytes_scanned,
human_readable_size(finder.total_bytes_scanned)))
lines.append(" Files skipped (small): {0}".format(
finder.files_skipped_size))
lines.append(" Files skipped (error): {0}".format(
finder.files_skipped_error))
lines.append(" Directories skipped: {0}".format(
finder.dirs_skipped))
lines.append("")
# Duplicate summary
if not duplicates:
lines.append("No duplicate files found!")
lines.append("")
return "\n".join(lines)
total_dup_groups = len(duplicates)
total_dup_files = sum(len(d['files']) for d in duplicates)
total_wasted = sum(d['size'] * (len(d['files']) - 1) for d in duplicates)
lines.append("DUPLICATE SUMMARY:")
lines.append("-" * 40)
lines.append(" Duplicate groups found: {0}".format(total_dup_groups))
lines.append(" Total duplicate files: {0}".format(total_dup_files))
lines.append(" Wasted space: {0} ({1})".format(
total_wasted, human_readable_size(total_wasted)))
lines.append("")
# Detailed duplicate listing
lines.append("DUPLICATE FILES (largest first):")
lines.append("=" * 70)
for i, dup in enumerate(duplicates, 1):
lines.append("")
lines.append("Group {0}: {1} ({2} files)".format(
i,
human_readable_size(dup['size']),
len(dup['files'])))
lines.append(" MD5: {0}".format(dup['hash']))
lines.append(" Files:")
for fp in dup['files']:
lines.append(" - {0}".format(fp))
lines.append("")
lines.append("=" * 70)
lines.append("END OF REPORT")
lines.append("=" * 70)
return "\n".join(lines)
def print_report(report_text, output_file=None):
"""Print report to console and optionally save to file."""
print(report_text)
if output_file:
try:
with open(output_file, 'w') as f:
f.write(report_text)
print("")
print("Report saved to: {0}".format(output_file))
except (IOError, OSError) as e:
print("Error saving report: {0}".format(e), file=sys.stderr)
# ============================================================================
# Main Entry Point
# ============================================================================
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Find duplicate files by size and MD5 hash.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python find_duplicates.py /share/CACHEDEV1_DATA/Multimedia
python find_duplicates.py /path1 /path2 --output report.txt
python find_duplicates.py /path --min-size 10485760 --verbose
"""
)
parser.add_argument(
'paths',
nargs='+',
help='One or more directory paths to scan'
)
parser.add_argument(
'-o', '--output',
metavar='FILE',
help='Save report to a text file'
)
parser.add_argument(
'-m', '--min-size',
type=int,
default=DEFAULT_MIN_SIZE,
metavar='BYTES',
help='Minimum file size in bytes (default: 1MB = 1048576)'
)
parser.add_argument(
'--include-metadata',
action='store_true',
help='Include QNAP metadata directories (starting with @)'
)
parser.add_argument(
'--follow-symlinks',
action='store_true',
help='Follow symbolic links (default: skip symlinks)'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Show progress during scan'
)
parser.add_argument(
'--no-progress',
action='store_true',
help='Disable the progress indicator'
)
return parser.parse_args()
def main():
"""Main entry point."""
args = parse_args()
# Validate paths
valid_paths = []
for p in args.paths:
if os.path.isdir(p):
valid_paths.append(p)
else:
print("Warning: Skipping invalid path: {0}".format(p),
file=sys.stderr)
if not valid_paths:
print("Error: No valid directories to scan.", file=sys.stderr)
sys.exit(1)
# Create finder and scan
finder = DuplicateFinder(
min_size=args.min_size,
include_metadata=args.include_metadata,
follow_symlinks=args.follow_symlinks,
verbose=args.verbose,
show_progress=not args.no_progress
)
print("Starting scan...")
print("Minimum file size: {0}".format(human_readable_size(args.min_size)))
print("")
# Phase 1: Scan and group by size
finder.scan_directories(valid_paths)
if finder.total_files_scanned == 0:
print("No files found matching criteria.")
sys.exit(0)
# Phase 2: Find duplicates by hashing
duplicates = finder.find_duplicates()
# Generate and output report
report = format_report(finder, duplicates)
print_report(report, args.output)
# Exit code: 0 if no duplicates, 1 if duplicates found
sys.exit(0 if not duplicates else 0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment