diff --git a/.changelog/4713.feature.md b/.changelog/4713.feature.md new file mode 100644 index 00000000000..a27759d1c65 --- /dev/null +++ b/.changelog/4713.feature.md @@ -0,0 +1 @@ +go/worker/common/p2p: Add support for persistent and blocked peers diff --git a/go/worker/common/p2p/init.go b/go/worker/common/p2p/init.go index 33d3ed05014..ce3cf589ad9 100644 --- a/go/worker/common/p2p/init.go +++ b/go/worker/common/p2p/init.go @@ -30,6 +30,11 @@ const ( CfgP2PMaxNumPeers = "worker.p2p.max_num_peers" // CfgP2PPeerGracePeriod is the peer grace period. CfgP2PPeerGracePeriod = "worker.p2p.peer_grace_period" + + // CfgP2PBlockedPeerIPs is a list of blocked peer IP addresses. + CfgP2PBlockedPeerIPs = "worker.p2p.blocked_peers" + // CfgP2PPersistentPeers is a list of persistent peer node addresses in format P2Ppubkey@IP:port. + CfgP2PPersistentPeers = "worker.p2p.persistent_peers" ) // Flags has the configuration flags. @@ -45,6 +50,8 @@ func init() { Flags.Float64(CfgP2PConnectednessLowWater, 0.2, "Set the low water mark at which the peer manager will try to reconnect to peers") Flags.Uint32(CfgP2PMaxNumPeers, 100, "Set maximum number of P2P peers") Flags.Duration(CfgP2PPeerGracePeriod, 20*time.Second, "Time duration for new peer connections to be immune from pruning") + Flags.StringSlice(CfgP2PBlockedPeerIPs, []string{}, "List of blocked peer IPs") + Flags.StringSlice(CfgP2PPersistentPeers, []string{}, "List of persistent peer node addresses in format P2Ppubkey@IP:port") _ = viper.BindPFlags(Flags) } diff --git a/go/worker/common/p2p/p2p.go b/go/worker/common/p2p/p2p.go index d76011b937d..fb73b15bcab 100644 --- a/go/worker/common/p2p/p2p.go +++ b/go/worker/common/p2p/p2p.go @@ -5,11 +5,13 @@ import ( "context" "fmt" "net" + "strings" "sync" "time" "github.com/libp2p/go-libp2p" core "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/p2p/net/conngater" @@ -20,6 +22,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/crypto/tuplehash" "github.com/oasisprotocol/oasis-core/go/common/identity" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -362,6 +365,73 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) return nil, fmt.Errorf("worker/common/p2p: failed to create connection gater: %w", err) } + // Block peers specified in the blacklist. + blacklist := viper.GetStringSlice(CfgP2PBlockedPeerIPs) + for _, blockedIP := range blacklist { + parsedIP := net.ParseIP(blockedIP) + if parsedIP == nil { + return nil, fmt.Errorf("worker/common/p2p: malformed blocked IP: %s", blockedIP) + } + + if grr := cg.BlockAddr(parsedIP); grr != nil { + return nil, fmt.Errorf("worker/common/p2p: failed to block IP (%s): %w", blockedIP, err) + } + } + + // Maintain persistent peers. + persistentPeers := make(map[core.PeerID]bool) + persistentPeersAI := []peer.AddrInfo{} + for _, pp := range viper.GetStringSlice(CfgP2PPersistentPeers) { + // The persistent peer addresses are in the format P2Ppubkey@IP:port, + // because we use a similar format elsewhere and it's easier for users + // to understand than a multiaddr. + + if strings.Count(pp, "@") != 1 || strings.Count(pp, ":") != 1 { + return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address (expected P2Ppubkey@IP:port)") + } + + pkaddr := strings.Split(pp, "@") + pubkey := pkaddr[0] + addr := pkaddr[1] + + var pk signature.PublicKey + if grr := pk.UnmarshalText([]byte(pubkey)); grr != nil { + return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address: cannot unmarshal P2P public key (%s): %w", pubkey, grr) + } + pid, grr := PublicKeyToPeerID(pk) + if grr != nil { + return nil, fmt.Errorf("worker/common/p2p: invalid persistent peer public key (%s): %w", pubkey, grr) + } + + ipport := strings.Split(addr, ":") + ip := ipport[0] + port := ipport[1] + + ma, grr := multiaddr.NewMultiaddr("/ip4/" + ip + "/tcp/" + port) + if grr != nil { + return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer IP address and/or port (%s): %w", addr, grr) + } + + if _, exists := persistentPeers[pid]; exists { + // If we already have this peer ID, append to its addresses. + for _, p := range persistentPeersAI { + if p.ID == pid { + p.Addrs = append(p.Addrs, ma) + break + } + } + } else { + // Fresh entry. + ai := peer.AddrInfo{ + ID: pid, + Addrs: []multiaddr.Multiaddr{ma}, + } + persistentPeersAI = append(persistentPeersAI, ai) + } + persistentPeers[pid] = true + cm.Protect(pid, "") + } + // Create the P2P host. host, err := libp2p.New( libp2p.UserAgent(fmt.Sprintf("oasis-core/%s", version.SoftwareVersion)), @@ -387,6 +457,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)), pubsub.WithValidateThrottle(viper.GetInt(CfgP2PValidateThrottle)), pubsub.WithMessageIdFn(messageIdFn), + pubsub.WithDirectPeers(persistentPeersAI), ) if err != nil { ctxCancel() @@ -402,7 +473,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) } p := &P2P{ - PeerManager: newPeerManager(ctx, host, cg, consensus), + PeerManager: newPeerManager(ctx, host, cg, consensus, persistentPeers), ctxCancel: ctxCancel, quitCh: make(chan struct{}), chainContext: chainContext, @@ -417,5 +488,11 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) "address", fmt.Sprintf("%+v", host.Addrs()), ) + if len(blacklist) > 0 { + p.logger.Info("p2p blacklist initialized", + "num_blocked_peers", len(blacklist), + ) + } + return p, nil } diff --git a/go/worker/common/p2p/peermgmt.go b/go/worker/common/p2p/peermgmt.go index 65ee1ae3d9d..148e209d0ec 100644 --- a/go/worker/common/p2p/peermgmt.go +++ b/go/worker/common/p2p/peermgmt.go @@ -67,8 +67,9 @@ type PeerManager struct { host core.Host cg *conngater.BasicConnectionGater - peers map[core.PeerID]*p2pPeer - importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool + peers map[core.PeerID]*p2pPeer + importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool + persistentPeers map[core.PeerID]bool initCh chan struct{} initOnce sync.Once @@ -131,12 +132,15 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) { newNodes[peerID] = node } - // Remove existing peers that are not in the new node list. + // Remove existing peers that are not in the new node list + // and not in the persistent peer list. for peerID := range mgr.peers { node := newNodes[peerID] if node == nil { - mgr.removePeerLocked(peerID) - continue + if _, persistent := mgr.persistentPeers[peerID]; !persistent { + mgr.removePeerLocked(peerID) + continue + } } } @@ -361,17 +365,24 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) { } } -func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend) *PeerManager { +func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend, persistentPeers map[core.PeerID]bool) *PeerManager { + if persistentPeers == nil { + persistentPeers = make(map[core.PeerID]bool) + } + mgr := &PeerManager{ - ctx: ctx, - host: host, - cg: cg, - peers: make(map[core.PeerID]*p2pPeer), - importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool), - initCh: make(chan struct{}), - logger: logging.GetLogger("worker/common/p2p/peermgr"), + ctx: ctx, + host: host, + cg: cg, + peers: make(map[core.PeerID]*p2pPeer), + importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool), + persistentPeers: persistentPeers, + initCh: make(chan struct{}), + logger: logging.GetLogger("worker/common/p2p/peermgr"), } + go mgr.watchRegistryNodes(consensus) + return mgr }