Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(schema-resolver): caching of latest artifacts and resilient schema updates #3839

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
schemaCache.configureLifetime(config.getCheckPeriod());
schemaCache.configureRetryBackoff(config.getRetryBackoff());
schemaCache.configureRetryCount(config.getRetryCount());
schemaCache.configureCacheLatest(config.getCacheLatest());
schemaCache.configureFaultTolerantRefresh(config.getFaultTolerantRefresh());

schemaCache.configureGlobalIdKeyExtractor(SchemaLookupResult::getGlobalId);
schemaCache.configureContentKeyExtractor(schema -> Optional.ofNullable(schema.getParsedSchema().getRawSchema()).map(IoUtil::toString).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.apicurio.registry.resolver.strategy.ArtifactCoordinates;
import io.apicurio.registry.rest.client.exception.RateLimitedClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -36,6 +38,7 @@
*/
public class ERCache<V> {

private final static Logger log = LoggerFactory.getLogger(ERCache.class);
/** Global ID index */
private final Map<Long, WrappedValue<V>> index1 = new ConcurrentHashMap<>();
/** Data content index */
Expand All @@ -56,6 +59,8 @@ public class ERCache<V> {
private Duration lifetime = Duration.ZERO;
private Duration backoff = Duration.ofMillis(200);
private long retries;
private boolean cacheLatest;
private boolean faultTolerantRefresh;

// === Configuration

Expand All @@ -71,6 +76,26 @@ public void configureRetryCount(long retries) {
this.retries = retries;
}

/**
* If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false
* will effectively disable caching for schema lookups that do not specify a version.
*
* @param cacheLatest Whether to enable cache of artifacts without a version specified.
*/
public void configureCacheLatest(boolean cacheLatest) {
this.cacheLatest = cacheLatest;
}

/**
* If set to {@code true}, will log the load error instead of throwing it when an exception occurs trying to refresh
* a cache entry. This will still honor retries before enacting this behavior.
*
* @param faultTolerantRefresh Whether to enable fault tolerant refresh behavior.
*/
public void configureFaultTolerantRefresh(boolean faultTolerantRefresh) {
this.faultTolerantRefresh = faultTolerantRefresh;
}

public void configureGlobalIdKeyExtractor(Function<V, Long> keyExtractor) {
this.keyExtractor1 = keyExtractor;
}
Expand All @@ -91,6 +116,26 @@ public void configureContentHashKeyExtractor(Function<V, String> keyExtractor) {
this.keyExtractor5 = keyExtractor;
}

/**
* Return whether caching of artifact lookups with {@code null} versions is enabled.
*
* @return {@code true} if it's enabled.
* @see #configureCacheLatest(boolean)
*/
public boolean isCacheLatest() {
return this.cacheLatest;
}

/**
* Return whether fault tolerant refresh is enabled.
*
* @return {@code true} if it's enabled.
* @see #configureFaultTolerantRefresh(boolean)
*/
public boolean isFaultTolerantRefresh() {
return this.faultTolerantRefresh;
}

public void checkInitialized() {
boolean initialized = keyExtractor1 != null && keyExtractor2 != null &&
keyExtractor3 != null && keyExtractor4 != null && keyExtractor5 != null;
Expand Down Expand Up @@ -157,22 +202,34 @@ private <T> V getValue(WrappedValue<V> value, T key, Function<T, V> loaderFuncti
});
if (newValue.isOk()) {
// Index
reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok));
reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok), key);
// Return
result = newValue.ok;
} else {
if (faultTolerantRefresh && value != null) {
log.warn("Error updating cache value. Fault tolerant load using expired value", newValue.error);
return value.value;
}
log.error("Failed to update cache value for key: " + key, newValue.error);
throw newValue.error;
}
}

return result;
}

