Skip to content

Commit

Permalink
[Broker] make grantPermissionsOnTopic method async (#14152)
Browse files Browse the repository at this point in the history
  • Loading branch information
liudezhi2098 authored Apr 24, 2022
1 parent 57aff0a commit 1344b33
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,53 +256,62 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
}

private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
} else {
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
}
log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
topicUri);
} catch (InterruptedException e) {
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
} catch (ExecutionException e) {
// The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (e.getCause() instanceof MetadataStoreException.NotFoundException
|| e.getCause() instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
topicUri, e);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (e.getCause() instanceof MetadataStoreException.BadVersionException
|| e.getCause() instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for topic {}: {}",
clientAppId(), topicUri, e.getCause().getMessage(), e);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String role, Set<AuthAction> actions) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
.thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
clientAppId(), role, actions, topicUri))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
clientAppId(), topicUri, realCause);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
realCause.getMessage(), realCause);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
realCause);
throw new RestException(realCause);
}
});
} else {
String msg = "Authorization is not enabled";
return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}

protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
int numPartitions = meta.partitions;
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
grantPermissions(topicNamePartition, role, actions);
}
}
grantPermissions(topicName, role, actions);
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
actions));
}
}
return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}

protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ public Map<String, Set<AuthAction>> getPermissionsOnTopic(@PathParam("property")
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void grantPermissionsOnTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role,
public void grantPermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("role") String role,
Set<AuthAction> actions) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGrantPermissionsOnTopic(role, actions);

try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGrantPermissionsOnTopic(asyncResponse, role, actions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public Map<String, Set<AuthAction>> getPermissionsOnTopic(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void grantPermissionsOnTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -174,8 +175,14 @@ public void grantPermissionsOnTopic(
@ApiParam(value = "Actions to be granted (produce,functions,consume)",
allowableValues = "produce,functions,consume")
Set<AuthAction> actions) {
validateTopicName(tenant, namespace, encodedTopic);
internalGrantPermissionsOnTopic(role, actions);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGrantPermissionsOnTopic(asyncResponse, role, actions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,11 @@ public void persistentTopics() throws Exception {
// grant permission
final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
final String role = "test-role";
persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, topic, role, actions);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, property, cluster, namespace, topic, role, actions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// verify permission
Map<String, Set<AuthAction>> permission = persistentTopics.getPermissionsOnTopic(property, cluster,
namespace, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,11 @@ public void testGrantNonPartitionedTopic() {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), expectActions);
}
Expand Down Expand Up @@ -659,7 +663,12 @@ public void testGrantPartitionedTopic() {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), expectActions);
Expand All @@ -680,9 +689,13 @@ public void testRevokeNonPartitionedTopic() {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand All @@ -703,7 +716,12 @@ public void testRevokePartitionedTopic() {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
responseCaptor = ArgumentCaptor.forClass(Response.class);
Expand Down

0 comments on commit 1344b33

Please sign in to comment.