diff --git a/src/simple_sign/backend.py b/src/simple_sign/backend.py index c50b817..d07b9f0 100644 --- a/src/simple_sign/backend.py +++ b/src/simple_sign/backend.py @@ -9,7 +9,7 @@ import logging from dataclasses import dataclass -from typing import Callable, Final +from typing import Callable, Final, Optional import cachetools.func import pycardano as pyc @@ -73,7 +73,7 @@ def retrieve_staked_holders(self, token_policy: str) -> list: raise NotImplementedError() def retrieve_nft_holders( - self, policy: str, deny_list: list, seek_addr: str = None + self, policy: str, deny_list: list, addr: str = None ) -> list: """Retrieve a list of NFT holders, e.g. a license to operate a decentralized node. @@ -81,7 +81,7 @@ def retrieve_nft_holders( raise NotImplementedError() def retrieve_metadata( - self, value: int, policy: str, tag: str, callback: Callable = None + self, value: int, policy: str, tag: str, callback: Optional[Callable | None] ) -> list: """Retrieve metadata from the backend.""" raise NotImplementedError() @@ -100,20 +100,25 @@ def __init__( self._port = port @cachetools.func.ttl_cache(ttl=60) - def _retrieve_unspent_utxos(self, addr: str = "") -> dict: + def _retrieve_unspent_utxos(self, addr: str = None) -> dict: """Retrieve unspent utxos from Kupo. - NB. Kupo must be configured to capture sparingly. + NB. Kupo must be configured to capture sparingly, i.e. the + policies and addresses it is watching and slot from which it is + watching must be as specific as possible for this function to + perform well. """ - if not addr: - resp = requests.get( - f"{self._base_url}:{self._port}/matches?unspent", timeout=30 - ) - return resp.json() - resp = requests.get( - f"{self._base_url}:{self._port}/matches/{addr}?unspent", timeout=30 - ) - return resp.json() + kupo_err: Final[str] = "hint" + request_string = f"{self._base_url}:{self._port}/matches?unspent" + if addr: + request_string = f"{self._base_url}:{self._port}/matches/{addr}?unspent" + logger.info("requesting unspent: '%s'", request_string) + resp = requests.get(request_string, timeout=30) + ret = resp.json() + if kupo_err in ret: + logger.error("unable to retrieve data due to Kupo request error: %s", ret) + return [] + return ret def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]): """Return metadata based on slot and transaction ID. This is @@ -145,17 +150,18 @@ def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]): md_list.append(md_dict[0]) return md_list - def retrieve_staked_holders(self, token_policy: str, seek_addr: str = None) -> list: + def retrieve_staked_holders(self, token_policy: str, addr: str = None) -> list: """Retrieve a list of staked holders against a given CNT.""" unspent = self._retrieve_unspent_utxos() addresses_with_fact = {} for item in unspent: - addr = item["address"] - if seek_addr and addr != seek_addr: + unspent_addr = item["address"] + unspent_staking = _get_staking_from_addr(unspent_addr) + if addr and addr not in (unspent_addr, unspent_staking): # don't process further than we have to if we're only # looking for a single address. continue - staking = _get_staking_from_addr(addr) + staking = unspent_staking assets = item["value"]["assets"] for key, value in assets.items(): if token_policy in key: @@ -163,7 +169,7 @@ def retrieve_staked_holders(self, token_policy: str, seek_addr: str = None) -> l return addresses_with_fact def retrieve_nft_holders( - self, policy: str, deny_list: list = None, seek_addr: str = None + self, policy: str, deny_list: list = None, addr: str = None ) -> list: """Retrieve a list of NFT holders, e.g. a license to operate a decentralized node. @@ -172,17 +178,21 @@ def retrieve_nft_holders( to remove some results that are unhelpful, e.g. the minting address if desired. """ - unspent = self._retrieve_unspent_utxos() + if addr: + unspent = self._retrieve_unspent_utxos(addr) + else: + unspent = self._retrieve_unspent_utxos() holders = {} for item in unspent: - addr = item["address"] - if seek_addr and addr != seek_addr: + unspent_addr = item["address"] + unspent_staking = _get_staking_from_addr(unspent_addr) + if addr and addr not in (unspent_addr, unspent_staking): # don't process further than we have to if we're only # looking for a single address. continue - staking = _get_staking_from_addr(addr) - if addr in deny_list: + if deny_list and unspent_addr in deny_list: continue + staking = unspent_staking assets = item["value"]["assets"] for key, _ in assets.items(): if not key.startswith(policy): @@ -195,6 +205,7 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx """Retrieve a list of valid transactions according to our policy rules. """ + logger.info("getting valid txs for policy: '%s'", policy) valid_txs = [] if not unspent: return valid_txs @@ -206,7 +217,6 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx for asset in assets: if policy not in asset: continue - logger.error(policy) slot = item["created_at"]["slot_no"] tx_id = item["transaction_id"] address = item["address"] @@ -225,7 +235,7 @@ def retrieve_metadata( value: int, policy: str, tag: str, - callback: Callable = None, + callback: Optional[Callable | None], ) -> list: """Retrieve a list of aliased signing addresses. An aliased signing address is an address that has been setup using a diff --git a/src/simple_sign/sign.py b/src/simple_sign/sign.py index 105ce66..64eddde 100644 --- a/src/simple_sign/sign.py +++ b/src/simple_sign/sign.py @@ -3,20 +3,25 @@ # pylint: disable=W0613 import argparse +import binascii +import copy import logging import os import sys import time -from typing import Final +from typing import Callable, Final import pycardano as pyc try: + from src.simple_sign.backend import KupoContext from src.simple_sign.version import get_version except ModuleNotFoundError: try: + from backend import KupoContext from version import get_version except ModuleNotFoundError: + from simple_sign.backend import KupoContext from simple_sign.version import get_version # Set up logging. @@ -44,35 +49,88 @@ class UnknownSigningKey(Exception): """Exception to raise when the signing key is unknown.""" -def retrieve_aliased(pkey: str) -> str: +def retrieve_aliased( + context: KupoContext, + policy_id: str, + tag: str, + value: int, + callback: Callable, +) -> str: """Retrieve another public key aliased by the given lookup. The result might then be used to verify using one of the other - methods in this library, e.g. + methods in this library, e.g. given an staking key returned for an + alias, verify if the staking key also holds the correct amount + of stake for a given token. - 1. lookup aliased staking key. - 2. lookup staking key in license pool. - 3. if not exists, raise exception, else, pass. + NB. to keep in mind, does aliasing already guarantee a license is + held? If the policy is supplied? - We want to do this on an address by address basis. The difficulty - is consistent parsing of metadata that allows this function to be - broadly applicable across functions. + Aliasing can potentially be a generic process, it exists in this + library by way of helping realize that. It could be removed in + future, and so any feedback is appreciated if it works for you. + + For more information; https://docs.orcfax.io/signing-key-aliasing """ - raise NotImplementedError("reading staked values is not yet implemented") + if not policy_id: + policy_id = "" + if not value or not tag: + raise NotImplementedError("function requires a lovelace value and metadata tag") + aliases = context.retrieve_metadata( + value=value, + tag=tag, + policy=policy_id, + callback=callback, + ) + return aliases -def signature_in_staked_pool(pkey: str, token_policy_id: str, min_stake: int) -> bool: +def signature_in_staked_pool( + context: KupoContext, pkey: str, token_policy_id: str, min_stake: int +) -> bool: """Validate whether the signing key belongs to a someone who has enough stake in a given token. """ - raise NotImplementedError("reading staked values is not yet implemented") + staking = context.retrieve_staked_holders( + addr=pkey, + token_policy=token_policy_id, + ) + for key, value in copy.deepcopy(staking).items(): + if value > min_stake: + continue + del staking[key] + try: + staked = staking[pkey] + if not int(staked) >= min_stake: + raise UnknownSigningKey( + f"addr: '{pkey}', does not have enough stake: '{min_stake}'", + ) + except IndexError: + raise UnknownSigningKey( + f"addr: '{pkey}', is not knonwn to the network", + ) from IndexError + return True -def signature_in_license_pool(pkey: str, policy_id: str) -> bool: +def signature_in_license_pool( + context: KupoContext, pkey: str, policy_id: str, suffix: str = "" +) -> bool: """Validate whether signing key matches one of those in a pool of licenses associated with the project and return True if so. """ - raise NotImplementedError("reading from license pool is not yet implemented") + md = context.retrieve_nft_holders( + policy=policy_id, + addr=pkey, + ) + holding = {} + for k, v in md.items(): + license_name = k.replace(policy_id, "").replace(".", "").replace(suffix, "") + license_name = binascii.unhexlify(license_name).decode() + holding[license_name] = v + if not holding: + raise UnknownSigningKey(f"addr '{pkey}' is not in possession of a license") + logger.info("information in license pool: '%s'", holding) + return True def signature_in_constitution_datum_utxo(pkey: str) -> bool: