Skip to content

Commit

Permalink
Introduce network functions
Browse files Browse the repository at this point in the history
Generic network functions are implemented that allow for verification
of signed data against the network. Signed data can come from keys
with specific NFTs; stake against a specific CNT; and against alias
stake keys following the Orcfax aliasing protocol.
  • Loading branch information
ross-spencer committed Nov 26, 2024
1 parent f2d7541 commit 19e8a1e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 40 deletions.
62 changes: 36 additions & 26 deletions src/simple_sign/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import logging
from dataclasses import dataclass
from typing import Callable, Final
from typing import Callable, Final, Optional

import cachetools.func
import pycardano as pyc
Expand Down Expand Up @@ -73,15 +73,15 @@ def retrieve_staked_holders(self, token_policy: str) -> list:
raise NotImplementedError()

def retrieve_nft_holders(
self, policy: str, deny_list: list, seek_addr: str = None
self, policy: str, deny_list: list, addr: str = None
) -> list:
"""Retrieve a list of NFT holders, e.g. a license to operate
a decentralized node.
"""
raise NotImplementedError()

def retrieve_metadata(
self, value: int, policy: str, tag: str, callback: Callable = None
self, value: int, policy: str, tag: str, callback: Optional[Callable | None]
) -> list:
"""Retrieve metadata from the backend."""
raise NotImplementedError()
Expand All @@ -100,20 +100,25 @@ def __init__(
self._port = port

@cachetools.func.ttl_cache(ttl=60)
def _retrieve_unspent_utxos(self, addr: str = "") -> dict:
def _retrieve_unspent_utxos(self, addr: str = None) -> dict:
"""Retrieve unspent utxos from Kupo.
NB. Kupo must be configured to capture sparingly.
NB. Kupo must be configured to capture sparingly, i.e. the
policies and addresses it is watching and slot from which it is
watching must be as specific as possible for this function to
perform well.
"""
if not addr:
resp = requests.get(
f"{self._base_url}:{self._port}/matches?unspent", timeout=30
)
return resp.json()
resp = requests.get(
f"{self._base_url}:{self._port}/matches/{addr}?unspent", timeout=30
)
return resp.json()
kupo_err: Final[str] = "hint"
request_string = f"{self._base_url}:{self._port}/matches?unspent"
if addr:
request_string = f"{self._base_url}:{self._port}/matches/{addr}?unspent"
logger.info("requesting unspent: '%s'", request_string)
resp = requests.get(request_string, timeout=30)
ret = resp.json()
if kupo_err in ret:
logger.error("unable to retrieve data due to Kupo request error: %s", ret)
return []
return ret

def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]):
"""Return metadata based on slot and transaction ID. This is
Expand Down Expand Up @@ -145,25 +150,26 @@ def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]):
md_list.append(md_dict[0])
return md_list

def retrieve_staked_holders(self, token_policy: str, seek_addr: str = None) -> list:
def retrieve_staked_holders(self, token_policy: str, addr: str = None) -> list:
"""Retrieve a list of staked holders against a given CNT."""
unspent = self._retrieve_unspent_utxos()
addresses_with_fact = {}
for item in unspent:
addr = item["address"]
if seek_addr and addr != seek_addr:
unspent_addr = item["address"]
unspent_staking = _get_staking_from_addr(unspent_addr)
if addr and addr not in (unspent_addr, unspent_staking):
# don't process further than we have to if we're only
# looking for a single address.
continue
staking = _get_staking_from_addr(addr)
staking = unspent_staking
assets = item["value"]["assets"]
for key, value in assets.items():
if token_policy in key:
addresses_with_fact = _sum_dict(staking, value, addresses_with_fact)
return addresses_with_fact

def retrieve_nft_holders(
self, policy: str, deny_list: list = None, seek_addr: str = None
self, policy: str, deny_list: list = None, addr: str = None
) -> list:
"""Retrieve a list of NFT holders, e.g. a license to operate
a decentralized node.
Expand All @@ -172,17 +178,21 @@ def retrieve_nft_holders(
to remove some results that are unhelpful, e.g. the minting
address if desired.
"""
unspent = self._retrieve_unspent_utxos()
if addr:
unspent = self._retrieve_unspent_utxos(addr)
else:
unspent = self._retrieve_unspent_utxos()
holders = {}
for item in unspent:
addr = item["address"]
if seek_addr and addr != seek_addr:
unspent_addr = item["address"]
unspent_staking = _get_staking_from_addr(unspent_addr)
if addr and addr not in (unspent_addr, unspent_staking):
# don't process further than we have to if we're only
# looking for a single address.
continue
staking = _get_staking_from_addr(addr)
if addr in deny_list:
if deny_list and unspent_addr in deny_list:
continue
staking = unspent_staking
assets = item["value"]["assets"]
for key, _ in assets.items():
if not key.startswith(policy):
Expand All @@ -195,6 +205,7 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx
"""Retrieve a list of valid transactions according to our
policy rules.
"""
logger.info("getting valid txs for policy: '%s'", policy)
valid_txs = []
if not unspent:
return valid_txs
Expand All @@ -206,7 +217,6 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx
for asset in assets:
if policy not in asset:
continue
logger.error(policy)
slot = item["created_at"]["slot_no"]
tx_id = item["transaction_id"]
address = item["address"]
Expand All @@ -225,7 +235,7 @@ def retrieve_metadata(
value: int,
policy: str,
tag: str,
callback: Callable = None,
callback: Optional[Callable | None],
) -> list:
"""Retrieve a list of aliased signing addresses. An aliased
signing address is an address that has been setup using a
Expand Down
86 changes: 72 additions & 14 deletions src/simple_sign/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@
# pylint: disable=W0613

import argparse
import binascii
import copy
import logging
import os
import sys
import time
from typing import Final
from typing import Callable, Final

import pycardano as pyc

try:
from src.simple_sign.backend import KupoContext
from src.simple_sign.version import get_version
except ModuleNotFoundError:
try:
from backend import KupoContext
from version import get_version
except ModuleNotFoundError:
from simple_sign.backend import KupoContext
from simple_sign.version import get_version

# Set up logging.
Expand Down Expand Up @@ -44,35 +49,88 @@ class UnknownSigningKey(Exception):
"""Exception to raise when the signing key is unknown."""


def retrieve_aliased(pkey: str) -> str:
def retrieve_aliased(
context: KupoContext,
policy_id: str,
tag: str,
value: int,
callback: Callable,
) -> str:
"""Retrieve another public key aliased by the given lookup.
The result might then be used to verify using one of the other
methods in this library, e.g.
methods in this library, e.g. given an staking key returned for an
alias, verify if the staking key also holds the correct amount
of stake for a given token.
1. lookup aliased staking key.
2. lookup staking key in license pool.
3. if not exists, raise exception, else, pass.
NB. to keep in mind, does aliasing already guarantee a license is
held? If the policy is supplied?
We want to do this on an address by address basis. The difficulty
is consistent parsing of metadata that allows this function to be
broadly applicable across functions.
Aliasing can potentially be a generic process, it exists in this
library by way of helping realize that. It could be removed in
future, and so any feedback is appreciated if it works for you.
For more information; https://docs.orcfax.io/signing-key-aliasing
"""
raise NotImplementedError("reading staked values is not yet implemented")
if not policy_id:
policy_id = ""
if not value or not tag:
raise NotImplementedError("function requires a lovelace value and metadata tag")
aliases = context.retrieve_metadata(
value=value,
tag=tag,
policy=policy_id,
callback=callback,
)
return aliases


def signature_in_staked_pool(pkey: str, token_policy_id: str, min_stake: int) -> bool:
def signature_in_staked_pool(
context: KupoContext, pkey: str, token_policy_id: str, min_stake: int
) -> bool:
"""Validate whether the signing key belongs to a someone who has
enough stake in a given token.
"""
raise NotImplementedError("reading staked values is not yet implemented")
staking = context.retrieve_staked_holders(
addr=pkey,
token_policy=token_policy_id,
)
for key, value in copy.deepcopy(staking).items():
if value > min_stake:
continue
del staking[key]
try:
staked = staking[pkey]
if not int(staked) >= min_stake:
raise UnknownSigningKey(
f"addr: '{pkey}', does not have enough stake: '{min_stake}'",
)
except IndexError:
raise UnknownSigningKey(
f"addr: '{pkey}', is not knonwn to the network",
) from IndexError
return True


def signature_in_license_pool(pkey: str, policy_id: str) -> bool:
def signature_in_license_pool(
context: KupoContext, pkey: str, policy_id: str, suffix: str = ""
) -> bool:
"""Validate whether signing key matches one of those in a pool of
licenses associated with the project and return True if so.
"""
raise NotImplementedError("reading from license pool is not yet implemented")
md = context.retrieve_nft_holders(
policy=policy_id,
addr=pkey,
)
holding = {}
for k, v in md.items():
license_name = k.replace(policy_id, "").replace(".", "").replace(suffix, "")
license_name = binascii.unhexlify(license_name).decode()
holding[license_name] = v
if not holding:
raise UnknownSigningKey(f"addr '{pkey}' is not in possession of a license")
logger.info("information in license pool: '%s'", holding)
return True


def signature_in_constitution_datum_utxo(pkey: str) -> bool:
Expand Down

0 comments on commit 19e8a1e

Please sign in to comment.