Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watcher keep alive even get 304 not modified #36

Merged
merged 15 commits into from
Sep 6, 2022
4 changes: 3 additions & 1 deletion centraldogma/dogma.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def repository_watcher(
repo_name: str,
path_pattern: str,
function: Callable[[Revision], T] = lambda x: x,
timeout_millis: int = _DEFAULT_WATCH_TIMEOUT_MILLIS,
) -> Watcher[T]:
"""
Returns a ``Watcher`` which notifies its listeners when the specified repository has a new commit
Expand All @@ -267,6 +268,7 @@ def listener(revision: Revision, contents: List[Content]) -> None:

:param path_pattern: the path pattern to match files in the repository.
:param function: the function to convert the given `Revision` into another.
:param timeout_millis: the timeout millis for the watching request.

.. _a known issue:
https://github.com/line/centraldogma/issues/40
Expand All @@ -276,7 +278,7 @@ def listener(revision: Revision, contents: List[Content]) -> None:
project_name,
repo_name,
path_pattern,
_DEFAULT_WATCH_TIMEOUT_MILLIS,
timeout_millis,
function,
)
watcher.start()
Expand Down
32 changes: 16 additions & 16 deletions centraldogma/repository_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,24 @@ def watch(self, listener: Callable[[Revision, T], None]) -> None:
listener(self._latest.revision, self._latest.value)

def _schedule_watch(self, num_attempts_so_far: int) -> None:
if self._is_stopped():
return
num_attempts = num_attempts_so_far
while num_attempts >= 0:
if self._is_stopped():
break

if num_attempts_so_far == 0:
delay = _DELAY_ON_SUCCESS_MILLIS if self._latest is None else 0
else:
delay = self._next_delay_millis(num_attempts_so_far)
if num_attempts == 0:
delay = _DELAY_ON_SUCCESS_MILLIS if self._latest is None else 0
else:
delay = self._next_delay_millis(num_attempts)

# FIXME(ikhoon): Replace asyncio.sleep() after AsyncClient is implemented.
time.sleep(delay / 1000)
self._watch(num_attempts_so_far)
# FIXME(ikhoon): Replace asyncio.sleep() after AsyncClient is implemented.
time.sleep(delay / 1000)
num_attempts = self._watch(num_attempts)
return None

def _watch(self, num_attempts_so_far: int) -> None:
def _watch(self, num_attempts_so_far: int) -> int:
if self._is_stopped():
return
return -1

last_known_revision = self._latest.revision if self._latest else Revision.init()
try:
Expand All @@ -146,9 +148,8 @@ def _watch(self, num_attempts_so_far: int) -> None:
self.notify_listeners()
if not old_latest:
self._initial_value_future.set_result(new_latest)

# Watch again for the next change.
self._schedule_watch(0)
# Watch again for the next change.
return 0
except Exception as ex:
if isinstance(ex, EntryNotFoundException):
logging.info(
Expand All @@ -173,8 +174,7 @@ def _watch(self, num_attempts_so_far: int) -> None:
self.path_pattern,
ex,
)
self._schedule_watch(num_attempts_so_far + 1)
return
return num_attempts_so_far + 1

def _do_watch(self, last_known_revision: Revision) -> Optional[Latest[T]]:
pass
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ black
codecov
pytest
pytest-cov
pytest-mock
respx
setuptools
23 changes: 23 additions & 0 deletions tests/integration/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# under the License.
import json
import os
import time
from concurrent.futures import Future
from concurrent.futures import TimeoutError

Expand Down Expand Up @@ -207,3 +208,25 @@ def test_await_init_value(self, run_around_test):
latest: Latest[str] = future.result()
assert latest.value == '{"a": 1}'
assert watcher.latest() == latest

def test_not_modified_repository_watcher(self, run_around_test):
"""It verifies that a watcher keep watching well even after `NOT_MODIFIED`."""
timeout_millis = 1000
timeout_second = timeout_millis / 1000

# pass short timeout millis for testing purpose.
watcher: Watcher[Revision] = dogma.repository_watcher(
project_name, repo_name, "/**", timeout_millis=timeout_millis
)

# wait until watcher get `NOT_MODIFIED` at least once.
time.sleep(4 * timeout_second)

commit = Commit("Upsert modify.txt")
upsert_text = Change("/path/modify.txt", ChangeType.UPSERT_TEXT, "modified")
result = dogma.push(project_name, repo_name, commit, [upsert_text])

# wait until watcher watch latest.
time.sleep(4 * timeout_second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


assert result.revision == watcher.latest().revision.major
100 changes: 100 additions & 0 deletions tests/test_repository_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2022 LINE Corporation
#
# LINE Corporation licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from centraldogma.data.revision import Revision
from centraldogma.query import Query
from centraldogma.repository_watcher import RepositoryWatcher, FileWatcher
from centraldogma.watcher import Latest

import pytest


@pytest.fixture()
def repo_watcher(mocker):
return RepositoryWatcher(
content_service=mocker.MagicMock(),
project_name="project",
repo_name="repo",
path_pattern="/test",
timeout_millis=1 * 60 * 1000,
function=lambda x: x,
)


@pytest.fixture()
def file_watcher(mocker):
return FileWatcher(
content_service=mocker.MagicMock(),
project_name="project",
repo_name="repo",
query=Query.text("test.txt"),
timeout_millis=5000,
function=lambda x: x,
)


def test_repository_watch(repo_watcher, mocker):
revision = Revision.init()
latest = Latest(revision, repo_watcher.function(revision))
mocker.patch.object(repo_watcher, "_do_watch", return_value=latest)

response = repo_watcher._watch(0)
assert response == 0
assert repo_watcher.latest() is latest


def test_repository_watch_with_none_revision(repo_watcher, mocker):
mocker.patch.object(repo_watcher, "_do_watch", return_value=None)

response = repo_watcher._watch(0)
assert response == 0
assert repo_watcher.latest() is None


def test_repository_watch_with_exception(repo_watcher, mocker):
mocker.patch.object(
repo_watcher, "_do_watch", side_effect=Exception("test exception")
)

response = repo_watcher._watch(0)
assert response == 1
assert repo_watcher.latest() is None


def test_file_watch(file_watcher, mocker):
revision = Revision.init()
latest = Latest(revision, file_watcher.function(revision))
mocker.patch.object(file_watcher, "_do_watch", return_value=latest)

response = file_watcher._watch(0)
assert response == 0
assert file_watcher.latest() is latest


def test_file_watch_with_none_revision(file_watcher, mocker):
mocker.patch.object(file_watcher, "_do_watch", return_value=None)

response = file_watcher._watch(0)
assert response == 0
assert file_watcher.latest() is None


def test_file_watch_with_exception(file_watcher, mocker):
mocker.patch.object(
file_watcher, "_do_watch", side_effect=Exception("test exception")
)

response = file_watcher._watch(0)
assert response == 1
assert file_watcher.latest() is None