Skip to content

Commit

Permalink
fix: Remove potential P2P deadlock (#1056)
Browse files Browse the repository at this point in the history
Relevant issue(s)
Resolves #1028
Resolves #470
Resolves #1053

Description
The main purpose of this PR is to resolve the potential deadlock. The deadlock can happen if a PushLog cycle doesn't receive the whole missing block history before an error occurs. This leaves the DAG incomplete but there is no clear way, at the moment, for Defra to be aware of that and for it to try to fill in the gap at some point.

To solve the deadlock situation, John and I had discussed implementing a transaction that would cover the whole PushLog cycle. This means that any error occurring during the cycle will discard the transaction and thus leave the DAG unaffected.

As side effects, we add thread safety to the badger transactions and we manage DAG workers on a per PushLog cycle basis.
  • Loading branch information
fredcarle authored and shahzadlone committed Apr 13, 2023
1 parent fd54554 commit a572a9c
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 110 deletions.
1 change: 1 addition & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DB interface {
Blockstore() blockstore.Blockstore

NewTxn(context.Context, bool) (datastore.Txn, error)
NewConcurrentTxn(context.Context, bool) (datastore.Txn, error)
ExecRequest(context.Context, string) *RequestResult
ExecTransactionalRequest(context.Context, string, datastore.Txn) *RequestResult
Close(context.Context)
Expand Down
5 changes: 4 additions & 1 deletion datastore/badger/v3/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (

var log = logger.Logger("badger")

var ErrClosed = errors.New("datastore closed")
var (
ErrClosed = errors.New("datastore closed")
ErrTxnConflict = badger.ErrConflict
)

type Datastore struct {
DB *badger.DB
Expand Down
99 changes: 99 additions & 0 deletions datastore/concurrent_txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package datastore

import (
"context"
"sync"

ds "github.com/ipfs/go-datastore"

"github.com/sourcenetwork/defradb/datastore/iterable"
)

type concurrentTxn struct {
ds.Txn

// Some datastore don't support concurrent operation within a single transaction. `concurrentTxn` with its
// mutex enable those concurrent operations. This was implemented for DefraDB's DAG sync process.
// Since the DAG sync process is highly concurrent and has been made to operate on a single transaction
// to eliminate the potential for deadlock (DAG being left in an incomplete state without a way to obviously
// detect it), we need to add a mutex to ensure thread safety.
mu sync.Mutex
}

// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
var rootTxn ds.Txn
var err error

// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err = iterableTxnStore.NewIterableTransaction(ctx, readonly)
if err != nil {
return nil, err
}
} else {
rootTxn, err = rootstore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
}
}

rootConcurentTxn := &concurrentTxn{Txn: rootTxn}
root := AsDSReaderWriter(rootConcurentTxn)
multistore := MultiStoreFrom(root)
return &txn{
rootConcurentTxn,
multistore,
[]func(){},
[]func(){},
}, nil
}

// Delete implements ds.Delete
func (t *concurrentTxn) Delete(ctx context.Context, key ds.Key) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.Txn.Delete(ctx, key)
}

// Get implements ds.Get
func (t *concurrentTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.Txn.Get(ctx, key)
}

// Has implements ds.Has
func (t *concurrentTxn) Has(ctx context.Context, key ds.Key) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.Txn.Has(ctx, key)
}

// Put implements ds.Put
func (t *concurrentTxn) Put(ctx context.Context, key ds.Key, value []byte) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.Txn.Put(ctx, key, value)
}

// Sync executes the transaction.
func (t *concurrentTxn) Sync(ctx context.Context, prefix ds.Key) error {
return t.Txn.Commit(ctx)
}

// Close discards the transaction.
func (t *concurrentTxn) Close() error {
t.Discard(context.TODO())
return nil
}
5 changes: 5 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error)
return datastore.NewTxnFrom(ctx, db.rootstore, readonly)
}

// NewConcurrentTxn creates a new transaction that supports concurrent API calls.
func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) {
return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, readonly)
}

// Root returns the root datastore.
func (db *db) Root() datastore.RootStore {
return db.rootstore
Expand Down
46 changes: 40 additions & 6 deletions net/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/logging"
)

