diff --git a/p2p/peer_pool.py b/p2p/peer_pool.py index ab442b340c..5b60ae5616 100644 --- a/p2p/peer_pool.py +++ b/p2p/peer_pool.py @@ -242,12 +242,16 @@ def unsubscribe(self, subscriber: PeerSubscriber) -> None: async def start_peer(self, peer: BasePeer) -> None: self.run_child_service(peer) await self.wait(peer.events.started.wait(), timeout=1) + if peer.is_operational: + self._add_peer(peer, ()) + else: + self.logger.debug("%s was cancelled immediately, not adding to pool", peer) + try: - with peer.collect_sub_proto_messages() as buffer: - await self.wait( - peer.boot_manager.events.finished.wait(), - timeout=self._peer_boot_timeout - ) + await self.wait( + peer.boot_manager.events.finished.wait(), + timeout=self._peer_boot_timeout + ) except TimeoutError as err: self.logger.debug('Timout waiting for peer to boot: %s', err) await peer.disconnect(DisconnectReason.timeout) @@ -256,10 +260,8 @@ async def start_peer(self, peer: BasePeer) -> None: await self.connection_tracker.record_failure(peer.remote, err) raise else: - if peer.is_operational: - self._add_peer(peer, buffer.get_messages()) - else: - self.logger.debug('%s disconnected during boot-up, not adding to pool', peer) + if not peer.is_operational: + self.logger.debug('%s disconnected during boot-up, dropped from pool', peer) def _add_peer(self, peer: BasePeer, diff --git a/trinity/protocol/common/boot.py b/trinity/protocol/common/boot.py index 0f6c95a09b..3e56fba452 100644 --- a/trinity/protocol/common/boot.py +++ b/trinity/protocol/common/boot.py @@ -2,6 +2,7 @@ from eth_utils import ValidationError +from eth.rlp.headers import BlockHeader from eth.vm.forks import HomesteadVM from p2p.exceptions import PeerConnectionLost @@ -42,7 +43,8 @@ async def ensure_same_side_on_dao_fork(self) -> None: # VM comes after the fork, so stop checking break - start_block = vm_class.get_dao_fork_block_number() - 1 + dao_fork_num = vm_class.get_dao_fork_block_number() + start_block = dao_fork_num - 1 try: headers = await self.peer.requests.get_block_headers( # type: ignore @@ -62,9 +64,18 @@ async def ensure_same_side_on_dao_fork(self) -> None: ) from err if len(headers) != 2: - raise DAOForkCheckFailure( - f"{self.peer} failed to return DAO fork check headers" - ) + tip_header = await self._get_tip_header() + if tip_header.block_number < dao_fork_num: + self.logger.debug( + f"{self.peer} has tip {tip_header!r}, and returned {headers!r} " + "at DAO fork #{dao_fork_num}. Peer seems to be syncing..." + ) + return + else: + raise DAOForkCheckFailure( + f"{self.peer} has tip {tip_header!r}, but only returned {headers!r} " + "at DAO fork #{dao_fork_num}. Peer seems to be witholding DAO headers..." + ) else: parent, header = headers @@ -72,3 +83,27 @@ async def ensure_same_side_on_dao_fork(self) -> None: vm_class.validate_header(header, parent, check_seal=True) except ValidationError as err: raise DAOForkCheckFailure(f"{self.peer} failed DAO fork check validation: {err}") + + async def _get_tip_header(self) -> BlockHeader: + try: + headers = await self.peer.requests.get_block_headers( # type: ignore + self.peer.head_hash, + max_headers=1, + timeout=CHAIN_SPLIT_CHECK_TIMEOUT, + ) + + except (TimeoutError, PeerConnectionLost) as err: + raise DAOForkCheckFailure( + f"Timed out waiting for tip header from {self.peer}: {err}" + ) from err + except ValidationError as err: + raise DAOForkCheckFailure( + f"Invalid header response for tip header during DAO fork check: {err}" + ) from err + else: + if len(headers) != 1: + raise DAOForkCheckFailure( + f"{self.peer} returned {headers!r} when asked for tip" + ) + else: + return headers[0]