Skip to content

Commit

Permalink
add missing from_descriptor methods,
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 29, 2024
1 parent 149545c commit 19d35ab
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 96 deletions.
12 changes: 12 additions & 0 deletions smartsim/_core/mli/comm/channel/dragonchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import sys
import typing as t

Expand Down Expand Up @@ -59,3 +60,14 @@ def recv(self) -> t.List[bytes]:
with self._channel.recvh(timeout=None) as recvh:
message_bytes: bytes = recvh.recv_bytes(timeout=None)
return [message_bytes]

@classmethod
def from_descriptor(
cls,
descriptor: str,
) -> "DragonCommChannel":
try:
return DragonCommChannel(base64.b64decode(descriptor))
except:
print(f"failed to create dragon comm channel: {descriptor}")
raise
12 changes: 8 additions & 4 deletions smartsim/_core/mli/comm/channel/dragonfli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def from_descriptor(
cls,
descriptor: str,
) -> "DragonFLIChannel":
return DragonFLIChannel(
fli_desc=base64.b64decode(descriptor),
sender_supplied=True,
)
try:
return DragonFLIChannel(
fli_desc=base64.b64decode(descriptor),
sender_supplied=True,
)
except:
logger.error(f"Error while creating DragonFLIChannel: {descriptor}")
raise
7 changes: 1 addition & 6 deletions smartsim/_core/mli/infrastructure/environmentloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ def get_queue(self) -> t.Optional[CommChannelBase]:
logger.warning("No queue factory is configured")
return None

if descriptor is not None:
# , sender_supplied: bool = True
# self.queue = DragonFLIChannel(
# fli_desc=base64.b64decode(descriptor),
# sender_supplied=sender_supplied,
# )
if descriptor is not None and descriptor:
self.queue = self._queue_factory(descriptor)
self._queue_descriptor = descriptor
return self.queue
19 changes: 5 additions & 14 deletions smartsim/_core/mli/infrastructure/storage/dragonfeaturestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,9 @@ def from_descriptor(
descriptor: str,
# b64encoded: bool = False,
) -> "DragonFeatureStore":
# import dragon.data.ddict.ddict as dragon_ddict # pylint: disable=import-outside-toplevel

# # if b64encoded:
# # descriptor = base64.b64decode(descriptor).encode("utf-8")
# # ddict = DDict.attach(descriptor)
# # ddict.attach(descriptor)

# storage = dragon_ddict.DDict()
# storage.attach(descriptor)
# return DragonFeatureStore(storage)

if descriptor is None:
print("foo")
return None
return DragonFeatureStore({"tmp": "here"})
try:
return DragonFeatureStore(dragon_ddict.DDict.attach(descriptor))
except:
print(f"error creating dragon feature store: {descriptor}")
raise
4 changes: 4 additions & 0 deletions smartsim/_core/mli/infrastructure/storage/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

from pydantic import BaseModel, Field

from smartsim.log import get_logger

logger = get_logger(__name__)


class FeatureStoreKey(BaseModel):
"""A key,descriptor pair enabling retrieval of an item from a feature store"""
Expand Down
48 changes: 18 additions & 30 deletions tests/dragon/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import smartsim.error as sse
from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore
from smartsim.log import get_logger

logger = get_logger(__name__)


class MemoryFeatureStore(FeatureStore):
Expand Down Expand Up @@ -69,9 +72,13 @@ class FileSystemFeatureStore(FeatureStore):
"""Alternative feature store implementation for testing. Stores all
data on the file system"""

def __init__(self, storage_dir: t.Optional[pathlib.Path] = None) -> None:
def __init__(
self, storage_dir: t.Optional[t.Union[pathlib.Path, str]] = None
) -> None:
"""Initialize the FileSystemFeatureStore instance
:param storage_dir: (optional) root directory to store all data relative to"""
if isinstance(storage_dir, str):
storage_dir = pathlib.Path(storage_dir)
self._storage_dir = storage_dir

def __getitem__(self, key: str) -> bytes:
Expand Down Expand Up @@ -127,33 +134,14 @@ def from_descriptor(
) -> "FileSystemFeatureStore":
# if b64encoded:
# descriptor = base64.b64decode(descriptor).encode("utf-8")
path = pathlib.Path(descriptor)
if not path.is_dir():
raise ValueError("FileSystemFeatureStore requires a directory path")
if not path.exists():
try:
path = pathlib.Path(descriptor)
path.mkdir(parents=True, exist_ok=True)
return FileSystemFeatureStore(path)


class DragonDict:
"""Mock implementation of a dragon dictionary"""

def __init__(self) -> None:
"""Initialize the mock DragonDict instance"""
self._storage: t.Dict[bytes, t.Any] = {}

def __getitem__(self, key: bytes) -> t.Any:
"""Retrieve an item using key
:param key: Unique key of an item to retrieve from the feature store"""
return self._storage[key]

def __setitem__(self, key: bytes, value: t.Any) -> None:
"""Assign a value using key
:param key: Unique key of an item to set in the feature store
:param value: Value to persist in the feature store"""
self._storage[key] = value

def __contains__(self, key: bytes) -> bool:
"""Return `True` if the key is found, `False` otherwise
:param key: Unique key of an item to retrieve from the feature store"""
return key in self._storage
if not path.is_dir():
raise ValueError("FileSystemFeatureStore requires a directory path")
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return FileSystemFeatureStore(path)
except:
logger.error(f"Error while creating FileSystemFeatureStore: {descriptor}")
raise
25 changes: 18 additions & 7 deletions tests/dragon/utils/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pathlib
import threading
import typing as t

