-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add configurable max retries
, increase read timeout to 5 minutes
#121
Conversation
// Can finish sync once we've synced to the stop position, or finished the VStream COPY phase | ||
canFinishSync := false | ||
resultCount := 0 | ||
|
||
var fields []*query.Field | ||
|
||
for { | ||
res, err := c.Recv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattlord I found that on the "no more VGTIDs" case, the process was blocking here. So I opted to just keep the regular context.WithTimeout
(instead of a timer like we discussed), but increased the wait time from 1 minute to 5 minutes.
After testing locally on a similarly higher traffic table & database, 5 minutes was more successful than the 1 minute timeout.
@@ -488,6 +506,17 @@ func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard | |||
}}, | |||
}, | |||
} | |||
|
|||
if lastKnownPk != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a bug: if req.Vgtid.ShardGtids[0].TablePKs
is non nil, then copy phase is always started. Make sure to leave it nil if we're NOT intending to start a copy phase (lastKnownPk is nil)
p.Logger.Log(LOGLEVEL_INFO, preamble+"Peeking to see if there's any new GTIDs") | ||
stopPosition, lcErr := p.getStopCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType) | ||
if lcErr != nil { | ||
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position: %+v", lcErr)) | ||
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position") | ||
} | ||
if stopPosition == "" { | ||
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position, was empty string: %+v", stopPosition)) | ||
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled:
- Fetching the desired stop position
- Validating the desired stop position
Out of the for loop so we aren't "advancing the desired stop position" on any kind of error. This helps the sync finish faster.
Keyspace: tc.Keyspace, | ||
Position: vgtid.Gtid, | ||
tc.Position = vgtid.Gtid | ||
if vgtid.TablePKs != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to check the len() here as it could be empty and then we get a panic on the next line as there's no element at position 0.
} else { | ||
tc.LastKnownPk = nil | ||
} | ||
} else { | ||
tc.LastKnownPk = nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could set it to nil before this outer block (after tc.Position = vgtid.Gtid
) to simplify the code/branching.
This PR makes a few improvements:
max retries
field.