Skip to content

Commit

Permalink
make future synchronize
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Aug 19, 2022
1 parent 023424e commit 4cce776
Showing 1 changed file with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.AsyncOperationTimeoutSeconds;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
Expand Down Expand Up @@ -377,7 +377,12 @@ public void putCursorProperty(String key, String value) {
CURSOR_PROPERTIES_UPDATER.updateAndGet(this, map -> {
Map<String, String> newProperties = map == null ? new TreeMap<>() : new TreeMap<>(map);
newProperties.put(key, value);
FutureUtil.waitForAll(Collections.singleton(setCursorProperties(newProperties)));
try {
setCursorProperties(newProperties).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to set cursor properties", e);
throw new RuntimeException(e);
}
return newProperties;
});
}
Expand All @@ -393,7 +398,12 @@ public void setAllCursorProperties(Map<String, String> properties) {
}
});
}
FutureUtil.waitForAll(Collections.singleton(setCursorProperties(newProperties)));
try {
setCursorProperties(newProperties).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to set cursor properties", e);
throw new RuntimeException(e);
}
return newProperties;
});
}
Expand All @@ -403,7 +413,12 @@ public void removeCursorProperty(String key) {
CURSOR_PROPERTIES_UPDATER.updateAndGet(this, map -> {
Map<String, String> newProperties = map == null ? new TreeMap<>() : new TreeMap<>(map);
newProperties.remove(key);
FutureUtil.waitForAll(Collections.singleton(setCursorProperties(newProperties)));
try {
setCursorProperties(newProperties).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to set cursor properties", e);
throw new RuntimeException(e);
}
return newProperties;
});
}
Expand Down

0 comments on commit 4cce776

Please sign in to comment.