diff --git a/beacon/syncmanager.go b/beacon/syncmanager.go index b83b22f..c56696d 100644 --- a/beacon/syncmanager.go +++ b/beacon/syncmanager.go @@ -2,6 +2,7 @@ package beacon import ( "bytes" + "github.com/golang/protobuf/proto" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -43,6 +44,8 @@ func (s *SyncManager) PeerConnected(id peer.ID, dir network.Direction) { if dir == network.DirInbound { genesisHash := s.blockchain.GenesisHash() + logrus.WithField("to", id).Info("sending version message") + err := s.protocol.SendMessage(id, &pb.BeaconVersionMessage{ Version: syncVersion, GenesisHash: genesisHash[:], @@ -485,15 +488,16 @@ func (s *SyncManager) handleReceivedBlock(block *primitives.Block, peerFrom peer "slotTrying": block.BlockHeader.SlotNumber, }).Info("requesting parent block") - // request all blocks up to this block - err := s.protocol.SendMessage(peerFrom, &pb.GetBlocksMessage{ - LocatorHashes: s.blockchain.View.Chain.GetChainLocator(), - HashStop: blockHash[:], - }) - if err != nil { - return err + for _, p := range s.hostNode.GetPeerList() { + // request all blocks up to this block + err := s.protocol.SendMessage(p, &pb.GetBlocksMessage{ + LocatorHashes: s.blockchain.View.Chain.GetChainLocator(), + HashStop: blockHash[:], + }) + if err != nil { + return err + } } - } else { logrus.WithField("slot", block.BlockHeader.SlotNumber).Debug("processing") output, newState, err := s.blockchain.ProcessBlock(block, true, verifySignature) diff --git a/go.sum b/go.sum index cc6ae2c..23ec7bf 100644 --- a/go.sum +++ b/go.sum @@ -399,6 +399,7 @@ github.com/libp2p/go-libp2p-autonat-svc v0.0.2/go.mod h1:j4iMiw0d3diRm5iB0noXumt github.com/libp2p/go-libp2p-autonat-svc v0.1.0/go.mod h1:fqi8Obl/z3R4PFVLm8xFtZ6PBL9MlV/xumymRFkKq5A= github.com/libp2p/go-libp2p-blankhost v0.0.1/go.mod h1:Ibpbw/7cPPYwFb7PACIWdvxxv0t0XCCI10t7czjAjTc= github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro= +github.com/libp2p/go-libp2p-blankhost v0.1.3 h1:0KycuXvPDhmehw0ASsg+s1o3IfXgCUDqfzAl94KEBOg= github.com/libp2p/go-libp2p-blankhost v0.1.3/go.mod h1:KML1//wiKR8vuuJO0y3LUd1uLv+tlkGTAr3jC0S5cLg= github.com/libp2p/go-libp2p-circuit v0.0.1/go.mod h1:Dqm0s/BiV63j8EEAs8hr1H5HudqvCAeXxDyic59lCwE= github.com/libp2p/go-libp2p-circuit v0.0.9/go.mod h1:uU+IBvEQzCu953/ps7bYzC/D/R0Ho2A9LfKVVCatlqU= @@ -456,6 +457,7 @@ github.com/libp2p/go-libp2p-net v0.0.0-20190226201932-e71fff5ba6e9/go.mod h1:8W6 github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c= github.com/libp2p/go-libp2p-net v0.0.2/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c= github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q= +github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ= github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU= github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo= github.com/libp2p/go-libp2p-peer v0.1.1/go.mod h1:jkF12jGB4Gk/IOo+yomm+7oLWxF278F7UnrYUQ1Q8es= @@ -502,6 +504,7 @@ github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MB github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= +github.com/libp2p/go-libp2p-testing v0.1.0 h1:WaFRj/t3HdMZGNZqnU2pS7pDRBmMeoDx7/HDNpeyT9U= github.com/libp2p/go-libp2p-testing v0.1.0/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0= github.com/libp2p/go-libp2p-tls v0.1.0/go.mod h1:VZdoSWQDeNpIIAFJFv+6uqTqpnIIDHcqZQSTC/A1TT0= github.com/libp2p/go-libp2p-transport v0.0.0-20190226201958-e8580c8a519d/go.mod h1:lcwgOszllbhvQXul37Kv5YbSYXPoUhRB2Z+Nr3jaBmo= diff --git a/integration/phore/framework/beaconnode.py b/integration/phore/framework/beaconnode.py index 050dae1..24aa633 100644 --- a/integration/phore/framework/beaconnode.py +++ b/integration/phore/framework/beaconnode.py @@ -20,7 +20,7 @@ def __init__(self, index: int, data_directory: str, p2p_port: int, rpc_port: int self.p2p_port = p2p_port self.rpc_port = rpc_port self.connect = [] - self.level = "debug" + self.level = "info" self.index = index self.beacon_executable = tester.get_phore_path("synapsebeacon") self.genesis_time = "+5" diff --git a/integration/phore/framework/relayernode.py b/integration/phore/framework/relayernode.py index 4dce6c7..b32e051 100644 --- a/integration/phore/framework/relayernode.py +++ b/integration/phore/framework/relayernode.py @@ -16,7 +16,7 @@ class RelayerConfig: def __init__(self, shard_port: str): self.p2p_port = -1 self.rpc_port = -1 - self.level = "debug" + self.level = "info" self.index = -1 self.shards = [] self.relayer_executable = tester.get_phore_path("synapserelayer") diff --git a/integration/phore/framework/shardnode.py b/integration/phore/framework/shardnode.py index 9222220..6a4fee1 100644 --- a/integration/phore/framework/shardnode.py +++ b/integration/phore/framework/shardnode.py @@ -16,7 +16,7 @@ class ShardConfig: def __init__(self, beacon_port: str): self.beacon_port = beacon_port - self.level = "trace" + self.level = "info" self.index = -1 self.rpc_port = -1 self.p2p_port = -1 diff --git a/integration/phore/framework/validatornode.py b/integration/phore/framework/validatornode.py index a7678f8..7c15e12 100644 --- a/integration/phore/framework/validatornode.py +++ b/integration/phore/framework/validatornode.py @@ -16,7 +16,7 @@ def __init__(self, beacon_port: int, shard_port: int, validators: str = "0-255") self.rootkey = "testnet" self.validators = validators self.validator_executable = tester.get_phore_path("synapsevalidator") - self.level = "trace" + self.level = "info" self.rpc_port = -1 def get_args(self) -> typing.List[str]: diff --git a/integration/phore/tests/synctestchain.py b/integration/phore/tests/synctestchain.py new file mode 100644 index 0000000..658e89b --- /dev/null +++ b/integration/phore/tests/synctestchain.py @@ -0,0 +1,54 @@ +from phore.framework import tester, validatornode, shardnode, beaconnode +from phore.pb import common_pb2 + +def connect_nodes(node1: beaconnode, node2: beaconnode): + addr = node1.get_listening_addresses().Addresses[0] + node2.connect(common_pb2.ConnectMessage(Address=addr)) + +class SyncTestChain(tester.Tester): + """ + This package connects 4 nodes in a chain and ensures that if the block + producer is on one end, the node on the other end stays in sync. + """ + def __init__(self): + super().__init__() + + def _do_run(self): + beacon_nodes = [self.create_beacon_node() for _ in range(4)] + + beacon_nodes[0].start() + beacon_nodes[1].start() + beacon_nodes[2].start() + beacon_nodes[3].start() + + beacon_nodes[0].wait_for_rpc() + beacon_nodes[1].wait_for_rpc() + beacon_nodes[2].wait_for_rpc() + beacon_nodes[3].wait_for_rpc() + + addrs = beacon_nodes[0].get_listening_addresses().Addresses + + shard_node = self.create_shard_node(shardnode.ShardConfig.from_beacon(beacon_nodes[0])) + shard_node.start() + shard_node.wait_for_rpc() + + validator_node = self.create_validator_node( + validatornode.ValidatorConfig.from_beacon_and_shard(beacon_nodes[0], shard_node, "0-255") + ) + validator_node.start() + validator_node.wait_for_rpc() + + beacon_nodes[0].wait_for_slot(8) + + connect_nodes(beacon_nodes[0], beacon_nodes[1]) + connect_nodes(beacon_nodes[1], beacon_nodes[2]) + connect_nodes(beacon_nodes[2], beacon_nodes[3]) + + beacon_nodes[3].wait_for_slot(16) + + self.reset() + + +ex = SyncTestChain() + +ex.run() diff --git a/p2p/connectionmanager.go b/p2p/connectionmanager.go index 141406d..b388c13 100644 --- a/p2p/connectionmanager.go +++ b/p2p/connectionmanager.go @@ -46,7 +46,7 @@ func NewConnectionManagerOptions() ConnectionManagerOptions { // ConnectionManager is the service to discover other peers. type ConnectionManager struct { host *HostNode - options ConnectionManagerOptions + Options ConnectionManagerOptions ctx context.Context p2pDiscovery *routingdiscovery.RoutingDiscovery @@ -86,7 +86,7 @@ func NewConnectionManager(ctx context.Context, host *HostNode, discoveryOptions return &ConnectionManager{ host: host, ctx: ctx, - options: discoveryOptions, + Options: discoveryOptions, p2pDiscovery: routingdiscovery.NewRoutingDiscovery(routing), protocolConfiguration: make(map[protocol.ID]*ProtocolHandler), lastConnect: make(map[peer.ID]time.Time), diff --git a/p2p/protocolhandler.go b/p2p/protocolhandler.go index f9313b2..cc04b91 100644 --- a/p2p/protocolhandler.go +++ b/p2p/protocolhandler.go @@ -133,12 +133,15 @@ func (p *ProtocolHandler) HandlePeerFound(pi peer.AddrInfo) { // findPeers looks for peers advertising our protocol ID and connects to them if needed. func (p *ProtocolHandler) findPeers() { - service, err := mdnsdiscovery.NewMdnsService(p.ctx, p.host.GetHost(), time.Minute*3, fmt.Sprintf("phore-%s-discovery._udp", p.ID)) - if err != nil { - logrus.Warn(err) - } - service.RegisterNotifee(p) + if p.connManager.Options.MDNS.Enabled { + service, err := mdnsdiscovery.NewMdnsService(p.ctx, p.host.GetHost(), p.connManager.Options.MDNS.Interval, fmt.Sprintf("phore-%s-discovery._udp", p.ID)) + if err != nil { + logrus.Warn(err) + } + + service.RegisterNotifee(p) + } for { findPeerCtx, cancel := context.WithTimeout(p.ctx, findPeerCycle) @@ -230,10 +233,12 @@ func (p *ProtocolHandler) sendMessages(id peer.ID, w io.Writer) { } func (p *ProtocolHandler) handleStream(s network.Stream) { - go p.receiveMessages(s.Conn().RemotePeer(), s) - p.sendMessages(s.Conn().RemotePeer(), s) + logrus.WithField("from", s.Conn().RemotePeer()).Info("handling messages") + + go p.receiveMessages(s.Conn().RemotePeer(), s) + p.notifeeLock.Lock() for _, n := range p.notifees { n.PeerConnected(s.Conn().RemotePeer(), s.Stat().Direction)