Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wenet] use torchrun for distributed training #2020

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions examples/aishell/s0/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@
# Use this to control how many gpu you use, It's 1-gpu training if you specify
# just 1gpu, otherwise it's is multiple gpu training based on DDP in pytorch
export CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"
# The NCCL_SOCKET_IFNAME variable specifies which IP interface to use for nccl
# communication. More details can be found in
# https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/env.html
# export NCCL_SOCKET_IFNAME=ens4f1
export NCCL_DEBUG=INFO

stage=0 # start from 0 if you need to start from data preparation
stop_stage=5

# The num of machines(nodes) for multi-machine training, 1 is for one machine.
# NFS is required if num_nodes > 1.
# You should change the following two parameters for multiple machine training,
# see https://pytorch.org/docs/stable/elastic/run.html
HOST_NODE_ADDR="localhost:0"
num_nodes=1

# The rank of each node or machine, which ranges from 0 to `num_nodes - 1`.
# You should set the node_rank=0 on the first machine, set the node_rank=1
# on the second machine, and so on.
node_rank=0
# The aishell dataset location, please change this to your own path
# make sure of using absolute path. DO-NOT-USE relatvie path!
data=/export/data/asr-data/OpenSLR/33/
Expand Down Expand Up @@ -128,8 +121,6 @@ if [ ${stage} -le 4 ] && [ ${stop_stage} -ge 4 ]; then
num_gpus=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
# Use "nccl" if it works, otherwise use "gloo"
dist_backend="nccl"
world_size=`expr $num_gpus \* $num_nodes`
echo "total gpus is: $world_size"
cmvn_opts=
$cmvn && cp data/${train_set}/global_cmvn $dir
$cmvn && cmvn_opts="--cmvn ${dir}/global_cmvn"
Expand Down Expand Up @@ -165,13 +156,8 @@ if [ ${stage} -le 4 ] && [ ${stop_stage} -ge 4 ]; then
--pin_memory
else
echo "using torch ddp"
for ((i = 0; i < $num_gpus; ++i)); do
{
gpu_id=$(echo $CUDA_VISIBLE_DEVICES | cut -d',' -f$[$i+1])
# Rank of each gpu/process used for knowing whether it is
# the master of a worker.
rank=`expr $node_rank \* $num_gpus + $i`
python wenet/bin/train.py --gpu $gpu_id \
torchrun --nnodes=$num_nodes --nproc_per_node=$num_gpus --rdzv_endpoint=$HOST_NODE_ADDR \
wenet/bin/train.py \
--config $train_config \
--data_type $data_type \
--symbol_table $dict \
Expand All @@ -180,16 +166,11 @@ if [ ${stage} -le 4 ] && [ ${stop_stage} -ge 4 ]; then
${checkpoint:+--checkpoint $checkpoint} \
--model_dir $dir \
--ddp.init_method $init_method \
--ddp.world_size $world_size \
--ddp.rank $rank \
--ddp.dist_backend $dist_backend \
--num_workers ${num_workers} \
--prefetch ${prefetch} \
$cmvn_opts \
--pin_memory
} &
done
wait
fi
fi

Expand Down
65 changes: 21 additions & 44 deletions wenet/bin/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,11 @@ def get_args():
help='train and cv data type')
parser.add_argument('--train_data', required=True, help='train data file')
parser.add_argument('--cv_data', required=True, help='cv data file')
parser.add_argument('--gpu',
type=int,
default=-1,
help='gpu id for this local rank, -1 for cpu')
parser.add_argument('--model_dir', required=True, help='save model dir')
parser.add_argument('--checkpoint', help='checkpoint model')
parser.add_argument('--tensorboard_dir',
default='tensorboard',
help='tensorboard log dir')
parser.add_argument('--ddp.rank',
dest='rank',
default=0,
type=int,
help='global rank for distributed training')
parser.add_argument('--ddp.world_size',
dest='world_size',
default=-1,
type=int,
help='''number of total processes/gpus for
distributed training''')
parser.add_argument('--ddp.dist_backend',
dest='dist_backend',
default='nccl',
Expand Down Expand Up @@ -149,9 +134,6 @@ def main():
args = get_args()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
# NOTE(xcsong): deepspeed set CUDA_VISIBLE_DEVICES internally
os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu) if not args.deepspeed \
else os.environ['CUDA_VISIBLE_DEVICES']

# Set random seed
torch.manual_seed(777)
Expand All @@ -169,27 +151,22 @@ def main():
else:
configs["ds_dtype"] = "fp32"

# deepspeed read world_size from env
if args.deepspeed:
assert args.world_size == -1
# distributed means pytorch native ddp, it parse world_size from args
distributed = args.world_size > 1
local_rank = args.rank
world_size = args.world_size
world_size = int(os.environ.get('WORLD_SIZE', 1))
local_rank = int(os.environ.get('LOCAL_RANK', 0))
rank = int(os.environ.get('RANK', 0))
distributed = world_size > 1
if distributed:
logging.info('training on multiple gpus, this gpu {}'.format(args.gpu))
logging.info('training on multiple gpus, this gpu {}'.format(local_rank))
torch.cuda.set_device(local_rank)
dist.init_process_group(args.dist_backend,
init_method=args.init_method,
world_size=world_size,
rank=local_rank)
rank=rank)
xingchensong marked this conversation as resolved.
Show resolved Hide resolved
elif args.deepspeed:
# Update local_rank & world_size from enviroment variables
local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE'])
deepspeed.init_distributed(dist_backend=args.dist_backend,
init_method=args.init_method,
rank=local_rank,
world_size=world_size)
world_size=world_size,
rank=rank)

