Skip to content

Commit

Permalink
add replicator retry
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Oct 7, 2024
1 parent 2bdbbd4 commit 56d1f43
Show file tree
Hide file tree
Showing 31 changed files with 817 additions and 319 deletions.
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type DB interface {
// Peerstore returns the peerstore where known host information is stored.
//
// It sits within the rootstore returned by [Root].
Peerstore() datastore.DSBatching
Peerstore() datastore.DSReaderWriter

// Headstore returns the headstore where the current heads of the database are stored.
//
Expand Down
12 changes: 6 additions & 6 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions client/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,26 @@

package client

import "github.com/libp2p/go-libp2p/core/peer"
import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
)

// Replicator is a peer that a set of local collections are replicated to.
type Replicator struct {
Info peer.AddrInfo
Schemas []string
Info peer.AddrInfo
Schemas []string
Status ReplicatorStatus
LastStatusChange time.Time
}

// ReplicatorStatus is the status of a Replicator.
type ReplicatorStatus uint8

const (
// ReplicatorStatusActive is the status of a Replicator that is actively replicating.
ReplicatorStatusActive ReplicatorStatus = iota
// ReplicatorStatusInactive is the status of a Replicator that is inactive/offline.
ReplicatorStatusInactive
)
12 changes: 6 additions & 6 deletions datastore/mocks/txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions datastore/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package datastore

import (
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
)

