Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] PIP-299-part-5: Add namespace-level policy: dispatcherPauseOnAckStatePersistent #21926

Merged
merged 5 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2675,4 +2675,27 @@ protected Policies getDefaultPolicesIfNull(Policies policies) {
}
return policies;
}

protected CompletableFuture<Void> internalSetDispatcherPauseOnAckStatePersistentAsync(
boolean dispatcherPauseOnAckStatePersistentEnabled) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.dispatcherPauseOnAckStatePersistentEnabled = dispatcherPauseOnAckStatePersistentEnabled;
return policies;
}));
}

protected CompletableFuture<Object> internalGetDispatcherPauseOnAckStatePersistentAsync() {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.READ)
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> {
if (!policiesOpt.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist");
}
return policiesOpt.map(p -> p.dispatcherPauseOnAckStatePersistentEnabled).orElse(false);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2777,5 +2777,66 @@ public void enableMigration(@PathParam("tenant") String tenant,
internalEnableMigration(migrated);
}

@POST
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetDispatcherPauseOnAckStatePersistentAsync(true)
.thenRun(() -> {
log.info("[{}] Successfully enabled dispatcherPauseOnAckStatePersistent: namespace={}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Remove dispatcher pause on ack state persistent configuration for specified namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetDispatcherPauseOnAckStatePersistentAsync(false)
.thenRun(() -> {
log.info("[{}] Successfully remove dispatcherPauseOnAckStatePersistent: namespace={}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Get dispatcher pause on ack state persistent config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetDispatcherPauseOnAckStatePersistentAsync()
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);

updateEntryFilters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2179,4 +2179,20 @@ private void createTestNamespaces(List<NamespaceName> nsnames, Policies policies
asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), policies));
}
}

@Test
public void testDispatcherPauseOnAckStatePersistent() throws Exception {
String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");

admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));

assertFalse(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
// should pass
admin.namespaces().setDispatcherPauseOnAckStatePersistent(namespace);
assertTrue(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
admin.namespaces().removeDispatcherPauseOnAckStatePersistent(namespace);
assertFalse(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));

admin.namespaces().deleteNamespace(namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ public boolean hasAckedMessage(String v) {
public Object[][] typesOfSetDispatcherPauseOnAckStatePersistent() {
return new Object[][]{
{TypeOfUpdateTopicConfig.BROKER_CONF},
//{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
{TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY}
};
}

public enum TypeOfUpdateTopicConfig {
BROKER_CONF,
NAMESPACE_LEVEL_POLICY,
TOPIC_LEVEL_POLICY;
}

Expand All @@ -235,6 +236,9 @@ private void enableDispatcherPauseOnAckStatePersistentAndCreateTopic(String tpNa
} else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
admin.topics().createNonPartitionedTopic(tpName);
admin.topicPolicies().setDispatcherPauseOnAckStatePersistent(tpName).join();
} else if (type == TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY) {
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().setDispatcherPauseOnAckStatePersistent(TopicName.get(tpName).getNamespace());
}
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
Expand All @@ -256,6 +260,8 @@ private void disableDispatcherPauseOnAckStatePersistent(String tpName, TypeOfUpd
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled", "false");
} else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
admin.topicPolicies().removeDispatcherPauseOnAckStatePersistent(tpName).join();
} else if (type == TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY) {
admin.namespaces().removeDispatcherPauseOnAckStatePersistent(TopicName.get(tpName).getNamespace());
}
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4648,4 +4648,36 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException;

/**
* Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously.
*/
CompletableFuture<Void> setDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Remove entry filters of a namespace.
* @param namespace Namespace name
* @throws PulsarAdminException
*/
void setDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

/**
* Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace asynchronously.
*/
CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace.
*/
void removeDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

/**
* Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace asynchronously.
*/
CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace.
*/
boolean getDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class Policies {

public boolean migrated;

public Boolean dispatcherPauseOnAckStatePersistentEnabled;

public enum BundleType {
LARGEST, HOT;
}
Expand Down Expand Up @@ -158,7 +160,8 @@ public int hashCode() {
offload_policies,
subscription_types_enabled,
properties,
resource_group_name, entryFilters, migrated);
resource_group_name, entryFilters, migrated,
dispatcherPauseOnAckStatePersistentEnabled);
}

@Override
Expand Down Expand Up @@ -206,7 +209,9 @@ public boolean equals(Object obj) {
&& Objects.equals(properties, other.properties)
&& Objects.equals(migrated, other.migrated)
&& Objects.equals(resource_group_name, other.resource_group_name)
&& Objects.equals(entryFilters, other.entryFilters);
&& Objects.equals(entryFilters, other.entryFilters)
&& Objects.equals(dispatcherPauseOnAckStatePersistentEnabled,
other.dispatcherPauseOnAckStatePersistentEnabled);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,6 @@ public void removeNamespaceResourceGroup(String namespace) throws PulsarAdminExc
@Override
public CompletableFuture<Void> clearPropertiesAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<String> future = new CompletableFuture<>();
WebTarget path = namespacePath(ns, "properties");
return asyncDeleteRequest(path);
}
Expand Down Expand Up @@ -1958,4 +1957,40 @@ public CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace
WebTarget path = namespacePath(ns, "entryFilters");
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<Void> setDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public void setDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
sync(() -> setDispatcherPauseOnAckStatePersistentAsync(namespace));
}

