Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added LRU cache for mockDB connections #36480

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface PackageName {
String APPSMITH_AI_PLUGIN = "appsmithai-plugin";
String DATABRICKS_PLUGIN = "databricks-plugin";
String AWS_LAMBDA_PLUGIN = "aws-lambda-plugin";
String MONGO_PLUGIN = "mongo-plugin";
}

public static final String DEFAULT_REST_DATASOURCE = "DEFAULT_REST_DATASOURCE";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.appsmith.server.domains;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;

@Getter
@Setter
@ToString
public class DatasourcePluginContext<T> {
private T connection;
private String pluginId;
private Instant creationTime;

public DatasourcePluginContext() {
creationTime = Instant.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
import com.appsmith.server.datasourcestorages.base.DatasourceStorageService;
import com.appsmith.server.domains.DatasourceContext;
import com.appsmith.server.domains.DatasourceContextIdentifier;
import com.appsmith.server.domains.DatasourcePluginContext;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.PluginExecutorHelper;
import com.appsmith.server.plugins.base.PluginService;
import com.appsmith.server.services.ConfigService;
import com.appsmith.server.solutions.DatasourcePermission;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
Expand All @@ -29,15 +34,32 @@
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;

@Slf4j
public class DatasourceContextServiceCEImpl implements DatasourceContextServiceCE {

// DatasourceContextIdentifier contains datasourceId & environmentId which is mapped to DatasourceContext
protected final Map<DatasourceContextIdentifier, Mono<DatasourceContext<Object>>> datasourceContextMonoMap;
protected final Map<DatasourceContextIdentifier, Object> datasourceContextSynchronizationMonitorMap;
protected final Map<DatasourceContextIdentifier, DatasourceContext<?>> datasourceContextMap;

/**
* This cache is used to store the datasource context for a limited time and then destroy the least recently used
* connection.The cleanup process is performed after every 2 hours.
* The purpose of this is to prevent the large number of open dangling connections to the movies mockDB.
* The removalListener method is called when the connection is removed from the cache.
*/
protected final Cache<DatasourceContextIdentifier, DatasourcePluginContext> datasourcePluginContextMapLRUCache =
CacheBuilder.newBuilder()
.removalListener(createRemovalListener())
.expireAfterAccess(2, TimeUnit.HOURS)
.build();

private final DatasourceService datasourceService;
private final DatasourceStorageService datasourceStorageService;
private final PluginService pluginService;
Expand Down Expand Up @@ -67,6 +89,50 @@ public DatasourceContextServiceCEImpl(
this.datasourcePermission = datasourcePermission;
}

private RemovalListener<DatasourceContextIdentifier, DatasourcePluginContext> createRemovalListener() {
return (RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) -> {
handleRemoval(removalNotification);
};
}

private void handleRemoval(
RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) {
final DatasourceContextIdentifier datasourceContextIdentifier = removalNotification.getKey();
final DatasourcePluginContext datasourcePluginContext = removalNotification.getValue();

log.debug(
"Removing Datasource Context from cache and closing the open connection for DatasourceId: {} and environmentId: {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());

// Close connection and remove entry from both cache maps
final Object connection = datasourceContextMap.containsKey(datasourceContextIdentifier)
&& datasourceContextMap.get(datasourceContextIdentifier) != null
? datasourceContextMap.get(datasourceContextIdentifier).getConnection()
: null;

Mono<Plugin> pluginMono =
pluginService.findById(datasourcePluginContext.getPluginId()).cache();
if (connection != null) {
try {
pluginExecutorHelper
.getPluginExecutor(pluginMono)
.flatMap(
pluginExecutor -> Mono.fromRunnable(() -> pluginExecutor.datasourceDestroy(connection)))
.onErrorResume(e -> {
log.error("Error destroying stale datasource connection", e);
return Mono.empty();
})
.subscribe(); // Trigger the execution
} catch (Exception e) {
log.info(Thread.currentThread().getName() + ": Error destroying stale datasource connection", e);
}
}
// Remove the entries from both maps
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
}

/**
* This method defines a critical section that can be executed only by one thread at a time per datasource id - i
* .e. if two threads want to create datasource for different datasource ids then they would not be synchronized.
Expand Down Expand Up @@ -115,6 +181,11 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
}
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as the connection is stale or in error state",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
}

/*
Expand All @@ -129,17 +200,13 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
+ ": Cached resource context mono exists for datasource id {}, environment id {}. Returning the same.",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
log.debug("Accessing the LRU cache to update the last accessed time");
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return datasourceContextMonoMap.get(datasourceContextIdentifier);
}

/* Create a fresh datasource context */
DatasourceContext<Object> datasourceContext = new DatasourceContext<>();
if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
/* For this datasource, either the context doesn't exist, or the context is stale. Replace (or add) with
the new connection in the context map. */
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
}

Mono<Object> connectionMonoCache = pluginExecutor
.datasourceCreate(datasourceStorage.getDatasourceConfiguration())
.cache();
Expand All @@ -159,15 +226,31 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
datasourceContext)
.cache(); /* Cache the value so that further evaluations don't result in new connections */

if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMonoMap.put(datasourceContextIdentifier, datasourceContextMonoCache);
}
log.debug(
Thread.currentThread().getName()
+ ": Cached new datasource context for datasource id {}, environment id {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
return datasourceContextMonoCache;
return connectionMonoCache
.flatMap(connection -> {
datasourceContext.setConnection(connection);
if (datasourceContextIdentifier.isKeyValid()
&& shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
datasourceContextMonoMap.put(
datasourceContextIdentifier, datasourceContextMonoCache);

if (TRUE.equals(datasourceStorage.getIsMock())
&& PluginConstants.PackageName.MONGO_PLUGIN.equals(
plugin.getPackageName())) {
log.info(
"Datasource is a mock mongo DB. Adding the connection to LRU cache!");
DatasourcePluginContext<Object> datasourcePluginContext =
new DatasourcePluginContext<>();
datasourcePluginContext.setConnection(datasourceContext.getConnection());
datasourcePluginContext.setPluginId(plugin.getId());
datasourcePluginContextMapLRUCache.put(
datasourceContextIdentifier, datasourcePluginContext);
}
}
return datasourceContextMonoCache;
})
.switchIfEmpty(datasourceContextMonoCache);
}
})
.flatMap(obj -> obj)
Expand Down Expand Up @@ -195,7 +278,7 @@ public Mono<Object> updateDatasourceAndSetAuthentication(Object connection, Data
.setAuthentication(updatableConnection.getAuthenticationDTO(
datasourceStorage.getDatasourceConfiguration().getAuthentication()));
datasourceStorageMono = datasourceStorageService.updateDatasourceStorage(
datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false);
datasourceStorage, datasourceStorage.getEnvironmentId(), FALSE, false);
}
return datasourceStorageMono.thenReturn(connection);
}
Expand Down Expand Up @@ -308,6 +391,8 @@ public Mono<DatasourceContext<?>> getDatasourceContext(DatasourceStorage datasou
} else {
if (isValidDatasourceContextAvailable(datasourceStorage, datasourceContextIdentifier)) {
log.debug("Resource context exists. Returning the same.");
log.debug("Accessing the LRU cache to update the last accessed time");
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return Mono.just(datasourceContextMap.get(datasourceContextIdentifier));
}
}
Expand Down Expand Up @@ -399,7 +484,11 @@ public Mono<DatasourceContext<?>> deleteDatasourceContext(DatasourceStorage data
log.info("Clearing datasource context for datasource storage ID {}.", datasourceStorage.getId());
pluginExecutor.datasourceDestroy(datasourceContext.getConnection());
datasourceContextMonoMap.remove(datasourceContextIdentifier);

log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as delete datasource context is invoked",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
if (!datasourceContextMap.containsKey(datasourceContextIdentifier)) {
log.info(
"datasourceContextMap does not contain any entry for datasource storage with id: {} ",
Expand Down
Loading