Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing rng sync when using custom sampler and batch_sampler #696

Merged

Conversation

pacman100
Copy link
Contributor

@pacman100 pacman100 commented Sep 12, 2022

What does this PR do?

  1. Fixes Error in prepared DataLoader with BatchSampler #679. PR Fix DataLoader with samplers that are batch samplers #687 introduced support for custom samplers that are batch_samplers (e.g., my_awesome_sampler=BatchSampler()) but it had the following issues
    a. Default collating was failing, I had to tweak the collate_fn which isn't a good experience for the user.
    b. Duplicate samples were being yielded and gather_for_metrics wasn't removing ending excess samples.
    With the changes in this PR, I am running below dataloader_checks.py script with the following command. Results are as per expectations.
accelerate launch --multi_gpu --num_processes 2 --num_machines 1 dataloader_checks.py

dataloader_checks.py: Builds upon the nice MRE given by @etiennebeaulac

import torch
from torch.utils.data import default_collate, Dataset, DataLoader, BatchSampler, RandomSampler, SequentialSampler
import numpy
import accelerate
from accelerate import Accelerator
from accelerate.utils.random import synchronize_rng_state
from accelerate.utils.dataclasses import DistributedType, RNGType
import os
import sys
import platform

class MyDataset(Dataset):
    def __len__(self):
        return 22
    
    def __getitem__(self, index):
#         print("MyDataset __getitem__", index)

        squeeze = False

        if isinstance(index, int):
            index = [index]
            squeeze = True
        elif isinstance(index, slice):
            index = list(range(*index.indices(self.size)))
        else:
            index = list(index)

        batch = [{"index": i, "label": i % 2} for i in index]
#         print(batch)

        if squeeze:
            batch = batch[0]

        return batch
    

if __name__ == "__main__":
    dataset = MyDataset()
    accelerator = Accelerator()
    
    accelerator.print("Starting conventional Dataloader with shuffle=False. Eval mode in general")
    loader = DataLoader(dataset, shuffle=False, batch_size=4)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending conventional Dataloader with shuffle=False. Eval mode in general")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting conventional Dataloader with shuffle=True")
    loader = DataLoader(dataset, shuffle=True, batch_size=4)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending conventional Dataloader with shuffle=True")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting Dataloader with batch_sampler=BatchSampler()")
    sampler = BatchSampler(RandomSampler(dataset), batch_size=4, drop_last=False)
    loader = DataLoader(dataset, batch_sampler=sampler)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending Dataloader with batch_sampler=BatchSampler()")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting Dataloader with sampler=BatchSampler()")
    sampler = BatchSampler(RandomSampler(dataset), batch_size=4, drop_last=False)
    loader = DataLoader(dataset, sampler=sampler, batch_size=None, collate_fn=default_collate)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending Dataloader with sampler=BatchSampler()")

The output logs: Broken down in respective chunks for easier understanding

Starting conventional Dataloader with shuffle=False. Eval mode in general                                                      
0 | batch #0 = {'index': tensor([0, 1, 2, 3], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0')}                
1 | batch #0 = {'index': tensor([4, 5, 6, 7], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1')}                
0 | gathered batch #0 | index = tensor([0, 1, 2, 3, 4, 5, 6, 7], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1, 0, 1], dev
ice='cuda:0')                                                                                                                  
1 | batch #1 = {'index': tensor([12, 13, 14, 15], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1')}            
0 | batch #1 = {'index': tensor([ 8,  9, 10, 11], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0')}            
0 | gathered batch #1 | index = tensor([ 8,  9, 10, 11, 12, 13, 14, 15], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1, 0,
 1], device='cuda:0')                                                                                                          
1 | batch #2 = {'index': tensor([20, 21,  0,  1], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1')}            
0 | batch #2 = {'index': tensor([16, 17, 18, 19], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0')}            
0 | gathered batch #2 | index = tensor([16, 17, 18, 19, 20, 21], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1], device='c
uda:0')                                                                                                                        
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]                            
Ending conventional Dataloader with shuffle=False. Eval mode in general 
Starting conventional Dataloader with shuffle=True                                                                             
1 | batch #0 = {'index': tensor([19, 13, 18, 11], device='cuda:1'), 'label': tensor([1, 1, 0, 1], device='cuda:1')}0 | batch #0
 = {'index': tensor([17,  5, 12, 15], device='cuda:0'), 'label': tensor([1, 1, 0, 1], device='cuda:0')}                        
                                                                                                                               
0 | gathered batch #0 | index = tensor([17,  5, 12, 15, 19, 13, 18, 11], device='cuda:0'), label = tensor([1, 1, 0, 1, 1, 1, 0,
 1], device='cuda:0')                                                                                                          
