Skip to content

Commit

Permalink
Prototype ray task (#29)
Browse files Browse the repository at this point in the history
## Overview
Introduces Ray framework for distributed task execution and
multiprocessing support in Datamimic. This change moves us from
single-process execution to a distributed model where:

- Parent process handles model parsing and orchestration -> current
parent process handles model parsing, but does not do orchestration too
much, only gathering generated result
- Ray workers handle parallel data generation and processing -> done
- Improved resource utilization across multiple cores -> still get some
performance as pure multiprocessing before, will check if utilizing
across ray clusters

## Key Features
- New GenerateWorker class using Ray for distributed execution -> done
- Parent/worker process separation for better resource management ->
done
- Automatic scaling across available CPU cores -> done
- Proper context isolation between workers -> done
- Efficient cleanup of resources -> done

## Technical Implementation
- Ray workers handle chunks of data generation tasks -> done
- Parent process manages task distribution and result aggregation ->
done
- Context copying ensures proper isolation between workers -> done
- Resource cleanup in finally blocks -> done
- Error propagation from workers to parent process -> need check more

## Architecture

Parent Process -> done
- Parse models
- Initialize Ray
- Distribute tasks to workers
- Aggregate results

Ray Workers (@ray.remote) -> use ray task (ray.remote on function)
instead of ray actor (class object) to reduce unnecessary overhead of
creating and managing ray actor.
- Handle data generation
- Process data chunks
- Manage database connections -> not need
- Clean up resources

## Notes
This is our first implementation using Ray for distributed processing.
The architecture separates concerns between parent orchestration and
worker execution while maintaining our existing model parsing and
validation.

---------

Co-authored-by: Dang Ly <[email protected]>
  • Loading branch information
dangsg and Dang Ly authored Jan 24, 2025
1 parent 7394fa7 commit ab77e5c
Show file tree
Hide file tree
Showing 50 changed files with 1,387 additions and 1,704 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
#WARNING: Please make sure this is set correctly for desired deployment behavior
RUNTIME_ENVIRONMENT=development
SQLALCHEMY_WARN_20=1
RAY_DEBUG=False

2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
- '[0-9]+.[0-9]+.[0-9]+'
pull_request:
branches:
- development
- "*"

env:
RUNTIME_ENVIRONMENT: production
Expand Down
2 changes: 2 additions & 0 deletions datamimic_ce/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Settings(BaseSettings):
DEFAULT_LOGGER: str = "DATAMIMIC"

LIB_EDITION: str = "CE"

RAY_DEBUG: bool = False
model_config = SettingsConfigDict(env_file=".env", case_sensitive=True, extra="ignore")


Expand Down
19 changes: 15 additions & 4 deletions datamimic_ce/contexts/geniter_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, parent: Context, current_name: str):
self._current_name = current_name
self._current_product: dict = {}
self._current_variables: dict = {}
self._namespace: dict = {}
self._worker_id: int | None = None

@property
def current_name(self) -> str:
Expand All @@ -42,6 +42,20 @@ def current_variables(self) -> dict:
def parent(self) -> Context:
return self._parent

@property
def worker_id(self) -> int:
if self._worker_id is not None:
return self._worker_id

if isinstance(self._parent, GenIterContext):
return self._parent.worker_id

raise ValueError("Worker ID not found in context hierarchy.")

@worker_id.setter
def worker_id(self, value: int) -> None:
self._worker_id = value

def add_current_product_field(self, key_path, value):
"""
Add field to current product using string key path (i.e. "data.people.name")
Expand All @@ -50,6 +64,3 @@ def add_current_product_field(self, key_path, value):
:return:
"""
dict_nested_update(self.current_product, key_path, value)

def get_namespace(self):
return self._namespace
4 changes: 2 additions & 2 deletions datamimic_ce/contexts/setup_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
memstore_manager: MemstoreManager,
task_id: str,
test_mode: bool,
test_result_exporter: TestResultExporter | None,
test_result_exporter: TestResultExporter,
default_separator: str,
default_locale: str,
default_dataset: str,
Expand Down Expand Up @@ -291,7 +291,7 @@ def test_mode(self) -> bool:
return self._test_mode

@property
def test_result_exporter(self) -> TestResultExporter | None:
def test_result_exporter(self) -> TestResultExporter:
return self._test_result_exporter

@property
Expand Down
26 changes: 17 additions & 9 deletions datamimic_ce/datamimic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
# For questions and support, contact: [email protected]
import argparse
import logging
import os
import traceback
import uuid
from pathlib import Path

# Avoid deduplication of logs in Ray, MUST be set before importing ray
os.environ["RAY_DEDUP_LOGS"] = "0"

import ray

from datamimic_ce.config import settings
from datamimic_ce.exporters.test_result_exporter import TestResultExporter
from datamimic_ce.logger import logger, setup_logger
Expand All @@ -20,30 +26,32 @@

LOG_FILE = "datamimic.log"

ray.init(ignore_reinit_error=True, local_mode=settings.RAY_DEBUG, include_dashboard=False)


class DataMimic:
def __init__(
self,
descriptor_path: Path,
task_id: str | None = None,
platform_props: dict[str, str] | None = None,
platform_configs: dict | None = None,
test_mode: bool = False,
args: argparse.Namespace | None = None,
self,
descriptor_path: Path,
task_id: str | None = None,
platform_props: dict[str, str] | None = None,
platform_configs: dict | None = None,
test_mode: bool = False,
args: argparse.Namespace | None = None,
):
"""
Initialize DataMimic with descriptor_path.
"""
# Set up logger
log_level = getattr(logging, args.log_level.upper(), logging.INFO) if args else logging.INFO
setup_logger(logger_name=settings.DEFAULT_LOGGER, task_id=task_id, level=log_level)
setup_logger(logger_name=settings.DEFAULT_LOGGER, worker_name="MAIN", level=log_level)