symbol_table = read_symbol_table(args.symbol_table)

Expand Down Expand Up @@ -264,7 +241,7 @@ def main():
configs['is_json_cmvn'] = True
configs['lfmmi_dir'] = args.lfmmi_dir

if local_rank == 0:
if rank == 0:
saved_config_path = os.path.join(args.model_dir, 'train.yaml')
with open(saved_config_path, 'w') as fout:
data = yaml.dump(configs)
Expand All @@ -279,7 +256,7 @@ def main():
# !!!IMPORTANT!!!
# Try to export the model by script, if fails, we should refine
# the code to satisfy the script export requirements
if local_rank == 0:
if rank == 0:
script_model = torch.jit.script(model)
script_model.save(os.path.join(args.model_dir, 'init.zip'))
executor = Executor()
Expand All @@ -298,7 +275,7 @@ def main():
num_epochs = configs.get('max_epoch', 100)
model_dir = args.model_dir
writer = None
if local_rank == 0:
if rank == 0:
os.makedirs(model_dir, exist_ok=True)
exp_id = os.path.basename(model_dir)
writer = SummaryWriter(os.path.join(args.tensorboard_dir, exp_id))
Expand All @@ -320,7 +297,7 @@ def main():
elif args.deepspeed: # deepspeed
# NOTE(xcsong): look in detail how the memory estimator API works:
# https://deepspeed.readthedocs.io/en/latest/memory.html#discussion
if local_rank == 0:
if rank == 0:
logging.info("Estimating model states memory needs (zero2)...")
estimate_zero2_model_states_mem_needs_all_live(
model, num_gpus_per_node=world_size, num_nodes=1)
Expand All @@ -330,7 +307,7 @@ def main():
device = None # Init device later
pass # Init DeepSpeed later
else:
use_cuda = args.gpu >= 0 and torch.cuda.is_available()
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
model = model.to(device)

Expand Down Expand Up @@ -370,7 +347,7 @@ def scheduler(opt):
lr_scheduler=scheduler, model_parameters=model.parameters())

final_epoch = None
configs['rank'] = local_rank
configs['rank'] = rank
configs['is_distributed'] = distributed # pytorch native ddp
configs['is_deepspeed'] = args.deepspeed # deepspeed
configs['use_amp'] = args.use_amp
Expand All @@ -380,11 +357,11 @@ def scheduler(opt):
# https://github.com/microsoft/DeepSpeed/issues/2993
with torch.no_grad():
model.save_checkpoint(save_dir=model_dir, tag='init')
if args.save_states == "model_only" and local_rank == 0:
if args.save_states == "model_only" and rank == 0:
convert_zero_checkpoint_to_fp32_state_dict(
model_dir, "{}/init.pt".format(model_dir), tag='init')
os.system("rm -rf {}/{}".format(model_dir, "init"))
elif not args.deepspeed and start_epoch == 0 and local_rank == 0:
elif not args.deepspeed and start_epoch == 0 and rank == 0:
save_model_path = os.path.join(model_dir, 'init.pt')
save_checkpoint(model, save_model_path)

Expand Down Expand Up @@ -413,7 +390,7 @@ def scheduler(opt):
'epoch': epoch, 'lr': lr, 'cv_loss': cv_loss, 'step': executor.step,
'save_time': datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
}
if local_rank == 0:
if rank == 0:
writer.add_scalar('epoch/cv_loss', cv_loss, epoch)
writer.add_scalar('epoch/lr', lr, epoch)
with open("{}/{}.yaml".format(model_dir, epoch), 'w') as fout:
Expand All @@ -427,17 +404,17 @@ def scheduler(opt):
model.save_checkpoint(save_dir=model_dir,
tag='{}'.format(epoch),
client_state=infos)
if args.save_states == "model_only" and local_rank == 0:
if args.save_states == "model_only" and rank == 0:
convert_zero_checkpoint_to_fp32_state_dict(
model_dir, "{}/{}.pt".format(model_dir, epoch),
tag='{}'.format(epoch))
os.system("rm -rf {}/{}".format(model_dir, epoch))
elif not args.deepspeed and local_rank == 0:
elif not args.deepspeed and rank == 0:
save_model_path = os.path.join(model_dir, '{}.pt'.format(epoch))
save_checkpoint(model, save_model_path, infos)
final_epoch = epoch

if final_epoch is not None and local_rank == 0:
if final_epoch is not None and rank == 0:
final_model_path = os.path.join(model_dir, 'final.pt')
os.remove(final_model_path) if os.path.exists(final_model_path) else None
os.symlink('{}.pt'.format(final_epoch), final_model_path)
Expand Down