from smartsim._core.mli.comm.channel.channel import CommChannelBase
Expand All @@ -38,6 +39,8 @@ class FileSystemCommChannel(CommChannelBase):

def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None:
"""Initialize the FileSystemCommChannel instance"""
self._lock = threading.RLock()

if not isinstance(key, bytes):
super().__init__(key.as_posix().encode("utf-8"))
self._file_path = key
Expand All @@ -56,20 +59,28 @@ def send(self, value: bytes) -> None:
logger.debug(
f"Channel {self.descriptor.decode('utf-8')} sending message to {self._file_path}"
)
self._file_path.write_bytes(value)
with self._lock:
self._file_path.write_bytes(value)

def recv(self) -> bytes:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
...
with self._lock:
if self._file_path.exists():
incoming = self._file_path.read_bytes()
self._file_path.unlink()
return incoming

@classmethod
def from_descriptor(
cls,
descriptor: t.Union[str, bytes],
) -> "FileSystemCommChannel":
if isinstance(descriptor, str):
path = pathlib.Path(descriptor)
else:
path = pathlib.Path(descriptor.decode("utf-8"))
return FileSystemCommChannel(path)
try:
if isinstance(descriptor, str):
path = pathlib.Path(descriptor)
else:
path = pathlib.Path(descriptor.decode("utf-8"))
return FileSystemCommChannel(path)
except:
print("failed to create FS comm channel: {descriptor}")
21 changes: 16 additions & 5 deletions tests/mli/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pathlib
import threading
import typing as t

from smartsim._core.mli.comm.channel.channel import CommChannelBase
Expand All @@ -38,6 +39,7 @@ class FileSystemCommChannel(CommChannelBase):

def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None:
"""Initialize the FileSystemCommChannel instance"""
self._lock = threading.RLock()
if not isinstance(key, bytes):
super().__init__(key.as_posix().encode("utf-8"))
self._file_path = key
Expand All @@ -56,17 +58,26 @@ def send(self, value: bytes) -> None:
logger.debug(
f"Channel {self.descriptor.decode('utf-8')} sending message to {self._file_path}"
)
self._file_path.write_bytes(value)
with self._lock:
self._file_path.write_bytes(value)

def recv(self) -> t.List[bytes]:
def recv(self) -> bytes:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
self._file_path.read_bytes()
with self._lock:
if self._file_path.exists():
incoming = self._file_path.read_bytes()
self._file_path.unlink()
return incoming

@classmethod
def from_descriptor(
cls,
descriptor: str,
) -> "FileSystemCommChannel":
path = pathlib.Path(descriptor)
return FileSystemCommChannel(path)
try:
path = pathlib.Path(descriptor)
return FileSystemCommChannel(path)
except:
print(f"failed to create fs comm channel: {descriptor}")
raise
48 changes: 18 additions & 30 deletions tests/mli/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import smartsim.error as sse
from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore
from smartsim.log import get_logger

logger = get_logger(__name__)


class MemoryFeatureStore(FeatureStore):
Expand Down Expand Up @@ -69,9 +72,13 @@ class FileSystemFeatureStore(FeatureStore):
"""Alternative feature store implementation for testing. Stores all
data on the file system"""

def __init__(self, storage_dir: t.Optional[pathlib.Path] = None) -> None:
def __init__(
self, storage_dir: t.Optional[t.Union[pathlib.Path, str]] = None
) -> None:
"""Initialize the FileSystemFeatureStore instance
:param storage_dir: (optional) root directory to store all data relative to"""
if isinstance(storage_dir, str):
storage_dir = pathlib.Path(storage_dir)
self._storage_dir = storage_dir

def __getitem__(self, key: str) -> bytes:
Expand Down Expand Up @@ -127,33 +134,14 @@ def from_descriptor(
) -> "FileSystemFeatureStore":
# if b64encoded:
# descriptor = base64.b64decode(descriptor).encode("utf-8")
path = pathlib.Path(descriptor)
if not path.is_dir():
raise ValueError("FileSystemFeatureStore requires a directory path")
if not path.exists():
try:
path = pathlib.Path(descriptor)
path.mkdir(parents=True, exist_ok=True)
return FileSystemFeatureStore(path)


class DragonDict:
"""Mock implementation of a dragon dictionary"""

def __init__(self) -> None:
"""Initialize the mock DragonDict instance"""
self._storage: t.Dict[bytes, t.Any] = {}

def __getitem__(self, key: bytes) -> t.Any:
"""Retrieve an item using key
:param key: Unique key of an item to retrieve from the feature store"""
return self._storage[key]

def __setitem__(self, key: bytes, value: t.Any) -> None:
"""Assign a value using key
:param key: Unique key of an item to set in the feature store
:param value: Value to persist in the feature store"""
self._storage[key] = value

def __contains__(self, key: bytes) -> bool:
"""Return `True` if the key is found, `False` otherwise
:param key: Unique key of an item to retrieve from the feature store"""
return key in self._storage
if not path.is_dir():
raise ValueError("FileSystemFeatureStore requires a directory path")
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return FileSystemFeatureStore(path)
except:
logger.error(f"Error while creating FileSystemFeatureStore: {descriptor}")
raise

0 comments on commit 19d35ab

Please sign in to comment.