Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace retry with tenacity, bump patch version #65

Merged
merged 5 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions nsl/stac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import storage as gcp_storage
from google.oauth2 import service_account
from retry import retry
from tenacity import retry, stop_after_delay, wait_fixed

from epl.protobuf.v1 import stac_service_pb2_grpc
from epl.protobuf.v1.geometry_pb2 import GeometryData, ProjectionData, EnvelopeData
Expand All @@ -45,13 +45,12 @@
LandsatRequest, Mosaic, MosaicRequest, DatetimeRange, View, ViewRequest, Extent, Interval, Provider

__all__ = [
'stac_service', 'url_to_channel', 'STAC_SERVICE',
'Collection', 'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
'GeometryData', 'ProjectionData', 'EnvelopeData',
'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
'StacItem', 'Asset', 'Eo', 'View', 'Mosaic', 'DatetimeRange', 'Extent', 'Interval', 'Provider',
'gcs_storage_client',
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'AuthInfo', 'bearer_auth'
'bearer_auth', 'gcs_storage_client', 'stac_service', 'url_to_channel',
'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
'Collection', 'Eo', 'StacItem', 'Mosaic', 'View', 'Asset',
'GeometryData', 'ProjectionData', 'EnvelopeData', 'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
'DatetimeRange', 'Extent', 'Interval', 'Provider',
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'STAC_SERVICE', 'AuthInfo',
]

CLOUD_PROJECT = os.getenv("CLOUD_PROJECT")
Expand Down Expand Up @@ -79,10 +78,11 @@
MAX_BACKOFF_MS = int(os.getenv('MAX_BACKOFF_MS', 4))
MULTIPLIER = int(os.getenv('MULTIPLIER', 4))

