Skip to content

Commit

Permalink
go/worker/common/p2p: Add support for persistent and blocked peers
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed May 18, 2022
1 parent 6ef8411 commit db8fd41
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 14 deletions.
1 change: 1 addition & 0 deletions .changelog/4713.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/common/p2p: Add support for persistent and blocked peers
7 changes: 7 additions & 0 deletions go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const (
CfgP2PMaxNumPeers = "worker.p2p.max_num_peers"
// CfgP2PPeerGracePeriod is the peer grace period.
CfgP2PPeerGracePeriod = "worker.p2p.peer_grace_period"

// CfgP2PBlockedPeerIPs is a list of blocked peer IP addresses.
CfgP2PBlockedPeerIPs = "worker.p2p.blocked_peers"
// CfgP2PPersistentPeers is a list of persistent peer node addresses in format P2Ppubkey@IP:port.
CfgP2PPersistentPeers = "worker.p2p.persistent_peers"
)

// Flags has the configuration flags.
Expand All @@ -45,6 +50,8 @@ func init() {
Flags.Float64(CfgP2PConnectednessLowWater, 0.2, "Set the low water mark at which the peer manager will try to reconnect to peers")
Flags.Uint32(CfgP2PMaxNumPeers, 100, "Set maximum number of P2P peers")
Flags.Duration(CfgP2PPeerGracePeriod, 20*time.Second, "Time duration for new peer connections to be immune from pruning")
Flags.StringSlice(CfgP2PBlockedPeerIPs, []string{}, "List of blocked peer IPs")
Flags.StringSlice(CfgP2PPersistentPeers, []string{}, "List of persistent peer node addresses in format P2Ppubkey@IP:port")

_ = viper.BindPFlags(Flags)
}
79 changes: 78 additions & 1 deletion go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
Expand All @@ -20,6 +22,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/crypto/tuplehash"
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
Expand Down Expand Up @@ -362,6 +365,73 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
return nil, fmt.Errorf("worker/common/p2p: failed to create connection gater: %w", err)
}

// Block peers specified in the blacklist.
blacklist := viper.GetStringSlice(CfgP2PBlockedPeerIPs)
for _, blockedIP := range blacklist {
parsedIP := net.ParseIP(blockedIP)
if parsedIP == nil {
return nil, fmt.Errorf("worker/common/p2p: malformed blocked IP: %s", blockedIP)
}

if grr := cg.BlockAddr(parsedIP); grr != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to block IP (%s): %w", blockedIP, err)
}
}

// Maintain persistent peers.
persistentPeers := make(map[core.PeerID]bool)
persistentPeersAI := []peer.AddrInfo{}
for _, pp := range viper.GetStringSlice(CfgP2PPersistentPeers) {
// The persistent peer addresses are in the format P2Ppubkey@IP:port,
// because we use a similar format elsewhere and it's easier for users
// to understand than a multiaddr.

if strings.Count(pp, "@") != 1 || strings.Count(pp, ":") != 1 {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address (expected P2Ppubkey@IP:port)")
}

pkaddr := strings.Split(pp, "@")
pubkey := pkaddr[0]
addr := pkaddr[1]

var pk signature.PublicKey
if grr := pk.UnmarshalText([]byte(pubkey)); grr != nil {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address: cannot unmarshal P2P public key (%s): %w", pubkey, grr)
}
pid, grr := PublicKeyToPeerID(pk)
if grr != nil {
return nil, fmt.Errorf("worker/common/p2p: invalid persistent peer public key (%s): %w", pubkey, grr)
}

ipport := strings.Split(addr, ":")
ip := ipport[0]
port := ipport[1]

ma, grr := multiaddr.NewMultiaddr("/ip4/" + ip + "/tcp/" + port)
if grr != nil {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer IP address and/or port (%s): %w", addr, grr)
}

if _, exists := persistentPeers[pid]; exists {
// If we already have this peer ID, append to its addresses.
for _, p := range persistentPeersAI {
if p.ID == pid {
p.Addrs = append(p.Addrs, ma)
break
}
}
} else {
// Fresh entry.
ai := peer.AddrInfo{
ID: pid,
Addrs: []multiaddr.Multiaddr{ma},
}
persistentPeersAI = append(persistentPeersAI, ai)
}
persistentPeers[pid] = true
cm.Protect(pid, "")
}

// Create the P2P host.
host, err := libp2p.New(
libp2p.UserAgent(fmt.Sprintf("oasis-core/%s", version.SoftwareVersion)),
Expand All @@ -387,6 +457,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)),
pubsub.WithValidateThrottle(viper.GetInt(CfgP2PValidateThrottle)),
pubsub.WithMessageIdFn(messageIdFn),
pubsub.WithDirectPeers(persistentPeersAI),
)
if err != nil {
ctxCancel()
Expand All @@ -402,7 +473,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
}

p := &P2P{
PeerManager: newPeerManager(ctx, host, cg, consensus),
PeerManager: newPeerManager(ctx, host, cg, consensus, persistentPeers),
ctxCancel: ctxCancel,
quitCh: make(chan struct{}),
chainContext: chainContext,
Expand All @@ -417,5 +488,11 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
"address", fmt.Sprintf("%+v", host.Addrs()),
)

if len(blacklist) > 0 {
p.logger.Info("p2p blacklist initialized",
"num_blocked_peers", len(blacklist),
)
}

return p, nil
}
37 changes: 24 additions & 13 deletions go/worker/common/p2p/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type PeerManager struct {
host core.Host
cg *conngater.BasicConnectionGater

peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
persistentPeers map[core.PeerID]bool

initCh chan struct{}
initOnce sync.Once
Expand Down Expand Up @@ -131,12 +132,15 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) {
newNodes[peerID] = node
}

// Remove existing peers that are not in the new node list.
// Remove existing peers that are not in the new node list
// and not in the persistent peer list.
for peerID := range mgr.peers {
node := newNodes[peerID]
if node == nil {
mgr.removePeerLocked(peerID)
continue
if _, persistent := mgr.persistentPeers[peerID]; !persistent {
mgr.removePeerLocked(peerID)
continue
}
}
}

Expand Down Expand Up @@ -361,17 +365,24 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) {
}
}

func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend) *PeerManager {
func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend, persistentPeers map[core.PeerID]bool) *PeerManager {
if persistentPeers == nil {
persistentPeers = make(map[core.PeerID]bool)
}

mgr := &PeerManager{
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
persistentPeers: persistentPeers,
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
}

go mgr.watchRegistryNodes(consensus)

return mgr
}

Expand Down

0 comments on commit db8fd41

Please sign in to comment.