Skip to content

Instantly share code, notes, and snippets.

@Kentakoong
Last active September 30, 2024 02:52
Show Gist options
  • Select an option

  • Save Kentakoong/836bbf8d2cf3a1c42e203b69dbb8e828 to your computer and use it in GitHub Desktop.

Select an option

Save Kentakoong/836bbf8d2cf3a1c42e203b69dbb8e828 to your computer and use it in GitHub Desktop.
mtnlog - Map Max Memory from PerformanceLogger
import os
import re
import csv
import logging
from multiprocessing import Pool
from tqdm import tqdm
import pandas as pd
# Set up logging
def setup_logging(log_level=logging.INFO):
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def find_max_memory_from_csv_by_tag(csv_path):
"""
Find the maximum memory used for each tag in the CSV file.
Args:
csv_path (str): Path to the CSV file.
Returns:
dict: Dictionary containing maximum memory usage for each tag.
"""
max_memory_per_tag = {}
with open(csv_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
tag = row.get('tag', None)
if not tag or tag.strip().lower() == 'none':
continue # Skip rows without a valid tag or if the tag is 'None'
for key, value in row.items():
if re.match(r'cuda:\d+ \(gpu:\d+\)/memory_used \(MiB\)/max', key):
try:
if value is not None and value.strip():
max_memory_per_tag[tag] = max(float(value), max_memory_per_tag.get(tag, 0))
except ValueError:
logging.warning("Skipping invalid value '%s' in column '%s'", value, key)
# Remove any None or invalid entries from the result
return {k: v for k, v in max_memory_per_tag.items() if v is not None}
def process_folder(args):
"""
Process a single folder, find and extract the maximum memory usage for each tag from its CSV files.
Args:
args (tuple): Contains base_dir, stage_folder, ub_folder, job_folder.
Returns:
job_id (str), max_memory_per_tag (dict): Returns the job ID and max memory for each tag.
"""
base_dir, stage_folder, ub_folder, job_folder = args
job_id = job_folder.split('-')[-1] # Extract the job ID from the folder name
job_path = os.path.join(base_dir, stage_folder, ub_folder, job_folder)
metric_dir = os.path.join(job_path, 'metric')
if not os.path.exists(metric_dir):
logging.error(f"No 'metric' directory found in {job_path}")
return job_id, None
csv_files = [f for f in os.listdir(metric_dir) if f.startswith('node-') and f.endswith('.csv')]
logging.debug(f"CSV files found in {metric_dir}: {csv_files}")
if not csv_files:
logging.error(f"No CSV files found in {metric_dir}")
return job_id, None
max_memory_per_tag = {}
for csv_file in csv_files:
csv_path = os.path.join(metric_dir, csv_file)
logging.info("Processing CSV file: %s", csv_file)
tag_max_memory = find_max_memory_from_csv_by_tag(csv_path)
for tag, memory in tag_max_memory.items():
max_memory_per_tag[tag] = max(max_memory_per_tag.get(tag, 0), memory)
return job_id, max_memory_per_tag
def process_folders_in_parallel(base_dir):
"""
Process multiple folders in parallel using multiprocessing.
"""
stage_folders = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) and d.startswith('stage-')]
logging.debug(f"Discovered stage_folders: {stage_folders}")
tasks = []
for stage_folder in stage_folders:
stage_path = os.path.join(base_dir, stage_folder)
ub_folders = [d for d in os.listdir(stage_path) if os.path.isdir(os.path.join(stage_path, d)) and d.startswith('llama-')]
logging.debug(f"Discovered llama-ub folders in {stage_folder}: {ub_folders}")
for ub_folder in ub_folders:
ub_path = os.path.join(stage_path, ub_folder)
job_folders = [d for d in os.listdir(ub_path) if os.path.isdir(os.path.join(ub_path, d))]
logging.debug(f"Discovered job folders in {ub_folder}: {job_folders}")
for job_folder in job_folders:
tasks.append((base_dir, stage_folder, ub_folder, job_folder))
logging.info(f"Total tasks to process: {len(tasks)}")
job_memory_map = {}
with Pool() as pool, tqdm(total=len(tasks), desc="Processing folders") as progress_bar:
for job_id, max_memory_per_tag in pool.imap_unordered(process_folder, tasks):
if max_memory_per_tag: # Only include valid mappings
job_memory_map[job_id] = max_memory_per_tag
progress_bar.update(1)
return job_memory_map
def append_memory_to_csv(scaling_runtime_csv, job_memory_map):
"""
Append or update the max memory columns in the scaling_runtime.csv file based on the job ID.
Args:
scaling_runtime_csv (str): Path to the scaling_runtime.csv file.
job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values.
"""
# Read the existing CSV file
df = pd.read_csv(scaling_runtime_csv)
# Create columns for each tag's max memory if not already present
all_tags = set(tag for memory_map in job_memory_map.values() if memory_map for tag in memory_map)
for tag in all_tags:
tag_col = f"{tag}_max_memory"
if tag_col not in df.columns:
df[tag_col] = None
# Update the tag max memory columns with new values from job_memory_map
for job_id, memory_map in job_memory_map.items():
if memory_map is not None:
for tag, new_max_memory in memory_map.items():
tag_col = f"{tag}_max_memory"
if new_max_memory is not None:
df.loc[df['Job ID'] == int(job_id), tag_col] = new_max_memory
# Drop 'None_max_memory' column if present
if 'None_max_memory' in df.columns:
df.drop(columns=['None_max_memory'], inplace=True)
# Drop any other column that ends with '_max_memory' but has no data
empty_columns = [col for col in df.columns if col.endswith('_max_memory') and df[col].isnull().all()]
df.drop(columns=empty_columns, inplace=True)
# Save the updated CSV file back
df.to_csv(scaling_runtime_csv, index=False)
logging.info(f"Updated 'Max Memory' columns in {scaling_runtime_csv}")
def remove_none_tags_from_json(job_memory_map):
"""
Remove any 'None' tags or entries with 'None' values from the job memory map.
Args:
job_memory_map (dict): Dictionary with job ID as keys and max memory per tag as values.
Returns:
dict: Cleaned job memory map with no 'None' tags or values.
"""
cleaned_map = {}
for job_id, memory_map in job_memory_map.items():
cleaned_map[job_id] = {tag: memory for tag, memory in memory_map.items() if tag.lower() != 'none' and memory is not None}
return cleaned_map
# Base directory path
BASE_DIR = "<path on top of the scaling path (ex: /scaling/wo-lora/weak/)>"
SCALING_RUNTIME_CSV = "<scaling-runtime.csv file path>"
# Set up logging
setup_logging(log_level=logging.DEBUG)
# Process folders in parallel and gather the job memory mapping
job_memory_map = process_folders_in_parallel(BASE_DIR)
# Append the max memory column to the scaling_runtime.csv
append_memory_to_csv(SCALING_RUNTIME_CSV, job_memory_map)
@Kentakoong
Copy link
Author

Use this SLURM Script to run it in parallel

#!/bin/bash
#SBATCH -p compute --exclusive               # Specify partition [Compute/Memory/GPU]
#SBATCH -c 128                               # Specify number of processors per task -- max, can be reduced
#SBATCH --ntasks-per-node=1                  # Specify number of tasks per node
#SBATCH -A ltxxxxxx                         # Specify project name
#SBATCH -J map-missing-graphs                # Specify job name
#SBATCH -o ./map/map-memory-%j.out           # Specify output file

module restore
module load Mamba

conda deactivate
conda activate <your-env-name>

# Run the script

srun python <path-to-python-file>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment