Skip to content

Commit

Permalink
Merge pull request #17 from libp2p/fix/encoding
Browse files Browse the repository at this point in the history
encode record-store keys in pubsub
  • Loading branch information
Stebalien authored Nov 16, 2018
2 parents 49aaeee + 87ee770 commit 3af4f46
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namesys
import (
"bytes"
"context"
"encoding/base64"
"errors"
"sync"
"time"
Expand Down Expand Up @@ -48,6 +49,13 @@ type PubsubValueStore struct {
Validator record.Validator
}

// KeyToTopic converts a binary record key to a pubsub topic key.
func KeyToTopic(key string) string {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs.
// Encodes to "/record/base64url(key)"
return "/record/" + base64.RawURLEncoding.EncodeToString([]byte(key))
}

// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub.
// The constructor interface is complicated by the need to bootstrap the pubsub topic.
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
Expand All @@ -69,20 +77,24 @@ func NewPubsubValueStore(ctx context.Context, host p2phost.Host, cr routing.Cont

// Publish publishes an IPNS record through pubsub with default TTL
func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) error {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs.
// Encode to "/record/base64url(key)"
topic := KeyToTopic(key)

p.mx.Lock()
_, bootstraped := p.subs[key]

if !bootstraped {
p.subs[key] = nil
p.mx.Unlock()

bootstrapPubsub(p.ctx, p.cr, p.host, key)
bootstrapPubsub(p.ctx, p.cr, p.host, topic)
} else {
p.mx.Unlock()
}

log.Debugf("PubsubPublish: publish value for key", key)
return p.ps.Publish(key, value)
return p.ps.Publish(topic, value)
}

func (p *PubsubValueStore) isBetter(key string, val []byte) bool {
Expand Down Expand Up @@ -115,15 +127,17 @@ func (p *PubsubValueStore) Subscribe(key string) error {
return nil
}

topic := KeyToTopic(key)

// Ignore the error. We have to check again anyways to make sure the
// record hasn't expired.
//
// Also, make sure to do this *before* subscribing.
p.ps.RegisterTopicValidator(key, func(ctx context.Context, msg *pubsub.Message) bool {
p.ps.RegisterTopicValidator(topic, func(ctx context.Context, msg *pubsub.Message) bool {
return p.isBetter(key, msg.GetData())
})

sub, err := p.ps.Subscribe(key)
sub, err := p.ps.Subscribe(topic)
if err != nil {
p.mx.Unlock()
return err
Expand All @@ -146,7 +160,7 @@ func (p *PubsubValueStore) Subscribe(key string) error {

if !bootstraped {
// TODO: Deal with publish then resolve case? Cancel behaviour changes.
go bootstrapPubsub(ctx, p.cr, p.host, key)
go bootstrapPubsub(ctx, p.cr, p.host, topic)
}
return nil
}
Expand Down

0 comments on commit 3af4f46

Please sign in to comment.