Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry #21816

Merged
merged 11 commits into from
Jan 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,45 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import lombok.Value;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* <p>
* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
* <p>
* This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>The heap is updated and kept sorted when a cursor is updated.
* <a href="https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation">here</a>
* <p>
* The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

/**
* This field is incremented everytime the cursor information is updated.
*/
private volatile long version;
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

@Value
public static class CursorInfo {
ManagedCursor cursor;
PositionImpl position;

/**
* Cursor info's version.
* <p>
* Use {@link DataVersion#compareVersions(long, long)} to compare between two versions,
* since it rolls over to 0 once reaching Long.MAX_VALUE
*/
long version;
}

private static class Item {
final ManagedCursor cursor;
PositionImpl position;
Expand All @@ -56,10 +76,66 @@ private static class Item {
}
}

public ManagedCursorContainer() {
/**
* Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
*/
public static final class DataVersion {
private DataVersion() {}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

/**
* Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
* <p>
asafm marked this conversation as resolved.
Show resolved Hide resolved
* Use {@link DataVersion#incrementVersion(long)} to increment the versions. The assumptions
* is that metric versios are compared with close time proximity one to another, hence,
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
* they are expected not close to each other in terms of distance, hence we don't
* expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
* to know which one is a later version in case the furthest rolls over to beyond 0. We
* assume the shortest distance between them dictates that.
* <p>
* @param v1 First version to compare
* @param v2 Second version to compare
* @return the value {@code 0} if {@code v1 == v2};
* a value less than {@code 0} if {@code v1 < v2}; and
* a value greater than {@code 0} if {@code v1 > v2}
*/
public static int compareVersions(long v1, long v2) {
if (v1 == v2) {
return 0;
}

// 0-------v1--------v2--------MAX_LONG
if (v2 > v1) {
long distance = v2 - v1;
long wrapAroundDistance = (Long.MAX_VALUE - v2) + v1;
if (distance < wrapAroundDistance) {
return -1;
} else {
return 1;
}

// 0-------v2--------v1--------MAX_LONG
} else {
long distance = v1 - v2;
long wrapAroundDistance = (Long.MAX_VALUE - v1) + v2;
if (distance < wrapAroundDistance) {
return 1; // v1 is bigger
} else {
return -1; // v2 is bigger
}
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
}

public static long incrementVersion(long existingVersion) {
if (existingVersion == Long.MAX_VALUE) {
return 0;
} else {
return existingVersion + 1;
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
}
}

public ManagedCursorContainer() {}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList<>();

Expand All @@ -80,6 +156,7 @@ public ManagedCursorContainer() {
* @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
* tracked for the slowest reader.
*/
@SuppressWarnings("NonAtomicOperationOnVolatileField") // We have rw lock for that
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
public void add(ManagedCursor cursor, Position position) {
long stamp = rwLock.writeLock();
try {
Expand All @@ -94,6 +171,7 @@ public void add(ManagedCursor cursor, Position position) {
if (cursor.isDurable()) {
durableCursorCount++;
}
version = DataVersion.incrementVersion(version);
} finally {
rwLock.unlockWrite(stamp);
}
Expand All @@ -109,6 +187,7 @@ public ManagedCursor get(String name) {
}
}

@SuppressWarnings("NonAtomicOperationOnVolatileField") // we have rw lock for that
public boolean removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Expand All @@ -129,6 +208,7 @@ public boolean removeCursor(String name) {
if (item.cursor.isDurable()) {
durableCursorCount--;
}
version = DataVersion.incrementVersion(version);
return true;
} else {
return false;
Expand All @@ -150,6 +230,7 @@ public boolean removeCursor(String name) {
* @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
* update).
*/
@SuppressWarnings("NonAtomicOperationOnVolatileField") // we have rw lock for that
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
requireNonNull(cursor);

Expand All @@ -162,6 +243,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi

PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.incrementVersion(version);

if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
Expand Down Expand Up @@ -204,6 +286,24 @@ public ManagedCursor getSlowestReader() {
}
}

/**
* @return Returns the CursorInfo for the cursor with the oldest position,
* or null if there aren't any tracked cursors
*/
public CursorInfo getCursorWithOldestPosition() {
long stamp = rwLock.readLock();
try {
if (heap.isEmpty()) {
return null;
} else {
Item item = heap.get(0);
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
return new CursorInfo(item.cursor, item.position, version);
}
} finally {
rwLock.unlockRead(stamp);
}
}

/**
* Check whether there are any cursors.
* @return true is there are no cursors and false if there are
Expand Down
Loading
Loading