1 | batch #1 = {'index': tensor([8, 9, 2, 0], device='cuda:1'), 'label': tensor([0, 1, 0, 0], device='cuda:1')}                
0 | batch #1 = {'index': tensor([ 7, 14, 10,  1], device='cuda:0'), 'label': tensor([1, 0, 0, 1], device='cuda:0')}            
0 | gathered batch #1 | index = tensor([ 7, 14, 10,  1,  8,  9,  2,  0], device='cuda:0'), label = tensor([1, 0, 0, 1, 0, 1, 0,
 0], device='cuda:0')                                                                                                          
1 | batch #2 = {'index': tensor([16,  3, 17,  5], device='cuda:1'), 'label': tensor([0, 1, 1, 1], device='cuda:1')}            
0 | batch #2 = {'index': tensor([20,  6, 21,  4], device='cuda:0'), 'label': tensor([0, 0, 1, 0], device='cuda:0')}            
0 | gathered batch #2 | index = tensor([20,  6, 21,  4, 16,  3], device='cuda:0'), label = tensor([0, 0, 1, 0, 0, 1], device='c
uda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending conventional Dataloader with shuffle=True
Starting Dataloader with batch_sampler=BatchSampler()                                                                          
1 | batch #0 = {'index': tensor([18, 12, 10,  2], device='cuda:1'), 'label': tensor([0, 0, 0, 0], device='cuda:1')}
0 | batch #0 = {'index': tensor([ 4, 16,  3, 19], device='cuda:0'), 'label': tensor([0, 0, 1, 1], device='cuda:0')}
0 | gathered batch #0 | index = tensor([ 4, 16,  3, 19, 18, 12, 10,  2], device='cuda:0'), label = tensor([0, 0, 1, 1, 0, 0, 0,
 0], device='cuda:0')
1 | batch #1 = {'index': tensor([ 6,  9,  0, 14], device='cuda:1'), 'label': tensor([0, 1, 0, 0], device='cuda:1')}
0 | batch #1 = {'index': tensor([ 8, 17, 15, 20], device='cuda:0'), 'label': tensor([0, 1, 1, 0], device='cuda:0')}
0 | gathered batch #1 | index = tensor([ 8, 17, 15, 20,  6,  9,  0, 14], device='cuda:0'), label = tensor([0, 1, 1, 0, 0, 1, 0,
 0], device='cuda:0')
1 | batch #2 = {'index': tensor([ 7, 11,  4, 16], device='cuda:1'), 'label': tensor([1, 1, 0, 0], device='cuda:1')}
0 | batch #2 = {'index': tensor([13, 21,  1,  5], device='cuda:0'), 'label': tensor([1, 1, 1, 1], device='cuda:0')}
0 | gathered batch #2 | index = tensor([13, 21,  1,  5,  7, 11], device='cuda:0'), label = tensor([1, 1, 1, 1, 1, 1], device='c
uda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending Dataloader with batch_sampler=BatchSampler()
Starting Dataloader with sampler=BatchSampler()
1 | batch #0 = {'index': tensor([ 2, 21,  0,  5], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1')}
0 | batch #0 = {'index': tensor([13, 15, 18,  4], device='cuda:0'), 'label': tensor([1, 1, 0, 0], device='cuda:0')}
0 | gathered batch #0 | index = tensor([13, 15, 18,  4,  2, 21,  0,  5], device='cuda:0'), label = tensor([1, 1, 0, 0, 0, 1, 0, 1], device='cuda:0')
1 | batch #1 = {'index': tensor([12,  1, 11,  6], device='cuda:1'), 'label': tensor([0, 1, 1, 0], device='cuda:1')}
0 | batch #1 = {'index': tensor([20, 19,  8,  9], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0')}
0 | gathered batch #1 | index = tensor([20, 19,  8,  9, 12,  1, 11,  6], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1, 1, 0], device='cuda:0')
1 | batch #2 = {'index': tensor([14, 10, 13, 15], device='cuda:1'), 'label': tensor([0, 0, 1, 1], device='cuda:1')}
0 | batch #2 = {'index': tensor([16,  3, 17,  7], device='cuda:0'), 'label': tensor([0, 1, 1, 1], device='cuda:0')}
0 | gathered batch #2 | index = tensor([16,  3, 17,  7, 14, 10], device='cuda:0'), label = tensor([0, 1, 1, 1, 0, 0], device='cuda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending Dataloader with sampler=BatchSampler()

@pacman100 pacman100 requested a review from sgugger September 12, 2022 09:34
@HuggingFaceDocBuilderDev
Copy link

HuggingFaceDocBuilderDev commented Sep 12, 2022

The documentation is not available anymore as the PR was closed or merged.

Copy link
Collaborator

@sgugger sgugger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing those!

src/accelerate/data_loader.py Outdated Show resolved Hide resolved
src/accelerate/data_loader.py Outdated Show resolved Hide resolved
@pacman100 pacman100 requested a review from sgugger September 12, 2022 13:50
@pacman100 pacman100 merged commit 8d27597 into huggingface:main Sep 12, 2022
@pacman100 pacman100 deleted the smangrul/fix-rng-sync-batch-sampler branch September 12, 2022 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Error in prepared DataLoader with BatchSampler
4 participants