Skip to content

Commit

Permalink
Add sync chain test
Browse files Browse the repository at this point in the history
  • Loading branch information
meyer9 committed Mar 16, 2020
1 parent cae0067 commit 4754a13
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 21 deletions.
20 changes: 12 additions & 8 deletions beacon/syncmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package beacon

import (
"bytes"

"github.com/golang/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -43,6 +44,8 @@ func (s *SyncManager) PeerConnected(id peer.ID, dir network.Direction) {
if dir == network.DirInbound {
genesisHash := s.blockchain.GenesisHash()

logrus.WithField("to", id).Info("sending version message")

err := s.protocol.SendMessage(id, &pb.BeaconVersionMessage{
Version: syncVersion,
GenesisHash: genesisHash[:],
Expand Down Expand Up @@ -485,15 +488,16 @@ func (s *SyncManager) handleReceivedBlock(block *primitives.Block, peerFrom peer
"slotTrying": block.BlockHeader.SlotNumber,
}).Info("requesting parent block")

// request all blocks up to this block
err := s.protocol.SendMessage(peerFrom, &pb.GetBlocksMessage{
LocatorHashes: s.blockchain.View.Chain.GetChainLocator(),
HashStop: blockHash[:],
})
if err != nil {
return err
for _, p := range s.hostNode.GetPeerList() {
// request all blocks up to this block
err := s.protocol.SendMessage(p, &pb.GetBlocksMessage{
LocatorHashes: s.blockchain.View.Chain.GetChainLocator(),
HashStop: blockHash[:],
})
if err != nil {
return err
}
}

} else {
logrus.WithField("slot", block.BlockHeader.SlotNumber).Debug("processing")
output, newState, err := s.blockchain.ProcessBlock(block, true, verifySignature)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ github.com/libp2p/go-libp2p-autonat-svc v0.0.2/go.mod h1:j4iMiw0d3diRm5iB0noXumt
github.com/libp2p/go-libp2p-autonat-svc v0.1.0/go.mod h1:fqi8Obl/z3R4PFVLm8xFtZ6PBL9MlV/xumymRFkKq5A=
github.com/libp2p/go-libp2p-blankhost v0.0.1/go.mod h1:Ibpbw/7cPPYwFb7PACIWdvxxv0t0XCCI10t7czjAjTc=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
github.com/libp2p/go-libp2p-blankhost v0.1.3 h1:0KycuXvPDhmehw0ASsg+s1o3IfXgCUDqfzAl94KEBOg=
github.com/libp2p/go-libp2p-blankhost v0.1.3/go.mod h1:KML1//wiKR8vuuJO0y3LUd1uLv+tlkGTAr3jC0S5cLg=
github.com/libp2p/go-libp2p-circuit v0.0.1/go.mod h1:Dqm0s/BiV63j8EEAs8hr1H5HudqvCAeXxDyic59lCwE=
github.com/libp2p/go-libp2p-circuit v0.0.9/go.mod h1:uU+IBvEQzCu953/ps7bYzC/D/R0Ho2A9LfKVVCatlqU=
Expand Down Expand Up @@ -456,6 +457,7 @@ github.com/libp2p/go-libp2p-net v0.0.0-20190226201932-e71fff5ba6e9/go.mod h1:8W6
github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
github.com/libp2p/go-libp2p-net v0.0.2/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q=
github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ=
github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU=
github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo=
github.com/libp2p/go-libp2p-peer v0.1.1/go.mod h1:jkF12jGB4Gk/IOo+yomm+7oLWxF278F7UnrYUQ1Q8es=
Expand Down Expand Up @@ -502,6 +504,7 @@ github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MB
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.1.0 h1:WaFRj/t3HdMZGNZqnU2pS7pDRBmMeoDx7/HDNpeyT9U=
github.com/libp2p/go-libp2p-testing v0.1.0/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0=
github.com/libp2p/go-libp2p-tls v0.1.0/go.mod h1:VZdoSWQDeNpIIAFJFv+6uqTqpnIIDHcqZQSTC/A1TT0=
github.com/libp2p/go-libp2p-transport v0.0.0-20190226201958-e8580c8a519d/go.mod h1:lcwgOszllbhvQXul37Kv5YbSYXPoUhRB2Z+Nr3jaBmo=
Expand Down
2 changes: 1 addition & 1 deletion integration/phore/framework/beaconnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, index: int, data_directory: str, p2p_port: int, rpc_port: int
self.p2p_port = p2p_port
self.rpc_port = rpc_port
self.connect = []
self.level = "debug"
self.level = "info"
self.index = index
self.beacon_executable = tester.get_phore_path("synapsebeacon")
self.genesis_time = "+5"
Expand Down
2 changes: 1 addition & 1 deletion integration/phore/framework/relayernode.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class RelayerConfig:
def __init__(self, shard_port: str):
self.p2p_port = -1
self.rpc_port = -1
self.level = "debug"
self.level = "info"
self.index = -1
self.shards = []
self.relayer_executable = tester.get_phore_path("synapserelayer")
Expand Down
2 changes: 1 addition & 1 deletion integration/phore/framework/shardnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class ShardConfig:
def __init__(self, beacon_port: str):
self.beacon_port = beacon_port
self.level = "trace"
self.level = "info"
self.index = -1
self.rpc_port = -1
self.p2p_port = -1
Expand Down
2 changes: 1 addition & 1 deletion integration/phore/framework/validatornode.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, beacon_port: int, shard_port: int, validators: str = "0-255")
self.rootkey = "testnet"
self.validators = validators
self.validator_executable = tester.get_phore_path("synapsevalidator")
self.level = "trace"
self.level = "info"
self.rpc_port = -1

def get_args(self) -> typing.List[str]:
Expand Down
54 changes: 54 additions & 0 deletions integration/phore/tests/synctestchain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from phore.framework import tester, validatornode, shardnode, beaconnode
from phore.pb import common_pb2

def connect_nodes(node1: beaconnode, node2: beaconnode):
addr = node1.get_listening_addresses().Addresses[0]
node2.connect(common_pb2.ConnectMessage(Address=addr))

class SyncTestChain(tester.Tester):
"""
This package connects 4 nodes in a chain and ensures that if the block
producer is on one end, the node on the other end stays in sync.
"""
def __init__(self):
super().__init__()

def _do_run(self):
beacon_nodes = [self.create_beacon_node() for _ in range(4)]

beacon_nodes[0].start()
beacon_nodes[1].start()
beacon_nodes[2].start()
beacon_nodes[3].start()

beacon_nodes[0].wait_for_rpc()
beacon_nodes[1].wait_for_rpc()
beacon_nodes[2].wait_for_rpc()
beacon_nodes[3].wait_for_rpc()

addrs = beacon_nodes[0].get_listening_addresses().Addresses

shard_node = self.create_shard_node(shardnode.ShardConfig.from_beacon(beacon_nodes[0]))
shard_node.start()
shard_node.wait_for_rpc()

validator_node = self.create_validator_node(
validatornode.ValidatorConfig.from_beacon_and_shard(beacon_nodes[0], shard_node, "0-255")
)
validator_node.start()
validator_node.wait_for_rpc()

beacon_nodes[0].wait_for_slot(8)

connect_nodes(beacon_nodes[0], beacon_nodes[1])
connect_nodes(beacon_nodes[1], beacon_nodes[2])
connect_nodes(beacon_nodes[2], beacon_nodes[3])

beacon_nodes[3].wait_for_slot(16)

self.reset()


ex = SyncTestChain()

ex.run()
4 changes: 2 additions & 2 deletions p2p/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewConnectionManagerOptions() ConnectionManagerOptions {
// ConnectionManager is the service to discover other peers.
type ConnectionManager struct {
host *HostNode
options ConnectionManagerOptions
Options ConnectionManagerOptions
ctx context.Context
p2pDiscovery *routingdiscovery.RoutingDiscovery

Expand Down Expand Up @@ -86,7 +86,7 @@ func NewConnectionManager(ctx context.Context, host *HostNode, discoveryOptions
return &ConnectionManager{
host: host,
ctx: ctx,
options: discoveryOptions,
Options: discoveryOptions,
p2pDiscovery: routingdiscovery.NewRoutingDiscovery(routing),
protocolConfiguration: make(map[protocol.ID]*ProtocolHandler),
lastConnect: make(map[peer.ID]time.Time),
Expand Down
19 changes: 12 additions & 7 deletions p2p/protocolhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ func (p *ProtocolHandler) HandlePeerFound(pi peer.AddrInfo) {

// findPeers looks for peers advertising our protocol ID and connects to them if needed.
func (p *ProtocolHandler) findPeers() {
service, err := mdnsdiscovery.NewMdnsService(p.ctx, p.host.GetHost(), time.Minute*3, fmt.Sprintf("phore-%s-discovery._udp", p.ID))
if err != nil {
logrus.Warn(err)
}

service.RegisterNotifee(p)
if p.connManager.Options.MDNS.Enabled {
service, err := mdnsdiscovery.NewMdnsService(p.ctx, p.host.GetHost(), p.connManager.Options.MDNS.Interval, fmt.Sprintf("phore-%s-discovery._udp", p.ID))
if err != nil {
logrus.Warn(err)
}

service.RegisterNotifee(p)
}

for {
findPeerCtx, cancel := context.WithTimeout(p.ctx, findPeerCycle)
Expand Down Expand Up @@ -230,10 +233,12 @@ func (p *ProtocolHandler) sendMessages(id peer.ID, w io.Writer) {
}

func (p *ProtocolHandler) handleStream(s network.Stream) {
go p.receiveMessages(s.Conn().RemotePeer(), s)

p.sendMessages(s.Conn().RemotePeer(), s)

logrus.WithField("from", s.Conn().RemotePeer()).Info("handling messages")

go p.receiveMessages(s.Conn().RemotePeer(), s)

p.notifeeLock.Lock()
for _, n := range p.notifees {
n.PeerConnected(s.Conn().RemotePeer(), s.Stat().Direction)
Expand Down

0 comments on commit 4754a13

Please sign in to comment.