self._task_id = task_id or uuid.uuid4().hex
self._descriptor_path = descriptor_path
self._platform_props = platform_props
self._platform_configs = platform_configs
self._test_mode = test_mode
self._test_result_storage = TestResultExporter() if test_mode else None
self._test_result_storage = TestResultExporter()

# Initialize logging
log_system_info()
Expand Down
28 changes: 12 additions & 16 deletions datamimic_ce/exporters/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from datamimic_ce.contexts.setup_context import SetupContext
from datamimic_ce.exporters.unified_buffered_exporter import UnifiedBufferedExporter
from datamimic_ce.logger import logger
from datamimic_ce.utils.multiprocessing_page_info import MultiprocessingPageInfo


class CSVExporter(UnifiedBufferedExporter):
Expand All @@ -19,17 +18,16 @@ class CSVExporter(UnifiedBufferedExporter):
"""

def __init__(
self,
setup_context: SetupContext,
product_name: str,
page_info: MultiprocessingPageInfo,
chunk_size: int | None,
fieldnames: list[str] | None,
delimiter: str | None,
quotechar: str | None,
quoting: int | None,
line_terminator: str | None,
encoding: str | None,
self,
setup_context: SetupContext,
product_name: str,
chunk_size: int | None,
fieldnames: list[str] | None,
delimiter: str | None,
quotechar: str | None,
quoting: int | None,
line_terminator: str | None,
encoding: str | None,
):
# Remove singleton pattern and initialize instance variables
self.fieldnames = fieldnames or []
Expand All @@ -46,18 +44,17 @@ def __init__(
setup_context=setup_context,
product_name=product_name,
chunk_size=chunk_size,
page_info=page_info,
encoding=encoding,
)
logger.info(
f"CSVExporter initialized with chunk size {chunk_size}, fieldnames '{fieldnames}', "
f"encoding '{self._encoding}', delimiter '{self.delimiter}'"
)

def _write_data_to_buffer(self, data: list[dict]) -> None:
def _write_data_to_buffer(self, data: list[dict], worker_id: int, chunk_idx: int) -> None:
"""Writes data to the current buffer file in CSV format."""
try:
buffer_file = self._get_buffer_file()
buffer_file = self._get_buffer_file(worker_id, chunk_idx)
if buffer_file is None:
return
write_header = not buffer_file.exists()
Expand All @@ -77,7 +74,6 @@ def _write_data_to_buffer(self, data: list[dict]) -> None:
for record in data:
writer.writerow(record)
logger.debug(f"Wrote {len(data)} records to buffer file: {buffer_file}")
self._is_first_write = False
except Exception as e:
logger.error(f"Error writing data to buffer: {e}")
raise
Expand Down
8 changes: 2 additions & 6 deletions datamimic_ce/exporters/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
# See LICENSE file for the full text of the license.
# For questions and support, contact: [email protected]

from abc import ABC, abstractmethod


class Exporter(ABC):
@abstractmethod
def consume(self, product: tuple):
pass
class Exporter:
pass
84 changes: 84 additions & 0 deletions datamimic_ce/exporters/exporter_state_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# DATAMIMIC
# Copyright (c) 2023-2024 Rapiddweller Asia Co., Ltd.
# This software is licensed under the MIT License.
# See LICENSE file for the full text of the license.
# For questions and support, contact: [email protected]


class ExporterStateManager:
"""
Manages the state of exporters for each worker.
"""

def __init__(self, worker_id):
self._worker_id = worker_id
self._storage_dict = {}

@property
def worker_id(self):
return self._worker_id

def add_storage(self, key: str, chunk_size: int):
self._storage_dict[key] = ExporterStateStorage(chunk_size)

def get_storage(self, key: str):
if key not in self._storage_dict:
self._storage_dict[key] = ExporterStateStorage(None)
return self._storage_dict[key]

def load_exporter_state(self, key: str):
storage = self.get_storage(key)

return storage.global_counter, storage.current_counter, storage.chunk_index, storage.chunk_size

def rotate_chunk(self, key: str):
storage = self.get_storage(key)

storage.chunk_index = storage.chunk_index + 1
storage.current_counter = 0

def save_state(self, key: str, global_counter: int, current_counter: int):
storage = self.get_storage(key)

storage.global_counter = global_counter
storage.current_counter = current_counter


class ExporterStateStorage:
"""
Stores the state of an exporter for a worker.
"""

def __init__(self, chunk_size: int | None):
self._global_counter = 0
self._current_counter = 0
self._chunk_index = 0
self._chunk_size = chunk_size

@property
def global_counter(self):
return self._global_counter

@global_counter.setter
def global_counter(self, value):
self._global_counter = value

@property
def current_counter(self):
return self._current_counter

@current_counter.setter
def current_counter(self, value):
self._current_counter = value

@property
def chunk_size(self):
return self._chunk_size

@property
def chunk_index(self):
return self._chunk_index

@chunk_index.setter
def chunk_index(self, value):
self._chunk_index = value
Loading

0 comments on commit ab77e5c

Please sign in to comment.