Skip to content

Commit

Permalink
add unicast blocklist
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jul 16, 2021
1 parent f1dee32 commit 1b4944e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 41 deletions.
72 changes: 72 additions & 0 deletions count_blocklist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package p2p

import (
"time"

"github.com/iotexproject/go-pkgs/cache"
"github.com/libp2p/go-libp2p-core"
)

// CountBlocklist is a count-based blacklist implementation. Entry is being added
// to an LRU cache, once the count reaches a threshold the entry is blocked for a
// certain timeout. The entry is then unblocked once the timeout expires
type CountBlocklist struct {
count int // number of times an entry is added before being blocked
ttl time.Duration // timeout an entry will be blocked
counter *cache.ThreadSafeLruCache
timeout *cache.ThreadSafeLruCache
}

// NewCountBlocklist creates a new CountBlocklist
func NewCountBlocklist(cap, count int, ttl time.Duration) *CountBlocklist {
return &CountBlocklist{
count: count,
ttl: ttl,
counter: cache.NewThreadSafeLruCache(cap),
timeout: cache.NewThreadSafeLruCache(cap),
}
}

// Blocked returns true if the name is blocked
func (b *CountBlocklist) Blocked(name core.PeerID, t time.Time) bool {
v, ok := b.timeout.Get(name)
if !ok {
return false
}

if v.(time.Time).After(t) {
return true
}

// timeout passed, remove name off the blocklist
b.remove(name)
return false
}

// Add tries to add the name to blocklist
func (b *CountBlocklist) Add(name core.PeerID, t time.Time) {
var count int
if v, ok := b.counter.Get(name); !ok {
count = 1
} else {
count = v.(int) + 1
}
b.counter.Add(name, count)

// add to blocklist once reaching count
if count >= b.count {
b.timeout.Add(name, t.Add(b.ttl))
}
}

// Clear clears the blocklist
func (b *CountBlocklist) Clear() {
b.timeout.Clear()
b.counter.Clear()
}

// remove takes name off the blocklist
func (b *CountBlocklist) remove(name core.PeerID) {
b.counter.Remove(name)
b.timeout.Remove(name)
}
48 changes: 48 additions & 0 deletions count_blocklist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package p2p

import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core"
"github.com/stretchr/testify/require"
)

