diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 32ef7a95374..64075cfbf93 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -905,6 +905,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", workflow, lagDuration.String()), out) require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") cleanupTestData() waitForTargetToCatchup() @@ -923,6 +925,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.Error(t, err) require.Contains(t, out, "failed to sync up replication between the source and target") require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") unlockTargetTable() deleteTestRows() waitForTargetToCatchup() diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 7e800bb28c7..2ee66b8756b 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -448,7 +448,7 @@ func (wf *workflowFetcher) scanWorkflow( // MaxVReplicationTransactionLag estimates the max statement processing lag // between the source and the target across all of the workflow streams. - transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State) + transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State) if transactionReplicationLag > meta.maxVReplicationTransactionLag { meta.maxVReplicationTransactionLag = transactionReplicationLag } @@ -655,7 +655,7 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd // heartbeat, but which has not yet been processed on the target. We don't allow // switching during the copy phase, so in that case we just return a large lag. // All timestamps are in seconds since epoch. -func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { +func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { if trxTs == nil { trxTs = &vttimepb.Time{} } @@ -663,14 +663,23 @@ func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.V if updatedTs == nil { updatedTs = &vttimepb.Time{} } - lastUpdateTime := updatedTs.Seconds + lastUpdatedTime := updatedTs.Seconds + if heartbeatTs == nil { + heartbeatTs = &vttimepb.Time{} + } + lastHeartbeatTime := heartbeatTs.Seconds if state == binlogdatapb.VReplicationWorkflowState_Copying { return math.MaxInt64 } - if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state - (lastTransactionTime == 0 /* No new events after copy */ || - lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) { - lastTransactionTime = lastUpdateTime + // We do NOT update the heartbeat timestamp when we are regularly updating the + // position as we replication transactions (GTIDs). + // When we DO record a heartbeat, we set the updated time to the same value. + // When recording that we are throttled, we update the updated time but NOT + // the heartbeat time. + if lastTransactionTime == 0 /* No replicated events after copy */ || + (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ + lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { + lastTransactionTime = lastUpdatedTime } now := time.Now().Unix() // Seconds since epoch return float64(now - lastTransactionTime)