From 96103a92e21442a62491f4e55d58d45e6c59e1bd Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Mon, 25 Jul 2022 22:20:07 +0800 Subject: [PATCH 1/2] [improve][broker] make some methods async in Namespaces --- .../broker/admin/impl/NamespacesBase.java | 20 ------- .../pulsar/broker/admin/v1/Namespaces.java | 31 ++++++++-- .../pulsar/broker/admin/v2/Namespaces.java | 60 +++++++++++++++---- 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5505afc4cf8df..838304117bc1b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1584,13 +1584,6 @@ private void doUpdatePersistence(PersistencePolicies persistence) { } } - protected PersistencePolicies internalGetPersistence() { - validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.persistence; - } - protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); @@ -1808,12 +1801,6 @@ protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscription } } - protected SubscriptionAuthMode internalGetSubscriptionAuthMode() { - validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ); - Policies policies = getNamespacePolicies(namespaceName); - return policies.subscription_auth_mode; - } - protected void internalModifyEncryptionRequired(boolean encryptionRequired) { validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); @@ -1843,13 +1830,6 @@ protected DelayedDeliveryPolicies internalGetDelayedDelivery() { return getNamespacePolicies(namespaceName).delayed_delivery_policies; } - protected InactiveTopicPolicies internalGetInactiveTopic() { - validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.inactive_topic_policies; - } - protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) { validateSuperUserAccess(); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 1c2fb282e1098..5084e09fd3127 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1257,10 +1257,20 @@ public void deleteBookieAffinityGroup(@PathParam("property") String property, @P @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public PersistencePolicies getPersistence(@PathParam("property") String property, + public void getPersistence( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetPersistence(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.persistence)) + .exceptionally(ex -> { + log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1384,11 +1394,20 @@ public void setSubscriptionAuthMode(@PathParam("property") String property, @Pat @ApiOperation(value = "Get subscription auth mode in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) - public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getSubscriptionAuthMode(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetSubscriptionAuthMode(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode)) + .exceptionally(ex -> { + log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 5417f2e19c1e9..8158bcc362a4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1298,10 +1298,20 @@ public void deleteBookieAffinityGroup(@PathParam("property") String property, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public PersistencePolicies getPersistence(@PathParam("tenant") String tenant, + public void getPersistence( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetPersistence(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.persistence)) + .exceptionally(ex -> { + log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1425,10 +1435,20 @@ public void setSubscriptionAuthMode(@PathParam("tenant") String tenant, @ApiOperation(value = "Get subscription auth mode in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")}) - public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getSubscriptionAuthMode( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetSubscriptionAuthMode(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode)) + .exceptionally(ex -> { + log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1451,10 +1471,19 @@ public void modifyEncryptionRequired( @ApiOperation(value = "Get message encryption required status in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")}) - public Boolean getEncryptionRequired(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getEncryptionRequired(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetEncryptionRequired(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.encryption_required)) + .exceptionally(ex -> { + log.error("[{}] Failed to get message encryption required status in a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -1499,10 +1528,19 @@ public void removeDelayedDeliveryPolicies(@PathParam("tenant") String tenant, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification"), }) - public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetInactiveTopic(); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.inactive_topic_policies)) + .exceptionally(ex -> { + log.error("[{}] Failed to get inactive topic policies config on a namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE From 8a620aa4f3fd5caecef6d3e9e87128131dba43a4 Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Tue, 26 Jul 2022 10:01:21 +0800 Subject: [PATCH 2/2] fix unit test --- .../org/apache/pulsar/broker/admin/NamespacesTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 2369a0af4bb96..1a4372985ea90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1062,8 +1062,11 @@ public void testPersistence() throws Exception { NamespaceName testNs = this.testLocalNamespaces.get(0); PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0); namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1); - PersistencePolicies persistence2 = namespaces.getPersistence(testNs.getTenant(), testNs.getCluster(), - testNs.getLocalName()); + AsyncResponse response = mock(AsyncResponse.class); + namespaces.getPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); + ArgumentCaptor captor = ArgumentCaptor.forClass(PersistencePolicies.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + PersistencePolicies persistence2 = captor.getValue(); assertEquals(persistence2, persistence1); }