From 854187391b7dc7e0991aa9f614199b1fa0d06ca5 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Fri, 25 Feb 2022 09:36:49 +0100 Subject: [PATCH 1/9] feat: add peer block filter option This feature lets a user configure a function that will allow / deny request for a block coming from a peer. --- bitswap.go | 11 +++++ internal/decision/engine.go | 22 ++++++++- internal/decision/engine_test.go | 85 ++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/bitswap.go b/bitswap.go index c7875307..73ca266e 100644 --- a/bitswap.go +++ b/bitswap.go @@ -154,8 +154,15 @@ func WithTargetMessageSize(tms int) Option { } } +func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option { + return func(bs *Bitswap) { + bs.peerBlockRequestFilter = pbrf + } +} + type TaskInfo = decision.TaskInfo type TaskComparator = decision.TaskComparator +type PeerBlockRequestFilter = decision.PeerBlockRequestFilter // WithTaskComparator configures custom task prioritization logic. func WithTaskComparator(comparator TaskComparator) Option { @@ -291,6 +298,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, activeBlocksGauge, decision.WithTaskComparator(bs.taskComparator), decision.WithTargetMessageSize(bs.engineTargetMessageSize), + decision.WithPeerBlockRequestFilter(bs.peerBlockRequestFilter), ) bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves) @@ -399,6 +407,9 @@ type Bitswap struct { simulateDontHavesOnTimeout bool taskComparator TaskComparator + + // an optional feature to accept / deny requests for blocks + peerBlockRequestFilter PeerBlockRequestFilter } type counters struct { diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 24e45f16..d1dff684 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -180,6 +180,8 @@ type Engine struct { metricUpdateCounter int taskComparator TaskComparator + + peerBlockRequestFilter PeerBlockRequestFilter } // TaskInfo represents the details of a request from a peer. @@ -201,6 +203,10 @@ type TaskInfo struct { // It should return true if task 'ta' has higher priority than task 'tb' type TaskComparator func(ta, tb *TaskInfo) bool +// PeerBlockRequestFilter is used to accept / deny requests for a CID coming from a PeerID +// It should return true if the request should be fullfilled. +type PeerBlockRequestFilter func(p peer.ID, c cid.Cid) bool + type Option func(*Engine) func WithTaskComparator(comparator TaskComparator) Option { @@ -209,6 +215,12 @@ func WithTaskComparator(comparator TaskComparator) Option { } } +func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option { + return func(e *Engine) { + e.peerBlockRequestFilter = pbrf + } +} + func WithTargetMessageSize(size int) Option { return func(e *Engine) { e.targetMessageSize = size @@ -647,8 +659,14 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Add each want-have / want-block to the ledger l.Wants(c, entry.Priority, entry.WantType) - // If the block was not found - if !found { + // Check if the peer is allowed to retrieve this block + passFilter := true + if e.peerBlockRequestFilter != nil { + passFilter = e.peerBlockRequestFilter(p, c) + } + + // If the block was not found or the peer doesn't pass the policy + if !found || !passFilter { log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) // Only add the task to the queue if the requester wants a DONT_HAVE diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 315604aa..2fdafda8 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1112,6 +1112,91 @@ func TestTaskComparator(t *testing.T) { } } +func TestPeerBlockFilter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Generate a few keys + keys := []string{"a", "b", "c"} + cids := make(map[cid.Cid]int) + blks := make([]blocks.Block, 0, len(keys)) + for i, letter := range keys { + block := blocks.NewBlock([]byte(letter)) + blks = append(blks, block) + cids[block.Cid()] = i + } + + // Generate a few peers + peerIDs := make([]peer.ID, len(keys)) + for _, i := range cids { + peerID := libp2ptest.RandPeerIDFatal(t) + peerIDs[i] = peerID + } + + // Setup the peer + fpt := &fakePeerTagger{} + sl := NewTestScoreLedger(shortTerm, nil, clock.New()) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + if err := bs.PutMany(ctx, blks); err != nil { + t.Fatal(err) + } + + // use a single task worker so that the order of outgoing messages is deterministic + engineTaskWorkerCount := 1 + e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { + // peer 0 has access to everything + if p == peerIDs[0] { + return true + } + // peer 1 has access to key b and c + if p == peerIDs[1] { + return blks[1].Cid().Equals(c) || blks[2].Cid().Equals(c) + } + // peer 2 and other have access to key c + return blks[2].Cid().Equals(c) + }), + ) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + + // Create wants requests + for _, peerID := range peerIDs { + partnerWantBlocks(e, keys, peerID) + } + + // check that outgoing messages are sent with the correct content + checkPeer := func(peerIndex int, expectedBlocks []blocks.Block) { + next := <-e.Outbox() + envelope := <-next + + peerID := peerIDs[peerIndex] + responseBlocks := envelope.Message.Blocks() + + if peerID != envelope.Peer { + t.Errorf("(Peer%v) expected message for peer ID %#v but instead got message for peer ID %#v", peerIndex, peerID, envelope.Peer) + } + + if len(responseBlocks) != len(expectedBlocks) { + t.Errorf("(Peer%v) expected %v block in response but instead got %v", peerIndex, len(expectedBlocks), len(responseBlocks)) + } + + responseBlockSet := make(map[cid.Cid]bool) + for _, b := range responseBlocks { + responseBlockSet[b.Cid()] = true + } + + for _, b := range expectedBlocks { + if !responseBlockSet[b.Cid()] { + t.Errorf("(Peer%v) expected block with CID %v", peerIndex, b.Cid()) + } + } + } + + checkPeer(0, blks[0:3]) + checkPeer(1, blks[1:3]) + checkPeer(2, blks[2:3]) +} + func TestTaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() From f3d887a8b18de90de870b15d7dac6c35964157da Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Wed, 2 Mar 2022 08:44:12 +0100 Subject: [PATCH 2/9] fix: apply peer block filter earlier Split the different cases (wants, cancel, denies) early to prevent calling blocksize on rejected blocks. --- internal/decision/engine.go | 62 +++++++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index d1dff684..d28cba92 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -610,8 +610,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } }() - // Get block sizes + // Dispatch entries wants, cancels := e.splitWantsCancels(entries) + wants, denials := e.splitWantsDenials(p, wants) + + // Get block sizes wantKs := cid.NewSet() for _, entry := range wants { wantKs.Add(entry.Cid) @@ -651,6 +654,33 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } } + // Deny access to blocks + for _, entry := range denials { + c := entry.Cid + log.Debugw("Bitswap engine: block denied access", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + + // Only add the task to the queue if the requester wants a DONT_HAVE + if e.sendDontHaves && entry.SendDontHave { + newWorkExists = true + isWantBlock := false + if entry.WantType == pb.Message_Wantlist_Block { + isWantBlock = true + } + + activeEntries = append(activeEntries, peertask.Task{ + Topic: c, + Priority: int(entry.Priority), + Work: bsmsg.BlockPresenceSize(c), + Data: &taskData{ + BlockSize: 0, + HaveBlock: false, + IsWantBlock: isWantBlock, + SendDontHave: entry.SendDontHave, + }, + }) + } + } + // For each want-have / want-block for _, entry := range wants { c := entry.Cid @@ -659,14 +689,8 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Add each want-have / want-block to the ledger l.Wants(c, entry.Priority, entry.WantType) - // Check if the peer is allowed to retrieve this block - passFilter := true - if e.peerBlockRequestFilter != nil { - passFilter = e.peerBlockRequestFilter(p, c) - } - - // If the block was not found or the peer doesn't pass the policy - if !found || !passFilter { + // If the block was not found + if !found { log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) // Only add the task to the queue if the requester wants a DONT_HAVE @@ -740,6 +764,26 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent return wants, cancels } +// Split the want-have / want-block entries from the block that will be denied access +func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { + if e.peerBlockRequestFilter == nil { + return allWants, nil + } + + wants := make([]bsmsg.Entry, 0, len(allWants)) + denied := make([]bsmsg.Entry, 0, len(allWants)) + + for _, et := range allWants { + if e.peerBlockRequestFilter(p, et.Cid) { + wants = append(wants, et) + } else { + denied = append(denied, et) + } + } + + return wants, denied +} + // ReceiveFrom is called when new blocks are received and added to the block // store, meaning there may be peers who want those blocks, so we should send // the blocks to them. From 2806f45a370a5bd3eadbde3376b64fdbddb871db Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Wed, 2 Mar 2022 08:50:51 +0100 Subject: [PATCH 3/9] refactor: factor the sendDontHave operation in message processing --- internal/decision/engine.go | 37 +++++++++++-------------------------- 1 file changed, 11 insertions(+), 26 deletions(-) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index d28cba92..c8c33097 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -654,13 +654,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } } - // Deny access to blocks - for _, entry := range denials { - c := entry.Cid - log.Debugw("Bitswap engine: block denied access", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) - + // Cancel a block operation + sendDontHave := func(entry bsmsg.Entry) { // Only add the task to the queue if the requester wants a DONT_HAVE if e.sendDontHaves && entry.SendDontHave { + c := entry.Cid + newWorkExists = true isWantBlock := false if entry.WantType == pb.Message_Wantlist_Block { @@ -681,6 +680,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } } + // Deny access to blocks + for _, entry := range denials { + log.Debugw("Bitswap engine: block denied access", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + sendDontHave(entry) + } + // For each want-have / want-block for _, entry := range wants { c := entry.Cid @@ -692,27 +697,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // If the block was not found if !found { log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) - - // Only add the task to the queue if the requester wants a DONT_HAVE - if e.sendDontHaves && entry.SendDontHave { - newWorkExists = true - isWantBlock := false - if entry.WantType == pb.Message_Wantlist_Block { - isWantBlock = true - } - - activeEntries = append(activeEntries, peertask.Task{ - Topic: c, - Priority: int(entry.Priority), - Work: bsmsg.BlockPresenceSize(c), - Data: &taskData{ - BlockSize: 0, - HaveBlock: false, - IsWantBlock: isWantBlock, - SendDontHave: entry.SendDontHave, - }, - }) - } + sendDontHave(entry) } else { // The block was found, add it to the queue newWorkExists = true From 1f6a08a9ea81adac8c33d1a0c356b80e74cbc986 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 08:38:30 +0100 Subject: [PATCH 4/9] test: use explicit partnerWantBlocksHaves parameters in test --- internal/decision/engine_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 2fdafda8..f4363a7d 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1284,24 +1284,24 @@ func TestTaggingUseful(t *testing.T) { } } -func partnerWantBlocks(e *Engine, keys []string, partner peer.ID) { +func partnerWantBlocks(e *Engine, wantBlocks []string, partner peer.ID) { add := message.New(false) - for i, letter := range keys { + for i, letter := range wantBlocks { block := blocks.NewBlock([]byte(letter)) - add.AddEntry(block.Cid(), int32(len(keys)-i), pb.Message_Wantlist_Block, true) + add.AddEntry(block.Cid(), int32(len(wantBlocks)-i), pb.Message_Wantlist_Block, true) } e.MessageReceived(context.Background(), partner, add) } -func partnerWantBlocksHaves(e *Engine, keys []string, wantHaves []string, sendDontHave bool, partner peer.ID) { +func partnerWantBlocksHaves(e *Engine, wantBlocks []string, wantHaves []string, sendDontHave bool, partner peer.ID) { add := message.New(false) - priority := int32(len(wantHaves) + len(keys)) + priority := int32(len(wantHaves) + len(wantBlocks)) for _, letter := range wantHaves { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Cid(), priority, pb.Message_Wantlist_Have, sendDontHave) priority-- } - for _, letter := range keys { + for _, letter := range wantBlocks { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Cid(), priority, pb.Message_Wantlist_Block, sendDontHave) priority-- From 96c125ac5aeed2340c383b102ae1bca22a85a7b3 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 08:50:09 +0100 Subject: [PATCH 5/9] test: simplify TestPeerBlockFilter --- internal/decision/engine_test.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index f4363a7d..d123141c 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1118,22 +1118,19 @@ func TestPeerBlockFilter(t *testing.T) { // Generate a few keys keys := []string{"a", "b", "c"} - cids := make(map[cid.Cid]int) blks := make([]blocks.Block, 0, len(keys)) - for i, letter := range keys { + for _, letter := range keys { block := blocks.NewBlock([]byte(letter)) blks = append(blks, block) - cids[block.Cid()] = i } - // Generate a few peers - peerIDs := make([]peer.ID, len(keys)) - for _, i := range cids { - peerID := libp2ptest.RandPeerIDFatal(t) - peerIDs[i] = peerID - } + // Generate a few partner peers + peerIDs := make([]peer.ID, 3) + peerIDs[0] = libp2ptest.RandPeerIDFatal(t) + peerIDs[1] = libp2ptest.RandPeerIDFatal(t) + peerIDs[2] = libp2ptest.RandPeerIDFatal(t) - // Setup the peer + // Setup the main peer fpt := &fakePeerTagger{} sl := NewTestScoreLedger(shortTerm, nil, clock.New()) bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) From 1d8994e741d063e7cd287dbe0699e6ff26edd1ac Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 09:20:33 +0100 Subject: [PATCH 6/9] test: try WANT_HAVE and WANT_BLOCK in TestPeerBlockFilter --- internal/decision/engine_test.go | 147 ++++++++++++++++++++++++------- 1 file changed, 116 insertions(+), 31 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index d123141c..e0d3de7d 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1117,7 +1117,7 @@ func TestPeerBlockFilter(t *testing.T) { defer cancel() // Generate a few keys - keys := []string{"a", "b", "c"} + keys := []string{"a", "b", "c", "d"} blks := make([]blocks.Block, 0, len(keys)) for _, letter := range keys { block := blocks.NewBlock([]byte(letter)) @@ -1146,52 +1146,137 @@ func TestPeerBlockFilter(t *testing.T) { if p == peerIDs[0] { return true } - // peer 1 has access to key b and c + // peer 1 can only access key c and d if p == peerIDs[1] { - return blks[1].Cid().Equals(c) || blks[2].Cid().Equals(c) + return blks[2].Cid().Equals(c) || blks[3].Cid().Equals(c) } - // peer 2 and other have access to key c - return blks[2].Cid().Equals(c) + // peer 2 and other can only access key d + return blks[3].Cid().Equals(c) }), ) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) - // Create wants requests - for _, peerID := range peerIDs { - partnerWantBlocks(e, keys, peerID) + // Setup the test + type testCaseEntry struct { + peerIndex int + wantBlks string + wantHaves string + sendDontHave bool } - // check that outgoing messages are sent with the correct content - checkPeer := func(peerIndex int, expectedBlocks []blocks.Block) { - next := <-e.Outbox() - envelope := <-next + type testCaseExp struct { + blks string + haves string + dontHaves string + } - peerID := peerIDs[peerIndex] - responseBlocks := envelope.Message.Blocks() + type testCase struct { + only bool + wl testCaseEntry + exp testCaseExp + } - if peerID != envelope.Peer { - t.Errorf("(Peer%v) expected message for peer ID %#v but instead got message for peer ID %#v", peerIndex, peerID, envelope.Peer) - } + testCases := []testCase{ + // Peer 0 has access to everything: want-block `a` succeeds. + { + wl: testCaseEntry{ + peerIndex: 0, + wantBlks: "a", + sendDontHave: true, + }, + exp: testCaseExp{ + blks: "a", + }, + }, + // Peer 0 has access to everything: want-have `b` succeeds. + { + wl: testCaseEntry{ + peerIndex: 0, + wantHaves: "b1", + sendDontHave: true, + }, + exp: testCaseExp{ + haves: "b", + dontHaves: "1", + }, + }, + // Peer 1 has access to [c, d]: want-have `a` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 1, + wantHaves: "ac", + sendDontHave: true, + }, + exp: testCaseExp{ + haves: "c", + dontHaves: "a", + }, + }, + // Peer 1 has access to [c, d]: want-block `b` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 1, + wantBlks: "bd", + sendDontHave: true, + }, + exp: testCaseExp{ + blks: "d", + dontHaves: "b", + }, + }, + // Peer 2 has access to [d]: want-have `a` and want-block `b` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 2, + wantHaves: "a", + wantBlks: "bcd1", + sendDontHave: true, + }, + exp: testCaseExp{ + haves: "", + blks: "d", + dontHaves: "abc1", + }, + }, + } - if len(responseBlocks) != len(expectedBlocks) { - t.Errorf("(Peer%v) expected %v block in response but instead got %v", peerIndex, len(expectedBlocks), len(responseBlocks)) + var onlyTestCases []testCase + for _, testCase := range testCases { + if testCase.only { + onlyTestCases = append(onlyTestCases, testCase) } + } + if len(onlyTestCases) > 0 { + testCases = onlyTestCases + } - responseBlockSet := make(map[cid.Cid]bool) - for _, b := range responseBlocks { - responseBlockSet[b.Cid()] = true - } + for i, testCase := range testCases { + // Create wants requests + wl := testCase.wl - for _, b := range expectedBlocks { - if !responseBlockSet[b.Cid()] { - t.Errorf("(Peer%v) expected block with CID %v", peerIndex, b.Cid()) - } + t.Logf("test case %v: Peer%v / want-blocks '%s' / want-haves '%s' / sendDontHave %t", + i, wl.peerIndex, wl.wantBlks, wl.wantHaves, wl.sendDontHave) + + wantBlks := strings.Split(wl.wantBlks, "") + wantHaves := strings.Split(wl.wantHaves, "") + + partnerWantBlocksHaves(e, wantBlks, wantHaves, wl.sendDontHave, peerIDs[wl.peerIndex]) + + // Check result + exp := testCase.exp + + next := <-e.Outbox() + envelope := <-next + + expBlks := strings.Split(exp.blks, "") + expHaves := strings.Split(exp.haves, "") + expDontHaves := strings.Split(exp.dontHaves, "") + + err := checkOutput(t, e, envelope, expBlks, expHaves, expDontHaves) + if err != nil { + t.Fatal(err) } } - - checkPeer(0, blks[0:3]) - checkPeer(1, blks[1:3]) - checkPeer(2, blks[2:3]) } func TestTaggingPeers(t *testing.T) { From f98f71b914dc9da72d7055c960902b2d6e3a400f Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 10:10:30 +0100 Subject: [PATCH 7/9] test: add TestPeerBlockFilterMutability --- internal/decision/engine_test.go | 176 +++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index e0d3de7d..8f0e5db3 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1279,6 +1279,182 @@ func TestPeerBlockFilter(t *testing.T) { } } +func TestPeerBlockFilterMutability(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Generate a few keys + keys := []string{"a", "b", "c", "d"} + blks := make([]blocks.Block, 0, len(keys)) + for _, letter := range keys { + block := blocks.NewBlock([]byte(letter)) + blks = append(blks, block) + } + + partnerID := libp2ptest.RandPeerIDFatal(t) + + // Setup the main peer + fpt := &fakePeerTagger{} + sl := NewTestScoreLedger(shortTerm, nil, clock.New()) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + if err := bs.PutMany(ctx, blks); err != nil { + t.Fatal(err) + } + + filterAllowList := make(map[cid.Cid]bool) + + // use a single task worker so that the order of outgoing messages is deterministic + engineTaskWorkerCount := 1 + e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { + return filterAllowList[c] + }), + ) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + + // Setup the test + type testCaseEntry struct { + allowList string + wantBlks string + wantHaves string + sendDontHave bool + } + + type testCaseExp struct { + blks string + haves string + dontHaves string + } + + type testCase struct { + only bool + wls []testCaseEntry + exps []testCaseExp + } + + testCases := []testCase{ + { + wls: []testCaseEntry{ + { + // Peer has no accesses & request a want-block + allowList: "", + wantBlks: "a", + sendDontHave: true, + }, + { + // Then Peer is allowed access to a + allowList: "a", + wantBlks: "a", + sendDontHave: true, + }, + }, + exps: []testCaseExp{ + { + dontHaves: "a", + }, + { + blks: "a", + }, + }, + }, + { + wls: []testCaseEntry{ + { + // Peer has access to bc + allowList: "bc", + wantHaves: "bc", + sendDontHave: true, + }, + { + // Then Peer loses access to b + allowList: "c", + wantBlks: "bc", // Note: We request a block here to force a response from the node + sendDontHave: true, + }, + }, + exps: []testCaseExp{ + { + haves: "bc", + }, + { + blks: "c", + dontHaves: "b", + }, + }, + }, + { + wls: []testCaseEntry{ + { + // Peer has no accesses & request a want-have + allowList: "", + wantHaves: "d", + }, + { + // Then Peer gains access to d + allowList: "d", + wantHaves: "d", + }, + }, + exps: []testCaseExp{ + { + dontHaves: "d", + }, + { + haves: "d", + }, + }, + }, + } + + var onlyTestCases []testCase + for _, testCase := range testCases { + if testCase.only { + onlyTestCases = append(onlyTestCases, testCase) + } + } + if len(onlyTestCases) > 0 { + testCases = onlyTestCases + } + + for i, testCase := range testCases { + for j := range testCase.wls { + wl := testCase.wls[j] + exp := testCase.exps[j] + + // Create wants requests + t.Logf("test case %v, %v: allow-list '%s' / want-blocks '%s' / want-haves '%s' / sendDontHave %t", + i, j, wl.allowList, wl.wantBlks, wl.wantHaves, wl.sendDontHave) + + allowList := strings.Split(wl.allowList, "") + wantBlks := strings.Split(wl.wantBlks, "") + wantHaves := strings.Split(wl.wantHaves, "") + + // Update the allow list + filterAllowList = make(map[cid.Cid]bool) + for _, letter := range allowList { + block := blocks.NewBlock([]byte(letter)) + filterAllowList[block.Cid()] = true + } + + // Send the request + partnerWantBlocksHaves(e, wantBlks, wantHaves, wl.sendDontHave, partnerID) + + // Check result + next := <-e.Outbox() + envelope := <-next + + expBlks := strings.Split(exp.blks, "") + expHaves := strings.Split(exp.haves, "") + expDontHaves := strings.Split(exp.dontHaves, "") + + err := checkOutput(t, e, envelope, expBlks, expHaves, expDontHaves) + if err != nil { + t.Fatal(err) + } + } + } +} + func TestTaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() From 91c3a8313cfa918396a242b77719e89fe65790f0 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 10:18:37 +0100 Subject: [PATCH 8/9] test: make sendDontHave implicit in filter tests --- internal/decision/engine_test.go | 73 ++++++++++++++------------------ 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 8f0e5db3..df64fd63 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1158,10 +1158,9 @@ func TestPeerBlockFilter(t *testing.T) { // Setup the test type testCaseEntry struct { - peerIndex int - wantBlks string - wantHaves string - sendDontHave bool + peerIndex int + wantBlks string + wantHaves string } type testCaseExp struct { @@ -1180,9 +1179,8 @@ func TestPeerBlockFilter(t *testing.T) { // Peer 0 has access to everything: want-block `a` succeeds. { wl: testCaseEntry{ - peerIndex: 0, - wantBlks: "a", - sendDontHave: true, + peerIndex: 0, + wantBlks: "a", }, exp: testCaseExp{ blks: "a", @@ -1191,9 +1189,8 @@ func TestPeerBlockFilter(t *testing.T) { // Peer 0 has access to everything: want-have `b` succeeds. { wl: testCaseEntry{ - peerIndex: 0, - wantHaves: "b1", - sendDontHave: true, + peerIndex: 0, + wantHaves: "b1", }, exp: testCaseExp{ haves: "b", @@ -1203,9 +1200,8 @@ func TestPeerBlockFilter(t *testing.T) { // Peer 1 has access to [c, d]: want-have `a` result in dont-have. { wl: testCaseEntry{ - peerIndex: 1, - wantHaves: "ac", - sendDontHave: true, + peerIndex: 1, + wantHaves: "ac", }, exp: testCaseExp{ haves: "c", @@ -1215,9 +1211,8 @@ func TestPeerBlockFilter(t *testing.T) { // Peer 1 has access to [c, d]: want-block `b` result in dont-have. { wl: testCaseEntry{ - peerIndex: 1, - wantBlks: "bd", - sendDontHave: true, + peerIndex: 1, + wantBlks: "bd", }, exp: testCaseExp{ blks: "d", @@ -1227,10 +1222,9 @@ func TestPeerBlockFilter(t *testing.T) { // Peer 2 has access to [d]: want-have `a` and want-block `b` result in dont-have. { wl: testCaseEntry{ - peerIndex: 2, - wantHaves: "a", - wantBlks: "bcd1", - sendDontHave: true, + peerIndex: 2, + wantHaves: "a", + wantBlks: "bcd1", }, exp: testCaseExp{ haves: "", @@ -1254,13 +1248,13 @@ func TestPeerBlockFilter(t *testing.T) { // Create wants requests wl := testCase.wl - t.Logf("test case %v: Peer%v / want-blocks '%s' / want-haves '%s' / sendDontHave %t", - i, wl.peerIndex, wl.wantBlks, wl.wantHaves, wl.sendDontHave) + t.Logf("test case %v: Peer%v / want-blocks '%s' / want-haves '%s'", + i, wl.peerIndex, wl.wantBlks, wl.wantHaves) wantBlks := strings.Split(wl.wantBlks, "") wantHaves := strings.Split(wl.wantHaves, "") - partnerWantBlocksHaves(e, wantBlks, wantHaves, wl.sendDontHave, peerIDs[wl.peerIndex]) + partnerWantBlocksHaves(e, wantBlks, wantHaves, true, peerIDs[wl.peerIndex]) // Check result exp := testCase.exp @@ -1314,10 +1308,9 @@ func TestPeerBlockFilterMutability(t *testing.T) { // Setup the test type testCaseEntry struct { - allowList string - wantBlks string - wantHaves string - sendDontHave bool + allowList string + wantBlks string + wantHaves string } type testCaseExp struct { @@ -1337,15 +1330,13 @@ func TestPeerBlockFilterMutability(t *testing.T) { wls: []testCaseEntry{ { // Peer has no accesses & request a want-block - allowList: "", - wantBlks: "a", - sendDontHave: true, + allowList: "", + wantBlks: "a", }, { // Then Peer is allowed access to a - allowList: "a", - wantBlks: "a", - sendDontHave: true, + allowList: "a", + wantBlks: "a", }, }, exps: []testCaseExp{ @@ -1361,15 +1352,13 @@ func TestPeerBlockFilterMutability(t *testing.T) { wls: []testCaseEntry{ { // Peer has access to bc - allowList: "bc", - wantHaves: "bc", - sendDontHave: true, + allowList: "bc", + wantHaves: "bc", }, { // Then Peer loses access to b - allowList: "c", - wantBlks: "bc", // Note: We request a block here to force a response from the node - sendDontHave: true, + allowList: "c", + wantBlks: "bc", // Note: We request a block here to force a response from the node }, }, exps: []testCaseExp{ @@ -1422,8 +1411,8 @@ func TestPeerBlockFilterMutability(t *testing.T) { exp := testCase.exps[j] // Create wants requests - t.Logf("test case %v, %v: allow-list '%s' / want-blocks '%s' / want-haves '%s' / sendDontHave %t", - i, j, wl.allowList, wl.wantBlks, wl.wantHaves, wl.sendDontHave) + t.Logf("test case %v, %v: allow-list '%s' / want-blocks '%s' / want-haves '%s'", + i, j, wl.allowList, wl.wantBlks, wl.wantHaves) allowList := strings.Split(wl.allowList, "") wantBlks := strings.Split(wl.wantBlks, "") @@ -1437,7 +1426,7 @@ func TestPeerBlockFilterMutability(t *testing.T) { } // Send the request - partnerWantBlocksHaves(e, wantBlks, wantHaves, wl.sendDontHave, partnerID) + partnerWantBlocksHaves(e, wantBlks, wantHaves, true, partnerID) // Check result next := <-e.Outbox() From 44432bc3448a50ff3a013b2b367513c6f1203a18 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Thu, 3 Mar 2022 10:21:52 +0100 Subject: [PATCH 9/9] test: simplify test filter --- internal/decision/engine_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index df64fd63..c4dc5348 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1138,9 +1138,7 @@ func TestPeerBlockFilter(t *testing.T) { t.Fatal(err) } - // use a single task worker so that the order of outgoing messages is deterministic - engineTaskWorkerCount := 1 - e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { // peer 0 has access to everything if p == peerIDs[0] { @@ -1297,9 +1295,7 @@ func TestPeerBlockFilterMutability(t *testing.T) { filterAllowList := make(map[cid.Cid]bool) - // use a single task worker so that the order of outgoing messages is deterministic - engineTaskWorkerCount := 1 - e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { return filterAllowList[c] }),