Skip to content

Commit

Permalink
Add new API endpoint for AlchemyClient (#54)
Browse files Browse the repository at this point in the history
alchemy_getAssetTransfers
  • Loading branch information
aflament authored Feb 8, 2025
1 parent 7ba866c commit 76839e1
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 14 deletions.
135 changes: 121 additions & 14 deletions alphaswarm/services/alchemy/alchemy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import logging
import os
import time
from datetime import datetime, timezone
from decimal import Decimal
from typing import Dict, Final, List
from typing import Annotated, Dict, Final, List

import requests
from alphaswarm.services.api_exception import ApiException
from pydantic import Field, field_validator
from pydantic.dataclasses import dataclass

logger = logging.getLogger(__name__)
Expand All @@ -32,13 +34,60 @@ class HistoricalPriceByAddress:
data: List[HistoricalPrice]


@dataclass
class Metadata:
block_timestamp: Annotated[str, Field(alias="blockTimestamp")]


@dataclass
class Transfer:
"""Represents a token transfer transaction.
A Transfer object captures details about a single token transfer on the blockchain,
including the block number, transaction hash, addresses involved, token amount,
asset type, and timestamp.
Attributes:
block_number (int): Block number when transfer occurred
tx_hash (str): Transaction hash
from_address (str): Address that sent the tokens
to_address (str): Address that received the tokens
value (Decimal): Amount of tokens transferred
asset (str): Token symbol (e.g. "USDC", "WETH")
category (str): Token category (e.g. "erc20")
block_timestamp (Optional[str]): ISO timestamp of block, if available
"""

block_number: Annotated[int, Field(validation_alias="blockNum", default=0)]
tx_hash: Annotated[str, Field(validation_alias="hash")]
from_address: Annotated[str, Field(validation_alias="from")]
to_address: Annotated[str, Field(validation_alias="to")]
value: Annotated[Decimal, Field(default=Decimal(0))]
metadata: Metadata
asset: str = "UNKNOWN"
category: str = "UNKNOWN"

@field_validator("block_number", mode="before")
def convert_hex_block_number(cls, value: str | int) -> int:
if isinstance(value, str) and value.startswith("0x"):
return int(value, 16)
return int(value)

@field_validator("value", mode="before")
def convert_to_decimal(cls, value: str | int | float | Decimal) -> Decimal:
if isinstance(value, Decimal):
return value
return Decimal(str(value))


NETWORKS = ["eth-mainnet", "base-mainnet", "solana-mainnet", "eth-sepolia", "base-sepolia", "solana-devnet"]


class AlchemyClient:
"""Alchemy API data source for historical token prices"""

DEFAULT_BASE_URL: Final[str] = "https://api.g.alchemy.com"
DEFAULT_NETWORK_URL: Final[str] = "https://{network}.g.alchemy.com/v2/{api_key}"
ENDPOINT_TOKENS_HISTORICAL: Final[str] = "/prices/v1/{api_key}/tokens/historical"

def __init__(self, *, api_key: str, base_url: str = DEFAULT_BASE_URL) -> None:
Expand All @@ -47,21 +96,31 @@ def __init__(self, *, api_key: str, base_url: str = DEFAULT_BASE_URL) -> None:
self.api_key = api_key
self.headers = {"accept": "application/json", "content-type": "application/json"}

def _make_request(self, endpoint: str, data: Dict) -> Dict:
"""Make API request to Alchemy"""
url = f"{self.base_url}{endpoint.format(api_key=self.api_key)}"
def _make_request(self, url: str, data: Dict) -> Dict:
"""Make API request to Alchemy with exponential backoff for rate limits."""
max_retries = 3
base_delay = 1

try:
response = requests.post(url, json=data, headers=self.headers)
for attempt in range(max_retries + 1):
try:
response = requests.post(url, json=data, headers=self.headers)

if response.status_code >= 400:
raise ApiException(response)
if response.status_code != 429:
if response.status_code >= 400:
raise ApiException(response)
return response.json()

return response.json()
if attempt == max_retries:
raise ApiException(response)

except Exception:
logger.exception("Error fetching data from Alchemy")
raise
delay = base_delay * (2**attempt)
time.sleep(delay)

except Exception:
if attempt < max_retries:
continue

raise RuntimeError("Max retries exceeded for Alchemy API request")

def get_historical_prices_by_symbol(
self, symbol: str, start_time: datetime, end_time: datetime, interval: str
Expand All @@ -80,7 +139,9 @@ def get_historical_prices_by_symbol(

# Prepare request data
data = {"symbol": symbol, "startTime": start_iso, "endTime": end_iso, "interval": interval}
response = self._make_request(self.ENDPOINT_TOKENS_HISTORICAL, data)
url = f"{self.base_url}{self.ENDPOINT_TOKENS_HISTORICAL.format(api_key=self.api_key)}"
response = self._make_request(url, data)

return HistoricalPriceBySymbol(**response)

def get_historical_prices_by_address(
Expand Down Expand Up @@ -114,9 +175,55 @@ def get_historical_prices_by_address(
"endTime": end_iso,
"interval": interval,
}
response = self._make_request(self.ENDPOINT_TOKENS_HISTORICAL, data)
url = f"{self.base_url}{self.ENDPOINT_TOKENS_HISTORICAL.format(api_key=self.api_key)}"
response = self._make_request(url, data)
return HistoricalPriceByAddress(**response)

def get_transfers(self, wallet: str, chain: str, incoming: bool = False) -> List[Transfer]:
"""Fetch raw ERC20 token transfer data from Alchemy API for a given wallet and chain."""

address_key = "toAddress" if incoming else "fromAddress"
payload = {
"id": 1,
"jsonrpc": "2.0",
"method": "alchemy_getAssetTransfers",
"params": [
{
"fromBlock": "0x0",
"toBlock": "latest",
address_key: wallet.lower(),
"category": ["erc20"],
"order": "asc",
"withMetadata": True,
"excludeZeroValue": True,
"maxCount": "0x3e8",
}
],
}

data = self._make_request(url=self.network_url(chain=chain), data=payload)
# Validation of the response structure
result = data.get("result")
if result is None or not isinstance(result, dict):
raise RuntimeError("Alchemy response JSON does not contain a 'result' object.")

transfers = result.get("transfers")
if transfers is None or not isinstance(transfers, list):
raise RuntimeError("Alchemy response JSON does not contain a 'result.transfers' list.")

parsed_transfers = [Transfer(**transfer) for transfer in transfers]
return parsed_transfers

def network_url(self, chain: str) -> str:
if chain == "ethereum":
return self.DEFAULT_NETWORK_URL.format(network="eth-mainnet", api_key=self.api_key)
elif chain == "base":
return self.DEFAULT_NETWORK_URL.format(network="base-mainnet", api_key=self.api_key)
elif chain == "ethereum_sepolia":
return self.DEFAULT_NETWORK_URL.format(network="eth-sepolia", api_key=self.api_key)
else:
raise ValueError(f"Unsupported chain {chain}")

@staticmethod
def from_env() -> AlchemyClient:
api_key = os.getenv("ALCHEMY_API_KEY")
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/services/alchemy/test_alchemy_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from datetime import datetime, timedelta, timezone

import pytest

from alphaswarm.config import ChainConfig, Config
from alphaswarm.services.alchemy.alchemy_client import AlchemyClient


@pytest.fixture
def eth_sepolia_config(default_config: Config) -> ChainConfig:
return default_config.get_chain_config(chain="ethereum_sepolia")


def test_historical_prices_by_symbol(alchemy_client: AlchemyClient) -> None:
end = datetime.now(timezone.utc)
start = end - timedelta(days=1)
Expand Down Expand Up @@ -31,3 +40,25 @@ def test_historical_prices_by_address(alchemy_client: AlchemyClient) -> None:
assert 24 <= len(result.data) <= 25
assert result.data[0].value > 0.1
assert result.data[0].timestamp >= start


@pytest.mark.skip("Needs a wallet")
def test_get_incoming_transfer(alchemy_client: AlchemyClient, eth_sepolia_config: ChainConfig) -> None:
# Test outgoing transfers
transfers = alchemy_client.get_transfers(
wallet=eth_sepolia_config.wallet_address, chain=eth_sepolia_config.chain, incoming=False
)

assert len(transfers) > 0
assert transfers[0].from_address.lower() == eth_sepolia_config.wallet_address.lower()


@pytest.mark.skip("Needs a wallet")
def test_get_outcoming_transfer(alchemy_client: AlchemyClient, eth_sepolia_config: ChainConfig) -> None:
# Test outgoing transfers
transfers = alchemy_client.get_transfers(
wallet=eth_sepolia_config.wallet_address, chain=eth_sepolia_config.chain, incoming=True
)

assert len(transfers) > 0
assert transfers[0].to_address.lower() == eth_sepolia_config.wallet_address.lower()

0 comments on commit 76839e1

Please sign in to comment.