Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait only one oracle during exit rotation #183

Merged
merged 1 commit into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 63 additions & 82 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
import logging
import random
import time
from urllib.parse import urljoin

import aiohttp
from eth_typing import BlockNumber, BLSPubkey
from sw_utils.decorators import retry_aiohttp_errors
from web3 import Web3
from web3.types import HexStr

Expand All @@ -17,16 +14,17 @@
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp, wait_block_finalization
from src.config.settings import (
DEFAULT_RETRY_TIME,
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY,
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
OUTDATED_SIGNATURES_URL_PATH,
settings,
)
from src.exits.consensus import get_validator_public_keys
from src.exits.execution import submit_exit_signatures
from src.exits.typings import SignatureRotationRequest
from src.exits.utils import send_signature_rotation_requests
from src.exits.utils import (
get_oracle_outdated_signatures_response,
send_signature_rotation_requests,
)
from src.validators.signing.local import get_exit_signature_shards
from src.validators.signing.remote import (
RemoteSignerConfiguration,
Expand All @@ -37,15 +35,52 @@
logger = logging.getLogger(__name__)


async def fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await _get_oracle_outdated_signatures_response(oracle_endpoint)
async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait oracles sync.
oracles = await get_oracles()
await _wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await _fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait oracles sync.
await _wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)


async def _fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
outdated_indexes = [val['index'] for val in response['validators']]

metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes


async def wait_oracles_signature_update(oracles: Oracles) -> None:
async def _wait_oracles_signature_update(oracles: Oracles) -> None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if not last_event:
return
Expand All @@ -54,20 +89,26 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None:
logger.info('Waiting for block %d finalization...', update_block)
await wait_block_finalization(update_block)

oracle_tasks = (
wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
oracle_tasks = {
asyncio.create_task(
_wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
)
)
for replicas in oracles.endpoints
for endpoint in replicas
)
await asyncio.gather(*oracle_tasks)
logger.info('All the oracles have fetched exit signatures update')
}
while oracle_tasks:
done, oracle_tasks = await asyncio.wait(oracle_tasks, return_when=asyncio.FIRST_COMPLETED)
if done:
for pending_task in oracle_tasks:
pending_task.cancel()
logger.info('Oracles have fetched exit signatures update')


async def wait_oracle_signature_update(
async def _wait_oracle_signature_update(
exit_signature_update_block: BlockNumber, oracle_endpoint: str, max_time: int | float = 0
) -> None:
"""
Expand All @@ -93,7 +134,7 @@ async def wait_oracle_signature_update(
)


async def update_exit_signatures(
async def _update_exit_signatures(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
oracles: Oracles,
Expand All @@ -104,15 +145,14 @@ async def update_exit_signatures(
outdated_indexes = outdated_indexes[:exit_rotation_batch_limit]

logger.info('Starting exit signature rotation for %d validators', len(outdated_indexes))

# pylint: disable=duplicate-code
validators = await get_validator_public_keys(outdated_indexes)
deadline = None
while True:
current_timestamp = get_current_timestamp()
if not deadline or deadline <= current_timestamp:
deadline = current_timestamp + oracles.signature_validity_period
oracles_request = await get_oracles_request(
oracles_request = await _get_oracles_request(
oracles=oracles,
keystores=keystores,
remote_signer_config=remote_signer_config,
Expand All @@ -134,37 +174,15 @@ async def update_exit_signatures(
return tx_hash


@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def _get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict:
"""
:param oracle_endpoint:
:return: Example response
```
{
"exit_signature_block_number": 100,
"validators": [{"index": 1}, ...]
}
```
"""
path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault)
url = urljoin(oracle_endpoint, path)

async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response:
response.raise_for_status()
data = await response.json()
return data


async def _fetch_exit_signature_block(oracle_endpoint: str) -> BlockNumber | None:
data = await _get_oracle_outdated_signatures_response(oracle_endpoint)
data = await get_oracle_outdated_signatures_response(oracle_endpoint)
block_number = data['exit_signature_block_number']
if block_number is None:
return None
return BlockNumber(block_number)


async def get_oracles_request(
async def _get_oracles_request(
oracles: Oracles,
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
Expand Down Expand Up @@ -209,40 +227,3 @@ async def get_oracles_request(
request.exit_signature_shards.append(shards.exit_signatures)

return request


async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait all oracles sync.
oracles = await get_oracles()
await wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait all oracles sync.
await wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)
10 changes: 5 additions & 5 deletions src/exits/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp
from src.config.settings import settings
from src.exits.tasks import get_oracles_request, wait_oracle_signature_update
from src.exits.tasks import _get_oracles_request, _wait_oracle_signature_update
from src.validators.signing.remote import RemoteSignerConfiguration
from src.validators.typings import ExitSignatureShards, Keystores

Expand All @@ -27,7 +27,7 @@ async def test_normal(self):
'src.exits.tasks._fetch_exit_signature_block', side_effect=[None, 1, 2, 3]
) as fetch_mock,
):
await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 4

Expand All @@ -41,7 +41,7 @@ async def test_timeout(self):
) as fetch_mock,
pytest.raises(asyncio.TimeoutError),
):
await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 2

Expand All @@ -66,7 +66,7 @@ async def test_local_keystores(
),
),
):
request = await get_oracles_request(
request = await _get_oracles_request(
oracles=oracles,
keystores=Keystores({test_validator_pubkey: test_validator_privkey}),
remote_signer_config=None,
Expand Down Expand Up @@ -108,7 +108,7 @@ async def test_remote_signer(
randint(0, int(1e6)): pubkey
for pubkey in remote_signer_config.pubkeys_to_shares.keys()
}
request = await get_oracles_request(
request = await _get_oracles_request(
oracles=oracles,
keystores=Keystores(dict()),
remote_signer_config=remote_signer_config,
Expand Down
29 changes: 28 additions & 1 deletion src/exits/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@

from src.common.typings import OracleApproval, Oracles, OraclesApproval
from src.common.utils import process_oracles_approvals
from src.config.settings import DEFAULT_RETRY_TIME, UPDATE_SIGNATURES_URL_PATH
from src.config.settings import (
DEFAULT_RETRY_TIME,
OUTDATED_SIGNATURES_URL_PATH,
UPDATE_SIGNATURES_URL_PATH,
settings,
)
from src.exits.typings import SignatureRotationRequest

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,3 +86,25 @@ async def send_signature_rotation_request(
signature=Web3.to_bytes(hexstr=data['signature']),
deadline=data['deadline'],
)


@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict:
"""
:param oracle_endpoint:
:return: Example response
```
{
"exit_signature_block_number": 100,
"validators": [{"index": 1}, ...]
}
```
"""
path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault)
url = urljoin(oracle_endpoint, path)

async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response:
response.raise_for_status()
data = await response.json()
return data