import argparse import logging import os import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from datetime import timedelta from functools import partial from pathlib import Path import pydicom import warnings warnings.filterwarnings("ignore", category=UserWarning) logger = logging.getLogger("anonymizer") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( "input_dicom_dir", type=Path, help="path to the directory with dicom files", ) parser.add_argument( "output_dicom_dir", type=Path, help="path to the directory fro anonymized files", ) parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true") parser.add_argument("-c", "--concurrency", help="number of processes to run in parallel", type=int, default=None) return parser.parse_args() def configure_logging(is_verbose: bool = False): handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)-15s %(levelname)-8s %(message)s")) logger.addHandler(handler) logger.setLevel(level=logging.DEBUG if is_verbose else logging.INFO) def scan_input_dir_or_path(path_to_dicoms: Path) -> list[Path]: dicom_paths: list[Path] = [] if path_to_dicoms.is_dir(): logger.info("Scanning dicoms in %s", path_to_dicoms) for dir_path, _, filenames in os.walk(path_to_dicoms): for filename in filenames: if filename.endswith(".dcm"): dicom_paths.append(Path(dir_path) / filename) else: if path_to_dicoms.suffix == ".dcm": dicom_paths.append(path_to_dicoms) logger.info("Found %s files", len(dicom_paths)) return dicom_paths def _anonymize(dataset): dataset.PatientID = "ANON" dataset.PatientName = "ANON" dataset.PatientBirthDate = "19000101" if "OtherPatientIDs" in dataset: del dataset.OtherPatientIDs if "OtherPatientIDsSequence" in dataset: del dataset.OtherPatientIDsSequence def anonymize(filename: Path, input_dicom_dir: Path, output_dicom_dir: Path): dataset = pydicom.dcmread(filename) _anonymize(dataset) output_filename = output_dicom_dir / filename.relative_to(input_dicom_dir) Path(output_filename.parent).mkdir(parents=True, exist_ok=True) dataset.save_as(output_filename) def main(args: argparse.Namespace) -> None: tic = time.monotonic() input_dicom_dir = args.input_dicom_dir output_dicom_dir = args.output_dicom_dir dicom_paths = scan_input_dir_or_path(input_dicom_dir) with ProcessPoolExecutor(args.concurrency) as executor: executor.map( partial(anonymize, input_dicom_dir=input_dicom_dir, output_dicom_dir=output_dicom_dir), dicom_paths ) toc = time.monotonic() logger.info("Anonymized dicoms saved to %s", output_dicom_dir) logger.debug("Anonymization of %s files finished at %s", len(dicom_paths), timedelta(seconds=toc - tic)) if __name__ == "__main__": args = parse_args() configure_logging(args.verbose) main(args)