Skip to content

Commit

Permalink
go/worker/common/p2p: Add support for persistent and blocked peers
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Apr 28, 2022
1 parent 1e4123a commit ca9a83f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .changelog/4713.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go/worker/common/p2p: Add support for persistent and blocked peers

7 changes: 7 additions & 0 deletions go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const (
CfgP2PMaxNumPeers = "worker.p2p.max_num_peers"
// CfgP2PPeerGracePeriod is the peer grace period.
CfgP2PPeerGracePeriod = "worker.p2p.peer_grace_period"

// CfgP2PBlockedPeerIPs is a list of blocked peer IP addresses.
CfgP2PBlockedPeerIPs = "worker.p2p.blocked_peers"
// CfgP2PPersistentPeers is a list of persistent peer node P2P IDs.
CfgP2PPersistentPeers = "worker.p2p.persistent_peers"
)

// Flags has the configuration flags.
Expand All @@ -45,6 +50,8 @@ func init() {
Flags.Float64(CfgP2PConnectednessLowWater, 0.2, "Set the low water mark at which the peer manager will try to reconnect to peers")
Flags.Uint32(CfgP2PMaxNumPeers, 100, "Set maximum number of P2P peers")
Flags.Duration(CfgP2PPeerGracePeriod, 20*time.Second, "Time duration for new peer connections to be immune from pruning")
Flags.StringSlice(CfgP2PBlockedPeerIPs, []string{}, "List of blocked peer IPs")
Flags.StringSlice(CfgP2PPersistentPeers, []string{}, "List of persistent peer node P2P IDs")

_ = viper.BindPFlags(Flags)
}
28 changes: 27 additions & 1 deletion go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,19 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
return nil, fmt.Errorf("worker/common/p2p: failed to create connection gater: %w", err)
}

// Block peers specified in the blacklist.
blacklist := viper.GetStringSlice(CfgP2PBlockedPeerIPs)
for _, blockedIP := range blacklist {
parsedIP := net.ParseIP(blockedIP)
if parsedIP == nil {
return nil, fmt.Errorf("worker/common/p2p: malformed blocked IP: %s", blockedIP)
}

if grr := cg.BlockAddr(parsedIP); grr != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to block IP (%s): %w", blockedIP, err)
}
}

// Create the P2P host.
host, err := libp2p.New(
libp2p.UserAgent(fmt.Sprintf("oasis-core/%s", version.SoftwareVersion)),
Expand Down Expand Up @@ -396,8 +409,15 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
return nil, fmt.Errorf("worker/common/p2p: failed to get consensus chain context: %w", err)
}

pm, err := newPeerManager(ctx, host, cg, consensus)
if err != nil {
ctxCancel()
_ = host.Close()
return nil, fmt.Errorf("worker/common/p2p: cannot create peer manager: %w", err)
}

p := &P2P{
PeerManager: newPeerManager(ctx, host, cg, consensus),
PeerManager: pm,
ctxCancel: ctxCancel,
quitCh: make(chan struct{}),
chainContext: chainContext,
Expand All @@ -412,5 +432,11 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
"address", fmt.Sprintf("%+v", host.Addrs()),
)

if len(blacklist) > 0 {
p.logger.Info("p2p blacklist initialized",
"num_blocked_peers", len(blacklist),
)
}

return p, nil
}
47 changes: 33 additions & 14 deletions go/worker/common/p2p/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type PeerManager struct {
host core.Host
cg *conngater.BasicConnectionGater

peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
persistentPeers map[core.PeerID]bool

initCh chan struct{}
initOnce sync.Once
Expand Down Expand Up @@ -131,12 +132,15 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) {
newNodes[peerID] = node
}

// Remove existing peers that are not in the new node list.
// Remove existing peers that are not in the new node list
// and not in the persistent peer list.
for peerID := range mgr.peers {
node := newNodes[peerID]
if node == nil {
mgr.removePeerLocked(peerID)
continue
if _, persistent := mgr.persistentPeers[peerID]; !persistent {
mgr.removePeerLocked(peerID)
continue
}
}
}

Expand Down Expand Up @@ -361,18 +365,33 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) {
}
}

func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend) *PeerManager {
func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend) (*PeerManager, error) {
mgr := &PeerManager{
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
persistentPeers: make(map[core.PeerID]bool),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
}

for _, pp := range viper.GetStringSlice(CfgP2PPersistentPeers) {
var pk signature.PublicKey
if grr := pk.UnmarshalText([]byte(pp)); grr != nil {
return nil, fmt.Errorf("invalid persistent peer address (%s): %w", pp, grr)
}
pid, err := PublicKeyToPeerID(pk)
if err != nil {
return nil, fmt.Errorf("invalid persistent peer public key (%s): %w", pp, err)
}
mgr.persistentPeers[pid] = true
}

go mgr.watchRegistryNodes(consensus)
return mgr

return mgr, nil
}

func (p *p2pPeer) connectWorker(mgr *PeerManager, peerID core.PeerID) {
Expand Down

0 comments on commit ca9a83f

Please sign in to comment.