var (
Expand All @@ -31,7 +30,7 @@ type multistore struct {
data DSReaderWriter
enc Blockstore
head DSReaderWriter
peer DSBatching
peer DSReaderWriter
system DSReaderWriter
dag Blockstore
}
Expand All @@ -46,7 +45,7 @@ func MultiStoreFrom(rootstore ds.Datastore) MultiStore {
data: prefix(rootRW, dataStoreKey),
enc: newBlockstore(prefix(rootRW, encStoreKey)),
head: prefix(rootRW, headStoreKey),
peer: namespace.Wrap(rootstore, peerStoreKey),
peer: prefix(rootRW, peerStoreKey),
system: prefix(rootRW, systemStoreKey),
dag: newBlockstore(prefix(rootRW, blockStoreKey)),
}
Expand All @@ -70,7 +69,7 @@ func (ms multistore) Headstore() DSReaderWriter {
}

// Peerstore implements MultiStore.
func (ms multistore) Peerstore() DSBatching {
func (ms multistore) Peerstore() DSReaderWriter {
return ms.peer
}

Expand Down
7 changes: 1 addition & 6 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type MultiStore interface {

// Peerstore is a wrapped root DSReaderWriter as a ds.Batching, embedded into a DSBatching
// under the /peers namespace
Peerstore() DSBatching
Peerstore() DSReaderWriter

// Blockstore is a wrapped root DSReaderWriter as a Blockstore, embedded into a Blockstore
// under the /blocks namespace
Expand Down Expand Up @@ -81,8 +81,3 @@ type IPLDStorage interface {
storage.ReadableStorage
storage.WritableStorage
}

// DSBatching wraps the Batching interface from go-datastore
type DSBatching interface {
ds.Batching
}
18 changes: 16 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
PeerInfoName = Name("peer-info")
// ReplicatorName is the name of the replicator event.
ReplicatorName = Name("replicator")
// ReplicatorFailureName is the name of the replicator failure event.
ReplicatorFailureName = Name("replicator-failure")
// P2PTopicCompletedName is the name of the network p2p topic update completed event.
P2PTopicCompletedName = Name("p2p-topic-completed")
// ReplicatorCompletedName is the name of the replicator completed event.
Expand Down Expand Up @@ -68,8 +70,12 @@ type Update struct {
// also formed this update.
Block []byte

// IsCreate is true if this update is the creation of a new document.
IsCreate bool
// IsRetry is true if this update is a retry of a previously failed update.
IsRetry bool

// Success is a channel that will receive a boolean value indicating if the update was successful.
// It is used during retries.
Success chan bool
}

// Merge is a notification that a merge can be performed up to the provided CID.
Expand Down Expand Up @@ -137,3 +143,11 @@ type Replicator struct {
// and those collections have documents to be replicated.
Docs <-chan Update
}

// ReplicatorFailure is an event that is published when a replicator fails to replicate a document.
type ReplicatorFailure struct {
// PeerID is the id of the peer that failed to replicate the document.
PeerID peer.ID
// DocID is the unique immutable identifier of the document that failed to replicate.
DocID string
}
2 changes: 1 addition & 1 deletion http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (c *Client) Encstore() datastore.Blockstore {
panic("client side database")
}

func (c *Client) Peerstore() datastore.DSBatching {
func (c *Client) Peerstore() datastore.DSReaderWriter {
panic("client side database")
}

Expand Down
2 changes: 1 addition & 1 deletion http/client_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Transaction) Headstore() datastore.DSReaderWriter {
panic("client side transaction")
}

func (c *Transaction) Peerstore() datastore.DSBatching {
func (c *Transaction) Peerstore() datastore.DSReaderWriter {
panic("client side transaction")
}

Expand Down
77 changes: 76 additions & 1 deletion internal/core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ const (
FIELD_ID_SEQ = "/seq/field"
PRIMARY_KEY = "/pk"
DATASTORE_DOC_VERSION_FIELD_ID = "v"
REPLICATOR = "/replicator/id"
P2P_COLLECTION = "/p2p/collection"
REPLICATOR = "/rep/id"
REPLICATOR_RETRY_ID = "/rep/retry/id"
REPLICATOR_RETRY_DOC = "/rep/retry/doc"
)

// rep/retry/1234 -> {nextRetry: 1234, bit: 1}
// rep/retry/bit -> {peerID: 1234}
// rep/retry/doc -> {bitset: 10101010}
// Key is an interface that represents a key in the database.
type Key interface {
ToString() string
Expand Down Expand Up @@ -946,3 +951,73 @@ func bytesPrefixEnd(b []byte) []byte {
// maximal byte string (i.e. already \xff...).
return b
}

type ReplicatorRetryIDKey struct {
PeerID string
}

var _ Key = (*ReplicatorRetryIDKey)(nil)

func NewReplicatorRetryIDKey(peerID string) ReplicatorRetryIDKey {
return ReplicatorRetryIDKey{
PeerID: peerID,
}
}

func NewReplicatorRetryIDKeyFromString(key string) (ReplicatorRetryIDKey, error) {
keyArr := strings.Split(key, "/")
if len(keyArr) != 5 {
return ReplicatorRetryIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key))
}
return NewReplicatorRetryIDKey(keyArr[4]), nil
}

func (k ReplicatorRetryIDKey) ToString() string {
return REPLICATOR_RETRY_ID + "/" + k.PeerID
}

func (k ReplicatorRetryIDKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k ReplicatorRetryIDKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

type ReplicatorRetryDocIDKey struct {
PeerID string
DocID string
}

var _ Key = (*ReplicatorRetryDocIDKey)(nil)

func NewReplicatorRetryDocIDKey(peerID, docID string) ReplicatorRetryDocIDKey {
return ReplicatorRetryDocIDKey{
PeerID: peerID,
DocID: docID,
}
}

func NewReplicatorRetryDocIDKeyFromString(key string) (ReplicatorRetryDocIDKey, error) {
keyArr := strings.Split(key, "/")
if len(keyArr) != 6 {
return ReplicatorRetryDocIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key))
}
return NewReplicatorRetryDocIDKey(keyArr[4], keyArr[5]), nil
}

func (k ReplicatorRetryDocIDKey) ToString() string {
keyString := REPLICATOR_RETRY_DOC + "/" + k.PeerID
if k.DocID != "" {
keyString += "/" + k.DocID
}
return keyString
}

func (k ReplicatorRetryDocIDKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k ReplicatorRetryDocIDKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}
1 change: 0 additions & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,6 @@ func (c *collection) save(
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
IsCreate: isCreate,
}
txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
Expand Down
29 changes: 28 additions & 1 deletion internal/db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package db

import (
"time"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -20,7 +22,24 @@ const (
)

type dbOptions struct {
maxTxnRetries immutable.Option[int]
maxTxnRetries immutable.Option[int]
RetryIntervals []time.Duration
}

// defaultOptions returns the default db options.
func defaultOptions() *dbOptions {
return &dbOptions{
RetryIntervals: []time.Duration{
// exponential backoff retry intervals
time.Second * 30,
time.Minute,
time.Minute * 2,
time.Minute * 4,
time.Minute * 8,
time.Minute * 16,
time.Minute * 32,
},
}
}

// Option is a funtion that sets a config value on the db.
Expand All @@ -32,3 +51,11 @@ func WithMaxRetries(num int) Option {
opts.maxTxnRetries = immutable.Some(num)
}
}

func WithRetryInterval(interval []time.Duration) Option {
return func(opt *dbOptions) {
if len(interval) > 0 {
opt.RetryIntervals = interval
}
}
}
7 changes: 7 additions & 0 deletions internal/db/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package db

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -22,3 +23,9 @@ func TestWithMaxRetries(t *testing.T) {
assert.True(t, d.maxTxnRetries.HasValue())
assert.Equal(t, 10, d.maxTxnRetries.Value())
}

func TestWithRetryInterval(t *testing.T) {
d := dbOptions{}
WithRetryInterval([]time.Duration{time.Minute, time.Hour})(&d)
assert.Equal(t, []time.Duration{time.Minute, time.Hour}, d.RetryIntervals)
}
Loading

0 comments on commit 56d1f43

Please sign in to comment.