diff --git a/count_blocklist.go b/count_blocklist.go new file mode 100644 index 0000000..99b5668 --- /dev/null +++ b/count_blocklist.go @@ -0,0 +1,72 @@ +package p2p + +import ( + "time" + + "github.com/iotexproject/go-pkgs/cache" + "github.com/libp2p/go-libp2p-core" +) + +// CountBlocklist is a count-based blacklist implementation. Entry is being added +// to an LRU cache, once the count reaches a threshold the entry is blocked for a +// certain timeout. The entry is then unblocked once the timeout expires +type CountBlocklist struct { + count int // number of times an entry is added before being blocked + ttl time.Duration // timeout an entry will be blocked + counter *cache.ThreadSafeLruCache + timeout *cache.ThreadSafeLruCache +} + +// NewCountBlocklist creates a new CountBlocklist +func NewCountBlocklist(cap, count int, ttl time.Duration) *CountBlocklist { + return &CountBlocklist{ + count: count, + ttl: ttl, + counter: cache.NewThreadSafeLruCache(cap), + timeout: cache.NewThreadSafeLruCache(cap), + } +} + +// Blocked returns true if the name is blocked +func (b *CountBlocklist) Blocked(name core.PeerID, t time.Time) bool { + v, ok := b.timeout.Get(name) + if !ok { + return false + } + + if v.(time.Time).After(t) { + return true + } + + // timeout passed, remove name off the blocklist + b.remove(name) + return false +} + +// Add tries to add the name to blocklist +func (b *CountBlocklist) Add(name core.PeerID, t time.Time) { + var count int + if v, ok := b.counter.Get(name); !ok { + count = 1 + } else { + count = v.(int) + 1 + } + b.counter.Add(name, count) + + // add to blocklist once reaching count + if count >= b.count { + b.timeout.Add(name, t.Add(b.ttl)) + } +} + +// Clear clears the blocklist +func (b *CountBlocklist) Clear() { + b.timeout.Clear() + b.counter.Clear() +} + +// remove takes name off the blocklist +func (b *CountBlocklist) remove(name core.PeerID) { + b.counter.Remove(name) + b.timeout.Remove(name) +} diff --git a/count_blocklist_test.go b/count_blocklist_test.go new file mode 100644 index 0000000..b81a2f8 --- /dev/null +++ b/count_blocklist_test.go @@ -0,0 +1,48 @@ +package p2p + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p-core" + "github.com/stretchr/testify/require" +) + +func TestBlockList(t *testing.T) { + r := require.New(t) + + cfg := DefaultConfig + now := time.Now() + name := core.PeerID("alfa") + withinBlockTTL := now.Add(cfg.BlockListCleanupInterval / 2) + pastBlockTTL := now.Add(cfg.BlockListCleanupInterval * 2) + + blockTests := []struct { + curTime time.Time + blocked bool + }{ + {now, false}, + {now, false}, + {withinBlockTTL, true}, + {withinBlockTTL, true}, + {pastBlockTTL, false}, + {withinBlockTTL, false}, + {withinBlockTTL, false}, + {pastBlockTTL, false}, + {now, false}, + {now, false}, + {withinBlockTTL, true}, + } + + list := NewCountBlocklist(cfg.BlockListLRUSize, cfg.BlockListErrorThreshold, cfg.BlockListCleanupInterval) + for _, v := range blockTests { + list.Add(name, now) + r.Equal(v.blocked, list.Blocked(name, v.curTime)) + } + r.Equal(1, list.counter.Len()) + r.Equal(1, list.timeout.Len()) + + list.Clear() + r.Zero(list.counter.Len()) + r.Zero(list.timeout.Len()) +} diff --git a/go.mod b/go.mod index 0742222..6663125 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/hashicorp/golang-lru v0.5.4 + github.com/iotexproject/go-pkgs v0.1.5 github.com/ipfs/go-cid v0.0.7 github.com/libp2p/go-libp2p v0.14.3 github.com/libp2p/go-libp2p-circuit v0.4.0 diff --git a/go.sum b/go.sum index 5891cb5..d039e1f 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,7 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/aristanetworks/goarista v0.0.0-20190429220743-799535f6f364/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -48,6 +49,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= +github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw= @@ -69,6 +71,7 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= @@ -97,6 +100,7 @@ github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018 h1:6xT9KW8zLC github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= +github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= @@ -108,6 +112,7 @@ github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUn github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustinxie/gmsm v1.2.1-0.20200206225615-ad1978e2c91f/go.mod h1:WqZ5qDGL/A1PfaK1yAAKkIxhNxXCbB0iSZ1XpsyfjMg= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= @@ -117,6 +122,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ethereum/go-ethereum v1.8.27/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= @@ -265,6 +271,9 @@ github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7 github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/iotexproject/go-pkgs v0.1.5 h1:j4+Yr0E+3bPQgNK0NDrHAKdBjCm0tU3BbQuCayrXyP8= +github.com/iotexproject/go-pkgs v0.1.5/go.mod h1:ttXhcwrtODyh7JozpJlCml09CjP0pcKqTe2B0MbTGc8= +github.com/iotexproject/iotex-address v0.2.4/go.mod h1:K78yPSMB4K7gF/iQ7djT1amph0RBrP3rSkFfH7gNG70= github.com/ipfs/go-cid v0.0.1 h1:GBjWPktLnNyX0JiQCNFpUuUSoMw5KMyqrsejHYlILBE= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -910,6 +919,7 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -1072,6 +1082,7 @@ golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU= @@ -1127,6 +1138,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -1156,6 +1168,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/p2p.go b/p2p.go index f064ca7..a83c0ee 100644 --- a/p2p.go +++ b/p2p.go @@ -60,8 +60,9 @@ type ( ConnLowWater int `yaml:"connLowWater"` ConnHighWater int `yaml:"connHighWater"` RateLimiterLRUSize int `yaml:"rateLimiterLRUSize"` - BlackListLRUSize int `yaml:"blackListLRUSize"` - BlackListCleanupInterval time.Duration `yaml:"blackListCleanupInterval"` + BlockListLRUSize int `yaml:"blockListLRUSize"` + BlockListErrorThreshold int `yaml:"blockListErrorThreshold"` + BlockListCleanupInterval time.Duration `yaml:"blockListCleanupInterval"` ConnGracePeriod time.Duration `yaml:"connGracePeriod"` EnableRateLimit bool `yaml:"enableRateLimit"` RateLimit RateLimitConfig `yaml:"rateLimit"` @@ -92,8 +93,9 @@ var ( ConnLowWater: 200, ConnHighWater: 500, RateLimiterLRUSize: 1000, - BlackListLRUSize: 1000, - BlackListCleanupInterval: 600 * time.Second, + BlockListLRUSize: 1000, + BlockListErrorThreshold: 3, + BlockListCleanupInterval: 600 * time.Second, ConnGracePeriod: 0, EnableRateLimit: false, RateLimit: DefaultRatelimitConfig, @@ -213,19 +215,20 @@ func PrivateNetworkPSK(privateNetworkPSK string) Option { // Host is the main struct that represents a host that communicating with the rest of the P2P networks type Host struct { - host core.Host - cfg Config - topics map[string]bool - kad *dht.IpfsDHT - kadKey cid.Cid - newPubSub func(ctx context.Context, h core.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) - pubs map[string]*pubsub.Topic - blacklists map[string]*LRUBlacklist - subs map[string]*pubsub.Subscription - close chan interface{} - ctx context.Context - peersLimiters *lru.Cache - unicastLimiter *rate.Limiter + host core.Host + cfg Config + topics map[string]bool + kad *dht.IpfsDHT + kadKey cid.Cid + newPubSub func(ctx context.Context, h core.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) + pubs map[string]*pubsub.Topic + blacklists map[string]*LRUBlacklist + subs map[string]*pubsub.Subscription + close chan interface{} + ctx context.Context + peersLimiters *lru.Cache + unicastLimiter *rate.Limiter + unicastBlocklist *CountBlocklist } // NewHost constructs a host struct @@ -337,19 +340,20 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { return nil, err } myHost := Host{ - host: host, - cfg: cfg, - topics: make(map[string]bool), - kad: kad, - kadKey: cid, - newPubSub: newPubSub, - pubs: make(map[string]*pubsub.Topic), - blacklists: make(map[string]*LRUBlacklist), - subs: make(map[string]*pubsub.Subscription), - close: make(chan interface{}), - ctx: ctx, - peersLimiters: limiters, - unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst), + host: host, + cfg: cfg, + topics: make(map[string]bool), + kad: kad, + kadKey: cid, + newPubSub: newPubSub, + pubs: make(map[string]*pubsub.Topic), + blacklists: make(map[string]*LRUBlacklist), + subs: make(map[string]*pubsub.Subscription), + close: make(chan interface{}), + ctx: ctx, + peersLimiters: limiters, + unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst), + unicastBlocklist: NewCountBlocklist(cfg.BlockListLRUSize, cfg.BlockListErrorThreshold, cfg.BlockListCleanupInterval), } addrs := make([]string, 0) @@ -416,7 +420,7 @@ func (h *Host) AddBroadcastPubSub(topic string, callback HandleBroadcast) error if _, ok := h.pubs[topic]; ok { return nil } - blacklist, err := NewLRUBlacklist(h.cfg.BlackListLRUSize) + blacklist, err := NewLRUBlacklist(h.cfg.BlockListLRUSize) if err != nil { return err } @@ -476,7 +480,7 @@ func (h *Host) AddBroadcastPubSub(topic string, callback HandleBroadcast) error case <-h.close: return default: - time.Sleep(h.cfg.BlackListCleanupInterval) + time.Sleep(h.cfg.BlockListCleanupInterval) h.blacklists[topic].RemoveOldest() } } @@ -523,20 +527,36 @@ func (h *Host) Broadcast(ctx context.Context, topic string, data []byte) error { // Unicast sends a message to a peer on the given address func (h *Host) Unicast(ctx context.Context, target core.PeerAddrInfo, topic string, data []byte) error { - if err := h.Connect(ctx, target); err != nil { + now := time.Now() + if h.unicastBlocklist.Blocked(target.ID, now) { + return errors.New("peer is in blocklist at this moment") + } + + if err := h.unicast(ctx, target, topic, data); err != nil { + Logger().Error("Error when sending a unicast message.", zap.Error(err)) + h.unicastBlocklist.Add(target.ID, now) + return err + } + return nil +} + +func (h *Host) unicast(ctx context.Context, target core.PeerAddrInfo, topic string, data []byte) error { + if err := h.host.Connect(ctx, target); err != nil { return err } stream, err := h.host.NewStream(ctx, target.ID, protocol.ID(topic)) if err != nil { return err } - defer func() { - if err := stream.Close(); err != nil { - Logger().Error("Error when closing a unicast stream.", zap.Error(err)) - } - }() - _, err = stream.Write(data) - return err + if _, err = stream.Write(data); err != nil { + return err + } + return stream.Close() +} + +// ClearBlocklist clears the blocklist +func (h *Host) ClearBlocklist() { + h.unicastBlocklist.Clear() } // HostIdentity returns the host identity string @@ -574,7 +594,7 @@ func (h *Host) Neighbors(ctx context.Context) []core.PeerAddrInfo { neighbors := make([]core.PeerAddrInfo, 0) for _, p := range dedupedPeers { peer := h.kad.FindLocal(p) - if peer.ID != "" && len(peer.Addrs) > 0 { + if peer.ID != "" && len(peer.Addrs) > 0 && !h.unicastBlocklist.Blocked(peer.ID, time.Now()) { neighbors = append(neighbors, peer) } }