func TestBlockList(t *testing.T) {
r := require.New(t)

cfg := DefaultConfig
now := time.Now()
name := core.PeerID("alfa")
withinBlockTTL := now.Add(cfg.BlockListCleanupInterval / 2)
pastBlockTTL := now.Add(cfg.BlockListCleanupInterval * 2)

blockTests := []struct {
curTime time.Time
blocked bool
}{
{now, false},
{now, false},
{withinBlockTTL, true},
{withinBlockTTL, true},
{pastBlockTTL, false},
{withinBlockTTL, false},
{withinBlockTTL, false},
{pastBlockTTL, false},
{now, false},
{now, false},
{withinBlockTTL, true},
}

list := NewCountBlocklist(cfg.BlockListLRUSize, cfg.BlockListErrorThreshold, cfg.BlockListCleanupInterval)
for _, v := range blockTests {
list.Add(name, now)
r.Equal(v.blocked, list.Blocked(name, v.curTime))
}
r.Equal(1, list.counter.Len())
r.Equal(1, list.timeout.Len())

list.Clear()
r.Zero(list.counter.Len())
r.Zero(list.timeout.Len())
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/iotexproject/go-pkgs v0.1.5
github.com/ipfs/go-cid v0.0.7
github.com/libp2p/go-libp2p v0.14.3
github.com/libp2p/go-libp2p-circuit v0.4.0
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/aristanetworks/goarista v0.0.0-20190429220743-799535f6f364/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand All @@ -48,6 +49,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
Expand All @@ -69,6 +71,7 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
Expand Down Expand Up @@ -97,6 +100,7 @@ github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018 h1:6xT9KW8zLC
github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4=
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU=
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
Expand All @@ -108,6 +112,7 @@ github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUn
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustinxie/gmsm v1.2.1-0.20200206225615-ad1978e2c91f/go.mod h1:WqZ5qDGL/A1PfaK1yAAKkIxhNxXCbB0iSZ1XpsyfjMg=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand All @@ -117,6 +122,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/go-ethereum v1.8.27/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as=
Expand Down Expand Up @@ -265,6 +271,9 @@ github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/iotexproject/go-pkgs v0.1.5 h1:j4+Yr0E+3bPQgNK0NDrHAKdBjCm0tU3BbQuCayrXyP8=
github.com/iotexproject/go-pkgs v0.1.5/go.mod h1:ttXhcwrtODyh7JozpJlCml09CjP0pcKqTe2B0MbTGc8=
github.com/iotexproject/iotex-address v0.2.4/go.mod h1:K78yPSMB4K7gF/iQ7djT1amph0RBrP3rSkFfH7gNG70=
github.com/ipfs/go-cid v0.0.1 h1:GBjWPktLnNyX0JiQCNFpUuUSoMw5KMyqrsejHYlILBE=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down Expand Up @@ -910,6 +919,7 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down Expand Up @@ -1072,6 +1082,7 @@ golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU=
Expand Down Expand Up @@ -1127,6 +1138,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -1156,6 +1168,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
102 changes: 61 additions & 41 deletions p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type (
ConnLowWater int `yaml:"connLowWater"`
ConnHighWater int `yaml:"connHighWater"`
RateLimiterLRUSize int `yaml:"rateLimiterLRUSize"`
BlackListLRUSize int `yaml:"blackListLRUSize"`
BlackListCleanupInterval time.Duration `yaml:"blackListCleanupInterval"`
BlockListLRUSize int `yaml:"blockListLRUSize"`
BlockListErrorThreshold int `yaml:"blockListErrorThreshold"`
BlockListCleanupInterval time.Duration `yaml:"blockListCleanupInterval"`
ConnGracePeriod time.Duration `yaml:"connGracePeriod"`
EnableRateLimit bool `yaml:"enableRateLimit"`
RateLimit RateLimitConfig `yaml:"rateLimit"`
Expand Down Expand Up @@ -92,8 +93,9 @@ var (
ConnLowWater: 200,
ConnHighWater: 500,
RateLimiterLRUSize: 1000,
BlackListLRUSize: 1000,
BlackListCleanupInterval: 600 * time.Second,
BlockListLRUSize: 1000,
BlockListErrorThreshold: 3,
BlockListCleanupInterval: 600 * time.Second,
ConnGracePeriod: 0,
EnableRateLimit: false,
RateLimit: DefaultRatelimitConfig,
Expand Down Expand Up @@ -213,19 +215,20 @@ func PrivateNetworkPSK(privateNetworkPSK string) Option {

// Host is the main struct that represents a host that communicating with the rest of the P2P networks
type Host struct {
host core.Host
cfg Config
topics map[string]bool
kad *dht.IpfsDHT
kadKey cid.Cid
newPubSub func(ctx context.Context, h core.Host, opts ...pubsub.Option) (*pubsub.PubSub, error)
pubs map[string]*pubsub.Topic
blacklists map[string]*LRUBlacklist
subs map[string]*pubsub.Subscription
close chan interface{}
ctx context.Context
peersLimiters *lru.Cache
unicastLimiter *rate.Limiter
host core.Host
cfg Config
topics map[string]bool
kad *dht.IpfsDHT
kadKey cid.Cid
newPubSub func(ctx context.Context, h core.Host, opts ...pubsub.Option) (*pubsub.PubSub, error)
pubs map[string]*pubsub.Topic
blacklists map[string]*LRUBlacklist
subs map[string]*pubsub.Subscription
close chan interface{}
ctx context.Context
peersLimiters *lru.Cache
unicastLimiter *rate.Limiter
unicastBlocklist *CountBlocklist
}

// NewHost constructs a host struct
Expand Down Expand Up @@ -337,19 +340,20 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
return nil, err
}
myHost := Host{
host: host,
cfg: cfg,
topics: make(map[string]bool),
kad: kad,
kadKey: cid,
newPubSub: newPubSub,
pubs: make(map[string]*pubsub.Topic),
blacklists: make(map[string]*LRUBlacklist),
subs: make(map[string]*pubsub.Subscription),
close: make(chan interface{}),
ctx: ctx,
peersLimiters: limiters,
unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst),
host: host,
cfg: cfg,
topics: make(map[string]bool),
kad: kad,
kadKey: cid,
newPubSub: newPubSub,
pubs: make(map[string]*pubsub.Topic),
blacklists: make(map[string]*LRUBlacklist),
subs: make(map[string]*pubsub.Subscription),
close: make(chan interface{}),
ctx: ctx,
peersLimiters: limiters,
unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst),
unicastBlocklist: NewCountBlocklist(cfg.BlockListLRUSize, cfg.BlockListErrorThreshold, cfg.BlockListCleanupInterval),
}

addrs := make([]string, 0)
Expand Down Expand Up @@ -416,7 +420,7 @@ func (h *Host) AddBroadcastPubSub(topic string, callback HandleBroadcast) error
if _, ok := h.pubs[topic]; ok {
return nil
}
blacklist, err := NewLRUBlacklist(h.cfg.BlackListLRUSize)
blacklist, err := NewLRUBlacklist(h.cfg.BlockListLRUSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -476,7 +480,7 @@ func (h *Host) AddBroadcastPubSub(topic string, callback HandleBroadcast) error
case <-h.close:
return
default:
time.Sleep(h.cfg.BlackListCleanupInterval)
time.Sleep(h.cfg.BlockListCleanupInterval)
h.blacklists[topic].RemoveOldest()
}
}
Expand Down Expand Up @@ -523,20 +527,36 @@ func (h *Host) Broadcast(ctx context.Context, topic string, data []byte) error {

// Unicast sends a message to a peer on the given address
func (h *Host) Unicast(ctx context.Context, target core.PeerAddrInfo, topic string, data []byte) error {
if err := h.Connect(ctx, target); err != nil {
now := time.Now()
if h.unicastBlocklist.Blocked(target.ID, now) {
return errors.New("peer is in blocklist at this moment")
}

if err := h.unicast(ctx, target, topic, data); err != nil {
Logger().Error("Error when sending a unicast message.", zap.Error(err))
h.unicastBlocklist.Add(target.ID, now)
return err
}
return nil
}

func (h *Host) unicast(ctx context.Context, target core.PeerAddrInfo, topic string, data []byte) error {
if err := h.host.Connect(ctx, target); err != nil {
return err
}
stream, err := h.host.NewStream(ctx, target.ID, protocol.ID(topic))
if err != nil {
return err
}
defer func() {
if err := stream.Close(); err != nil {
Logger().Error("Error when closing a unicast stream.", zap.Error(err))
}
}()
_, err = stream.Write(data)
return err
if _, err = stream.Write(data); err != nil {
return err
}
return stream.Close()
}

// ClearBlocklist clears the blocklist
func (h *Host) ClearBlocklist() {
h.unicastBlocklist.Clear()
}

// HostIdentity returns the host identity string
Expand Down Expand Up @@ -574,7 +594,7 @@ func (h *Host) Neighbors(ctx context.Context) []core.PeerAddrInfo {
neighbors := make([]core.PeerAddrInfo, 0)
for _, p := range dedupedPeers {
peer := h.kad.FindLocal(p)
if peer.ID != "" && len(peer.Addrs) > 0 {
if peer.ID != "" && len(peer.Addrs) > 0 && !h.unicastBlocklist.Blocked(peer.ID, time.Now()) {
neighbors = append(neighbors, peer)
}
}
Expand Down

0 comments on commit 1b4944e

Please sign in to comment.