Source code for beamds.beam.config.deepspeed

import os
from collections import defaultdict
from .core_config import BeamConfig, BeamParam
from ..base import base_paths
from ..path import beam_path


[docs] def deepspeed_dtype_mapper(dtype): model_mapping = {'float32': 'fp32', 'float16': 'fp16', 'bfloat16': 'bf16'} return model_mapping[dtype]
[docs] class DeepspeedConfig(BeamConfig): parameters = [ BeamParam('deepspeed_config', str, None, 'Deepspeed configuration JSON file.'), # Optimizer Parameters BeamParam('deepspeed_optimizer', str, 'AdamW', 'Optimizer type (currently used for deepspeed configuration only) ' 'Supported optimizers: [Adam, AdamW, Lamb, OneBitAdam, OneBitLamb]'), # Scheduler Parameters # Automatic mixed precision (AMP) training options # ZeRO Optimizations for FP16 Training BeamParam('zero_stage', int, 2, 'The ZeRO training stage to use.'), BeamParam('stage3_gather_16bit_weights_on_model_save', bool, False, 'Whether to gather 16-bit weights on model save in ZeRO stage 3'), # Parameter offloading BeamParam('offload_param_device', str, None, 'Whether to offload parameters from GPU in ZeRO stage 3: ' '[cpu, nvme, none]'), BeamParam('offload_param_pin_memory', bool, True, 'Whether to pin memory for offloaded parameters'), BeamParam('offload_param_nvme_path', str, base_paths.deepspeed_data, 'Path to NVMe device for offloaded parameters'), # Optimizer offloading BeamParam('offload_optimizer_device', str, None, 'Whether to offload optimizer states from GPU in ZeRO stages 1/2/3: ' '[cpu, nvme, none]'), BeamParam('offload_optimizer_pin_memory', bool, True, 'Whether to pin memory for offloaded optimizer states'), BeamParam('autotuning', bool, False, 'Whether to use deepspeed autotuning feature.'), # Activation Checkpointing BeamParam('partition_activations', bool, False, 'Enables partition activation when used with model parallelism'), BeamParam('cpu_checkpointing', bool, False, 'Offloads partitioned activations to CPU if partition_activations is enabled'), BeamParam('contiguous_memory_optimization', bool, False, 'Copies partitioned activations so that they are contiguous in memory'), BeamParam('number_checkpoints', int, None, 'Total number of activation checkpoints used to allocate memory buffer ' 'for contiguous_memory_optimization'), BeamParam('synchronize_checkpoint_boundary', bool, False, 'Inserts get_accelerator().synchronize() at each checkpoint boundary'), BeamParam('profile', bool, False, 'Logs the forward and backward time for each checkpoint function'), # Sparse Attention # Data Efficiency # Data Type options BeamParam('grad_accum_dtype', str, None, 'The data type for gradient accumulation.' 'Supported types: [float32, float16, bfloat16]'), ]
[docs] def recursive_dict_update(d, u): ''' Merge two dicts recursively and update the values of the first one with the values of the second one. @param d: @param u: @return: ''' for k, v in u.items(): if isinstance(v, dict): d[k] = recursive_dict_update(d.get(k, {}), v) else: d[k] = v return d
[docs] def recursive_to_dict(obj): if isinstance(obj, defaultdict) or isinstance(obj, dict): return dict([(k, recursive_to_dict(v)) for k, v in obj.items()]) return obj
[docs] def deepspeed_config_generator(hparams): config = defaultdict(dict) target = hparams.get('training_framework', 'deepspeed') config["train_micro_batch_size_per_gpu"] = hparams.get('batch_size_train') or hparams.get('batch_size') config["gradient_accumulation_steps"] = hparams.get('accumulate') if hparams.get('zero_stage', 2) is not None: config['zero_optimization']['stage'] = hparams.get('zero_stage', 2) config['stage3_gather_16bit_weights_on_model_save'] = ( hparams.get('stage3_gather_16bit_weights_on_model_save', False)) if hparams.get('offload_param_device', None) is not None: config['zero_optimization']['offload_param'] = {} config['zero_optimization']['offload_param']['device'] = hparams.get('offload_param_device') config['zero_optimization']['offload_param']['pin_memory'] = hparams.get('offload_param_pin_memory', False) if hparams.get('offload_optimizer_device', None) is not None: config['zero_optimization']['offload_optimizer'] = {} config['zero_optimization']['offload_optimizer']['device'] = hparams.get('offload_optimizer_device') config['zero_optimization']['offload_optimizer']['pin_memory'] = hparams.get('offload_optimizer_pin_memory', False) # optimizer if target == 'deepspeed': config['optimizer']['type'] = hparams.get('deepspeed_optimizer', 'AdamW') config['optimizer']['params'] = {'lr': hparams.get('lr-dense')} if hparams.get('weight_decay', None) is not None: config['optimizer']['params']['weight_decay'] = hparams.get('weight_decay') if 'adam' in config['optimizer']['type'].lower(): config['optimizer']['params']['betas'] = [hparams.get('momentum', 0.8), hparams.get('beta2', 0.999)] config['optimizer']['params']['eps'] = hparams.get('eps', 1e-8) # activation_checkpointing config['activation_checkpointing']['partition_activations'] = ( hparams.get('partition_activations', False)) config['activation_checkpointing']['cpu_checkpointing'] = hparams.get('cpu_checkpointing', False) config['activation_checkpointing']['contiguous_memory_optimization'] = ( hparams.get('contiguous_memory_optimization', False)) config['activation_checkpointing']['number_checkpoints'] = hparams.get('number_checkpoints', None) config['activation_checkpointing']['synchronize_checkpoint_boundary'] = ( hparams.get('synchronize_checkpoint_boundary', False)) config['activation_checkpointing']['profile'] = hparams.get('profile', False) # autotuning if hparams.get('autotuning', False): config['autotuning']['enabled'] = True model_dtype = deepspeed_dtype_mapper(hparams.get('model_dtype', 'float32')) config['fp16']['enabled'] = model_dtype == 'fp16' config['bf16']['enabled'] = model_dtype == 'bf16' # steps_per_print if target != 'accelerate': epoch_length_train = hparams.get('epoch_length_train', None) epoch_length_eval = hparams.get('epoch_length_eval', None) if epoch_length_train is not None and epoch_length_eval is not None: config['steps_per_print'] = epoch_length_train + epoch_length_eval # update config with deepspeed_config (deepspeed_config overrides all other parameters) hparams_dict = recursive_to_dict(config) if hparams.get('deepspeed_config', None) is not None: config_file_dict = beam_path(hparams.get('deepspeed_config')).read() recursive_dict_update(hparams_dict, config_file_dict) return hparams_dict