Skip to content

Commit

Permalink
fix conflict.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed May 8, 2024
1 parent 24ba12b commit 0637308
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
*/
private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>();

// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
// when change max read position, the count will +1. Take snapshot will reset the count.
private final AtomicLong changeMaxReadPositionCount = new AtomicLong();

private final LongAdder txnCommittedCounter = new LongAdder();

Expand Down Expand Up @@ -432,15 +432,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
}

private void takeSnapshotByChangeTimes() {
if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) {
this.changeMaxReadPositionAndAddAbortTimes.set(0);
if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) {
this.changeMaxReadPositionCount.set(0);
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
}

private void takeSnapshotByTimeout() {
if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
this.changeMaxReadPositionAndAddAbortTimes.set(0);
if (changeMaxReadPositionCount.get() > 0) {
this.changeMaxReadPositionCount.set(0);
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
this.timer.newTimeout(TopicTransactionBuffer.this,
Expand Down Expand Up @@ -474,7 +474,7 @@ void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
this.maxReadPosition = newPosition;
if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
this.changeMaxReadPositionCount.getAndIncrement();
if (!disableCallback) {
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition);
}
Expand Down

0 comments on commit 0637308

Please sign in to comment.