import os import re import csv import logging from multiprocessing import Pool from tqdm import tqdm import pandas as pd # Set up logging def setup_logging(log_level=logging.INFO): logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def find_max_memory_from_csv_by_tag(csv_path): """ Find the maximum memory used for each tag in the CSV file. Args: csv_path (str): Path to the CSV file. Returns: dict: Dictionary containing maximum memory usage for each tag. """ max_memory_per_tag = {} with open(csv_path, newline='') as csvfile: reader = csv.DictReader(csvfile) for row in reader: tag = row.get('tag', None) if not tag or tag.strip().lower() == 'none': continue # Skip rows without a valid tag or if the tag is 'None' for key, value in row.items(): if re.match(r'cuda:\d+ \(gpu:\d+\)/memory_used \(MiB\)/max', key): try: if value is not None and value.strip(): max_memory_per_tag[tag] = max(float(value), max_memory_per_tag.get(tag, 0)) except ValueError: logging.warning("Skipping invalid value '%s' in column '%s'", value, key) # Remove any None or invalid entries from the result return {k: v for k, v in max_memory_per_tag.items() if v is not None} def process_folder(args): """ Process a single folder, find and extract the maximum memory usage for each tag from its CSV files. Args: args (tuple): Contains base_dir, stage_folder, ub_folder, job_folder. Returns: job_id (str), max_memory_per_tag (dict): Returns the job ID and max memory for each tag. """ base_dir, stage_folder, ub_folder, job_folder = args job_id = job_folder.split('-')[-1] # Extract the job ID from the folder name job_path = os.path.join(base_dir, stage_folder, ub_folder, job_folder) metric_dir = os.path.join(job_path, 'metric') if not os.path.exists(metric_dir): logging.error(f"No 'metric' directory found in {job_path}") return job_id, None csv_files = [f for f in os.listdir(metric_dir) if f.startswith('node-') and f.endswith('.csv')] logging.debug(f"CSV files found in {metric_dir}: {csv_files}") if not csv_files: logging.error(f"No CSV files found in {metric_dir}") return job_id, None max_memory_per_tag = {} for csv_file in csv_files: csv_path = os.path.join(metric_dir, csv_file) logging.info("Processing CSV file: %s", csv_file) tag_max_memory = find_max_memory_from_csv_by_tag(csv_path) for tag, memory in tag_max_memory.items(): max_memory_per_tag[tag] = max(max_memory_per_tag.get(tag, 0), memory) return job_id, max_memory_per_tag def process_folders_in_parallel(base_dir): """ Process multiple folders in parallel using multiprocessing. """ stage_folders = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) and d.startswith('stage-')] logging.debug(f"Discovered stage_folders: {stage_folders}") tasks = [] for stage_folder in stage_folders: stage_path = os.path.join(base_dir, stage_folder) ub_folders = [d for d in os.listdir(stage_path) if os.path.isdir(os.path.join(stage_path, d)) and d.startswith('llama-')] logging.debug(f"Discovered llama-ub folders in {stage_folder}: {ub_folders}") for ub_folder in ub_folders: ub_path = os.path.join(stage_path, ub_folder) job_folders = [d for d in os.listdir(ub_path) if os.path.isdir(os.path.join(ub_path, d))] logging.debug(f"Discovered job folders in {ub_folder}: {job_folders}") for job_folder in job_folders: tasks.append((base_dir, stage_folder, ub_folder, job_folder)) logging.info(f"Total tasks to process: {len(tasks)}") job_memory_map = {} with Pool() as pool, tqdm(total=len(tasks), desc="Processing folders") as progress_bar: for job_id, max_memory_per_tag in pool.imap_unordered(process_folder, tasks): if max_memory_per_tag: # Only include valid mappings job_memory_map[job_id] = max_memory_per_tag progress_bar.update(1) return job_memory_map def append_memory_to_csv(scaling_runtime_csv, job_memory_map): """ Append or update the max memory columns in the scaling_runtime.csv file based on the job ID. Args: scaling_runtime_csv (str): Path to the scaling_runtime.csv file. job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values. """ # Read the existing CSV file df = pd.read_csv(scaling_runtime_csv) # Create columns for each tag's max memory if not already present all_tags = set(tag for memory_map in job_memory_map.values() if memory_map for tag in memory_map) for tag in all_tags: tag_col = f"{tag}_max_memory" if tag_col not in df.columns: df[tag_col] = None # Update the tag max memory columns with new values from job_memory_map for job_id, memory_map in job_memory_map.items(): if memory_map is not None: for tag, new_max_memory in memory_map.items(): tag_col = f"{tag}_max_memory" if new_max_memory is not None: df.loc[df['Job ID'] == int(job_id), tag_col] = new_max_memory # Drop 'None_max_memory' column if present if 'None_max_memory' in df.columns: df.drop(columns=['None_max_memory'], inplace=True) # Drop any other column that ends with '_max_memory' but has no data empty_columns = [col for col in df.columns if col.endswith('_max_memory') and df[col].isnull().all()] df.drop(columns=empty_columns, inplace=True) # Save the updated CSV file back df.to_csv(scaling_runtime_csv, index=False) logging.info(f"Updated 'Max Memory' columns in {scaling_runtime_csv}") def remove_none_tags_from_json(job_memory_map): """ Remove any 'None' tags or entries with 'None' values from the job memory map. Args: job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values. Returns: dict: Cleaned job memory map with no 'None' tags or values. """ cleaned_map = {} for job_id, memory_map in job_memory_map.items(): cleaned_map[job_id] = {tag: memory for tag, memory in memory_map.items() if tag.lower() != 'none' and memory is not None} return cleaned_map # Base directory path BASE_DIR = "" SCALING_RUNTIME_CSV = "" # Set up logging setup_logging(log_level=logging.DEBUG) # Process folders in parallel and gather the job memory mapping job_memory_map = process_folders_in_parallel(BASE_DIR) # Append the max memory column to the scaling_runtime.csv append_memory_to_csv(SCALING_RUNTIME_CSV, job_memory_map)