-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add replicator retry #3107
feat: Add replicator retry #3107
Conversation
56d1f43
to
453f002
Compare
// exponential backoff retry intervals | ||
time.Second * 30, | ||
time.Minute, | ||
time.Minute * 2, | ||
time.Minute * 4, | ||
time.Minute * 8, | ||
time.Minute * 16, | ||
time.Minute * 32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Interesting strat! Do we want the retry intervals to be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a ticket to make the retry configurable #3073. It is already configurable if devs use the go api though.
453f002
to
a64e3dc
Compare
nodeIndex := i | ||
if action.NodeID.HasValue() { | ||
nodeIndex = action.NodeID.Value() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info: When I fix the getNodes
here #3076 won't have to do this hack anymore, I am guessing this is because the nodeIndex (0
) is wrong when their is a non-zero nodeID specified correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's pretty much it.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #3107 +/- ##
===========================================
- Coverage 80.12% 80.00% -0.12%
===========================================
Files 353 353
Lines 28175 28466 +291
===========================================
+ Hits 22574 22772 +198
- Misses 4019 4081 +62
- Partials 1582 1613 +31
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 12 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good, but I have a handful of important todos for you whilst I continue my review.
func (s *server) pushLog(evt event.Update, pid peer.ID) error { | ||
func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) { | ||
defer func() { | ||
if err != nil && !evt.IsRetry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Please document why we are not publishing if it is a retry event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: the failure event could be used to queue another retry instead of using a channel on the event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: the failure event could be used to queue another retry instead of using a channel on the event
The retry interval is set per peer and not per update. This is why the channel on the event is used to say if the retry was successful or not.
Creator: p.host.ID().String(), | ||
Block: evt.Block, | ||
} | ||
if !evt.IsRetry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Please document why we are not publishing if it is a retry event.
p.server.mu.Lock() | ||
reps, exists := p.server.replicators[lg.SchemaRoot] | ||
p.server.mu.Unlock() | ||
|
||
if exists { | ||
for pid := range reps { | ||
// Don't push if pid is in the list of peers for the topic. | ||
// It will be handled by the pubsub system. | ||
if _, ok := peers[pid.String()]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why has this been removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the pubsub system offers less guarantees than the direct to peer replicator system. There is no real downsides to having both reach the receiving peer. Also, if we rely on the pubsub system on updates, it will increase the difficulty of keeping track of failures for retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks for the explanation :)
net/server.go
Outdated
if s.peer.ps == nil { // skip if we aren't running with a pubsub net | ||
return nil | ||
} | ||
s.mu.Lock() | ||
t, ok := s.topics[topic] | ||
s.mu.Unlock() | ||
if !ok { | ||
err := s.addPubSubTopic(topic, false, nil) | ||
subscribe := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The below is a little simpler IMO.
subscribe := topic != req.SchemaRoot && !s.hasPubSubTopic(req.SchemaRoot)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It's like this from a change I had made and didn't clean it up. Thanks for pointing it out.
@@ -585,3 +585,73 @@ func TestP2POneToOneReplicatorOrderIndependentDirectCreate(t *testing.T) { | |||
|
|||
testUtils.ExecuteTestCase(t, test) | |||
} | |||
|
|||
func TestP2POneToOneReplicator_ManyDocsWithTargetNodeTemporarilyOffline_ShouldSucceed(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: Thanks for this test, and the new utils stuff required to get it working, it is very easy to read.
internal/db/db.go
Outdated
|
||
retryIntervals []time.Duration | ||
retryChan chan event.ReplicatorFailure | ||
retryDone chan retryStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: db
is quite a busy type, it might be worth rolling these 3 props up into a new type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
internal/db/p2p_replicator.go
Outdated
"github.com/sourcenetwork/defradb/internal/merkle/clock" | ||
) | ||
|
||
const ( | ||
retryLoopInterval = 2 * time.Second | ||
retryTimeout = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Without chasing down the usage of these properties it is quite hard to guess their difference. Please add some documentation to them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
internal/db/p2p_replicator.go
Outdated
} | ||
} | ||
|
||
func (db *db) handleReplicatorFailure(ctx context.Context, r event.ReplicatorFailure) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: It looks like the stuff done in this function (and child calls) should be protected by an internal transaction, otherwise we will have partial successes.
suggestion: When introducing the txn, I suggest not hosting the child functions on db
as it makes it quite easy to accidentally bypass the txn, especially in the short term when we have no tests protecting against this.
Same comments apply to retryReplicators
, and the r.Success
half of handleCompletedReplicatorRetry
.
internal/db/p2p_replicator.go
Outdated
if err != nil { | ||
return err | ||
} | ||
if !exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Inverting this if would remove a level of indentation (and complexity) from the bulk of this function:
if exists {
return
}
r := ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
rInfo := retryInfo{} | ||
err = cbor.Unmarshal(result.Value, &rInfo) | ||
if err != nil { | ||
log.ErrorContextE(ctx, "Failed to unmarshal replicator retry info", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: If this error is ever hit, it seems likely that it will not be the only record (programming error). I worry that if we do not delete the record in this block we will have a very rapidly growing database and log file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the bother, overall the PR looks good but I think the retry logic needs a little bit of work/documentation before it can be merged.
internal/db/db.go
Outdated
|
||
retryIntervals []time.Duration | ||
retryChan chan event.ReplicatorFailure | ||
retryDone chan retryStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Please document the retry channels, it is not easy to understand how they work atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
return db.Peerstore().Put(ctx, key.ToDS(), b) | ||
} | ||
|
||
func (db *db) retryReplicator(ctx context.Context, peerID string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: The stuff within this function should be protected by a transaction scoped to this function, otherwise it can partially succeed.
thought: Partial success in this function may be desirable, in which case my preference is still to make that explicit through code, if not please document it.
internal/db/p2p_replicator.go
Outdated
log.ErrorContextE(ctx, "Failed to delete retry docID", err) | ||
} | ||
} | ||
db.retryDone <- retryStatus{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: I cannot guess why retryDone
was handled via a channel - it looks like it is only ever written to from this function and atm could be changed to a simple function call.
If it has a good reason to be done like this please document it.
todo: It looks like you have a concurrency bug here, because you are processing 'done' like this, in the loop that also reads from the retryChan
channel, you appear to have created a situation where retryDone
may be written to, then a new retryChan
record is written to for the same peer and processed (overwriting the retryInfo
record), and then the retryDone
chan-item is processed, deleting the re-written retryInfo
and preventing it from being queried and it's docs retried.
switch active { | ||
case true: | ||
rep.Status = client.ReplicatorStatusActive | ||
if rep.Status == client.ReplicatorStatusInactive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: this will never evaluate to true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
✅
} | ||
case false: | ||
rep.Status = client.ReplicatorStatusInactive | ||
if rep.Status == client.ReplicatorStatusActive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: this will never evaluate to true
func (s *server) pushLog(evt event.Update, pid peer.ID) error { | ||
func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) { | ||
defer func() { | ||
if err != nil && !evt.IsRetry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: the failure event could be used to queue another retry instead of using a channel on the event
todo: you need to change the marked issue number in the description, it's linked to #3070 which is not accurate. |
5d16912
to
c603799
Compare
01d7ff5
to
ccfbc27
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: I found this much easier to read, the retry stuff seems more linear, and the documentation is very clear.
It also looks like it will make it fairly straightforward to adjust if we want to change the way new retries and retry processing are handled/queued in the future if we chose to.
Thanks Fred :)
Relevant issue(s)
Resolves #3072
Description
This PR adds a replication retry functionality to the database. It uses an exponential backoff until 32 minutes is reached and then it will continuously retry every 32 minutes until the inactive peer is removed from the list of replicators.
Tasks
How has this been tested?
make test
Specify the platform(s) on which this was tested: