Experiments

import itertools
import pandas as pd
import numpy as np
from transformers import AutoConfig, AutoModelForCausalLM
import gspread
from gspread_dataframe import get_as_dataframe, set_with_dataframe
import os
gc = gspread.oauth()
models = [
    {"model_name":"meta-llama/Llama-2-7b-hf", "model_size":7}, 
    {"model_name":"meta-llama/Llama-2-13b-hf", "model_size":13},
    {"model_name":"codellama/CodeLlama-34b-hf", "model_size":34},
    {"model_name":"meta-llama/Llama-2-70b-hf", "model_size":70}
]
# for m in models:
#     cfg = AutoConfig.from_pretrained(m['model_name'])
#     m['config'] = cfg
seqlen = [{"seqlen":256}]
max_bs = [{"max_bs":None}]
bs = [{"bs":None}]
cpu_offloading = [{"cpu_offloading":False}, {"cpu_offloading":True}]
distrib_type = [{"distrib_type":"FSDP"}, {"distrib_type":"DDP"}]
ft_type = [{"ft_type":"LoRA"}, {"ft_type":"QLoRA"}]
# RTX 3090 is not available in cloud providers A5000 also has 24GB memory
gpus = [{"gpu_model":"A5000", "num_gpus":2, "gpu_mem":24, "total_gpu_mem":48, "nvlink":"False"},
        {"gpu_model":"A100-40", "num_gpus":8, "gpu_mem":40, "total_gpu_mem":320, "nvlink":"True"}]
wandb = [{"wandb_link":None,
          "memory_peak":None, 
          "memory_after_model_creation":None,
          "memory_after_model_wrap":None,          
          "memory_before_forward":None,
          "memory_after_forward":None,
          "memory_before_backward":None,
          "memory_after_backward":None, 
          "time_taken":None}]
grad_ckpt = [{"use_gradient_checkpointing":True}, {"use_gradient_checkpointing":False}]
iters = [models, seqlen, max_bs, bs, grad_ckpt, cpu_offloading, distrib_type, ft_type, gpus, wandb]
experiments = list(itertools.product(*iters))
len(experiments)
def flatten_list_of_dicts(l):
    final_d = {}
    for d in l: 
        for k,v in d.items():
            if k in final_d:
                raise ValueError(f"Key {k} exists.")
            final_d[k] = v
    return final_d
experiments_flat = [flatten_list_of_dicts(exp) for exp in experiments]
df = pd.DataFrame(experiments_flat)
# exclude lora ddp
mask = ~((df['ft_type'] == 'LoRA') & (df['distrib_type'] == 'DDP'))
# no cpu-offloading with ddp
mask = np.logical_and(mask, ~((df['cpu_offloading'] == True) & (df['distrib_type'] == 'DDP')))

df = df[mask].reset_index(drop=True)
df.shape
df.head()
# !pip install gspread
# !pip install gspread-dataframe
url = "https://docs.google.com/spreadsheets/d/1JSQbnkwtqPgc-_wqI3LTCJI6jWCaWafubK0ontWR2_Y"
sheet = gc.open_by_url(url)
# this will overwrite the existing sheet!
# use other utils from gspread to add data to specific cells.
worksheet = sheet.get_worksheet_by_id(0)
set_with_dataframe(worksheet, df)

Modify Experiments

Flag experiments based on the theoretical limits excluding the activations.

Note: In DDP script cast all model params to bfloat16 except for RoPE layers.

  1. DDP requires all params, optimizer states, activations to fit into a single GPU.

  2. Compute approx memory requirement per GPU with FSDP full sharing, consider cases with and without CPU offloading.

url = "https://docs.google.com/spreadsheets/d/1JSQbnkwtqPgc-_wqI3LTCJI6jWCaWafubK0ontWR2_Y"
sheet = gc.open_by_url(url)
worksheet = sheet.get_worksheet_by_id(0)
vals = worksheet.get_all_values()
df = pd.DataFrame(vals[1:], columns=vals[0])
df.shape
df.columns
df.head()
# activation memory per layer: https://arxiv.org/pdf/2205.05198.pdf
bs = 1 
sl = 256
h = 4096
a = 32
(bs * sl * h * (34 + 5 * (a * sl / h))) / 1e9
# exclude optimizer states since lora updates a small fraction of weights
# exclude activations 
oom_ignored = []
for row in df.itertuples():
    if row.cpu_offloading != 'TRUE':
        approx_mem_req = int(row.model_size) * 2 / (int(row.num_gpus) if row.distrib_type == 'FSDP' else 1)
        oom_ignored.append(approx_mem_req > int(row.total_gpu_mem))
    else:
        oom_ignored.append(False)
