diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 161726f3914..d8b60c303f4 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -784,15 +784,7 @@ func (r *requestedTable) associateSubscriptionID(event model.RegionFeedEvent) Mu } func (r *requestedTable) updateStaleLocks(s *SharedClient, maxVersion uint64) { - for { - old := r.staleLocksVersion.Load() - if old >= maxVersion { - return - } - if r.staleLocksVersion.CompareAndSwap(old, maxVersion) { - break - } - } + util.MustCompareAndMonotonicIncrease(&r.staleLocksVersion, maxVersion) res := r.rangeLock.CollectLockedRangeAttrs(r.postUpdateRegionResolvedTs) log.Warn("event feed finds slow locked ranges", diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 9d4938fc497..0978fd6dd70 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -161,12 +162,10 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err // This start ts maybe greater than the initial start ts of the table sink. // Because in two phase scheduling, the table sink may be advanced to a later ts. // And we can just continue to replicate the table sink from the new start ts. - for { - old := t.receivedSorterResolvedTs.Load() - if startTs <= old || t.receivedSorterResolvedTs.CompareAndSwap(old, startTs) { - break - } - } + util.MustCompareAndMonotonicIncrease(&t.receivedSorterResolvedTs, startTs) + // the barrierTs should always larger than or equal to the checkpointTs, so we need to update + // barrierTs before the checkpointTs is updated. + t.updateBarrierTs(startTs) if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) { t.tableSink.checkpointTs = model.NewResolvedTs(startTs) t.tableSink.resolvedTs = model.NewResolvedTs(startTs) @@ -188,26 +187,14 @@ func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEve } func (t *tableSinkWrapper) updateBarrierTs(ts model.Ts) { - for { - old := t.barrierTs.Load() - if ts <= old || t.barrierTs.CompareAndSwap(old, ts) { - break - } - } + util.MustCompareAndMonotonicIncrease(&t.barrierTs, ts) } func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { - for { - old := t.receivedSorterResolvedTs.Load() - if ts <= old { - return - } - if t.receivedSorterResolvedTs.CompareAndSwap(old, ts) { - if t.state.Load() == tablepb.TableStatePreparing { - t.state.Store(tablepb.TableStatePrepared) - } - return - } + increased := util.CompareAndMonotonicIncrease(&t.receivedSorterResolvedTs, ts) + if increased && t.state.Load() == tablepb.TableStatePreparing { + // Update the state to `Prepared` when the receivedSorterResolvedTs is updated for the first time. + t.state.Store(tablepb.TableStatePrepared) } } diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 442b3c79634..981a8365624 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -156,42 +156,30 @@ type cacheEvents struct { } type statefulRts struct { - flushed model.Ts - unflushed model.Ts + flushed atomic.Uint64 + unflushed atomic.Uint64 +} + +func newStatefulRts(ts model.Ts) (ret statefulRts) { + ret.unflushed.Store(ts) + ret.flushed.Store(ts) + return } func (s *statefulRts) getFlushed() model.Ts { - return atomic.LoadUint64(&s.flushed) + return s.flushed.Load() } func (s *statefulRts) getUnflushed() model.Ts { - return atomic.LoadUint64(&s.unflushed) + return s.unflushed.Load() } -func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { - for { - old := atomic.LoadUint64(&s.unflushed) - if old > unflushed { - return false - } - if atomic.CompareAndSwapUint64(&s.unflushed, old, unflushed) { - break - } - } - return true +func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (ok bool) { + return util.CompareAndIncrease(&s.unflushed, unflushed) } -func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (changed bool) { - for { - old := atomic.LoadUint64(&s.flushed) - if old > flushed { - return false - } - if atomic.CompareAndSwapUint64(&s.flushed, old, flushed) { - break - } - } - return true +func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (ok bool) { + return util.CompareAndIncrease(&s.flushed, flushed) } // logManager manages redo log writer, buffers un-persistent redo logs, calculates @@ -372,7 +360,8 @@ func (m *logManager) GetResolvedTs(span tablepb.Span) model.Ts { // AddTable adds a new table in redo log manager func (m *logManager) AddTable(span tablepb.Span, startTs uint64) { - _, loaded := m.rtsMap.LoadOrStore(span, &statefulRts{flushed: startTs, unflushed: startTs}) + rts := newStatefulRts(startTs) + _, loaded := m.rtsMap.LoadOrStore(span, &rts) if loaded { log.Warn("add duplicated table in redo log manager", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 3569c833681..40e110c93e2 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -271,8 +271,8 @@ func (m *metaManager) initMeta(ctx context.Context) error { zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", resolvedTs)) } - m.metaResolvedTs.unflushed = resolvedTs - m.metaCheckpointTs.unflushed = checkpointTs + m.metaResolvedTs.unflushed.Store(resolvedTs) + m.metaCheckpointTs.unflushed.Store(checkpointTs) if err := m.maybeFlushMeta(ctx); err != nil { return errors.WrapError(errors.ErrRedoMetaInitialize, err) } diff --git a/pkg/util/atomic.go b/pkg/util/atomic.go new file mode 100644 index 00000000000..a0047a83fce --- /dev/null +++ b/pkg/util/atomic.go @@ -0,0 +1,58 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +type numbers interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr | float32 | float64 +} + +type genericAtomic[T numbers] interface { + Load() T + Store(T) + CompareAndSwap(old, new T) bool +} + +// CompareAndIncrease updates the target if the new value is larger than or equal to the old value. +// It returns false if the new value is smaller than the old value. +func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool { + for { + old := target.Load() + if new < old { + return false + } + if new == old || target.CompareAndSwap(old, new) { + return true + } + } +} + +// CompareAndMonotonicIncrease updates the target if the new value is larger than the old value. +// It returns false if the new value is smaller than or equal to the old value. +func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool { + for { + old := target.Load() + if new <= old { + return false + } + if target.CompareAndSwap(old, new) { + return true + } + } +} + +// MustCompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It do nothing +// if the new value is smaller than or equal to the old value. +func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) { + _ = CompareAndMonotonicIncrease(target, new) +} diff --git a/pkg/util/atomic_test.go b/pkg/util/atomic_test.go new file mode 100644 index 00000000000..900720ebb5a --- /dev/null +++ b/pkg/util/atomic_test.go @@ -0,0 +1,106 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMustCompareAndIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + + doIncrease := func() { + for { + select { + case <-ctx.Done(): + return + default: + delta := rand.Int63n(100) + v := target.Load() + delta + MustCompareAndMonotonicIncrease(&target, v) + require.GreaterOrEqual(t, target.Load(), v) + } + } + } + + // Test target increase. + wg.Add(2) + go func() { + defer wg.Done() + doIncrease() + }() + go func() { + defer wg.Done() + doIncrease() + }() + + wg.Add(1) + // Test target never decrease. + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + v := target.Load() - 1 + MustCompareAndMonotonicIncrease(&target, v) + require.Greater(t, target.Load(), v) + } + } + }() + + cancel() + wg.Wait() +} + +func TestCompareAndIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + require.True(t, CompareAndIncrease(&target, 10)) + require.Equal(t, int64(10), target.Load()) + + require.True(t, CompareAndIncrease(&target, 20)) + require.Equal(t, int64(20), target.Load()) + require.False(t, CompareAndIncrease(&target, 19)) + require.Equal(t, int64(20), target.Load()) +} + +func TestCompareAndMonotonicIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + require.False(t, CompareAndMonotonicIncrease(&target, 10)) + require.Equal(t, int64(10), target.Load()) + + require.True(t, CompareAndMonotonicIncrease(&target, 11)) + require.Equal(t, int64(11), target.Load()) + require.False(t, CompareAndMonotonicIncrease(&target, 10)) + require.Equal(t, int64(11), target.Load()) +}