Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make cursor properties support modify single value concurrently. #17164

Merged
merged 26 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c78f290
Make cursor properties support concurrent modify single value.
coderzc Aug 18, 2022
fb1106f
Use special prefix prevent internal property cover.
coderzc Aug 18, 2022
023424e
check key
coderzc Aug 18, 2022
4cce776
make future synchronize
coderzc Aug 19, 2022
dfc8649
Merge branch 'master' into improve_cursor_property
coderzc Aug 23, 2022
367ed59
Keep method async
coderzc Aug 23, 2022
fe3f997
Merge branch 'master' into improve_cursor_property
coderzc Aug 23, 2022
bd1f9ab
Merge branch 'master' into improve_cursor_property
coderzc Aug 29, 2022
0d871ef
Merge branch 'master' into improve_cursor_property
coderzc Aug 30, 2022
fe1f89f
Merge branch 'master' into improve_cursor_property
coderzc Aug 31, 2022
eca7aa8
fix code style
coderzc Sep 2, 2022
8ba255d
use non-reentrant lock
coderzc Sep 5, 2022
db562bd
Merge branch 'master' into improve_cursor_property
coderzc Sep 6, 2022
d57b43d
Merge branch 'master' into improve_cursor_property
coderzc Sep 9, 2022
a7a21ac
Remove internal properties support.
coderzc Sep 9, 2022
53fd48c
Use `HashMap` instead of `TreeMap`.
coderzc Sep 10, 2022
7d42521
Ensure can unlock when an exception occurs during executing `asyncUpd…
coderzc Sep 12, 2022
0d27be0
Add timeout Handle
coderzc Sep 12, 2022
2224515
Serial execution operation instead of use lock.
coderzc Sep 13, 2022
3cec436
improve code
coderzc Sep 14, 2022
3da8e12
Remove `asyncGetCursorInfo` and cache `ManagedCursorInfo`.
coderzc Jul 25, 2022
ad83c9d
Throw `BadVersionException` to caller.
coderzc Sep 16, 2022
292ed62
improve some comment
coderzc Sep 16, 2022
9da0728
Add some notes.
coderzc Sep 16, 2022
4e64c4b
Package `updateCursorLedgerStat` method to update cursorLedgerStat an…
coderzc Sep 17, 2022
86a2c85
Merge branch 'master' into improve_cursor_property
coderzc Sep 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,35 @@ enum IndividualDeletedEntries {
*/
Map<String, String> getCursorProperties();

/**
* Updates the properties.
* @param cursorProperties
* @return a handle to the result of the operation
*/
default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return CompletableFuture.completedFuture(null);
}
/**
* Add a property associated with the cursor.
*
* Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
* if there are concurrent modification and store data has changed.
*
* @return a handle to the result of the operation
*/
CompletableFuture<Void> putCursorProperty(String key, String value);
coderzc marked this conversation as resolved.
Show resolved Hide resolved

/**
* Set all properties associated with the cursor.
*
* Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
* if there are concurrent modification and store data has changed.
*
* @return a handle to the result of the operation
*/
CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties);

/**
* Remove a property associated with the cursor.
*
* Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
* if there are concurrent modification and store data has changed.
*
* @return a handle to the result of the operation
*/
CompletableFuture<Void> removeCursorProperty(String key);

/**
* Add a property associated with the last stored position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;

private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;

Expand Down Expand Up @@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
// Stat of the cursor z-node
private volatile Stat cursorLedgerStat;

private volatile ManagedCursorInfo managedCursorInfo;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved

private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
PositionImplRecyclable position = PositionImplRecyclable.create();
Expand Down Expand Up @@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
return cursorProperties;
}

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
private CompletableFuture<Void> computeCursorProperties(
final Function<Map<String, String>, Map<String, String>> updateFunction) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(info)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, stat, new MetaStoreCallback<>() {

final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved

Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(ManagedCursorImpl.this.managedCursorInfo)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
.build();

ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties);
ManagedCursorImpl.this.cursorProperties = cursorProperties;
ManagedCursorImpl.this.managedCursorInfo = copy;
ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.unmodifiableMap alone won't make a map immutable if the map gets modified outside of the wrapper. is newProperties already a copy?

Copy link
Member Author

@coderzc coderzc Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the newProperties already a copy, but properties are not copied at setCursorProperties, I will deal with it in another PR("support internal properties").

cursorLedgerStat = stat;
updateCursorPropertiesResult.complete(result);
}
Expand All @@ -353,18 +360,33 @@ public void operationFailed(MetaStoreException e) {
updateCursorPropertiesResult.completeExceptionally(e);
}
});
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties, e);
updateCursorPropertiesResult.completeExceptionally(e);
}
});
return updateCursorPropertiesResult;
}

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return computeCursorProperties(lastRead -> cursorProperties);
}

@Override
public CompletableFuture<Void> putCursorProperty(String key, String value) {
return computeCursorProperties(lastRead -> {
Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
newProperties.put(key, value);
return newProperties;
});
}

@Override
public CompletableFuture<Void> removeCursorProperty(String key) {
return computeCursorProperties(lastRead -> {
Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
newProperties.remove(key);
return newProperties;
});
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
Expand Down Expand Up @@ -410,6 +432,7 @@ void recover(final VoidCallback callback) {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {

ManagedCursorImpl.this.managedCursorInfo = info;
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;

Expand Down Expand Up @@ -2499,6 +2522,8 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
return;
}

final Stat lastCursorLedgerStat = cursorLedgerStat;

// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
Expand All @@ -2520,10 +2545,12 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, position);
}

ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerStat,
ManagedCursorInfo cursorInfo = info.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, cursorInfo, lastCursorLedgerStat,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
managedCursorInfo = cursorInfo;
cursorLedgerStat = stat;
callback.operationComplete(result, stat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;

Expand Down Expand Up @@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f

return compositeFuture;
}

public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
Class<? extends Exception> needRetryExceptionClass) {
CompletableFuture<T> resultFuture = new CompletableFuture<>();
op.get().whenComplete((res, ex) -> {
if (ex == null) {
resultFuture.complete(res);
} else {
if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
if (ex2 == null) {
resultFuture.complete(res2);
} else {
resultFuture.completeExceptionally(ex2);
}
});
return;
}
resultFuture.completeExceptionally(ex);
}
});

return resultFuture;
}
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -74,6 +74,21 @@ public Map<String, String> getCursorProperties() {
return Collections.emptyMap();
}

@Override
public CompletableFuture<Void> putCursorProperty(String key, String value) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> removeCursorProperty(String key) {
return CompletableFuture.completedFuture(null);
}

@Override
public boolean putProperty(String key, Long value) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import static org.testng.Assert.assertEquals;

import static org.testng.Assert.assertNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
Expand Down Expand Up @@ -209,6 +214,12 @@ void testUpdateCursorProperties() throws Exception {
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);

c1.putCursorProperty("custom3", "Five").get();
cursorPropertiesUpdated.put("custom3", "Five");
c1.removeCursorProperty("custom1").get();
cursorPropertiesUpdated.remove("custom1");
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);

// Create a new factory to force a managed ledger close and recovery
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Reopen the managed ledger
Expand All @@ -218,6 +229,38 @@ void testUpdateCursorProperties() throws Exception {
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);

ledger.close();

factory2.shutdown();
}

@Test
public void testUpdateCursorPropertiesConcurrent() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");

List<CompletableFuture<Void>> futures = new ArrayList<>();

Map<String, String> map = new HashMap<>();
map.put("a", "1");
map.put("b", "2");
map.put("c", "3");

futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
ManagedLedgerException.BadVersionException.class));

futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
ManagedLedgerException.BadVersionException.class));

futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
ManagedLedgerException.BadVersionException.class));

for (CompletableFuture<Void> future : futures) {
future.get();
}

assertEquals(c1.getCursorProperties().get("a"), "2");
assertEquals(c1.getCursorProperties().get("b"), "2");
assertNull(c1.getCursorProperties().get("c"));
}
}