Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use more granular permissions for topics #6504

Merged
merged 1 commit into from
Mar 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,44 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
}

public void validateReadOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
}
validateAdminAccessForSubscriber("");
}
}

public void validateWriteOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
}
try {
if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(),
clientAuthData())) {
log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId());
throw new RestException(Status.UNAUTHORIZED,
String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
}
} catch (RestException re) {
throw re;
} catch (Exception ex) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), ex);
throw new RestException(ex);
}
}
}

protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
Expand Down Expand Up @@ -317,7 +355,7 @@ protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> acti
}

protected void internalDeleteTopicForcefully(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
topic.deleteForcefully().get();
Expand Down Expand Up @@ -391,7 +429,7 @@ protected void internalRevokePermissionsOnTopic(String role) {
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(authoritative);
validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down Expand Up @@ -426,7 +464,7 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
* @param numPartitions
*/
protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(false);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
Expand Down Expand Up @@ -540,7 +578,7 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
try {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down Expand Up @@ -738,7 +776,7 @@ protected void internalDeleteTopic(boolean authoritative, boolean force) {
}

protected void internalDeleteTopic(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);

// v2 topics have a global name so check if the topic is replicated.
Expand Down Expand Up @@ -825,7 +863,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
Expand Down Expand Up @@ -1279,7 +1317,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
// validate ownership and redirect if current broker is not owner
PersistentTopic topic;
try {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);

topic = (PersistentTopic) getTopicReference(topicName);
} catch (Exception e) {
Expand Down Expand Up @@ -1744,7 +1782,7 @@ protected MessageId internalTerminate(boolean authoritative) {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
}
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
return ((PersistentTopic) topic).terminate().get();
Expand Down Expand Up @@ -1867,7 +1905,7 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
}

protected void internalTriggerCompaction(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
Expand All @@ -1880,13 +1918,13 @@ protected void internalTriggerCompaction(boolean authoritative) {
}

protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus();
}

protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
Expand All @@ -1899,7 +1937,7 @@ protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messa
}

protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}
Expand Down Expand Up @@ -2237,7 +2275,7 @@ private void validateNonPartitionTopicName(String topicName) {
}

protected MessageId internalGetLastMessageId(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);

if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);
Expand Down