private void reindex(WrappedValue<V> newValue) {
private <T> void reindex(WrappedValue<V> newValue, T lookupKey) {
Optional.ofNullable(keyExtractor1.apply(newValue.value)).ifPresent(k -> index1.put(k, newValue));
Optional.ofNullable(keyExtractor2.apply(newValue.value)).ifPresent(k -> index2.put(k, newValue));
Optional.ofNullable(keyExtractor3.apply(newValue.value)).ifPresent(k -> index3.put(k, newValue));
Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> index4.put(k, newValue));
Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> {
index4.put(k, newValue);
// By storing the lookup key, we ensure that a null/latest lookup gets cached, as the key extractor will
// automatically add the version to the new key
if (this.cacheLatest && k.getClass().equals(lookupKey.getClass())) {
index4.put((ArtifactCoordinates) lookupKey, newValue);
}
});
Optional.ofNullable(keyExtractor5.apply(newValue.value)).ifPresent(k -> index5.put(k, newValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ public class SchemaResolverConfig {
public static final String FIND_LATEST_ARTIFACT = "apicurio.registry.find-latest";
public static final boolean FIND_LATEST_ARTIFACT_DEFAULT = false;

/**
* If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false
* will effectively disable caching for schema lookups that do not specify a version.
*/
public static final String CACHE_LATEST = "apicurio.registry.cache-latest";
public static final boolean CACHE_LATEST_DEFAULT = true;

/**
* If {@code true}, will log exceptions instead of throwing them when an error occurs trying to refresh a schema
* in the cache. This is useful for production situations where a stale schema is better than completely failing
* schema resolution. Note that this will not impact trying of retries, as retries are attempted before this flag
* is considered.
*/
public static final String FAULT_TOLERANT_REFRESH = "apicurio.registry.fault-tolerant-refresh";
public static final boolean FAULT_TOLERANT_REFRESH_DEFAULT = false;

/**
* Only applicable for serializers
* Optional, set explicitly the groupId used for querying/creating an artifact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class DefaultSchemaResolverConfig {
entry(ARTIFACT_RESOLVER_STRATEGY, ARTIFACT_RESOLVER_STRATEGY_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT, AUTO_REGISTER_ARTIFACT_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT_IF_EXISTS, AUTO_REGISTER_ARTIFACT_IF_EXISTS_DEFAULT),
entry(CACHE_LATEST, CACHE_LATEST_DEFAULT),
entry(FAULT_TOLERANT_REFRESH, FAULT_TOLERANT_REFRESH_DEFAULT),
entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT),
entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT),
entry(RETRY_COUNT, RETRY_COUNT_DEFAULT),
Expand Down Expand Up @@ -97,6 +99,14 @@ public String autoRegisterArtifactIfExists() {
return getStringOneOf(AUTO_REGISTER_ARTIFACT_IF_EXISTS, "FAIL", "UPDATE", "RETURN", "RETURN_OR_UPDATE");
}

public boolean getCacheLatest() {
return getBoolean(CACHE_LATEST);
}

public boolean getFaultTolerantRefresh() {
return getBoolean(FAULT_TOLERANT_REFRESH);
}

public boolean findLatest() {
// Should be non-null, a default value is defined
return getBoolean(FIND_LATEST_ARTIFACT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/
package io.apicurio.registry.resolver;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.resolver.strategy.ArtifactReference;

import static org.junit.jupiter.api.Assertions.*;

public class AbstractSchemaResolverTest {
@Test
void testConfigureInitializesSchemaCache() throws Exception {
Expand All @@ -37,6 +38,43 @@ void testConfigureInitializesSchemaCache() throws Exception {
}
}

@Test
void testSupportsFailureTolerantSchemaCache() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");
configs.put(SchemaResolverConfig.FAULT_TOLERANT_REFRESH, true);

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertTrue(resolver.schemaCache.isFaultTolerantRefresh());
}
}

@Test
void testDefaultsToFailureTolerantSchemaCacheDisabled() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertFalse(resolver.schemaCache.isFaultTolerantRefresh());
}
}

@Test
void testDefaultsToCacheLatestEnabled() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertTrue(resolver.schemaCache.isCacheLatest());
}
}

class TestAbstractSchemaResolver<SCHEMA, DATA> extends AbstractSchemaResolver<SCHEMA, DATA> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -106,6 +107,84 @@ void testClearEmptiesContentHashIndex() {
assertFalse(cache.containsByContentHash(contentHashKey));
}

@Test
void testThrowsLoadExceptionsByDefault() {
String contentHashKey = "another key";
ERCache<String> cache = newCache(contentHashKey);
Function<String, String> staticValueLoader = (key) -> {throw new IllegalStateException("load failure");};

assertThrows(IllegalStateException.class, () -> {cache.getByContentHash(contentHashKey, staticValueLoader);});
}

@Test
void testHoldsLoadExceptionsWhenFaultTolerantRefreshEnabled() {
String contentHashKey = "another key";
ERCache<String> cache = newCache(contentHashKey);
cache.configureLifetime(Duration.ZERO);
cache.configureFaultTolerantRefresh(true);

// Seed a value
Function<String, String> workingLoader = (key) -> {return "some value";};
String originalLoadValue = cache.getByContentHash(contentHashKey, workingLoader);

// Refresh with a failing loader
Function<String, String> failingLoader = (key) -> {throw new IllegalStateException("load failure");};
String failingLoadValue = cache.getByContentHash(contentHashKey, failingLoader);

assertEquals("some value", originalLoadValue);
assertEquals("some value", failingLoadValue);
}

@Test
void testCanCacheLatestWhenEnabled() {
ERCache<String> cache = newCache("some key");
cache.configureLifetime(Duration.ofMinutes(10));
cache.configureCacheLatest(true);

ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder()
.artifactId("someArtifactId")
.groupId("someGroupId")
.build();
final AtomicInteger loadCount = new AtomicInteger(0);
Function<ArtifactCoordinates, String> countingLoader = (key) -> {
loadCount.incrementAndGet();
return "some value";
};

// Seed a value
String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);
// Try the same lookup
String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);

assertEquals(firstLookupValue, secondLookupValue);
assertEquals(1, loadCount.get());
}

@Test
void doesNotCacheLatestWhenDisabled() {
ERCache<String> cache = newCache("some key");
cache.configureLifetime(Duration.ofMinutes(10));
cache.configureCacheLatest(false);

ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder()
.artifactId("someArtifactId")
.groupId("someGroupId")
.build();
final AtomicInteger loadCount = new AtomicInteger(0);
Function<ArtifactCoordinates, String> countingLoader = (key) -> {
loadCount.incrementAndGet();
return "some value";
};

// Seed a value
String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);
// Try the same lookup
String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);

assertEquals(firstLookupValue, secondLookupValue);
assertEquals(2, loadCount.get());
}

private ERCache<String> newCache(String contentHashKey) {
ERCache<String> cache = new ERCache<>();
cache.configureLifetime(Duration.ofDays(30));
Expand Down