Skip to content

Commit

Permalink
[fix][security] Add timeout of sync methods and avoid call sync metho…
Browse files Browse the repository at this point in the history
…d for AuthoriationService (apache#15694)
  • Loading branch information
codelipenghui authored Jun 9, 2022
1 parent 0052003 commit 6af365e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,11 @@ public boolean allowTenantOperation(String tenantName,
TenantOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTenantOperationAsync(
tenantName, operation, originalRole, role, authData).get();
tenantName, operation, originalRole, role, authData).get(
conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -455,10 +456,11 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
PolicyOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, originalRole, role, authData).get();
namespaceName, policy, operation, originalRole, role, authData).get(
conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -516,10 +518,11 @@ public Boolean allowTopicPolicyOperation(TopicName topicName,
PolicyOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTopicPolicyOperationAsync(
topicName, policy, operation, originalRole, role, authData).get();
topicName, policy, operation, originalRole, role, authData).get(
conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -596,9 +599,10 @@ public Boolean allowTopicOperation(TopicName topicName,
TopicOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -4014,46 +4013,55 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException authException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, authException.getMessage()));
}
} catch (Exception ex) {
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, ex);
throw ex;
}
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});

// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,14 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId, false));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
authException.getMessage(), requestId));
return;
} catch (Exception e) {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
validationFuture.completeExceptionally(e);
return;
}
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
Expand All @@ -248,21 +238,36 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured", requestId));
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));

}).exceptionally(ex -> {
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
return null;
});
})
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
throwable.getMessage(), requestId));
} else {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
validationFuture.completeExceptionally(throwable);
}
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,18 +907,21 @@ private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsy
});
}

protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
AuthenticationDataSource authenticationData) throws Exception {
protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
String role, AuthenticationDataSource authenticationData) {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
return;
return CompletableFuture.completedFuture(null);
}
// get zk policy manager
if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
TopicOperation.LOOKUP, null, role, authenticationData)) {
log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
if (!allow) {
log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
throw new RestException(Status.UNAUTHORIZED,
"Don't have permission to connect to this namespace");
}
});
}

// Used for unit tests access
Expand Down

0 comments on commit 6af365e

Please sign in to comment.