Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/common/p2p: Add support for persistent and blocked peers #4713

Merged
merged 1 commit into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4713.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/common/p2p: Add support for persistent and blocked peers
7 changes: 7 additions & 0 deletions go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const (
CfgP2PMaxNumPeers = "worker.p2p.max_num_peers"
// CfgP2PPeerGracePeriod is the peer grace period.
CfgP2PPeerGracePeriod = "worker.p2p.peer_grace_period"

// CfgP2PBlockedPeerIPs is a list of blocked peer IP addresses.
CfgP2PBlockedPeerIPs = "worker.p2p.blocked_peers"
// CfgP2PPersistentPeers is a list of persistent peer node addresses in format P2Ppubkey@IP:port.
CfgP2PPersistentPeers = "worker.p2p.persistent_peers"
)

// Flags has the configuration flags.
Expand All @@ -45,6 +50,8 @@ func init() {
Flags.Float64(CfgP2PConnectednessLowWater, 0.2, "Set the low water mark at which the peer manager will try to reconnect to peers")
Flags.Uint32(CfgP2PMaxNumPeers, 100, "Set maximum number of P2P peers")
Flags.Duration(CfgP2PPeerGracePeriod, 20*time.Second, "Time duration for new peer connections to be immune from pruning")
Flags.StringSlice(CfgP2PBlockedPeerIPs, []string{}, "List of blocked peer IPs")
Flags.StringSlice(CfgP2PPersistentPeers, []string{}, "List of persistent peer node addresses in format P2Ppubkey@IP:port")

_ = viper.BindPFlags(Flags)
}
80 changes: 79 additions & 1 deletion go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
Expand All @@ -20,6 +22,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/crypto/tuplehash"
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
Expand Down Expand Up @@ -362,6 +365,74 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
return nil, fmt.Errorf("worker/common/p2p: failed to create connection gater: %w", err)
}

// Block peers specified in the blacklist.
blacklist := viper.GetStringSlice(CfgP2PBlockedPeerIPs)
for _, blockedIP := range blacklist {
parsedIP := net.ParseIP(blockedIP)
if parsedIP == nil {
return nil, fmt.Errorf("worker/common/p2p: malformed blocked IP: %s", blockedIP)
}

if grr := cg.BlockAddr(parsedIP); grr != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to block IP (%s): %w", blockedIP, err)
}
}

// Maintain persistent peers.
persistentPeers := make(map[core.PeerID]bool)
persistentPeersAI := []peer.AddrInfo{}
for _, pp := range viper.GetStringSlice(CfgP2PPersistentPeers) {
// The persistent peer addresses are in the format P2Ppubkey@IP:port,
// because we use a similar format elsewhere and it's easier for users
// to understand than a multiaddr.

if strings.Count(pp, "@") != 1 || strings.Count(pp, ":") != 1 {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address (expected P2Ppubkey@IP:port)")
}

pkaddr := strings.Split(pp, "@")
pubkey := pkaddr[0]
addr := pkaddr[1]

var pk signature.PublicKey
if grr := pk.UnmarshalText([]byte(pubkey)); grr != nil {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address: cannot unmarshal P2P public key (%s): %w", pubkey, grr)
}
pid, grr := PublicKeyToPeerID(pk)
if grr != nil {
return nil, fmt.Errorf("worker/common/p2p: invalid persistent peer public key (%s): %w", pubkey, grr)
}

ip, port, grr := net.SplitHostPort(addr)
if grr != nil {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer IP address and/or port (%s): %w", addr, grr)
}

ma, grr := multiaddr.NewMultiaddr("/ip4/" + ip + "/tcp/" + port)
if grr != nil {
return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer IP address and/or port (%s): %w", addr, grr)
}

if _, exists := persistentPeers[pid]; exists {
// If we already have this peer ID, append to its addresses.
for _, p := range persistentPeersAI {
if p.ID == pid {
p.Addrs = append(p.Addrs, ma)
break
}
}
} else {
// Fresh entry.
ai := peer.AddrInfo{
ID: pid,
Addrs: []multiaddr.Multiaddr{ma},
}
persistentPeersAI = append(persistentPeersAI, ai)
}
persistentPeers[pid] = true
cm.Protect(pid, "")
}

// Create the P2P host.
host, err := libp2p.New(
libp2p.UserAgent(fmt.Sprintf("oasis-core/%s", version.SoftwareVersion)),
Expand All @@ -387,6 +458,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)),
pubsub.WithValidateThrottle(viper.GetInt(CfgP2PValidateThrottle)),
pubsub.WithMessageIdFn(messageIdFn),
pubsub.WithDirectPeers(persistentPeersAI),
)
if err != nil {
ctxCancel()
Expand All @@ -402,7 +474,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
}

p := &P2P{
PeerManager: newPeerManager(ctx, host, cg, consensus),
PeerManager: newPeerManager(ctx, host, cg, consensus, persistentPeers),
ctxCancel: ctxCancel,
quitCh: make(chan struct{}),
chainContext: chainContext,
Expand All @@ -417,5 +489,11 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error)
"address", fmt.Sprintf("%+v", host.Addrs()),
)

if len(blacklist) > 0 {
p.logger.Info("p2p blacklist initialized",
"num_blocked_peers", len(blacklist),
)
}

return p, nil
}
37 changes: 24 additions & 13 deletions go/worker/common/p2p/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type PeerManager struct {
host core.Host
cg *conngater.BasicConnectionGater

peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
peers map[core.PeerID]*p2pPeer
importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool
persistentPeers map[core.PeerID]bool

initCh chan struct{}
initOnce sync.Once
Expand Down Expand Up @@ -131,12 +132,15 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) {
newNodes[peerID] = node
}

// Remove existing peers that are not in the new node list.
// Remove existing peers that are not in the new node list
// and not in the persistent peer list.
for peerID := range mgr.peers {
node := newNodes[peerID]
if node == nil {
mgr.removePeerLocked(peerID)
continue
if _, persistent := mgr.persistentPeers[peerID]; !persistent {
mgr.removePeerLocked(peerID)
continue
}
}
}

Expand Down Expand Up @@ -361,17 +365,24 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) {
}
}

func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend) *PeerManager {
func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConnectionGater, consensus consensus.Backend, persistentPeers map[core.PeerID]bool) *PeerManager {
if persistentPeers == nil {
persistentPeers = make(map[core.PeerID]bool)
}

mgr := &PeerManager{
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
ctx: ctx,
host: host,
cg: cg,
peers: make(map[core.PeerID]*p2pPeer),
importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool),
persistentPeers: persistentPeers,
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/common/p2p/peermgr"),
}

go mgr.watchRegistryNodes(consensus)

return mgr
}

Expand Down