Skip to content

Commit

Permalink
Issue #2: build ZkMemoizer from config
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 19, 2022
1 parent 1bef5ea commit 2263a71
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
43 changes: 36 additions & 7 deletions src/openeo_aggregator/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import json
import logging
import time
from typing import Callable, Tuple, Union
from typing import Callable, Tuple, Union, Optional

import kazoo.exceptions
import kazoo.protocol.paths
from kazoo.client import KazooClient

from openeo.util import TimingLogger
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.utils import strip_join, Clock


Expand Down Expand Up @@ -90,7 +91,7 @@ def flush_all(self):


@contextlib.contextmanager
def zk_connected(client: KazooClient, timeout: int = 5) -> KazooClient:
def zk_connected(client: KazooClient, timeout: float = 5) -> KazooClient:
"""
Context manager to automatically start and stop ZooKeeper connection.
"""
Expand All @@ -111,13 +112,41 @@ class ZkMemoizer:
count = zk_cache.get_or_call("count", callback=calculate_count)
"""
DEFAULT_NAME = "default"
DEFAULT_TTL = 5 * 60
DEFAULT_ZK_TIMEOUT = 5

def __init__(self, client: KazooClient, prefix: str, default_ttl: int = 60, name: str = None, zk_timeout: int = 5):
def __init__(
self,
client: KazooClient,
prefix: str,
name: str = DEFAULT_NAME,
default_ttl: float = DEFAULT_TTL,
zk_timeout: float = DEFAULT_ZK_TIMEOUT,
):
self._client = client
self._prefix = prefix
self._default_ttl = default_ttl
self._name = name or self.__class__.__name__
self._zk_timeout = zk_timeout
self._prefix = kazoo.protocol.paths.normpath(prefix)
self._name = name
self._default_ttl = float(default_ttl)
self._zk_timeout = float(zk_timeout)

@classmethod
def from_config(
cls,
config: AggregatorConfig,
name: str = DEFAULT_NAME,
prefix: Optional[str] = None,
) -> "ZkMemoizer":
"""Factory to create `ZkMemoizer` instance from config values."""
if prefix is None:
prefix = f"cache/{name.lower()}"
return cls(
client=KazooClient(hosts=config.zk_memoizer.get("zk_hosts", "localhost:2181")),
prefix=f"{config.zookeeper_prefix}/{prefix}",
name=name,
default_ttl=config.zk_memoizer.get("default_ttl", cls.DEFAULT_TTL),
zk_timeout=config.zk_memoizer.get("zk_timeout", cls.DEFAULT_ZK_TIMEOUT),
)

def get_or_call(
self,
Expand Down
3 changes: 3 additions & 0 deletions src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class AggregatorConfig(dict):
partitioned_job_tracking = dict_item(default=None)
zookeeper_prefix = dict_item(default="/openeo-aggregator/")

zk_memoizer = dict_item(default={})


@staticmethod
def from_py_file(path: Union[str, Path]) -> 'AggregatorConfig':
"""Load config from Python file."""
Expand Down
31 changes: 31 additions & 0 deletions tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import kazoo.exceptions
import pytest

import openeo_aggregator.caching
from openeo_aggregator.caching import TtlCache, CacheMissException, ZkMemoizer
from openeo_aggregator.testing import clock_mock
from openeo_aggregator.utils import Clock
Expand Down Expand Up @@ -251,3 +252,33 @@ def test_corrupted_cache(self, zk_client):
assert zk_cache.get_or_call(key="count", callback=callback) == 101
assert zk_cache.get_or_call(key="count", callback=callback) == 101
assert zk_client.get("/test/count")[0] == b"101"

@pytest.mark.parametrize(["kwargs", "expected_path"], [
({"name": "Tezt"}, "/o-a/cache/tezt/count"),
({"name": "Tezt", "prefix": "tezt/_ch"}, "/o-a/tezt/_ch/count"),
])
def test_from_config(self, config, zk_client, kwargs, expected_path):
config.zk_memoizer = {
"zk_hosts": "zk1.test:2181,zk2.test:2181",
"default_ttl": 123,
"zk_timeout": 7.25,
}
with mock.patch.object(openeo_aggregator.caching, "KazooClient", return_value=zk_client) as KazooClient:
zk_cache = ZkMemoizer.from_config(config, **kwargs)

KazooClient.assert_called_with(hosts="zk1.test:2181,zk2.test:2181")

callback = self._build_callback()
with clock_mock(1000):
assert zk_cache.get_or_call(key="count", callback=callback) == 100
assert zk_cache.get_or_call(key="count", callback=callback) == 100
with clock_mock(1122):
assert zk_cache.get_or_call(key="count", callback=callback) == 100
with clock_mock(1123):
assert zk_cache.get_or_call(key="count", callback=callback) == 101
assert zk_cache.get_or_call(key="count", callback=callback) == 101

zk_client.start.assert_called_with(timeout=7.25)

paths = set(c[2]["path"] for c in zk_client.mock_calls if c[0] in {"get", "set"})
assert paths == {expected_path}

0 comments on commit 2263a71

Please sign in to comment.