-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Remove potential P2P deadlock (#1056)
Relevant issue(s) Resolves #1028 Resolves #470 Resolves #1053 Description The main purpose of this PR is to resolve the potential deadlock. The deadlock can happen if a PushLog cycle doesn't receive the whole missing block history before an error occurs. This leaves the DAG incomplete but there is no clear way, at the moment, for Defra to be aware of that and for it to try to fill in the gap at some point. To solve the deadlock situation, John and I had discussed implementing a transaction that would cover the whole PushLog cycle. This means that any error occurring during the cycle will discard the transaction and thus leave the DAG unaffected. As side effects, we add thread safety to the badger transactions and we manage DAG workers on a per PushLog cycle basis.
- Loading branch information
Showing
9 changed files
with
279 additions
and
110 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright 2022 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package datastore | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
ds "github.com/ipfs/go-datastore" | ||
|
||
"github.com/sourcenetwork/defradb/datastore/iterable" | ||
) | ||
|
||
type concurrentTxn struct { | ||
ds.Txn | ||
|
||
// Some datastore don't support concurrent operation within a single transaction. `concurrentTxn` with its | ||
// mutex enable those concurrent operations. This was implemented for DefraDB's DAG sync process. | ||
// Since the DAG sync process is highly concurrent and has been made to operate on a single transaction | ||
// to eliminate the potential for deadlock (DAG being left in an incomplete state without a way to obviously | ||
// detect it), we need to add a mutex to ensure thread safety. | ||
mu sync.Mutex | ||
} | ||
|
||
// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls | ||
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) { | ||
var rootTxn ds.Txn | ||
var err error | ||
|
||
// check if our datastore natively supports iterable transaction, transactions or batching | ||
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { | ||
rootTxn, err = iterableTxnStore.NewIterableTransaction(ctx, readonly) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
rootTxn, err = rootstore.NewTransaction(ctx, readonly) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
rootConcurentTxn := &concurrentTxn{Txn: rootTxn} | ||
root := AsDSReaderWriter(rootConcurentTxn) | ||
multistore := MultiStoreFrom(root) | ||
return &txn{ | ||
rootConcurentTxn, | ||
multistore, | ||
[]func(){}, | ||
[]func(){}, | ||
}, nil | ||
} | ||
|
||
// Delete implements ds.Delete | ||
func (t *concurrentTxn) Delete(ctx context.Context, key ds.Key) error { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
return t.Txn.Delete(ctx, key) | ||
} | ||
|
||
// Get implements ds.Get | ||
func (t *concurrentTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
return t.Txn.Get(ctx, key) | ||
} | ||
|
||
// Has implements ds.Has | ||
func (t *concurrentTxn) Has(ctx context.Context, key ds.Key) (bool, error) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
return t.Txn.Has(ctx, key) | ||
} | ||
|
||
// Put implements ds.Put | ||
func (t *concurrentTxn) Put(ctx context.Context, key ds.Key, value []byte) error { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
return t.Txn.Put(ctx, key, value) | ||
} | ||
|
||
// Sync executes the transaction. | ||
func (t *concurrentTxn) Sync(ctx context.Context, prefix ds.Key) error { | ||
return t.Txn.Commit(ctx) | ||
} | ||
|
||
// Close discards the transaction. | ||
func (t *concurrentTxn) Close() error { | ||
t.Discard(context.TODO()) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.