diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java index 785f0b83758..53bb137e3d0 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java @@ -1287,6 +1287,20 @@ public List getAllSubjects(Map requestProperties, return response; } + public List getAllSubjectsWithPagination(int offset, int limit) + throws IOException, RestClientException { + return getAllSubjectsWithPagination(DEFAULT_REQUEST_PROPERTIES, offset, limit); + } + + public List getAllSubjectsWithPagination(Map requestProperties, + int offset, int limit) + throws IOException, RestClientException { + String url = "/subjects?limit=" + limit + "&offset=" + offset; + List response = httpRequest(url, "GET", null, requestProperties, + ALL_TOPICS_RESPONSE_TYPE); + return response; + } + public List getDeletedOnlySubjects(String subjectPrefix) throws IOException, RestClientException { return getAllSubjects(DEFAULT_REQUEST_PROPERTIES, subjectPrefix, false, true); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java index 2510ba391bc..365521ac8dd 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java @@ -221,6 +221,17 @@ public class SchemaRegistryConfig extends RestConfig { public static final String SCHEMA_SEARCH_MAX_LIMIT_CONFIG = "schema.search.max.limit"; public static final int SCHEMA_SEARCH_MAX_LIMIT_DEFAULT = 1000; + /** + * subject.search.default.limit + */ + public static final String SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG = "subject.search.default.limit"; + public static final int SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT = 20000; + /** + * subject.search.max.limit + */ + public static final String SUBJECT_SEARCH_MAX_LIMIT_CONFIG = "subject.search.max.limit"; + public static final int SUBJECT_SEARCH_MAX_LIMIT_DEFAULT = 20000; + public static final String METADATA_ENCODER_SECRET_CONFIG = "metadata.encoder.secret"; public static final String METADATA_ENCODER_OLD_SECRET_CONFIG = "metadata.encoder.old.secret"; @@ -380,6 +391,10 @@ public class SchemaRegistryConfig extends RestConfig { "The default limit for schema searches."; protected static final String SCHEMA_SEARCH_MAX_LIMIT_DOC = "The max limit for schema searches."; + protected static final String SUBJECT_SEARCH_DEFAULT_LIMIT_DOC = + "The default limit for subject searches."; + protected static final String SUBJECT_SEARCH_MAX_LIMIT_DOC = + "The max limit for subject searches."; protected static final String METADATA_ENCODER_SECRET_DOC = "The secret used to encrypt and decrypt encoder keysets. " + "Use a random string with high entropy."; @@ -601,6 +616,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0), SCHEMA_SEARCH_MAX_LIMIT_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_SEARCH_MAX_LIMIT_DOC ) + .define(SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG, ConfigDef.Type.INT, + SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT, + ConfigDef.Importance.LOW, SUBJECT_SEARCH_DEFAULT_LIMIT_DOC + ) + .define(SUBJECT_SEARCH_MAX_LIMIT_CONFIG, ConfigDef.Type.INT, + SUBJECT_SEARCH_MAX_LIMIT_DEFAULT, + ConfigDef.Importance.LOW, SUBJECT_SEARCH_MAX_LIMIT_DOC + ) .define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC ) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java index 26f173f0543..c763b41bf57 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java @@ -15,6 +15,7 @@ package io.confluent.kafka.schemaregistry.rest.resources; +import com.google.common.collect.Streams; import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; @@ -35,6 +36,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tags; +import java.util.Iterator; import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +48,10 @@ import javax.ws.rs.DefaultValue; import javax.ws.rs.QueryParam; import javax.ws.rs.PathParam; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + @Path("/schemas") @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, @@ -101,7 +103,6 @@ public List getSchemas( @Parameter(description = "Pagination size for results. Ignored if negative") @DefaultValue("-1") @QueryParam("limit") int limit) { Iterator schemas; - List filteredSchemas = new ArrayList<>(); String errorMessage = "Error while getting schemas for prefix " + subjectPrefix; LookupFilter filter = lookupDeletedSchema ? LookupFilter.INCLUDE_DELETED : LookupFilter.DEFAULT; try { @@ -117,17 +118,11 @@ public List getSchemas( } catch (SchemaRegistryException e) { throw Errors.schemaRegistryException(errorMessage, e); } - limit = schemaRegistry.normalizeLimit(limit); - int toIndex = offset + limit; - int index = 0; - while (schemas.hasNext() && index < toIndex) { - ExtendedSchema schema = schemas.next(); - if (index >= offset) { - filteredSchemas.add(schema); - } - index++; - } - return filteredSchemas; + limit = schemaRegistry.normalizeSchemaLimit(limit); + return Streams.stream(schemas) + .skip(offset) + .limit(limit) + .collect(Collectors.toList()); } @GET diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java index 4018fd9d44e..d2cc24a54a1 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java @@ -40,6 +40,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tags; import java.util.HashMap; +import java.util.LinkedHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; @Path("/subjects") @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, @@ -229,6 +232,10 @@ public Set list( @DefaultValue(QualifiedSubject.CONTEXT_WILDCARD) @Parameter(description = "Subject name prefix") @QueryParam("subjectPrefix") String subjectPrefix, + @Parameter(description = "Pagination offset for results") + @DefaultValue("0") @QueryParam("offset") int offset, + @Parameter(description = "Pagination size for results. Ignored if negative") + @DefaultValue("-1") @QueryParam("limit") int limit, @Parameter(description = "Whether to look up deleted subjects") @QueryParam("deleted") boolean lookupDeletedSubjects, @Parameter(description = "Whether to return deleted subjects only") @@ -242,8 +249,15 @@ public Set list( filter = LookupFilter.INCLUDE_DELETED; } try { - return schemaRegistry.listSubjectsWithPrefix( - subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter); + Set subjects = schemaRegistry.listSubjectsWithPrefix( + subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter); + Stream stream = subjects.stream(); + + limit = schemaRegistry.normalizeSubjectLimit(limit); + return stream + .skip(offset) + .limit(limit) + .collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order } catch (SchemaRegistryStoreException e) { throw Errors.storeException("Error while listing subjects", e); } catch (SchemaRegistryException e) { diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index b7a2b64b5d6..c5a4f51dba6 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -153,8 +153,10 @@ public class KafkaSchemaRegistry implements SchemaRegistry, private final int initTimeout; private final boolean initWaitForReader; private final int kafkaStoreMaxRetries; - private final int searchDefaultLimit; - private final int searchMaxLimit; + private final int schemaSearchDefaultLimit; + private final int schemaSearchMaxLimit; + private final int subjectSearchDefaultLimit; + private final int subjectSearchMaxLimit; private final boolean delayLeaderElection; private final boolean allowModeChanges; private final boolean enableStoreHealthCheck; @@ -228,9 +230,13 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, .expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), TimeUnit.SECONDS) .build(s -> loadSchema(s.getSchema(), s.isNew(), s.isNormalize())); - this.searchDefaultLimit = - config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG); - this.searchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG); + this.schemaSearchDefaultLimit = + config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG); + this.schemaSearchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG); + this.subjectSearchDefaultLimit = + config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG); + this.subjectSearchMaxLimit = + config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_MAX_LIMIT_CONFIG); this.lookupCache = lookupCache(); this.idGenerator = identityGenerator(config); this.kafkaStore = kafkaStore(config); @@ -602,14 +608,22 @@ public SchemaProvider schemaProvider(String schemaType) { return providers.get(schemaType); } - public int normalizeLimit(int suppliedLimit) { - int limit = searchDefaultLimit; - if (suppliedLimit > 0 && suppliedLimit <= searchMaxLimit) { + public int normalizeLimit(int suppliedLimit, int defaultLimit, int maxLimit) { + int limit = defaultLimit; + if (suppliedLimit > 0 && suppliedLimit <= maxLimit) { limit = suppliedLimit; } return limit; } + public int normalizeSchemaLimit(int suppliedLimit) { + return normalizeLimit(suppliedLimit, schemaSearchDefaultLimit, schemaSearchMaxLimit); + } + + public int normalizeSubjectLimit(int suppliedLimit) { + return normalizeLimit(suppliedLimit, subjectSearchDefaultLimit, subjectSearchMaxLimit); + } + public Schema register(String subject, RegisterSchemaRequest request, boolean normalize) throws SchemaRegistryException { try { diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java index 4f6ba04128c..a2a8a5e81a6 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java @@ -155,6 +155,14 @@ public void testBasic() throws Exception { allSubjects, restApp.restClient.getAllSubjects()); + // test pagination with limit of 1 + assertEquals("Getting all subjects with pagination offset=0, limit=1 should return first registered subject", + ImmutableList.of(allSubjects.get(0)), + restApp.restClient.getAllSubjectsWithPagination(0, 1)); + assertEquals("Getting all subjects with pagination offset=1, limit=1 should return second registered subject", + ImmutableList.of(allSubjects.get(1)), + restApp.restClient.getAllSubjectsWithPagination(1, 1)); + List latestSchemas = restApp.restClient.getSchemas(null, false, true); assertEquals("Getting latest schemas should return two schemas", 2,