df['oom_ignored'] = oom_ignored
df['oom_ignored'].mean(), df['oom_ignored'].sum()
set_with_dataframe(worksheet, df)

Create Training Commands

sub_df = df.query("oom_ignored == 'FALSE' or not oom_ignored")
df.shape, sub_df.shape
small_gpu_commands = []
large_gpu_commands = []

for _, row in sub_df.iterrows():
    cmd_args = ["python train.py",
                "--batch_size 128", # divide by 2 every retry
                "--num_epochs 1",
                "--dataset alpaca_sample",
                "--use_flash_attention",
                "--precision bf16_buffers_autocast",
                "--log_to wandb",
    ]

    if row.distrib_type == "DDP":
        cmd_args.append("--use_dpp")
    elif row.distrib_type == "FSDP":
        pass
    else:
        raise ValueError(f"Unknown distrib_type {distrib_type}")

    cmd_args.append(f"--model_name {row.model_name}")

    cmd_args.append(f"--context_length {row.seqlen}")
    
    if row.use_gradient_checkpointing == "TRUE":
        cmd_args.append("--use_gradient_checkpointing True")
    else:
        cmd_args.append("--use_gradient_checkpointing False")
    
    if row.cpu_offloading == "TRUE":
        cmd_args.append("--use_cpu_offload")

    if row.ft_type == "LoRA":
        cmd_args.append("--train_type lora")
    elif row.ft_type == "QLoRA":
        cmd_args.append("--train_type qlora")
    else:
        raise ValueError(f"Unknown ft_type {ft_type}")
        
    if row.gpu_model == "A100-40":
        large_gpu_commands.append(" ".join(cmd_args))
    elif row.gpu_model == "A5000":
        small_gpu_commands.append(" ".join(cmd_args))
    else:
        ValueError("Unknown gpu model.")
os.makedirs("../benchmarking", exist_ok=True)
with open("../benchmarking/small_gpu_benchmarking.sh", "w") as f: 
    f.write("\n".join(small_gpu_commands))

with open("../benchmarking/large_gpu_benchmarking.sh", "w") as f: 
    f.write("\n".join(large_gpu_commands))    

Update Sheet with Results

import wandb
api = wandb.Api()
url = "https://docs.google.com/spreadsheets/d/1JSQbnkwtqPgc-_wqI3LTCJI6jWCaWafubK0ontWR2_Y"
sheet = gc.open_by_url(url)
empty_worksheet = sheet.get_worksheet_by_id(0)
filled_worksheet = sheet.get_worksheet_by_id(74399953)
vals = empty_worksheet.get_all_values()
df = pd.DataFrame(vals[1:], columns=vals[0])
df.shape
df.columns
wandb_project = "answerdotai/fsdp-benchmarking"

wandb_cols = ['memory_peak', 'memory_after_model_creation',
              'memory_after_model_wrap', 'memory_before_forward',
              'memory_after_forward', 'memory_after_backward', 
              'time_taken']

empty_logs = pd.Series({c:None for c in wandb_cols})
wandb_logs = []
for row in df.itertuples():
    if row.wandb_link == "": 
        wandb_logs.append(empty_logs)
    else:
        expid = row.wandb_link.split("runs/")[-1].split("/")[0].split("?")[0]
        print(row.wandb_link, expid)
        run = api.run(wandb_project + "/" + expid)
        history_df = run.history()
        existing_cols = list(set(history_df.columns).intersection(wandb_cols))
        wandb_logs.append(history_df[existing_cols].fillna(-1e30).max(axis=0))
wandb_logs_df = pd.concat(wandb_logs, axis=1).T
for c in wandb_logs_df.columns:
    if c.startswith("memory"):
        wandb_logs_df[c] = wandb_logs_df[c] / 1e9
df[wandb_logs_df.columns] = wandb_logs_df
df.head()
set_with_dataframe(filled_worksheet, df, 1, 1)