@Override
public CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncDeleteRequest(path);
}

@Override
public void removeDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
sync(() -> removeDispatcherPauseOnAckStatePersistentAsync(namespace));
}

@Override
public CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncGetRequest(path, new FutureCallback<Boolean>(){});
}

@Override
public boolean getDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
return sync(() -> getDispatcherPauseOnAckStatePersistentAsync(namespace));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ public void namespaces() throws Exception {
namespaces.run(split("remove-deduplication-snapshot-interval myprop/clust/ns1"));
verify(mockNamespaces).removeDeduplicationSnapshotInterval("myprop/clust/ns1");

namespaces.run(split("set-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).setDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

namespaces.run(split("get-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).getDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

namespaces.run(split("remove-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).removeDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,42 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Enable dispatcherPauseOnAckStatePersistent for a namespace")
private class SetDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().setDispatcherPauseOnAckStatePersistent(namespace);
}
}

@Parameters(commandDescription = "Get the dispatcherPauseOnAckStatePersistent for a namespace")
private class GetDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getAdmin().namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
}
}

@Parameters(commandDescription = "Remove dispatcherPauseOnAckStatePersistent for a namespace")
private class RemoveDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeDispatcherPauseOnAckStatePersistent(namespace);
}
}

public CmdNamespaces(Supplier<PulsarAdmin> admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
Expand Down Expand Up @@ -2778,5 +2814,12 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("remove-entry-filters", new RemoveEntryFiltersPerTopic());

jcommander.addCommand("update-migration-state", new UpdateMigrationState());

jcommander.addCommand("set-dispatcher-pause-on-ack-state-persistent",
new SetDispatcherPauseOnAckStatePersistent());
jcommander.addCommand("get-dispatcher-pause-on-ack-state-persistent",
new GetDispatcherPauseOnAckStatePersistent());
jcommander.addCommand("remove-dispatcher-pause-on-ack-state-persistent",
new RemoveDispatcherPauseOnAckStatePersistent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ public enum PolicyName {
MAX_TOPICS,
RESOURCEGROUP,
ENTRY_FILTERS,
SHADOW_TOPIC
SHADOW_TOPIC,
DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TopicPolicies {
private Integer maxUnackedMessagesOnSubscription;
private Long delayedDeliveryTickTimeMillis;
private Boolean delayedDeliveryEnabled;
private Boolean dispatcherPauseOnAckStatePersistentEnabled;;
private Boolean dispatcherPauseOnAckStatePersistentEnabled;
private OffloadPoliciesImpl offloadPolicies;
private InactiveTopicPolicies inactiveTopicPolicies;
private DispatchRateImpl dispatchRate;
Expand Down
Loading