STAC_SERVICE = os.getenv('STAC_SERVICE', 'api.nearspacelabs.net:9090')
STAC_SERVICE_HOST = os.getenv('STAC_SERVICE_HOST', 'api.nearspacelabs.net')
STAC_SERVICE = os.getenv('STAC_SERVICE', f'{STAC_SERVICE_HOST}:9090')
BYTES_IN_MB = 1024 * 1024
# at this point only allowing 4 MB or smaller messages
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 20))
# at this point only allowing 10 MB or smaller messages
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 10))
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB),
('grpc.max_receive_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB)]

Expand Down Expand Up @@ -299,7 +299,7 @@ def __init__(self, nsl_id: str, nsl_secret: str):
self.nsl_secret = nsl_secret

# this only retries if there's a timeout error
@retry(exceptions=requests.Timeout, delay=1, backoff=2, tries=4)
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
def authorize(self):
if self.skip_authorization:
return
Expand Down Expand Up @@ -388,7 +388,7 @@ def __init__(self, init=False):
def default_nsl_id(self):
return self._default_nsl_id

def auth_header(self, nsl_id: str = None, profile_name: str = None):
def auth_header(self, nsl_id: str = None, profile_name: str = None) -> str:
auth_info = self._get_auth_info(nsl_id, profile_name)
if not auth_info.skip_authorization and (auth_info.expiry - time.time()) < TOKEN_REFRESH_THRESHOLD:
print(f'authorizing NSL_ID: `{auth_info.nsl_id}`')
Expand All @@ -398,15 +398,19 @@ def auth_header(self, nsl_id: str = None, profile_name: str = None):
print(f"will attempt re-authorization in {ttl} minutes")
return f"Bearer {auth_info.token}"

def get_credentials(self, nsl_id: str = None) -> Optional[AuthInfo]:
def get_credentials(self, nsl_id: str = None, profile_name: str = None) -> Optional[AuthInfo]:
if profile_name is not None:
nsl_id = self._profile_map.get(profile_name, None)
return self._auth_info_map.get(nsl_id if nsl_id is not None else self.default_nsl_id, None)

def set_credentials(self, nsl_id: str, nsl_secret: str):
def set_credentials(self, nsl_id: str, nsl_secret: str, profile_name: str = None):
if len(self._auth_info_map) == 0:
self._default_nsl_id = nsl_id

self._auth_info_map[nsl_id] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
self._auth_info_map[nsl_id].authorize()
if profile_name is not None:
self._profile_map[profile_name] = nsl_id

def unset_credentials(self, profile_name: str):
nsl_id = self._profile_map.pop(profile_name)
Expand All @@ -431,14 +435,14 @@ def loads(self) -> Dict[str, AuthInfo]:
if not lines[i + 1].startswith('NSL_ID') or not lines[i + 2].startswith('NSL_SECRET'):
raise ValueError("credentials should be of the format:\n[named profile]\nNSL_ID={your "
"nsl id}\nNSL_SECRET={your nsl secret}")
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline, and optionally the leading quote
# .strip(), now we now [6:] starts after 'NSL_ID' .strip()[6:], strip potential whitespace
# between NSL_ID and '=' with .strip()[6:].strip(), start one after equal
# .strip()[6:].strip()[1:], strip potential whitespace
# after equal .strip()[6:].strip()[1:].strip()
profile_name = line.strip().lstrip('[').rstrip(']')
nsl_id = lines[i + 1].strip()[6:].strip()[1:].strip()
nsl_secret = lines[i + 2].strip()[10:].strip()[1:].strip()
nsl_id = lines[i + 1].strip()[6:].strip().strip('"')[1:].strip().strip('"')
nsl_secret = lines[i + 2].strip()[10:].strip().strip('"')[1:].strip().strip('"')

output[profile_name] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
return output
Expand All @@ -448,8 +452,8 @@ def dumps(self):
for profile_name, nsl_id in self._profile_map.items():
creds = self.get_credentials(nsl_id)
file_obj.write(f'[{profile_name}]\n')
file_obj.write(f'NSL_ID={creds.nsl_id}\n')
file_obj.write(f'NSL_SECRET={creds.nsl_secret}\n')
file_obj.write(f'NSL_ID="{creds.nsl_id}"\n')
file_obj.write(f'NSL_SECRET="{creds.nsl_secret}"\n')
file_obj.write('\n')
file_obj.close()

Expand Down
97 changes: 65 additions & 32 deletions nsl/stac/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
#
# for additional information, contact:
# [email protected]
import uuid

import requests

from typing import Iterator, List
from typing import Iterator, List, Optional, Tuple
from warnings import warn

from epl.protobuf.v1 import stac_pb2
Expand All @@ -29,14 +30,18 @@


class NSLClient:
def __init__(self, nsl_only=True):
def __init__(self, nsl_only=True, nsl_id=None, profile_name=None):
"""
Create a client connection to a gRPC STAC service. nsl_only limits all queries to only return data from Near
Space Labs.
:param nsl_only:
"""
self._stac_service = stac_singleton
self._nsl_only = nsl_only
if profile_name:
nsl_id = bearer_auth._get_auth_info(profile_name=profile_name).nsl_id
if nsl_id:
bearer_auth._default_nsl_id = nsl_id

@property
def default_nsl_id(self):
Expand Down Expand Up @@ -67,7 +72,8 @@ def search_one(self,
stac_request: stac_pb2.StacRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> stac_pb2.StacItem:
profile_name: str = None,
correlation_id: str = None) -> stac_pb2.StacItem:
"""
search for one item from the db that matches the stac request
:param timeout: timeout for request
Expand All @@ -77,20 +83,23 @@ def search_one(self,
NSLClient object's set_credentials to set credentials
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
using a different profile name
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
to identify the context and is passed to all components that are involved in the transaction flow
:return: StacItem
"""
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
return self._stac_service.stub.SearchOneItem(stac_request, timeout=timeout, metadata=metadata)

def count(self,
stac_request: stac_pb2.StacRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> int:
profile_name: str = None,
correlation_id: str = None) -> int:
"""
count all the items in the database that match the stac request
:param timeout: timeout for request
Expand All @@ -100,13 +109,15 @@ def count(self,
NSLClient object's set_credentials to set credentials
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
using a different profile name
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
to identify the context and is passed to all components that are involved in the transaction flow
:return: int
"""
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
db_result = self._stac_service.stub.CountItems(stac_request, timeout=timeout, metadata=metadata)
if db_result.status:
# print db_result
Expand All @@ -119,7 +130,9 @@ def search(self,
nsl_id: str = None,
profile_name: str = None,
auto_paginate: bool = False,
only_accessible: bool = False) -> Iterator[stac_pb2.StacItem]:
only_accessible: bool = False,
page_size: int = 50,
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
"""
search for stac items by using StacRequest. return a stream of StacItems
:param timeout: timeout for request
Expand All @@ -136,13 +149,16 @@ def search(self,
- If set to `False` (the default), `stac_request.limit` and `stac_request.offset` can be used to manually
page through StacItems.
:param only_accessible: limits results to only StacItems downloadable by your level of sample/paid access
:param page_size: how many results to page at a time
:return: stream of StacItems
"""
for item in self._search_all(stac_request,
timeout,
nsl_id=nsl_id,
profile_name=profile_name,
auto_paginate=auto_paginate):
auto_paginate=auto_paginate,
page_size=page_size,
correlation_id=correlation_id):
if not only_accessible or \
bearer_auth.is_valid_for(item_region(item), nsl_id=nsl_id, profile_name=profile_name):
yield item
Expand All @@ -151,8 +167,10 @@ def search_collections(self,
collection_request: stac_pb2.CollectionRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> Iterator[stac_pb2.Collection]:
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
profile_name: str = None,
correlation_id: str = None) -> Iterator[stac_pb2.Collection]:

metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
for item in self._stac_service.stub.SearchCollections(collection_request, timeout=timeout, metadata=metadata):
yield item

Expand All @@ -173,7 +191,7 @@ def subscribe(self,
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT
res = requests.post(f'{AUTH0_TENANT}/subscription',
headers=NSLClient._json_headers(nsl_id, profile_name),
headers=self._json_headers(nsl_id, profile_name),
json=dict(stac_request=utils.stac_request_to_b64(stac_request),
destination=destination.to_json_str(),
is_active=is_active))
Expand All @@ -186,7 +204,7 @@ def subscribe(self,
def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
"""Reactivates a subscription with the given `sub_id`."""
res = requests.put(f'{AUTH0_TENANT}/subscription/{sub_id}',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 200)
print(f'reactivated subscription with id: {sub_id}')
Expand All @@ -195,7 +213,7 @@ def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
"""Deactivates a subscription with the given `sub_id`."""
res = requests.delete(f'{AUTH0_TENANT}/subscription/{sub_id}',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 202)
print(f'deactivated subscription with id: {sub_id}')
Expand All @@ -204,7 +222,7 @@ def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
def subscriptions(self, nsl_id: str = None, profile_name: str = None) -> List[Subscription]:
"""Fetches all subscriptions."""
res = requests.get(f'{AUTH0_TENANT}/subscription',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 200)
return list(Subscription(response_dict) for response_dict in res.json()['results'])
Expand All @@ -214,48 +232,63 @@ def _search_all(self,
timeout=15,
nsl_id: str = None,
profile_name: str = None,
auto_paginate: bool = False) -> Iterator[stac_pb2.StacItem]:
auto_paginate: bool = False,
page_size: int = 50,
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

if not auto_paginate:
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
for item in self._stac_service.stub.SearchItems(stac_request, timeout=timeout, metadata=metadata):
if not item.id:
warn("STAC item missing STAC id; ending search")
warn(f"STAC item missing STAC id: \n{item};\n ending search")
return
else:
yield item
else:
limit = stac_request.limit if stac_request.limit > 0 else None
original_limit = stac_request.limit if stac_request.limit > 0 else None
offset = stac_request.offset
page_size = 500
count = 0

stac_request.limit = page_size
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
stac_request.limit = page_size if original_limit is None else max(original_limit, page_size)
items = list(self._search_all(stac_request, timeout=timeout,
nsl_id=nsl_id, profile_name=profile_name,
page_size=page_size, correlation_id=correlation_id))
while len(items) > 0:
for item in items:
if limit is None or (limit is not None and count < limit):
if original_limit is None or (original_limit is not None and count < original_limit):
yield item
count += 1
if limit is not None and count >= limit:
if original_limit is not None and count >= original_limit:
break

if limit is not None and count >= limit:
if original_limit is not None and count >= original_limit:
break

stac_request.offset += page_size
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
stac_request.offset += len(items)
items = list(self._search_all(stac_request, timeout=timeout,
nsl_id=nsl_id, profile_name=profile_name,
page_size=page_size, correlation_id=correlation_id))

stac_request.offset = offset
stac_request.limit = limit if limit is not None else 0

@staticmethod
def _json_headers(nsl_id: str = None, profile_name: str = None) -> dict:
return {'content-type': 'application/json',
'Authorization': bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)}
stac_request.limit = original_limit if original_limit is not None else 0

def _json_headers(self,
nsl_id: str = None,
profile_name: str = None,
correlation_id: str = None) -> dict:
headers = {k: v for (k, v) in self._grpc_headers(nsl_id, profile_name, correlation_id)}
return {'content-type': 'application/json', **headers}

def _grpc_headers(self,
nsl_id: str = None,
profile_name: str = None,
correlation_id: str = None) -> Tuple[Tuple[str, str], ...]:
correlation_id = str(uuid.uuid4()) if correlation_id is None else correlation_id
return (('x-correlation-id', correlation_id),
('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)))

@staticmethod
def _handle_json_response(res, status_code: int):
Expand Down
Loading