Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NeMo-UX] Adding file-lock to Connector #9400

Merged
merged 7 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nemo/collections/llm/gpt/model/mistral_7b.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Mistral7BConfig(GPTConfig):

class Mistral7BModel(GPTModel):
def __init__(self, config: Optional[Mistral7BConfig] = None, tokenizer=None):
_tokenizer = tokenizer or HFMistral7BImporter().tokenizer
_tokenizer = tokenizer or HFMistral7BImporter("mistralai/Mistral-7B-v0.1").tokenizer

super().__init__(config or Mistral7BConfig(), _tokenizer)

Expand All @@ -56,6 +56,8 @@ def apply(self, output_path: Path) -> Path:
self.convert_state(source, target)
self.nemo_save(output_path, trainer)

print(f"Converted Mistral 7B model to Nemo, model saved to {output_path}")

teardown(trainer, target)
del trainer, target

Expand Down
29 changes: 24 additions & 5 deletions nemo/lightning/io/connector.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import shutil
from pathlib import Path, PosixPath, WindowsPath
from typing import Generic, Optional, Tuple, TypeVar

import pytorch_lightning as pl
from filelock import FileLock, Timeout

# Dynamically inherit from the correct Path subclass based on the operating system.
if os.name == 'nt':
Expand Down Expand Up @@ -47,6 +49,7 @@ class Connector(BasePath, Generic[SourceT, TargetT]):
"""

default_path = None
LOCK_TIMEOUT = 1200

def init(self) -> TargetT:
raise NotImplementedError()
Expand All @@ -63,13 +66,29 @@ def __new__(cls, *args, **kwargs):

def __call__(self, output_path: Optional[Path] = None, overwrite: bool = False) -> Path:
_output_path = output_path or self.local_path()
lock_path = _output_path.with_suffix(_output_path.suffix + '.lock')
lock = FileLock(lock_path)

if overwrite and _output_path.exists():
shutil.rmtree(_output_path)
# Check if the lock file exists and set overwrite to False if it does
if lock_path.exists():
overwrite = False

if not _output_path.exists():
to_return = self.apply(_output_path)
_output_path = to_return or _output_path
try:
with lock.acquire(timeout=self.LOCK_TIMEOUT):
if overwrite and _output_path.exists():
shutil.rmtree(_output_path)

if not _output_path.exists():
to_return = self.apply(_output_path)
_output_path = to_return or _output_path

except Timeout:
logging.error(f"Timeout occurred while trying to acquire the lock for {_output_path}")
raise

except Exception as e:
logging.error(f"An error occurred: {e}")
raise

return _output_path

Expand Down
Loading