Expand Down Expand Up @@ -57,6 +58,10 @@ type dagJob struct {
dockey core.DataStoreKey // dockey of our document
fieldName string // field of the subgraph our node belongs to

// Transaction common to a pushlog event. It is used to pass it along to processLog
// and handleChildBlocks within the dagWorker.
txn datastore.Txn

// OLD FIELDS
// root cid.Cid // the root of the branch we are walking down
// rootPrio uint64 // the priority of the root delta
Expand All @@ -67,21 +72,43 @@ type dagJob struct {
// workers without races by becoming the only sender for the store.jobQueue
// channel.
func (p *Peer) sendJobWorker() {
// The DAG sync process for a document is handled over a single transaction, it is possible that a single
// document ends up using all workers. Since the transaction uses a mutex to guarantee thread safety, some
// operations in those workers may temporarily blocked which would leave a concurrent document sync process
// hanging waiting for some workers to free up. To eliviate this problem, we add new workers dedicated to a
// document and discard them once the process is completed.
docWorkerQueue := make(map[string]chan *dagJob)
for {
select {
case <-p.ctx.Done():
close(p.jobQueue)
for _, job := range docWorkerQueue {
close(job)
}
return
case j := <-p.sendJobs:
p.jobQueue <- j

case newJob := <-p.sendJobs:
jobs, ok := docWorkerQueue[newJob.dockey.DocKey]
if !ok {
jobs = make(chan *dagJob, numWorkers)
for i := 0; i < numWorkers; i++ {
go p.dagWorker(jobs)
}
}
jobs <- newJob

case dockey := <-p.closeJob:
if jobs, ok := docWorkerQueue[dockey]; ok {
close(jobs)
delete(docWorkerQueue, dockey)
}
}
}
}

// dagWorker should run in its own goroutine. Workers are launched during
// initialization in New().
func (p *Peer) dagWorker() {
for job := range p.jobQueue {
func (p *Peer) dagWorker(jobs chan *dagJob) {
for job := range jobs {
log.Debug(
p.ctx,
"Starting new job from DAG queue",
Expand All @@ -99,14 +126,14 @@ func (p *Peer) dagWorker() {

children, err := p.processLog(
p.ctx,
job.txn,
job.collection,
job.dockey,
job.node.Cid(),
job.fieldName,
job.node,
job.nodeGetter,
)

if err != nil {
log.ErrorE(
p.ctx,
Expand All @@ -118,9 +145,16 @@ func (p *Peer) dagWorker() {
job.session.Done()
continue
}

if len(children) == 0 {
job.session.Done()
continue
}

go func(j *dagJob) {
p.handleChildBlocks(
j.session,
j.txn,
j.collection,
j.dockey,
j.fieldName,
Expand Down
24 changes: 18 additions & 6 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ type Peer struct {
server *server
p2pRPC *grpc.Server // rpc server over the p2p network

jobQueue chan *dagJob
// Used to close the dagWorker pool for a given document.
// The string represents a dockey.
closeJob chan string
sendJobs chan *dagJob

// outstanding log request currently being processed
Expand Down Expand Up @@ -107,7 +109,7 @@ func NewPeer(
p2pRPC: grpc.NewServer(serverOptions...),
ctx: ctx,
cancel: cancel,
jobQueue: make(chan *dagJob, numWorkers),
closeJob: make(chan string),
sendJobs: make(chan *dagJob),
replicators: make(map[string]map[peer.ID]struct{}),
queuedChildren: newCidSafeSet(),
Expand Down Expand Up @@ -159,11 +161,8 @@ func (p *Peer) Start() error {
}
}()

// start sendJobWorker + NumWorkers goroutines
// start sendJobWorker
go p.sendJobWorker()
for i := 0; i < numWorkers; i++ {
go p.dagWorker()
}

return nil
}
Expand Down Expand Up @@ -601,8 +600,17 @@ func (p *Peer) handleDocUpdateLog(evt events.Update) error {

func (p *Peer) pushLogToReplicators(ctx context.Context, lg events.Update) {
// push to each peer (replicator)
peers := make(map[string]struct{})
for _, peer := range p.ps.ListPeers(lg.DocKey) {
peers[peer.String()] = struct{}{}
}
if reps, exists := p.replicators[lg.SchemaID]; exists {
for pid := range reps {
// Don't push if pid is in the list of peers for the topic.
// It will be handled by the pubsub system.
if _, ok := peers[pid.String()]; ok {
continue
}
go func(peerID peer.ID) {
if err := p.server.pushLog(p.ctx, lg, peerID); err != nil {
log.ErrorE(
Expand All @@ -629,6 +637,10 @@ func (p *Peer) setupDAGService() {
p.DAGService = dag.NewDAGService(p.bserv)
}

func (p *Peer) newDAGSyncerTxn(txn datastore.Txn) ipld.DAGService {
return dag.NewDAGService(blockservice.New(txn.DAGstore(), p.exch))
}

// Session returns a session-based NodeGetter.
func (p *Peer) Session(ctx context.Context) ipld.NodeGetter {
ng := dag.NewSession(ctx, p.DAGService)
Expand Down
Loading

0 comments on commit a572a9c

Please sign in to comment.