Skip to content

Commit

Permalink
Correct vreplication trx lag calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
1 parent a239dc6 commit 45adff7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
4 changes: 4 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
waitForTargetToCatchup()

Expand All @@ -923,6 +925,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
unlockTargetTable()
deleteTestRows()
waitForTargetToCatchup()
Expand Down
23 changes: 16 additions & 7 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (wf *workflowFetcher) scanWorkflow(

// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State)
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
Expand Down Expand Up @@ -655,22 +655,31 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd
// heartbeat, but which has not yet been processed on the target. We don't allow
// switching during the copy phase, so in that case we just return a large lag.
// All timestamps are in seconds since epoch.
func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
if trxTs == nil {
trxTs = &vttimepb.Time{}
}
lastTransactionTime := trxTs.Seconds
if updatedTs == nil {
updatedTs = &vttimepb.Time{}
}
lastUpdateTime := updatedTs.Seconds
lastUpdatedTime := updatedTs.Seconds
if heartbeatTs == nil {
heartbeatTs = &vttimepb.Time{}
}
lastHeartbeatTime := heartbeatTs.Seconds
if state == binlogdatapb.VReplicationWorkflowState_Copying {
return math.MaxInt64
}
if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state
(lastTransactionTime == 0 /* No new events after copy */ ||
lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) {
lastTransactionTime = lastUpdateTime
// We do NOT update the heartbeat timestamp when we are regularly updating the
// position as we replication transactions (GTIDs).
// When we DO record a heartbeat, we set the updated time to the same value.
// When recording that we are throttled, we update the updated time but NOT
// the heartbeat time.
if lastTransactionTime == 0 /* No replicated events after copy */ ||
(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
lastTransactionTime = lastUpdatedTime
}
now := time.Now().Unix() // Seconds since epoch
return float64(now - lastTransactionTime)
Expand Down

0 comments on commit 45adff7

Please sign in to comment.