Last active
September 30, 2024 02:52
-
-
Save Kentakoong/836bbf8d2cf3a1c42e203b69dbb8e828 to your computer and use it in GitHub Desktop.
mtnlog - Map Max Memory from PerformanceLogger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import re | |
| import csv | |
| import logging | |
| from multiprocessing import Pool | |
| from tqdm import tqdm | |
| import pandas as pd | |
| # Set up logging | |
| def setup_logging(log_level=logging.INFO): | |
| logging.basicConfig( | |
| level=log_level, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| def find_max_memory_from_csv_by_tag(csv_path): | |
| """ | |
| Find the maximum memory used for each tag in the CSV file. | |
| Args: | |
| csv_path (str): Path to the CSV file. | |
| Returns: | |
| dict: Dictionary containing maximum memory usage for each tag. | |
| """ | |
| max_memory_per_tag = {} | |
| with open(csv_path, newline='') as csvfile: | |
| reader = csv.DictReader(csvfile) | |
| for row in reader: | |
| tag = row.get('tag', None) | |
| if not tag or tag.strip().lower() == 'none': | |
| continue # Skip rows without a valid tag or if the tag is 'None' | |
| for key, value in row.items(): | |
| if re.match(r'cuda:\d+ \(gpu:\d+\)/memory_used \(MiB\)/max', key): | |
| try: | |
| if value is not None and value.strip(): | |
| max_memory_per_tag[tag] = max(float(value), max_memory_per_tag.get(tag, 0)) | |
| except ValueError: | |
| logging.warning("Skipping invalid value '%s' in column '%s'", value, key) | |
| # Remove any None or invalid entries from the result | |
| return {k: v for k, v in max_memory_per_tag.items() if v is not None} | |
| def process_folder(args): | |
| """ | |
| Process a single folder, find and extract the maximum memory usage for each tag from its CSV files. | |
| Args: | |
| args (tuple): Contains base_dir, stage_folder, ub_folder, job_folder. | |
| Returns: | |
| job_id (str), max_memory_per_tag (dict): Returns the job ID and max memory for each tag. | |
| """ | |
| base_dir, stage_folder, ub_folder, job_folder = args | |
| job_id = job_folder.split('-')[-1] # Extract the job ID from the folder name | |
| job_path = os.path.join(base_dir, stage_folder, ub_folder, job_folder) | |
| metric_dir = os.path.join(job_path, 'metric') | |
| if not os.path.exists(metric_dir): | |
| logging.error(f"No 'metric' directory found in {job_path}") | |
| return job_id, None | |
| csv_files = [f for f in os.listdir(metric_dir) if f.startswith('node-') and f.endswith('.csv')] | |
| logging.debug(f"CSV files found in {metric_dir}: {csv_files}") | |
| if not csv_files: | |
| logging.error(f"No CSV files found in {metric_dir}") | |
| return job_id, None | |
| max_memory_per_tag = {} | |
| for csv_file in csv_files: | |
| csv_path = os.path.join(metric_dir, csv_file) | |
| logging.info("Processing CSV file: %s", csv_file) | |
| tag_max_memory = find_max_memory_from_csv_by_tag(csv_path) | |
| for tag, memory in tag_max_memory.items(): | |
| max_memory_per_tag[tag] = max(max_memory_per_tag.get(tag, 0), memory) | |
| return job_id, max_memory_per_tag | |
| def process_folders_in_parallel(base_dir): | |
| """ | |
| Process multiple folders in parallel using multiprocessing. | |
| """ | |
| stage_folders = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) and d.startswith('stage-')] | |
| logging.debug(f"Discovered stage_folders: {stage_folders}") | |
| tasks = [] | |
| for stage_folder in stage_folders: | |
| stage_path = os.path.join(base_dir, stage_folder) | |
| ub_folders = [d for d in os.listdir(stage_path) if os.path.isdir(os.path.join(stage_path, d)) and d.startswith('llama-')] | |
| logging.debug(f"Discovered llama-ub folders in {stage_folder}: {ub_folders}") | |
| for ub_folder in ub_folders: | |
| ub_path = os.path.join(stage_path, ub_folder) | |
| job_folders = [d for d in os.listdir(ub_path) if os.path.isdir(os.path.join(ub_path, d))] | |
| logging.debug(f"Discovered job folders in {ub_folder}: {job_folders}") | |
| for job_folder in job_folders: | |
| tasks.append((base_dir, stage_folder, ub_folder, job_folder)) | |
| logging.info(f"Total tasks to process: {len(tasks)}") | |
| job_memory_map = {} | |
| with Pool() as pool, tqdm(total=len(tasks), desc="Processing folders") as progress_bar: | |
| for job_id, max_memory_per_tag in pool.imap_unordered(process_folder, tasks): | |
| if max_memory_per_tag: # Only include valid mappings | |
| job_memory_map[job_id] = max_memory_per_tag | |
| progress_bar.update(1) | |
| return job_memory_map | |
| def append_memory_to_csv(scaling_runtime_csv, job_memory_map): | |
| """ | |
| Append or update the max memory columns in the scaling_runtime.csv file based on the job ID. | |
| Args: | |
| scaling_runtime_csv (str): Path to the scaling_runtime.csv file. | |
| job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values. | |
| """ | |
| # Read the existing CSV file | |
| df = pd.read_csv(scaling_runtime_csv) | |
| # Create columns for each tag's max memory if not already present | |
| all_tags = set(tag for memory_map in job_memory_map.values() if memory_map for tag in memory_map) | |
| for tag in all_tags: | |
| tag_col = f"{tag}_max_memory" | |
| if tag_col not in df.columns: | |
| df[tag_col] = None | |
| # Update the tag max memory columns with new values from job_memory_map | |
| for job_id, memory_map in job_memory_map.items(): | |
| if memory_map is not None: | |
| for tag, new_max_memory in memory_map.items(): | |
| tag_col = f"{tag}_max_memory" | |
| if new_max_memory is not None: | |
| df.loc[df['Job ID'] == int(job_id), tag_col] = new_max_memory | |
| # Drop 'None_max_memory' column if present | |
| if 'None_max_memory' in df.columns: | |
| df.drop(columns=['None_max_memory'], inplace=True) | |
| # Drop any other column that ends with '_max_memory' but has no data | |
| empty_columns = [col for col in df.columns if col.endswith('_max_memory') and df[col].isnull().all()] | |
| df.drop(columns=empty_columns, inplace=True) | |
| # Save the updated CSV file back | |
| df.to_csv(scaling_runtime_csv, index=False) | |
| logging.info(f"Updated 'Max Memory' columns in {scaling_runtime_csv}") | |
| def remove_none_tags_from_json(job_memory_map): | |
| """ | |
| Remove any 'None' tags or entries with 'None' values from the job memory map. | |
| Args: | |
| job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values. | |
| Returns: | |
| dict: Cleaned job memory map with no 'None' tags or values. | |
| """ | |
| cleaned_map = {} | |
| for job_id, memory_map in job_memory_map.items(): | |
| cleaned_map[job_id] = {tag: memory for tag, memory in memory_map.items() if tag.lower() != 'none' and memory is not None} | |
| return cleaned_map | |
| # Base directory path | |
| BASE_DIR = "<path on top of the scaling path (ex: /scaling/wo-lora/weak/)>" | |
| SCALING_RUNTIME_CSV = "<scaling-runtime.csv file path>" | |
| # Set up logging | |
| setup_logging(log_level=logging.DEBUG) | |
| # Process folders in parallel and gather the job memory mapping | |
| job_memory_map = process_folders_in_parallel(BASE_DIR) | |
| # Append the max memory column to the scaling_runtime.csv | |
| append_memory_to_csv(SCALING_RUNTIME_CSV, job_memory_map) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Use this SLURM Script to run it in parallel