diff --git a/fetch_test.go b/fetch_test.go index a8eae86..7db6cbe 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -46,8 +46,8 @@ func TestFetchProtocolTrip(t *testing.T) { d2 := &datastore{map[string][]byte{"key": []byte("value2")}} h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) - fetchCheck(t, ctx, h1, h2, "key", []byte("value2")) - fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) + fetchCheck(ctx, t, h1, h2, "key", []byte("value2")) + fetchCheck(ctx, t, h2, h1, "key", []byte("value1")) } func TestFetchProtocolNotFound(t *testing.T) { @@ -66,8 +66,8 @@ func TestFetchProtocolNotFound(t *testing.T) { d2 := &datastore{make(map[string][]byte)} h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) - fetchCheck(t, ctx, h1, h2, "key", nil) - fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) + fetchCheck(ctx, t, h1, h2, "key", nil) + fetchCheck(ctx, t, h2, h1, "key", []byte("value1")) } func TestFetchProtocolRepeated(t *testing.T) { @@ -87,12 +87,12 @@ func TestFetchProtocolRepeated(t *testing.T) { h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) for i := 0; i < 10; i++ { - fetchCheck(t, ctx, h1, h2, "key", nil) - fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) + fetchCheck(ctx, t, h1, h2, "key", nil) + fetchCheck(ctx, t, h2, h1, "key", []byte("value1")) } } -func fetchCheck(t *testing.T, ctx context.Context, +func fetchCheck(ctx context.Context, t *testing.T, requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) { data, err := requester.Fetch(ctx, responder.host.ID(), key) if err != nil { diff --git a/go.mod b/go.mod index 8108e78..0a995f0 100644 --- a/go.mod +++ b/go.mod @@ -2,17 +2,15 @@ module github.com/libp2p/go-libp2p-pubsub-router require ( github.com/gogo/protobuf v1.3.1 - github.com/ipfs/go-cid v0.0.3 github.com/ipfs/go-datastore v0.3.1 github.com/ipfs/go-ipfs-ds-help v0.0.1 - github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v0.0.1 github.com/libp2p/go-libp2p-blankhost v0.1.4 github.com/libp2p/go-libp2p-core v0.2.5 - github.com/libp2p/go-libp2p-pubsub v0.1.1 + github.com/libp2p/go-libp2p-pubsub v0.2.4 github.com/libp2p/go-libp2p-record v0.1.2 - github.com/libp2p/go-libp2p-routing-helpers v0.1.0 github.com/libp2p/go-libp2p-swarm v0.2.2 + golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 ) go 1.12 diff --git a/go.sum b/go.sum index c79b49f..6c4f8f8 100644 --- a/go.sum +++ b/go.sum @@ -61,10 +61,6 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= -github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -137,6 +133,8 @@ github.com/libp2p/go-libp2p-core v0.2.5 h1:iP1PIiIrlRrGbE1fYq2918yBc5NlCH3pFuIPS github.com/libp2p/go-libp2p-core v0.2.5/go.mod h1:6+5zJmKhsf7yHn1RbmYDu08qDUpIUxGdqHuEZckmZOA= github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= +github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= +github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -148,14 +146,10 @@ github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpo github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= github.com/libp2p/go-libp2p-peerstore v0.1.3 h1:wMgajt1uM2tMiqf4M+4qWKVyyFc8SfA+84VV9glZq1M= github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= -github.com/libp2p/go-libp2p-pubsub v0.1.1 h1:phDnQvO3H3hAgaEEQi6yt3LILqIYVXaw05bxzezrEwQ= -github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q= -github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc= -github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q= +github.com/libp2p/go-libp2p-pubsub v0.2.4 h1:O4BcaKpPQ9p82yTBtzIzgDFoOXkqhrQpfcVac3FAywU= +github.com/libp2p/go-libp2p-pubsub v0.2.4/go.mod h1:1tJwAfySvZQ49R9uTVlkwtSTMVLeQQdrnLTJrr91gVc= github.com/libp2p/go-libp2p-record v0.1.2 h1:M50VKzWnmUrk/M5/Dz99qO9Xh4vs8ijsK+7HkJvRP+0= github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGdsU/9W//C8dqjQDk= -github.com/libp2p/go-libp2p-routing-helpers v0.1.0 h1:BaFvpyv8TyhCN7TihawTiKuzeu8/Pyw7ZnMA4IvqIN8= -github.com/libp2p/go-libp2p-routing-helpers v0.1.0/go.mod h1:oUs0h39vNwYtYXnQWOTU5BaafbedSyWCCal3gqHuoOQ= github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ= github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= github.com/libp2p/go-libp2p-secio v0.2.0 h1:ywzZBsWEEz2KNTn5RtzauEDq5RFEefPsttXYwAWqHng= @@ -236,6 +230,8 @@ github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lg github.com/multiformats/go-multiaddr v0.1.0/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE= github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= +github.com/multiformats/go-multiaddr v0.1.2 h1:HWYHNSyyllbQopmVIF5K7JKJugiah+L9/kuZKHbmNdQ= +github.com/multiformats/go-multiaddr v0.1.2/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8= github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= @@ -258,6 +254,8 @@ github.com/multiformats/go-multihash v0.0.9 h1:aoijQXYYl7Xtb2pUUP68R+ys1TlnlR3eX github.com/multiformats/go-multihash v0.0.9/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= +github.com/multiformats/go-varint v0.0.1 h1:TR/0rdQtnNxuN2IhiB639xC3tWM4IUi7DkTBVTdGW/M= +github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= @@ -342,6 +340,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pubsub.go b/pubsub.go index 27d556b..6535f9a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -4,8 +4,8 @@ import ( "bytes" "context" "encoding/base64" + "errors" "fmt" - "github.com/ipfs/go-cid" "sync" "time" @@ -19,7 +19,6 @@ import ( ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" dshelp "github.com/ipfs/go-ipfs-ds-help" - u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" ) @@ -33,7 +32,6 @@ type watchGroup struct { type PubsubValueStore struct { ctx context.Context ds ds.Datastore - cr routing.ContentRouting ps *pubsub.PubSub host host.Host @@ -42,14 +40,9 @@ type PubsubValueStore struct { rebroadcastInitialDelay time.Duration rebroadcastInterval time.Duration - // Map of keys to subscriptions. - // - // If a key is present but the subscription is nil, we've bootstrapped - // but haven't subscribed. - mx sync.Mutex - subs map[string]*pubsub.Subscription - - cancels map[string]context.CancelFunc + // Map of keys to topics + mx sync.Mutex + topics map[string]*topicInfo watchLk sync.Mutex watching map[string]*watchGroup @@ -57,6 +50,17 @@ type PubsubValueStore struct { Validator record.Validator } +type topicInfo struct { + topic *pubsub.Topic + evts *pubsub.TopicEventHandler + sub *pubsub.Subscription + + cancel context.CancelFunc + finished chan struct{} + + dbWriteMx sync.Mutex +} + // KeyToTopic converts a binary record key to a pubsub topic key. func KeyToTopic(key string) string { // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs. @@ -64,49 +68,67 @@ func KeyToTopic(key string) string { return "/record/" + base64.RawURLEncoding.EncodeToString([]byte(key)) } -// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub. -// The constructor interface is complicated by the need to bootstrap the pubsub topic. -// This could be greatly simplified if the pubsub implementation handled bootstrap itself -func NewPubsubValueStore(ctx context.Context, host host.Host, cr routing.ContentRouting, ps *pubsub.PubSub, validator record.Validator) *PubsubValueStore { +// Option is a function that configures a PubsubValueStore during initialization +type Option func(*PubsubValueStore) error + +// NewPubsubValueStore constructs a new ValueStore that gets and receives records through pubsub. +func NewPubsubValueStore(ctx context.Context, host host.Host, ps *pubsub.PubSub, validator record.Validator, opts ...Option) (*PubsubValueStore, error) { psValueStore := &PubsubValueStore{ ctx: ctx, - cr: cr, // needed for pubsub bootstrap - ds: dssync.MutexWrap(ds.NewMapDatastore()), ps: ps, host: host, rebroadcastInitialDelay: 100 * time.Millisecond, rebroadcastInterval: time.Minute * 10, - subs: make(map[string]*pubsub.Subscription), - cancels: make(map[string]context.CancelFunc), + topics: make(map[string]*topicInfo), watching: make(map[string]*watchGroup), Validator: validator, } + for _, opt := range opts { + err := opt(psValueStore) + if err != nil { + return nil, err + } + } + psValueStore.fetch = newFetchProtocol(ctx, host, psValueStore.getLocal) go psValueStore.rebroadcast(ctx) - return psValueStore + return psValueStore, nil } -// Publish publishes an IPNS record through pubsub with default TTL +// PutValue publishes a record through pubsub func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { - // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs. - // Encode to "/record/base64url(key)" - topic := KeyToTopic(key) - if err := p.Subscribe(key); err != nil { return err } log.Debugf("PubsubPublish: publish value for key", key) + p.mx.Lock() + ti, ok := p.topics[key] + p.mx.Unlock() + if !ok { + return errors.New("could not find topic handle") + } + + ti.dbWriteMx.Lock() + defer ti.dbWriteMx.Unlock() + recCmp, err := p.putLocal(ti, key, value) + if err != nil { + return err + } + if recCmp < 0 { + return nil + } + select { - case err := <-p.psPublishChannel(topic, value): + case err := <-p.psPublishChannel(ctx, ti.topic, value): return err case <-ctx.Done(): return ctx.Err() @@ -134,18 +156,17 @@ func (p *PubsubValueStore) compare(key string, val []byte) int { i, err := p.Validator.Select(key, [][]byte{val, old}) if err == nil && i == 0 { return 1 - } else { - return -1 } + return -1 } func (p *PubsubValueStore) Subscribe(key string) error { p.mx.Lock() - // see if we already have a pubsub subscription; if not, subscribe - sub := p.subs[key] - p.mx.Unlock() + defer p.mx.Unlock() - if sub != nil { + // see if we already have a pubsub subscription; if not, subscribe + ti, ok := p.topics[key] + if ok { return nil } @@ -162,34 +183,49 @@ func (p *PubsubValueStore) Subscribe(key string) error { return cmp > 0 || cmp == 0 && src == myID }) - sub, err := p.ps.Subscribe(topic) + ti, err := p.createTopicHandler(topic) if err != nil { return err } - p.mx.Lock() - existingSub, bootstrapped := p.subs[key] - if existingSub != nil { - p.mx.Unlock() - sub.Cancel() - return nil - } - + p.topics[key] = ti ctx, cancel := context.WithCancel(p.ctx) - p.cancels[key] = cancel + ti.cancel = cancel - p.subs[key] = sub - go p.handleSubscription(ctx, sub, key, cancel) - p.mx.Unlock() + go p.handleSubscription(ctx, ti, key) log.Debugf("PubsubResolve: subscribed to %s", key) - if !bootstrapped { - // TODO: Deal with publish then resolve case? Cancel behaviour changes. - go bootstrapPubsub(ctx, p.cr, p.host, topic) + return nil +} + +// createTopicHandler creates an internal topic object. Must be called with p.mx held +func (p *PubsubValueStore) createTopicHandler(topic string) (*topicInfo, error) { + t, err := p.ps.Join(topic) + if err != nil { + return nil, err } - return nil + sub, err := t.Subscribe() + if err != nil { + _ = t.Close() + return nil, err + } + + evts, err := t.EventHandler() + if err != nil { + sub.Cancel() + _ = t.Close() + } + + ti := &topicInfo{ + topic: t, + evts: evts, + sub: sub, + finished: make(chan struct{}, 1), + } + + return ti, nil } func (p *PubsubValueStore) rebroadcast(ctx context.Context) { @@ -205,20 +241,21 @@ func (p *PubsubValueStore) rebroadcast(ctx context.Context) { for { select { case <-ticker.C: - var keys []string p.mx.Lock() - keys = make([]string, 0, len(p.subs)) - for p, _ := range p.subs { - keys = append(keys, p) + keys := make([]string, 0, len(p.topics)) + topics := make([]*topicInfo, 0, len(p.topics)) + for k, ti := range p.topics { + keys = append(keys, k) + topics = append(topics, ti) } p.mx.Unlock() - if len(keys) > 0 { - for _, k := range keys { + if len(topics) > 0 { + for i, k := range keys { val, err := p.getLocal(k) if err == nil { - topic := KeyToTopic(k) + topic := topics[i].topic select { - case <-p.psPublishChannel(topic, val): + case <-p.psPublishChannel(ctx, topic, val): case <-ctx.Done(): return } @@ -231,14 +268,26 @@ func (p *PubsubValueStore) rebroadcast(ctx context.Context) { } } -func (p *PubsubValueStore) psPublishChannel(topic string, value []byte) chan error { +func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.Topic, value []byte) chan error { done := make(chan error, 1) go func() { - done <- p.ps.Publish(topic, value) + done <- topic.Publish(ctx, value) }() return done } +// putLocal tries to put the key-value pair into the local datastore +// Requires that the ti.dbWriteMx is held when called +// Returns true if the value is better then what is currently in the datastore +// Returns any errors from putting the data in the datastore +func (p *PubsubValueStore) putLocal(ti *topicInfo, key string, value []byte) (int, error) { + cmp := p.compare(key, value) + if cmp > 0 { + return cmp, p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), value) + } + return cmp, nil +} + func (p *PubsubValueStore) getLocal(key string) ([]byte, error) { val, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(key))) if err != nil { @@ -339,7 +388,7 @@ func (p *PubsubValueStore) GetSubscriptions() []string { defer p.mx.Unlock() var res []string - for sub := range p.subs { + for sub := range p.topics { res = append(res, sub) } @@ -359,29 +408,39 @@ func (p *PubsubValueStore) Cancel(name string) (bool, error) { } p.watchLk.Unlock() - sub, ok := p.subs[name] + ti, ok := p.topics[name] if ok { - sub.Cancel() - delete(p.subs, name) - } - - if cancel, ok := p.cancels[name]; ok { - cancel() - delete(p.cancels, name) + p.closeTopic(name, ti) + <-ti.finished } return ok, nil } -func (p *PubsubValueStore) handleSubscription(ctx context.Context, sub *pubsub.Subscription, key string, cancel func()) { - defer sub.Cancel() - defer cancel() +// closeTopic must be called under the PubSubValueStore's mutex +func (p *PubsubValueStore) closeTopic(key string, ti *topicInfo) { + ti.cancel() + ti.sub.Cancel() + ti.evts.Cancel() + _ = ti.topic.Close() + delete(p.topics, key) +} + +func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo, key string) { + defer func() { + close(ti.finished) + + p.mx.Lock() + defer p.mx.Unlock() + + p.closeTopic(key, ti) + }() newMsg := make(chan []byte) go func() { defer close(newMsg) for { - data, err := p.handleNewMsgs(ctx, sub, key) + data, err := p.handleNewMsgs(ctx, ti.sub, key) if err != nil { return } @@ -397,7 +456,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, sub *pubsub.S go func() { defer close(newPeerData) for { - data, err := p.handleNewPeer(ctx, sub, key) + data, err := p.handleNewPeer(ctx, ti.evts, key) if err == nil { if data != nil { select { @@ -433,8 +492,10 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, sub *pubsub.S return } - if p.compare(key, data) > 0 { - err := p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), data) + ti.dbWriteMx.Lock() + recCmp, err := p.putLocal(ti, key, data) + ti.dbWriteMx.Unlock() + if recCmp > 0 { if err != nil { log.Warningf("PubsubResolve: error writing update for %s: %s", key, err) } @@ -454,7 +515,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr return msg.GetData(), nil } -func (p *PubsubValueStore) handleNewPeer(ctx context.Context, sub *pubsub.Subscription, key string) ([]byte, error) { +func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -464,7 +525,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, sub *pubsub.Subscr var pid peer.ID for { - peerEvt, err := sub.NextPeerEvent(ctx) + peerEvt, err := peerEvtHandler.NextPeerEvent(ctx) if err != nil { if err != context.Canceled { log.Warningf("PubsubNewPeer: subscription error in %s: %s", key, err.Error()) @@ -497,60 +558,16 @@ func (p *PubsubValueStore) notifyWatchers(key string, data []byte) { } } -// rendezvous with peers in the name topic through provider records -// Note: rendezvous/boostrap should really be handled by the pubsub implementation itself! -func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host host.Host, name string) { - // TODO: consider changing this to `pubsub:...` - topic := "floodsub:" + name - hash := u.Hash([]byte(topic)) - rz := cid.NewCidV1(cid.Raw, hash) - - go func() { - err := cr.Provide(ctx, rz, true) - if err != nil { - log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) - } - - for { - select { - case <-time.After(8 * time.Hour): - err := cr.Provide(ctx, rz, true) - if err != nil { - log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) - } - case <-ctx.Done(): - return - } - } - }() - - rzctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - wg := &sync.WaitGroup{} - for pi := range cr.FindProvidersAsync(rzctx, rz, 10) { - if pi.ID == host.ID() { - continue - } - wg.Add(1) - go func(pi peer.AddrInfo) { - defer wg.Done() - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - err := host.Connect(ctx, pi) - if err != nil { - log.Debugf("Error connecting to pubsub peer %s: %s", pi.ID, err.Error()) - return - } - - // delay to let pubsub perform its handshake - time.Sleep(time.Millisecond * 250) - - log.Debugf("Connected to pubsub peer %s", pi.ID) - }(pi) +func WithRebroadcastInterval(duration time.Duration) Option { + return func(store *PubsubValueStore) error { + store.rebroadcastInterval = duration + return nil } +} - wg.Wait() +func WithRebroadcastInitialDelay(duration time.Duration) Option { + return func(store *PubsubValueStore) error { + store.rebroadcastInitialDelay = duration + return nil + } } diff --git a/pubsub_test.go b/pubsub_test.go index 4c9dd39..b632548 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -3,16 +3,18 @@ package namesys import ( "bytes" "context" + "fmt" "testing" "time" + "golang.org/x/sync/errgroup" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" bhost "github.com/libp2p/go-libp2p-blankhost" pubsub "github.com/libp2p/go-libp2p-pubsub" record "github.com/libp2p/go-libp2p-record" - rhelper "github.com/libp2p/go-libp2p-routing-helpers" swarmt "github.com/libp2p/go-libp2p-swarm/testing" ) @@ -79,7 +81,10 @@ func setupTest(ctx context.Context, t *testing.T) (*PubsubValueStore, []*PubsubV t.Fatal(err) } - vss[i] = NewPubsubValueStore(ctx, hosts[i], rhelper.Null{}, fs, testValidator{}) + vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{}) + if err != nil { + t.Fatal(err) + } } pub := vss[0] vss = vss[1:] @@ -121,7 +126,10 @@ func TestEarlyPublish(t *testing.T) { t.Fatal(err) } - vss[i] = NewPubsubValueStore(ctx, hosts[i], rhelper.Null{}, fs, testValidator{}) + vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{}) + if err != nil { + t.Fatal(err) + } } pub := vss[0] @@ -309,6 +317,65 @@ func TestWatch(t *testing.T) { } } +func TestPutMany(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := newNetHosts(ctx, t, 5) + vss := make([]*PubsubValueStore, len(hosts)) + for i := 0; i < len(vss); i++ { + fs, err := pubsub.NewFloodSub(ctx, hosts[i]) + if err != nil { + t.Fatal(err) + } + + vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{}) + if err != nil { + t.Fatal(err) + } + } + + for i := 1; i < len(hosts); i++ { + connect(t, hosts[0], hosts[i]) + } + + const numRuns = 10 + const numRoutines = 1000 // Note: if changing the numRoutines also change the number of digits in the fmtString + const fmtString = "%s-%04d" + const baseKey = "/namespace/key" + + for i := 0; i < numRuns; i++ { + key := fmt.Sprintf("%s/%d", baseKey, i) + var eg errgroup.Group + for j := 0; j < numRoutines; j++ { + rtNum := j + eg.Go(func() error { + return vss[0].PutValue(ctx, key, []byte(fmt.Sprintf(fmtString, key, rtNum))) + }) + } + + if err := eg.Wait(); err != nil { + t.Fatal(err) + } + + finalValue := []byte(fmt.Sprintf(fmtString, key, numRoutines-1)) + for j := 0; j < len(hosts); j++ { + for { + v, err := vss[j].GetValue(ctx, key) + if err != routing.ErrNotFound { + if err != nil { + t.Fatal(err) + } + if bytes.Equal(v, finalValue) { + break + } + } + time.Sleep(time.Millisecond * 100) + } + } + } +} + func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) { t.Helper() _, err := vs.GetValue(ctx, key)