Skip to content

Commit

Permalink
feat(bitswap:network) propagate errors up the stack
Browse files Browse the repository at this point in the history
Rather than pushing errors back down to lower layers, propagate the
errors upward.

This commit adds a `ReceiveError` method to BitSwap's network receiver.

Still TODO: rm the error return value from:

    net.service.handler.HandleMessage

This is inspired by delegation patterns in found in the wild.
  • Loading branch information
Brian Tiger Chow committed Sep 22, 2014
1 parent 676d9c5 commit 146217b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 46 deletions.
17 changes: 11 additions & 6 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package bitswap

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Expand Down Expand Up @@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())

if p == nil {
return nil, nil, errors.New("Received nil Peer")
// TODO propagate the error upward
return nil, nil
}
if incoming == nil {
return nil, nil, errors.New("Received nil Message")
// TODO propagate the error upward
return nil, nil
}

bs.strategy.MessageReceived(p, incoming) // FIRST
Expand Down Expand Up @@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}
}
defer bs.strategy.MessageSent(p, message)
return p, message, nil
return p, message
}

func (bs *bitswap) ReceiveError(err error) {
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// send strives to ensure that accounting is always performed when a message is
Expand Down
4 changes: 3 additions & 1 deletion exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type Adapter interface {
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
destination *peer.Peer, outgoing bsmsg.BitSwapMessage)

ReceiveError(error)
}

// TODO(brian): move this to go-ipfs/net package
Expand Down
15 changes: 6 additions & 9 deletions exchange/bitswap/network/net_message_adapter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package network

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Expand Down Expand Up @@ -34,18 +32,16 @@ func (adapter *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {

if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped")
return nil, nil
}

received, err := bsmsg.FromNet(incoming)
if err != nil {
return nil, err
adapter.receiver.ReceiveError(err)
return nil, nil
}

p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil {
return nil, err
}
p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)

// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
Expand All @@ -54,7 +50,8 @@ func (adapter *impl) HandleMessage(

outgoing, err := bsmsg.ToNet(p)
if err != nil {
return nil, err
adapter.receiver.ReceiveError(err)
return nil, nil
}

return outgoing, nil
Expand Down
24 changes: 5 additions & 19 deletions exchange/bitswap/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,7 @@ func (n *network) deliver(
return errors.New("Invalid input")
}

nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {

// TODO should this error be returned across network boundary?

// TODO this raises an interesting question about network contract. How
// can the network be expected to behave under different failure
// conditions? What if peer is unreachable? Will we know if messages
// aren't delivered?

return err
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return errors.New("Malformed client request")
Expand Down Expand Up @@ -119,15 +108,12 @@ func (n *network) SendRequest(
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
return nil, err
// TODO return nil, NoResponse
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

// TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return nil, errors.New("Malformed client request")
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}

// TODO dedupe code
Expand All @@ -144,7 +130,7 @@ func (n *network) SendRequest(
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, NoResponse
return nil, nil
}
return nextMsg, nil
}
Expand Down
25 changes: 14 additions & 11 deletions exchange/bitswap/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
ctx context.Context,
from *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

t.Log("Recipient received a message from the network")

Expand All @@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
m := bsmsg.New()
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return from, m, nil
return from, m
}))

t.Log("Build a message and send a synchronous request to recipient")
Expand Down Expand Up @@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
ctx context.Context,
fromWaiter *peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return fromWaiter, msgToWaiter, nil
return fromWaiter, msgToWaiter
}))

waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder *peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
Expand All @@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")

}
return nil, nil, nil
return nil, nil
}))

messageSentAsync := bsmsg.New()
Expand All @@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}

type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error)
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
Expand All @@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver {
}

type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error)
f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage)
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming)
}

func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error
}

0 comments on commit 146217b

Please sign in to comment.