diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 90bb3673951..629af22fde6 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -71,6 +71,10 @@ { "ImportPath": "github.com/syndtr/goleveldb/leveldb", "Rev": "99056d50e56252fbe0021d5c893defca5a76baf8" + }, + { + "ImportPath": "github.com/tuxychandru/pubsub", + "Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e" } ] } diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md new file mode 100644 index 00000000000..c1aab80b5d8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md @@ -0,0 +1,30 @@ +Install pubsub with, + + go get github.com/tuxychandru/pubsub + +View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub). + +## License + +Copyright (c) 2013, Chandra Sekar S +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go new file mode 100644 index 00000000000..9cbf9cffad2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go @@ -0,0 +1,208 @@ +// Copyright 2013, Chandra Sekar S. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the README.md file. + +// Package pubsub implements a simple multi-topic pub-sub +// library. +// +// Topics must be strings and messages of any type can be +// published. A topic can have any number of subcribers and +// all of them receive messages published on the topic. +package pubsub + +type operation int + +const ( + sub operation = iota + subOnce + pub + unsub + unsubAll + closeTopic + shutdown +) + +// PubSub is a collection of topics. +type PubSub struct { + cmdChan chan cmd + capacity int +} + +type cmd struct { + op operation + topics []string + ch chan interface{} + msg interface{} +} + +// New creates a new PubSub and starts a goroutine for handling operations. +// The capacity of the channels created by Sub and SubOnce will be as specified. +func New(capacity int) *PubSub { + ps := &PubSub{make(chan cmd), capacity} + go ps.start() + return ps +} + +// Sub returns a channel on which messages published on any of +// the specified topics can be received. +func (ps *PubSub) Sub(topics ...string) chan interface{} { + return ps.sub(sub, topics...) +} + +// SubOnce is similar to Sub, but only the first message published, after subscription, +// on any of the specified topics can be received. +func (ps *PubSub) SubOnce(topics ...string) chan interface{} { + return ps.sub(subOnce, topics...) +} + +func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { + ch := make(chan interface{}, ps.capacity) + ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} + return ch +} + +// AddSub adds subscriptions to an existing channel. +func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { + ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} +} + +// Pub publishes the given message to all subscribers of +// the specified topics. +func (ps *PubSub) Pub(msg interface{}, topics ...string) { + ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg} +} + +// Unsub unsubscribes the given channel from the specified +// topics. If no topic is specified, it is unsubscribed +// from all topics. +func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) { + if len(topics) == 0 { + ps.cmdChan <- cmd{op: unsubAll, ch: ch} + return + } + + ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch} +} + +// Close closes all channels currently subscribed to the specified topics. +// If a channel is subscribed to multiple topics, some of which is +// not specified, it is not closed. +func (ps *PubSub) Close(topics ...string) { + ps.cmdChan <- cmd{op: closeTopic, topics: topics} +} + +// Shutdown closes all subscribed channels and terminates the goroutine. +func (ps *PubSub) Shutdown() { + ps.cmdChan <- cmd{op: shutdown} +} + +func (ps *PubSub) start() { + reg := registry{ + topics: make(map[string]map[chan interface{}]bool), + revTopics: make(map[chan interface{}]map[string]bool), + } + +loop: + for cmd := range ps.cmdChan { + if cmd.topics == nil { + switch cmd.op { + case unsubAll: + reg.removeChannel(cmd.ch) + + case shutdown: + break loop + } + + continue loop + } + + for _, topic := range cmd.topics { + switch cmd.op { + case sub: + reg.add(topic, cmd.ch, false) + + case subOnce: + reg.add(topic, cmd.ch, true) + + case pub: + reg.send(topic, cmd.msg) + + case unsub: + reg.remove(topic, cmd.ch) + + case closeTopic: + reg.removeTopic(topic) + } + } + } + + for topic, chans := range reg.topics { + for ch, _ := range chans { + reg.remove(topic, ch) + } + } +} + +// registry maintains the current subscription state. It's not +// safe to access a registry from multiple goroutines simultaneously. +type registry struct { + topics map[string]map[chan interface{}]bool + revTopics map[chan interface{}]map[string]bool +} + +func (reg *registry) add(topic string, ch chan interface{}, once bool) { + if reg.topics[topic] == nil { + reg.topics[topic] = make(map[chan interface{}]bool) + } + reg.topics[topic][ch] = once + + if reg.revTopics[ch] == nil { + reg.revTopics[ch] = make(map[string]bool) + } + reg.revTopics[ch][topic] = true +} + +func (reg *registry) send(topic string, msg interface{}) { + for ch, once := range reg.topics[topic] { + ch <- msg + if once { + for topic := range reg.revTopics[ch] { + reg.remove(topic, ch) + } + } + } +} + +func (reg *registry) removeTopic(topic string) { + for ch := range reg.topics[topic] { + reg.remove(topic, ch) + } +} + +func (reg *registry) removeChannel(ch chan interface{}) { + for topic := range reg.revTopics[ch] { + reg.remove(topic, ch) + } +} + +func (reg *registry) remove(topic string, ch chan interface{}) { + if _, ok := reg.topics[topic]; !ok { + return + } + + if _, ok := reg.topics[topic][ch]; !ok { + return + } + + delete(reg.topics[topic], ch) + delete(reg.revTopics[ch], topic) + + if len(reg.topics[topic]) == 0 { + delete(reg.topics, topic) + } + + if len(reg.revTopics[ch]) == 0 { + close(ch) + delete(reg.revTopics, ch) + } +} diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go new file mode 100644 index 00000000000..16392d33bc7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go @@ -0,0 +1,230 @@ +// Copyright 2013, Chandra Sekar S. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the README.md file. + +package pubsub + +import ( + check "launchpad.net/gocheck" + "runtime" + "testing" + "time" +) + +var _ = check.Suite(new(Suite)) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type Suite struct{} + +func (s *Suite) TestSub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t1") + ch3 := ps.Sub("t2") + + ps.Pub("hi", "t1") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + + ps.Pub("hello", "t2") + c.Check(<-ch3, check.Equals, "hello") + + ps.Shutdown() + _, ok := <-ch1 + c.Check(ok, check.Equals, false) + _, ok = <-ch2 + c.Check(ok, check.Equals, false) + _, ok = <-ch3 + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestSubOnce(c *check.C) { + ps := New(1) + ch := ps.SubOnce("t1") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestAddSub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t2") + + ps.Pub("hi1", "t1") + c.Check(<-ch1, check.Equals, "hi1") + + ps.Pub("hi2", "t2") + c.Check(<-ch2, check.Equals, "hi2") + + ps.AddSub(ch1, "t2", "t3") + ps.Pub("hi3", "t2") + c.Check(<-ch1, check.Equals, "hi3") + c.Check(<-ch2, check.Equals, "hi3") + + ps.Pub("hi4", "t3") + c.Check(<-ch1, check.Equals, "hi4") + + ps.Shutdown() +} + +func (s *Suite) TestUnsub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Unsub(ch, "t1") + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestUnsubAll(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1", "t2", "t3") + ch2 := ps.Sub("t1", "t3") + + ps.Unsub(ch1) + + m, ok := <-ch1 + c.Check(ok, check.Equals, false) + + ps.Pub("hi", "t1") + m, ok = <-ch2 + c.Check(m, check.Equals, "hi") + + ps.Shutdown() +} + +func (s *Suite) TestClose(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t1") + ch3 := ps.Sub("t2") + ch4 := ps.Sub("t3") + + ps.Pub("hi", "t1") + ps.Pub("hello", "t2") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + c.Check(<-ch3, check.Equals, "hello") + + ps.Close("t1", "t2") + _, ok := <-ch1 + c.Check(ok, check.Equals, false) + _, ok = <-ch2 + c.Check(ok, check.Equals, false) + _, ok = <-ch3 + c.Check(ok, check.Equals, false) + + ps.Pub("welcome", "t3") + c.Check(<-ch4, check.Equals, "welcome") + + ps.Shutdown() +} + +func (s *Suite) TestUnsubAfterClose(c *check.C) { + ps := New(1) + ch := ps.Sub("t1") + defer func() { + ps.Unsub(ch, "t1") + ps.Shutdown() + }() + + ps.Close("t1") + _, ok := <-ch + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestShutdown(c *check.C) { + start := runtime.NumGoroutine() + New(10).Shutdown() + time.Sleep(1) + c.Check(runtime.NumGoroutine()-start, check.Equals, 1) +} + +func (s *Suite) TestMultiSub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Shutdown() + _, ok := <-ch + c.Check(ok, check.Equals, false) +} + +func (s *Suite) TestMultiSubOnce(c *check.C) { + ps := New(1) + ch := ps.SubOnce("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hello", "t2") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + +func (s *Suite) TestMultiPub(c *check.C) { + ps := New(1) + ch1 := ps.Sub("t1") + ch2 := ps.Sub("t2") + + ps.Pub("hi", "t1", "t2") + c.Check(<-ch1, check.Equals, "hi") + c.Check(<-ch2, check.Equals, "hi") + + ps.Shutdown() +} + +func (s *Suite) TestMultiUnsub(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2", "t3") + + ps.Unsub(ch, "t1") + + ps.Pub("hi", "t1") + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Unsub(ch, "t2", "t3") + _, ok := <-ch + c.Check(ok, check.Equals, false) + + ps.Shutdown() +} + +func (s *Suite) TestMultiClose(c *check.C) { + ps := New(1) + ch := ps.Sub("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Close("t1") + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + ps.Close("t2") + _, ok := <-ch + c.Check(ok, check.Equals, false) + + ps.Shutdown() +} diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 37876fc95c2..dadf306c9df 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -1,11 +1,15 @@ package bitswap import ( + "errors" "time" - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + notifications "github.com/jbenet/go-ipfs/bitswap/notifications" + tx "github.com/jbenet/go-ipfs/bitswap/transmission" blocks "github.com/jbenet/go-ipfs/blocks" swarm "github.com/jbenet/go-ipfs/net/swarm" peer "github.com/jbenet/go-ipfs/peer" @@ -29,6 +33,7 @@ type BitSwap struct { peer *peer.Peer // net holds the connections to all peers. + sender tx.Sender net swarm.Network meschan *swarm.Chan @@ -38,7 +43,7 @@ type BitSwap struct { // routing interface for communication routing *dht.IpfsDHT - listener *swarm.MessageListener + notifications notifications.PubSub // partners is a map of currently active bitswap relationships. // The Ledger has the peer.ID, and the peer connection works through net. @@ -59,6 +64,8 @@ type BitSwap struct { // NewBitSwap creates a new BitSwap instance. It does not check its parameters. func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { + receiver := tx.Forwarder{} + sender := tx.NewBSNetService(context.Background(), &receiver) bs := &BitSwap{ peer: p, net: net, @@ -66,10 +73,13 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR partners: LedgerMap{}, wantList: KeySet{}, routing: r.(*dht.IpfsDHT), - meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), - haltChan: make(chan struct{}), - listener: swarm.NewMessageListener(), + // TODO(brian): replace |meschan| with |sender| in BitSwap impl + meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), + sender: sender, + haltChan: make(chan struct{}), + notifications: notifications.New(), } + receiver.Delegate(bs) go bs.handleMessages() return bs @@ -83,7 +93,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( tleft := timeout - time.Now().Sub(begin) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) - valchan := make(chan []byte) + blockChannel := make(chan blocks.Block) after := time.After(tleft) // TODO: when the data is received, shut down this for loop ASAP @@ -96,7 +106,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( return } select { - case valchan <- blk: + case blockChannel <- *blk: default: } }(p) @@ -104,31 +114,30 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( }() select { - case blkdata := <-valchan: - close(valchan) - return blocks.NewBlock(blkdata) + case block := <-blockChannel: + close(blockChannel) + return &block, nil case <-after: return nil, u.ErrTimeout } } -func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { +func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) { u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) - message := newMessage() - message.AppendWanted(k) + ctx, _ := context.WithTimeout(context.Background(), timeout) + blockChannel := bs.notifications.Subscribe(ctx, k) - after := time.After(timeout) - resp := bs.listener.Listen(string(k), 1, timeout) + message := bsmsg.New() + message.AppendWanted(k) bs.meschan.Outgoing <- message.ToSwarm(p) - select { - case resp_mes := <-resp: - return resp_mes.Data, nil - case <-after: + block, ok := <-blockChannel + if !ok { u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) return nil, u.ErrTimeout } + return &block, nil } // HaveBlock announces the existance of a block to BitSwap, potentially sending @@ -148,7 +157,7 @@ func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { } func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) { - message := newMessage() + message := bsmsg.New() message.AppendBlock(b) bs.meschan.Outgoing <- message.ToSwarm(p) } @@ -157,29 +166,25 @@ func (bs *BitSwap) handleMessages() { for { select { case mes := <-bs.meschan.Incoming: - pmes := new(PBMessage) - err := proto.Unmarshal(mes.Data, pmes) + bsmsg, err := bsmsg.FromSwarm(*mes) if err != nil { u.PErr("%v\n", err) continue } - if pmes.Blocks != nil { - for _, blkData := range pmes.Blocks { - blk, err := blocks.NewBlock(blkData) - if err != nil { - u.PErr("%v\n", err) - continue - } + + if bsmsg.Blocks() != nil { + for _, blk := range bsmsg.Blocks() { go bs.blockReceive(mes.Peer, blk) } } - if pmes.Wantlist != nil { - for _, want := range pmes.Wantlist { + if bsmsg.Wantlist() != nil { + for _, want := range bsmsg.Wantlist() { go bs.peerWantsBlock(mes.Peer, want) } } case <-bs.haltChan: + bs.notifications.Shutdown() return } } @@ -187,15 +192,14 @@ func (bs *BitSwap) handleMessages() { // peerWantsBlock will check if we have the block in question, // and then if we do, check the ledger for whether or not we should send it. -func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { - u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty()) +func (bs *BitSwap) peerWantsBlock(p *peer.Peer, wanted u.Key) { + u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty()) ledger := bs.getLedger(p) - dsk := ds.NewKey(want) - blk_i, err := bs.datastore.Get(dsk) + blk_i, err := bs.datastore.Get(wanted.DatastoreKey()) if err != nil { if err == ds.ErrNotFound { - ledger.Wants(u.Key(want)) + ledger.Wants(wanted) } u.PErr("datastore get error: %v\n", err) return @@ -221,7 +225,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { } } -func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { +func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) { u.DOut("blockReceive: %s\n", blk.Key().Pretty()) err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data) if err != nil { @@ -229,11 +233,7 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { return } - mes := &swarm.Message{ - Peer: p, - Data: blk.Data, - } - bs.listener.Respond(string(blk.Key()), mes) + bs.notifications.Publish(blk) ledger := bs.getLedger(p) ledger.ReceivedBytes(len(blk.Data)) @@ -253,7 +253,7 @@ func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger { } func (bs *BitSwap) SendWantList(wl KeySet) error { - message := newMessage() + message := bsmsg.New() for k, _ := range wl { message.AppendWanted(k) } @@ -276,3 +276,20 @@ func (bs *BitSwap) SetStrategy(sf StrategyFunc) { ledger.Strategy = sf } } + +func (bs *BitSwap) ReceiveMessage( + ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( + bsmsg.BitSwapMessage, *peer.Peer, error) { + if incoming.Blocks() != nil { + for _, block := range incoming.Blocks() { + go bs.blockReceive(sender, block) + } + } + + if incoming.Wantlist() != nil { + for _, want := range incoming.Wantlist() { + go bs.peerWantsBlock(sender, want) + } + } + return nil, nil, errors.New("TODO implement") +} diff --git a/bitswap/message.go b/bitswap/message.go deleted file mode 100644 index a0be726b72a..00000000000 --- a/bitswap/message.go +++ /dev/null @@ -1,38 +0,0 @@ -package bitswap - -import ( - blocks "github.com/jbenet/go-ipfs/blocks" - swarm "github.com/jbenet/go-ipfs/net/swarm" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -// message wraps a proto message for convenience -type message struct { - pb PBMessage -} - -func newMessageFromProto(pb PBMessage) *message { - return &message{pb: pb} -} - -func newMessage() *message { - return new(message) -} - -func (m *message) AppendWanted(k u.Key) { - m.pb.Wantlist = append(m.pb.Wantlist, string(k)) -} - -func (m *message) AppendBlock(b *blocks.Block) { - m.pb.Blocks = append(m.pb.Blocks, b.Data) -} - -func (m *message) ToProto() *PBMessage { - cp := m.pb - return &cp -} - -func (m *message) ToSwarm(p *peer.Peer) *swarm.Message { - return swarm.NewMessage(p, m.ToProto()) -} diff --git a/bitswap/message/Makefile b/bitswap/message/Makefile new file mode 100644 index 00000000000..5bbebea075a --- /dev/null +++ b/bitswap/message/Makefile @@ -0,0 +1,8 @@ +# TODO(brian): add proto tasks +all: message.pb.go + +message.pb.go: message.proto + protoc --gogo_out=. --proto_path=../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm message.pb.go diff --git a/bitswap/message/message.go b/bitswap/message/message.go new file mode 100644 index 00000000000..01ef402534b --- /dev/null +++ b/bitswap/message/message.go @@ -0,0 +1,81 @@ +package message + +import ( + "errors" + + netmsg "github.com/jbenet/go-ipfs/net/message" + + blocks "github.com/jbenet/go-ipfs/blocks" + nm "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +type BitSwapMessage interface { + Wantlist() []u.Key + Blocks() []blocks.Block + AppendWanted(k u.Key) + AppendBlock(b *blocks.Block) + Exportable +} + +type Exportable interface { + ToProto() *PBMessage + ToNet(p *peer.Peer) (nm.NetMessage, error) +} + +// message wraps a proto message for convenience +type message struct { + pb PBMessage +} + +func newMessageFromProto(pb PBMessage) *message { + return &message{pb: pb} +} + +func New() *message { + return new(message) +} + +// TODO(brian): convert these into keys +func (m *message) Wantlist() []u.Key { + wl := make([]u.Key, len(m.pb.Wantlist)) + for _, str := range m.pb.Wantlist { + wl = append(wl, u.Key(str)) + } + return wl +} + +// TODO(brian): convert these into blocks +func (m *message) Blocks() []blocks.Block { + bs := make([]blocks.Block, len(m.pb.Blocks)) + for _, data := range m.pb.Blocks { + b, err := blocks.NewBlock(data) + if err != nil { + continue + } + bs = append(bs, *b) + } + return bs +} + +func (m *message) AppendWanted(k u.Key) { + m.pb.Wantlist = append(m.pb.Wantlist, string(k)) +} + +func (m *message) AppendBlock(b *blocks.Block) { + m.pb.Blocks = append(m.pb.Blocks, b.Data) +} + +func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) { + return nil, errors.New("TODO implement") +} + +func (m *message) ToProto() *PBMessage { + cp := m.pb + return &cp +} + +func (m *message) ToNet(p *peer.Peer) (nm.NetMessage, error) { + return nm.FromObject(p, m.ToProto()) +} diff --git a/bitswap/message.pb.go b/bitswap/message/message.pb.go similarity index 98% rename from bitswap/message.pb.go rename to bitswap/message/message.pb.go index a340ca0733d..d1089f5c94a 100644 --- a/bitswap/message.pb.go +++ b/bitswap/message/message.pb.go @@ -11,7 +11,7 @@ It is generated from these files: It has these top-level messages: PBMessage */ -package bitswap +package message import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" import math "math" diff --git a/bitswap/message.proto b/bitswap/message/message.proto similarity index 82% rename from bitswap/message.proto rename to bitswap/message/message.proto index b025ac3c3c7..a0e4d19972c 100644 --- a/bitswap/message.proto +++ b/bitswap/message/message.proto @@ -1,4 +1,4 @@ -package bitswap; +package message; message PBMessage { repeated string wantlist = 1; diff --git a/bitswap/message_test.go b/bitswap/message/message_test.go similarity index 94% rename from bitswap/message_test.go rename to bitswap/message/message_test.go index bc52b5aa9ed..87a36cea105 100644 --- a/bitswap/message_test.go +++ b/bitswap/message/message_test.go @@ -1,4 +1,4 @@ -package bitswap +package message import ( "bytes" @@ -10,7 +10,7 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" - m := newMessage() + m := New() m.AppendWanted(u.Key(str)) if !contains(m.ToProto().GetWantlist(), str) { @@ -37,7 +37,7 @@ func TestAppendBlock(t *testing.T) { strs = append(strs, "Celeritas") strs = append(strs, "Incendia") - m := newMessage() + m := New() for _, str := range strs { block, err := blocks.NewBlock([]byte(str)) if err != nil { @@ -57,7 +57,7 @@ func TestAppendBlock(t *testing.T) { func TestCopyProtoByValue(t *testing.T) { const str = "foo" - m := newMessage() + m := New() protoBeforeAppend := m.ToProto() m.AppendWanted(u.Key(str)) if contains(protoBeforeAppend.GetWantlist(), str) { diff --git a/bitswap/notifications/notifications.go b/bitswap/notifications/notifications.go new file mode 100644 index 00000000000..2da2b7fadcb --- /dev/null +++ b/bitswap/notifications/notifications.go @@ -0,0 +1,55 @@ +package notifications + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub" + + blocks "github.com/jbenet/go-ipfs/blocks" + u "github.com/jbenet/go-ipfs/util" +) + +type PubSub interface { + Publish(block blocks.Block) + Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block + Shutdown() +} + +func New() PubSub { + const bufferSize = 16 + return &impl{*pubsub.New(bufferSize)} +} + +type impl struct { + wrapped pubsub.PubSub +} + +func (ps *impl) Publish(block blocks.Block) { + topic := string(block.Key()) + ps.wrapped.Pub(block, topic) +} + +// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil +// if the |ctx| times out or is cancelled. Then channel is closed after the +// block given by |k| is sent. +func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block { + topic := string(k) + subChan := ps.wrapped.SubOnce(topic) + blockChannel := make(chan blocks.Block) + go func() { + defer close(blockChannel) + select { + case val := <-subChan: + block, ok := val.(blocks.Block) + if ok { + blockChannel <- block + } + case <-ctx.Done(): + ps.wrapped.Unsub(subChan, topic) + } + }() + return blockChannel +} + +func (ps *impl) Shutdown() { + ps.wrapped.Shutdown() +} diff --git a/bitswap/notifications/notifications_test.go b/bitswap/notifications/notifications_test.go new file mode 100644 index 00000000000..487474e2dbe --- /dev/null +++ b/bitswap/notifications/notifications_test.go @@ -0,0 +1,65 @@ +package notifications + +import ( + "bytes" + "testing" + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + blocks "github.com/jbenet/go-ipfs/blocks" +) + +func TestPublishSubscribe(t *testing.T) { + blockSent := getBlockOrFail(t, "Greetings from The Interval") + + n := New() + defer n.Shutdown() + ch := n.Subscribe(context.Background(), blockSent.Key()) + + n.Publish(blockSent) + blockRecvd, ok := <-ch + if !ok { + t.Fail() + } + + assertBlocksEqual(t, blockRecvd, blockSent) + +} + +func TestCarryOnWhenDeadlineExpires(t *testing.T) { + + impossibleDeadline := time.Nanosecond + fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline) + + n := New() + defer n.Shutdown() + block := getBlockOrFail(t, "A Missed Connection") + blockChannel := n.Subscribe(fastExpiringCtx, block.Key()) + + assertBlockChannelNil(t, blockChannel) +} + +func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { + _, ok := <-blockChannel + if ok { + t.Fail() + } +} + +func assertBlocksEqual(t *testing.T, a, b blocks.Block) { + if !bytes.Equal(a.Data, b.Data) { + t.Fail() + } + if a.Key() != b.Key() { + t.Fail() + } +} + +func getBlockOrFail(t *testing.T, msg string) blocks.Block { + block, blockCreationErr := blocks.NewBlock([]byte(msg)) + if blockCreationErr != nil { + t.Fail() + } + return *block +} diff --git a/bitswap/transmission/forwarder.go b/bitswap/transmission/forwarder.go new file mode 100644 index 00000000000..ab2fc6a0850 --- /dev/null +++ b/bitswap/transmission/forwarder.go @@ -0,0 +1,28 @@ +package transmission + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" +) + +// Forwarder breaks the circular dependency between bitswap and its sender +// NB: A sender is instantiated with a handler and this sender is then passed +// as a constructor argument to BitSwap. However, the handler is BitSwap! +// Hence, this receiver. +type Forwarder struct { + delegate Receiver +} + +func (r *Forwarder) ReceiveMessage( + ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( + bsmsg.BitSwapMessage, *peer.Peer, error) { + if r.delegate == nil { + return nil, nil, nil + } + return r.delegate.ReceiveMessage(ctx, sender, incoming) +} + +func (r *Forwarder) Delegate(delegate Receiver) { + r.delegate = delegate +} diff --git a/bitswap/transmission/forwarder_test.go b/bitswap/transmission/forwarder_test.go new file mode 100644 index 00000000000..f17ebb1473f --- /dev/null +++ b/bitswap/transmission/forwarder_test.go @@ -0,0 +1,16 @@ +package transmission + +import ( + "testing" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" +) + +func TestDoesntPanicIfDelegateNotPresent(t *testing.T) { + fwdr := Forwarder{} + fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New()) +} + +// TODO(brian): func TestForwardsMessageToDelegate(t *testing.T) diff --git a/bitswap/transmission/interface.go b/bitswap/transmission/interface.go new file mode 100644 index 00000000000..080c9b85142 --- /dev/null +++ b/bitswap/transmission/interface.go @@ -0,0 +1,21 @@ +package transmission + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" +) + +type Sender interface { + SendMessage(ctx context.Context, destination *peer.Peer, message bsmsg.Exportable) error + SendRequest(ctx context.Context, destination *peer.Peer, outgoing bsmsg.Exportable) ( + incoming bsmsg.BitSwapMessage, err error) +} + +// TODO(brian): consider returning a NetMessage +type Receiver interface { + ReceiveMessage( + ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( + outgoing bsmsg.BitSwapMessage, destination *peer.Peer, err error) +} diff --git a/bitswap/transmission/service_wrapper.go b/bitswap/transmission/service_wrapper.go new file mode 100644 index 00000000000..04d3f21f884 --- /dev/null +++ b/bitswap/transmission/service_wrapper.go @@ -0,0 +1,76 @@ +package transmission + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + netmsg "github.com/jbenet/go-ipfs/net/message" + netservice "github.com/jbenet/go-ipfs/net/service" + peer "github.com/jbenet/go-ipfs/peer" +) + +func NewBSNetService(ctx context.Context, r Receiver) Sender { + h := &handlerWrapper{r} + s := netservice.NewService(ctx, h) + return &serviceWrapper{*s} +} + +// handlerWrapper is responsible for marshaling/unmarshaling NetMessages. It +// delegates calls to the BitSwap delegate. +type handlerWrapper struct { + bitswapDelegate Receiver +} + +// HandleMessage marshals and unmarshals net messages, forwarding them to the +// BitSwapMessage receiver +func (wrapper *handlerWrapper) HandleMessage( + ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { + + received, err := bsmsg.FromNet(incoming) + if err != nil { + return nil, err + } + + bsmsg, p, err := wrapper.bitswapDelegate.ReceiveMessage(ctx, incoming.Peer(), received) + if err != nil { + return nil, err + } + if bsmsg == nil { + return nil, nil + } + + outgoing, err := bsmsg.ToNet(p) + if err != nil { + return nil, err + } + + return outgoing, nil +} + +type serviceWrapper struct { + serviceDelegate netservice.Service +} + +func (wrapper *serviceWrapper) SendMessage( + ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error { + nmsg, err := outgoing.ToNet(p) + if err != nil { + return err + } + req, err := netservice.NewRequest(p.ID) + return wrapper.serviceDelegate.SendMessage(ctx, nmsg, req.ID) +} + +func (wrapper *serviceWrapper) SendRequest(ctx context.Context, + p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) { + + outgoingMsg, err := outgoing.ToNet(p) + if err != nil { + return nil, err + } + incomingMsg, err := wrapper.serviceDelegate.SendRequest(ctx, outgoingMsg) + if err != nil { + return nil, err + } + return bsmsg.FromNet(incomingMsg) +} diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index ef5b0d18c48..a94c36a4d52 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -9,7 +9,7 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander" config "github.com/jbenet/go-ipfs/config" ci "github.com/jbenet/go-ipfs/crypto" - identify "github.com/jbenet/go-ipfs/identify" + spipe "github.com/jbenet/go-ipfs/crypto/spipe" u "github.com/jbenet/go-ipfs/util" ) @@ -90,7 +90,7 @@ func initCmd(c *commander.Command, inp []string) error { } cfg.Identity.PrivKey = base64.StdEncoding.EncodeToString(skbytes) - id, err := identify.IDFromPubKey(pk) + id, err := spipe.IDFromPubKey(pk) if err != nil { return err } diff --git a/core/core.go b/core/core.go index 2c010b4ac62..9c17f2a4587 100644 --- a/core/core.go +++ b/core/core.go @@ -95,11 +95,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { } route = dht.NewDHT(local, net, d) + // TODO(brian): pass a context to DHT for its async operations route.Start() + // TODO(brian): pass a context to bs for its async operations swap = bitswap.NewBitSwap(local, net, d, route) swap.SetStrategy(bitswap.YesManStrategy) + // TODO(brian): pass a context to initConnections go initConnections(cfg, route) } diff --git a/net/interface.go b/net/interface.go index f5934a7e1d0..6f153983633 100644 --- a/net/interface.go +++ b/net/interface.go @@ -26,7 +26,7 @@ type Network interface { GetProtocols() *mux.ProtocolMap // SendMessage sends given Message out - SendMessage(*msg.Message) error + SendMessage(msg.NetMessage) error // Close terminates all network operation Close() error diff --git a/net/message/message.go b/net/message/message.go index e847539d8b4..11053e423cc 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -6,38 +6,52 @@ import ( proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ) -// Message represents a packet of information sent to or received from a +type NetMessage interface { + Peer() *peer.Peer + Data() []byte +} + +func New(p *peer.Peer, data []byte) NetMessage { + return &message{peer: p, data: data} +} + +// message represents a packet of information sent to or received from a // particular Peer. -type Message struct { +type message struct { // To or from, depending on direction. - Peer *peer.Peer + peer *peer.Peer // Opaque data - Data []byte + data []byte +} + +func (m *message) Peer() *peer.Peer { + return m.peer +} + +func (m *message) Data() []byte { + return m.data } // FromObject creates a message from a protobuf-marshallable message. -func FromObject(p *peer.Peer, data proto.Message) (*Message, error) { +func FromObject(p *peer.Peer, data proto.Message) (NetMessage, error) { bytes, err := proto.Marshal(data) if err != nil { return nil, err } - return &Message{ - Peer: p, - Data: bytes, - }, nil + return New(p, bytes), nil } // Pipe objects represent a bi-directional message channel. type Pipe struct { - Incoming chan *Message - Outgoing chan *Message + Incoming chan NetMessage + Outgoing chan NetMessage } // NewPipe constructs a pipe with channels of a given buffer size. func NewPipe(bufsize int) *Pipe { return &Pipe{ - Incoming: make(chan *Message, bufsize), - Outgoing: make(chan *Message, bufsize), + Incoming: make(chan NetMessage, bufsize), + Outgoing: make(chan NetMessage, bufsize), } } diff --git a/net/mux/mux.go b/net/mux/mux.go index a73e9a209b7..e6cf0651fbe 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -87,15 +87,15 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) { } // handleIncomingMessage routes message to the appropriate protocol. -func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 *msg.Message) { +func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) { - data, pid, err := unwrapData(m1.Data) + data, pid, err := unwrapData(m1.Data()) if err != nil { u.PErr("muxer de-serializing error: %v\n", err) return } - m2 := &msg.Message{Peer: m1.Peer, Data: data} + m2 := msg.New(m1.Peer(), data) proto, found := m.Protocols[pid] if !found { u.PErr("muxer unknown protocol %v\n", pid) @@ -125,14 +125,14 @@ func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, prot } // handleOutgoingMessage wraps out a message and sends it out the -func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 *msg.Message) { - data, err := wrapData(m1.Data, pid) +func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) { + data, err := wrapData(m1.Data(), pid) if err != nil { u.PErr("muxer serializing error: %v\n", err) return } - m2 := &msg.Message{Peer: m1.Peer, Data: data} + m2 := msg.New(m1.Peer(), data) select { case m.GetPipe().Outgoing <- m2: case <-ctx.Done(): diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go index 3bbbf784313..d28c3aa6cb6 100644 --- a/net/mux/mux_test.go +++ b/net/mux/mux_test.go @@ -32,14 +32,14 @@ func newPeer(t *testing.T, id string) *peer.Peer { return &peer.Peer{ID: peer.ID(mh)} } -func testMsg(t *testing.T, m *msg.Message, data []byte) { - if !bytes.Equal(data, m.Data) { - t.Errorf("Data does not match: %v != %v", data, m.Data) +func testMsg(t *testing.T, m msg.NetMessage, data []byte) { + if !bytes.Equal(data, m.Data()) { + t.Errorf("Data does not match: %v != %v", data, m.Data()) } } -func testWrappedMsg(t *testing.T, m *msg.Message, pid ProtocolID, data []byte) { - data2, pid2, err := unwrapData(m.Data) +func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) { + data2, pid2, err := unwrapData(m.Data()) if err != nil { t.Error(err) } @@ -76,7 +76,7 @@ func TestSimpleMuxer(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s)) } @@ -86,13 +86,13 @@ func TestSimpleMuxer(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p1.Incoming, []byte(s)) } // test outgoing p2 for _, s := range []string{"foo", "bar", "baz"} { - p2.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p2.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s)) } @@ -102,7 +102,7 @@ func TestSimpleMuxer(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p2.Incoming, []byte(s)) } } @@ -139,7 +139,7 @@ func TestSimultMuxer(t *testing.T) { for i := 0; i < size; i++ { <-limiter s := fmt.Sprintf("proto %v out %v", pid, i) - m := &msg.Message{Peer: peer1, Data: []byte(s)} + m := msg.New(peer1, []byte(s)) mux1.Protocols[pid].GetPipe().Outgoing <- m counts[pid][0][0]++ u.DOut("sent %v\n", s) @@ -156,7 +156,7 @@ func TestSimultMuxer(t *testing.T) { t.Error(err) } - m := &msg.Message{Peer: peer1, Data: d} + m := msg.New(peer1, d) mux1.Incoming <- m counts[pid][1][0]++ u.DOut("sent %v\n", s) @@ -167,7 +167,7 @@ func TestSimultMuxer(t *testing.T) { for { select { case m := <-mux1.Outgoing: - data, pid, err := unwrapData(m.Data) + data, pid, err := unwrapData(m.Data()) if err != nil { t.Error(err) } @@ -186,7 +186,7 @@ func TestSimultMuxer(t *testing.T) { select { case m := <-mux1.Protocols[pid].GetPipe().Incoming: counts[pid][0][1]++ - u.DOut("got %v\n", string(m.Data)) + u.DOut("got %v\n", string(m.Data())) case <-ctx.Done(): return } @@ -239,7 +239,7 @@ func TestStopping(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s)) } @@ -249,7 +249,7 @@ func TestStopping(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p1.Incoming, []byte(s)) } @@ -260,7 +260,7 @@ func TestStopping(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) select { case <-mux1.Outgoing: t.Error("should not have received anything.") @@ -274,7 +274,7 @@ func TestStopping(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) select { case <-p1.Incoming: t.Error("should not have received anything.") diff --git a/net/net.go b/net/net.go index a6361d9b65b..e080ff97c04 100644 --- a/net/net.go +++ b/net/net.go @@ -86,7 +86,7 @@ func (n *IpfsNetwork) GetProtocols() *mux.ProtocolMap { } // SendMessage sends given Message out -func (n *IpfsNetwork) SendMessage(m *msg.Message) error { +func (n *IpfsNetwork) SendMessage(m msg.NetMessage) error { n.swarm.Outgoing <- m return nil } diff --git a/net/service/request.go b/net/service/request.go index 44e856955e1..0905e3a635b 100644 --- a/net/service/request.go +++ b/net/service/request.go @@ -75,7 +75,7 @@ type Request struct { PeerID peer.ID // Response is the channel of incoming responses. - Response chan *msg.Message + Response chan msg.NetMessage } // NewRequest creates a request for given peer.ID @@ -88,7 +88,7 @@ func NewRequest(pid peer.ID) (*Request, error) { return &Request{ ID: id, PeerID: pid, - Response: make(chan *msg.Message, 1), + Response: make(chan msg.NetMessage, 1), }, nil } diff --git a/net/service/service.go b/net/service/service.go index d6735552636..d6e32a5014f 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -16,7 +16,7 @@ type Handler interface { // HandleMessage receives an incoming message, and potentially returns // a response message to send back. - HandleMessage(context.Context, *msg.Message) (*msg.Message, error) + HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error) } // Service is a networking component that protocols can use to multiplex @@ -74,16 +74,16 @@ func (s *Service) GetPipe() *msg.Pipe { } // SendMessage sends a message out -func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error { +func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error { // serialize ServiceMessage wrapper - data, err := wrapData(m.Data, rid) + data, err := wrapData(m.Data(), rid) if err != nil { return err } // send message - m2 := &msg.Message{Peer: m.Peer, Data: data} + m2 := msg.New(m.Peer(), data) select { case s.Outgoing <- m2: case <-ctx.Done(): @@ -94,10 +94,10 @@ func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID } // SendRequest sends a request message out and awaits a response. -func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) { +func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { // create a request - r, err := NewRequest(m.Peer.ID) + r, err := NewRequest(m.Peer().ID) if err != nil { return nil, err } @@ -150,14 +150,14 @@ func (s *Service) handleIncomingMessages(ctx context.Context) { } } -func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) { +func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { // unwrap the incoming message - data, rid, err := unwrapData(m.Data) + data, rid, err := unwrapData(m.Data()) if err != nil { u.PErr("de-serializing error: %v\n", err) } - m2 := &msg.Message{Peer: m.Peer, Data: data} + m2 := msg.New(m.Peer(), data) // if it's a request (or has no RequestID), handle it if rid == nil || rid.IsRequest() { @@ -182,7 +182,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) { u.PErr("RequestID should identify a response here.\n") } - key := RequestKey(m.Peer.ID, RequestID(rid)) + key := RequestKey(m.Peer().ID, RequestID(rid)) s.RequestsLock.RLock() r, found := s.Requests[key] s.RequestsLock.RUnlock() diff --git a/net/service/service_test.go b/net/service/service_test.go index 96b5a1cdc43..0e798bb7889 100644 --- a/net/service/service_test.go +++ b/net/service/service_test.go @@ -15,15 +15,15 @@ import ( // ReverseHandler reverses all Data it receives and sends it back. type ReverseHandler struct{} -func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) ( - *msg.Message, error) { +func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) ( + msg.NetMessage, error) { - d := m.Data + d := m.Data() for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 { d[i], d[j] = d[j], d[i] } - return &msg.Message{Peer: m.Peer, Data: d}, nil + return msg.New(m.Peer(), d), nil } func newPeer(t *testing.T, id string) *peer.Peer { @@ -47,11 +47,11 @@ func TestServiceHandler(t *testing.T) { t.Error(err) } - m1 := &msg.Message{Peer: peer1, Data: d} + m1 := msg.New(peer1, d) s.Incoming <- m1 m2 := <-s.Outgoing - d, rid, err := unwrapData(m2.Data) + d, rid, err := unwrapData(m2.Data()) if err != nil { t.Error(err) } @@ -85,14 +85,14 @@ func TestServiceRequest(t *testing.T) { } }() - m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m1 := msg.New(peer1, []byte("beep")) m2, err := s1.SendRequest(ctx, m1) if err != nil { t.Error(err) } - if !bytes.Equal(m2.Data, []byte("peeb")) { - t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof") + if !bytes.Equal(m2.Data(), []byte("peeb")) { + t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof") } } @@ -117,7 +117,7 @@ func TestServiceRequestTimeout(t *testing.T) { } }() - m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m1 := msg.New(peer1, []byte("beep")) m2, err := s1.SendRequest(ctx, m1) if err == nil || m2 != nil { t.Error("should've timed out") diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 9be46fd70a9..93bee663d78 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -153,7 +153,7 @@ func (s *Swarm) fanOut() { } s.connsLock.RLock() - conn, found := s.conns[msg.Peer.Key()] + conn, found := s.conns[msg.Peer().Key()] s.connsLock.RUnlock() if !found { @@ -164,7 +164,7 @@ func (s *Swarm) fanOut() { } // queue it in the connection's buffer - conn.Outgoing.MsgChan <- msg.Data + conn.Outgoing.MsgChan <- msg.Data() } } } @@ -189,7 +189,7 @@ func (s *Swarm) fanIn(c *conn.Conn) { goto out } - msg := &msg.Message{Peer: c.Peer, Data: data} + msg := msg.New(c.Peer, data) s.Incoming <- msg } } diff --git a/util/util.go b/util/util.go index 9c17fe0e62e..d2a43278963 100644 --- a/util/util.go +++ b/util/util.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" ) @@ -31,6 +32,10 @@ var ErrNotFound = errors.New("Error: Not Found.") // Key is a string representation of multihash for use with maps. type Key string +func (k Key) DatastoreKey() ds.Key { + return ds.NewKey(string(k)) +} + func (k Key) Pretty() string { return b58.Encode([]byte(k)) }