Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make cursor properties support modify single value concurrently. #17164

Merged
merged 26 commits into from
Sep 19, 2022

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Aug 18, 2022

Motivation

Currently, the cursor properties only support overall update and it is not thread-safe, but sometimes we need to modify a single value concurrently, such as PIP-195

Modifications

  • Add putCursorProperty / removeCursorProperty method.
  • Remove asyncGetCursorInfo operation and cached ManagedCursorInfo.
  • Get and fixed stat before assembly data to ensure data consistency.
  • Add some tests for concurrent operation.
  • Add the util method for retry operation when BadVersionException has occurred.
  • Warp the properties to an unmodifiable view on the callback.
  • Package updateCursorLedgerStat method to update cursorLedgerStat and managedCursorInfo at the same time.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@codelipenghui codelipenghui added this to the 2.12.0 milestone Aug 19, 2022
@codelipenghui codelipenghui added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker doc-not-needed Your PR changes do not impact docs labels Aug 19, 2022
@coderzc coderzc force-pushed the improve_cursor_property branch from e55976f to 99d89e5 Compare August 19, 2022 07:50
@coderzc coderzc force-pushed the improve_cursor_property branch from 99d89e5 to 4cce776 Compare August 19, 2022 10:09
@coderzc coderzc requested a review from lhotari August 23, 2022 06:51
@coderzc
Copy link
Member Author

coderzc commented Sep 2, 2022

@lhotari Please review again.

@lhotari
Copy link
Member

lhotari commented Sep 5, 2022

In the description it says: "use a special prefix to prevent internal property loss."
Why is a special prefix needed? What are the consequences of this changes? Is it backwards compatible?
@coderzc Please explain this detail.

@lhotari lhotari requested a review from eolivelli September 5, 2022 11:08
@coderzc
Copy link
Member Author

coderzc commented Sep 5, 2022

In the description it says: "use a special prefix to prevent internal property loss." Why is a special prefix needed? What are the consequences of this changes? Is it backwards compatible? @coderzc Please explain this detail.

@lhotari
In the original implementation, all key-value is removed when update subscription properties, this will cause internal property loss, so use a special prefix to prevent internal properties removed by updateSubscriptionProperties.

Now, the internal property has not been used, it will first be used to PIP-195.

public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
Map<String, String> newSubscriptionProperties;
if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
newSubscriptionProperties = Collections.emptyMap();
} else {
newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
}
return cursor.setCursorProperties(newSubscriptionProperties)
.thenRun(() -> {
this.subscriptionProperties = newSubscriptionProperties;
});
}

public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(info)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),

}
});

updateCursorPropertiesResultRef.setValue(asyncUpdateCursorProperties(newProperties));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new properties must take effect only after they are persisted to storage.
otherwise:

  • in case of failure the value is no consistent on storage
  • other readers of the value won't see the value consistently (who reads from this node may see something that will never been read)

Copy link
Member Author

@coderzc coderzc Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, I use a non-reentrant lock instead of AtomicReferenceFieldUpdater to ensure the new properties take effect after they are persisted to storage. And keep using an asynchronous method.

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
final Function<Map<String, String>, Map<String, String>> updateFunction) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the get operation?
Instead, we can only update the local properties cache if we get a BadVersion exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, we can only update the local properties cache if we get a BadVersion exception

Will this introduce inconsistency between broker memory state and zookeeper storage?

CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here might introduce a problem, the ManagedCursorImpl.this.cursorProperties is staled, but we will update the zookeeper with a new stat.

Copy link
Member Author

@coderzc coderzc Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the zookeeper with local stat instead of new stat?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the properties from info.

Copy link
Member Author

@coderzc coderzc Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jason918 Maybe we can be removed asyncGetCursorInfo and cache ManagedCursorInfo, what do you think?

@coderzc
Copy link
Member Author

coderzc commented Sep 14, 2022

@Jason918 @codelipenghui

I removed asyncGetCursorInfo operation and cached ManagedCursorInfo, all update operations will with a local stat, and all local cache data may old than zookeeper storage but they won't be newer than zookeeper storage.

I'm not sure if I have missed considerations. Please take a look, thanks~

@coderzc coderzc force-pushed the improve_cursor_property branch from 4d4d513 to 3da8e12 Compare September 15, 2022 07:27
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks better !

Copy link
Contributor

@Jason918 Jason918 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

@coderzc coderzc force-pushed the improve_cursor_property branch from 6340d80 to ad83c9d Compare September 16, 2022 08:53
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another one is related to this PR.
But it's better to improve it.

The getProperties() returned a mutable structure, it should be a risk it the properties has been modified outside.

@coderzc coderzc force-pushed the improve_cursor_property branch from fff1987 to 292ed62 Compare September 16, 2022 11:12
@codelipenghui
Copy link
Contributor

@lhotari Please help review this PR again.

@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties);
ManagedCursorImpl.this.cursorProperties = cursorProperties;
ManagedCursorImpl.this.managedCursorInfo = copy;
ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.unmodifiableMap alone won't make a map immutable if the map gets modified outside of the wrapper. is newProperties already a copy?

Copy link
Member Author

@coderzc coderzc Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the newProperties already a copy, but properties are not copied at setCursorProperties, I will deal with it in another PR("support internal properties").

@lhotari lhotari dismissed their stale review September 16, 2022 14:07

Questions added, but no changes requested anymore.

Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic LGTM.

@mattisonchao
Copy link
Member

@lhotari Please take the final review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants