Skip to content

Commit

Permalink
Merge pull request #252 from nolar/peerings-not-found
Browse files Browse the repository at this point in the history
Peerings not found when zalando.org/v1 group is absent
  • Loading branch information
nolar authored Nov 21, 2019
2 parents 067ddab + 5597891 commit a2b077c
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 15 deletions.
18 changes: 17 additions & 1 deletion kopf/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ def reauthenticated_request(fn: _F) -> _F:
"""
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> Any:

# If a session is explicitly passed, make it a simple call without re-auth.
# Exceptions are escalated to a caller, which is probably wrapped itself.
if 'session' in kwargs:
return await fn(*args, **kwargs)

# Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s.
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
try:
Expand All @@ -58,6 +65,15 @@ def reauthenticated_stream(fn: _F) -> _F:
"""
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> Any:

# If a session is explicitly passed, make it a simple call without re-auth.
# Exceptions are escalated to a caller, which is probably wrapped itself.
if 'session' in kwargs:
async for item in fn(*args, **kwargs):
yield item
return

# Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s.
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
try:
Expand Down Expand Up @@ -90,7 +106,7 @@ class APISession(aiohttp.ClientSession):
default_namespace: Optional[str]
_tempfiles: "_TempFiles"
_discovery_lock: asyncio.Lock
_discovered_resources: Dict[resources.Resource, Dict[str, object]]
_discovered_resources: Dict[str, Dict[resources.Resource, Dict[str, object]]]

@classmethod
def from_connection_info(
Expand Down
47 changes: 33 additions & 14 deletions kopf/clients/discovery.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,54 @@
from typing import Dict, Optional, cast

import aiohttp

from kopf.clients import auth
from kopf.structs import resources


@auth.reauthenticated_request
async def discover(
*,
resource: resources.Resource,
session: auth.APISession,
session: Optional[auth.APISession] = None, # injected by the decorator
) -> Optional[Dict[str, object]]:
if resource not in session._discovered_resources:
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

if resource.api_version not in session._discovered_resources:
async with session._discovery_lock:
if resource not in session._discovered_resources:
if resource.api_version not in session._discovered_resources:
session._discovered_resources[resource.api_version] = {}

try:
response = await session.get(
url=resource.get_version_url(server=session.server),
)
response.raise_for_status()
respdata = await response.json()

response = await session.get(
url=resource.get_version_url(server=session.server),
)
response.raise_for_status()
respdata = await response.json()
session._discovered_resources[resource.api_version].update({
resources.Resource(resource.group, resource.version, info['name']): info
for info in respdata['resources']
})

session._discovered_resources.update({
resources.Resource(resource.group, resource.version, info['name']): info
for info in respdata['resources']
})
return session._discovered_resources.get(resource, None)
except aiohttp.ClientResponseError as e:
if e.status in [403, 404]:
pass
else:
raise

return session._discovered_resources[resource.api_version].get(resource, None)


@auth.reauthenticated_request
async def is_namespaced(
*,
resource: resources.Resource,
session: auth.APISession,
session: Optional[auth.APISession] = None, # injected by the decorator
) -> bool:
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

info = await discover(resource=resource, session=session)
return cast(bool, info['namespaced']) if info is not None else True # assume namespaced
82 changes: 82 additions & 0 deletions tests/authentication/test_reauthentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import aiohttp.web
from typing import Optional, AsyncIterator, Tuple

from kopf.clients.auth import APISession, reauthenticated_request, reauthenticated_stream


@reauthenticated_request
async def request_fn(
x: int,
*,
session: Optional[APISession],
) -> Tuple[APISession, int]:
return session, x + 100


@reauthenticated_stream
async def stream_fn(
x: int,
*,
session: Optional[APISession],
) -> AsyncIterator[Tuple[APISession, int]]:
yield session, x + 100


async def test_session_is_injected_to_request(
fake_vault, resp_mocker, aresponses, hostname, resource):

result = {}
get_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, resource.get_url(namespace=None, name='xyz'), 'get', get_mock)

session, result = await request_fn(1)

assert session is not None
assert result == 101


async def test_session_is_injected_to_stream(
fake_vault, resp_mocker, aresponses, hostname, resource):

result = {}
get_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, resource.get_url(namespace=None, name='xyz'), 'get', get_mock)

counter = 0
async for session, result in stream_fn(1):
counter += 1

assert session is not None
assert result == 101
assert counter == 1


async def test_session_is_passed_through_to_request(
fake_vault, resp_mocker, aresponses, hostname, resource):

result = {}
get_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, resource.get_url(namespace=None, name='xyz'), 'get', get_mock)

explicit_session = APISession()
session, result = await request_fn(1, session=explicit_session)

assert session is explicit_session
assert result == 101


async def test_session_is_passed_through_to_stream(
fake_vault, resp_mocker, aresponses, hostname, resource):

result = {}
get_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, resource.get_url(namespace=None, name='xyz'), 'get', get_mock)

explicit_session = APISession()
counter = 0
async for session, result in stream_fn(1, session=explicit_session):
counter += 1

assert session is explicit_session
assert result == 101
assert counter == 1
87 changes: 87 additions & 0 deletions tests/k8s/test_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import aiohttp.web
import pytest

from kopf.clients.discovery import discover, is_namespaced
from kopf.structs.resources import Resource


async def test_discovery_of_existing_resource(
resp_mocker, aresponses, hostname):

res1info = {'name': 'someresources', 'namespaced': True}
result = {'resources': [res1info]}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

resource = Resource('some-group.org', 'someversion', 'someresources')
info = await discover(resource=resource)

assert info == res1info


async def test_discovery_of_unexisting_resource(
resp_mocker, aresponses, hostname):

result = {'resources': []}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

resource = Resource('some-group.org', 'someversion', 'someresources')
info = await discover(resource=resource)

assert info is None


@pytest.mark.parametrize('status', [403, 404])
async def test_discovery_of_unexisting_group_or_version(
resp_mocker, aresponses, hostname, status):

list_mock = resp_mocker(return_value=aresponses.Response(status=status, reason="boo!"))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

resource = Resource('some-group.org', 'someversion', 'someresources')
info = await discover(resource=resource)

assert info is None


async def test_discovery_is_cached_per_session(
resp_mocker, aresponses, hostname):

res1info = {'name': 'someresources1', 'namespaced': True}
res2info = {'name': 'someresources2', 'namespaced': True}

result = {'resources': [res1info]}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

result = {'resources': [res2info]}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

resource = Resource('some-group.org', 'someversion', 'someresources1')
info = await discover(resource=resource)
assert info == res1info

resource = Resource('some-group.org', 'someversion', 'someresources2')
info = await discover(resource=resource)
assert info is None # cached as absent on the 1st call.

resource = Resource('some-group.org', 'someversion', 'someresources1')
info = await discover(resource=resource)
assert info == res1info


@pytest.mark.parametrize('namespaced', [True, False])
async def test_is_namespaced(
resp_mocker, aresponses, hostname, namespaced):

res1info = {'name': 'someresources', 'namespaced': namespaced}
result = {'resources': [res1info]}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, '/apis/some-group.org/someversion', 'get', list_mock)

resource = Resource('some-group.org', 'someversion', 'someresources')
result = await is_namespaced(resource=resource)

assert result == namespaced

0 comments on commit a2b077c

Please sign in to comment.