Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Connect to unsynced peers, and trinity peers #618

Merged
merged 2 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions p2p/peer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,16 @@ def unsubscribe(self, subscriber: PeerSubscriber) -> None:
async def start_peer(self, peer: BasePeer) -> None:
self.run_child_service(peer)
await self.wait(peer.events.started.wait(), timeout=1)
if peer.is_operational:
self._add_peer(peer, ())
else:
self.logger.debug("%s was cancelled immediately, not adding to pool", peer)

try:
with peer.collect_sub_proto_messages() as buffer:
await self.wait(
peer.boot_manager.events.finished.wait(),
timeout=self._peer_boot_timeout
)
await self.wait(
peer.boot_manager.events.finished.wait(),
timeout=self._peer_boot_timeout
)
except TimeoutError as err:
self.logger.debug('Timout waiting for peer to boot: %s', err)
await peer.disconnect(DisconnectReason.timeout)
Expand All @@ -256,10 +260,8 @@ async def start_peer(self, peer: BasePeer) -> None:
await self.connection_tracker.record_failure(peer.remote, err)
raise
else:
if peer.is_operational:
self._add_peer(peer, buffer.get_messages())
else:
self.logger.debug('%s disconnected during boot-up, not adding to pool', peer)
if not peer.is_operational:
self.logger.debug('%s disconnected during boot-up, dropped from pool', peer)

def _add_peer(self,
peer: BasePeer,
Expand Down
43 changes: 39 additions & 4 deletions trinity/protocol/common/boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from eth_utils import ValidationError

from eth.rlp.headers import BlockHeader
from eth.vm.forks import HomesteadVM

from p2p.exceptions import PeerConnectionLost
Expand Down Expand Up @@ -42,7 +43,8 @@ async def ensure_same_side_on_dao_fork(self) -> None:
# VM comes after the fork, so stop checking
break

start_block = vm_class.get_dao_fork_block_number() - 1
dao_fork_num = vm_class.get_dao_fork_block_number()
start_block = dao_fork_num - 1

try:
headers = await self.peer.requests.get_block_headers( # type: ignore
Expand All @@ -62,13 +64,46 @@ async def ensure_same_side_on_dao_fork(self) -> None:
) from err

if len(headers) != 2:
raise DAOForkCheckFailure(
f"{self.peer} failed to return DAO fork check headers"
)
tip_header = await self._get_tip_header()
if tip_header.block_number < dao_fork_num:
self.logger.debug(
f"{self.peer} has tip {tip_header!r}, and returned {headers!r} "
"at DAO fork #{dao_fork_num}. Peer seems to be syncing..."
)
return
else:
raise DAOForkCheckFailure(
f"{self.peer} has tip {tip_header!r}, but only returned {headers!r} "
"at DAO fork #{dao_fork_num}. Peer seems to be witholding DAO headers..."
)
else:
parent, header = headers

try:
vm_class.validate_header(header, parent, check_seal=True)
except ValidationError as err:
raise DAOForkCheckFailure(f"{self.peer} failed DAO fork check validation: {err}")

async def _get_tip_header(self) -> BlockHeader:
try:
headers = await self.peer.requests.get_block_headers( # type: ignore
self.peer.head_hash,
max_headers=1,
timeout=CHAIN_SPLIT_CHECK_TIMEOUT,
)

except (TimeoutError, PeerConnectionLost) as err:
raise DAOForkCheckFailure(
f"Timed out waiting for tip header from {self.peer}: {err}"
) from err
except ValidationError as err:
raise DAOForkCheckFailure(
f"Invalid header response for tip header during DAO fork check: {err}"
) from err
else:
if len(headers) != 1:
raise DAOForkCheckFailure(
f"{self.peer} returned {headers!r} when asked for tip"
)
else:
return headers[0]