From e6cd97318d294d85512e65a79c196e9e513001b7 Mon Sep 17 00:00:00 2001 From: Nilansh Bansal Date: Fri, 27 Sep 2024 14:00:01 +0530 Subject: [PATCH] feat: added LRU cache for mockDB connections (#36480) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description This PR implements an LRU (Least Recently Used) caching mechanism for the Mongo mockDB connections which get auto-cleaned up every 2 hours based on their access time. This is done to stop the overpopulation of the open dangling connections to the mockDB resulting in the max connection limit. The Caching Implementation used is [Google Guave Cache](https://github.com/google/guava/wiki/CachesExplained). Also refer - [Working of Guava cache ](https://medium.com/@alxkm/introduction-to-caching-with-google-guava-a-simple-and-flexible-solution-2c721427e72e) image Fixes #36474 ## Automation /ok-to-test tags="@tag.Sanity" ### :mag: Cypress test results > [!TIP] > 🟢 🟢 🟢 All cypress tests have passed! 🎉 🎉 🎉 > Workflow run: > Commit: 38fcf572b32f1b5d7544828ccf00f2b6fbaa180e > Cypress dashboard. > Tags: `@tag.Sanity` > Spec: >
Fri, 27 Sep 2024 08:01:28 UTC ## Communication Should the DevRel and Marketing teams inform users about this change? - [ ] Yes - [ ] No ## Summary by CodeRabbit - **New Features** - Introduced a new constant for the MongoDB plugin to enhance plugin identification. - Added a `DatasourcePluginContext` class to encapsulate datasource plugin context, including connection details and creation time. - Implemented a caching mechanism for datasource contexts to optimize connection management and reduce excessive database connections. - Added functionality to identify mock MongoDB datasources. - **Bug Fixes** - Enhanced cleanup processes for stale connections in the caching system. --- .../external/constants/PluginConstants.java | 1 + .../domains/DatasourcePluginContext.java | 20 +++ .../ce/DatasourceContextServiceCEImpl.java | 128 +++++++++++++++--- 3 files changed, 132 insertions(+), 17 deletions(-) create mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java index 7c52a719d02c..960834f9064d 100644 --- a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java @@ -24,6 +24,7 @@ interface PackageName { String APPSMITH_AI_PLUGIN = "appsmithai-plugin"; String DATABRICKS_PLUGIN = "databricks-plugin"; String AWS_LAMBDA_PLUGIN = "aws-lambda-plugin"; + String MONGO_PLUGIN = "mongo-plugin"; } public static final String DEFAULT_REST_DATASOURCE = "DEFAULT_REST_DATASOURCE"; diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java new file mode 100644 index 000000000000..b3385964e2dc --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java @@ -0,0 +1,20 @@ +package com.appsmith.server.domains; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.time.Instant; + +@Getter +@Setter +@ToString +public class DatasourcePluginContext { + private T connection; + private String pluginId; + private Instant creationTime; + + public DatasourcePluginContext() { + creationTime = Instant.now(); + } +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java index 3693d1348c47..adc1d2ea82bb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java @@ -13,6 +13,7 @@ import com.appsmith.server.datasourcestorages.base.DatasourceStorageService; import com.appsmith.server.domains.DatasourceContext; import com.appsmith.server.domains.DatasourceContextIdentifier; +import com.appsmith.server.domains.DatasourcePluginContext; import com.appsmith.server.domains.Plugin; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; @@ -20,6 +21,10 @@ import com.appsmith.server.plugins.base.PluginService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.solutions.DatasourcePermission; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -29,8 +34,12 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static java.lang.Boolean.FALSE; +import static java.lang.Boolean.TRUE; + @Slf4j public class DatasourceContextServiceCEImpl implements DatasourceContextServiceCE { @@ -38,6 +47,21 @@ public class DatasourceContextServiceCEImpl implements DatasourceContextServiceC protected final Map>> datasourceContextMonoMap; protected final Map datasourceContextSynchronizationMonitorMap; protected final Map> datasourceContextMap; + + /** + * This cache is used to store the datasource context for a limited time and a limited max number of connections and + * then destroy the least recently used connection. The cleanup process is triggered when the cache is accessed and + * either the time limit or the max connections are reached. + * The purpose of this is to prevent the large number of open dangling connections to the movies mockDB. + * The removalListener method is called when the connection is removed from the cache. + */ + protected final Cache datasourcePluginContextMapLRUCache = + CacheBuilder.newBuilder() + .removalListener(createRemovalListener()) + .expireAfterAccess(2, TimeUnit.HOURS) + .maximumSize(300) // caches most recently used 300 mock connections per pod + .build(); + private final DatasourceService datasourceService; private final DatasourceStorageService datasourceStorageService; private final PluginService pluginService; @@ -67,6 +91,50 @@ public DatasourceContextServiceCEImpl( this.datasourcePermission = datasourcePermission; } + private RemovalListener createRemovalListener() { + return (RemovalNotification removalNotification) -> { + handleRemoval(removalNotification); + }; + } + + private Object getConnectionFromDatasourceContextMap(DatasourceContextIdentifier datasourceContextIdentifier) { + return this.datasourceContextMap.containsKey(datasourceContextIdentifier) + && this.datasourceContextMap.get(datasourceContextIdentifier) != null + ? this.datasourceContextMap.get(datasourceContextIdentifier).getConnection() + : null; + } + + private void handleRemoval( + RemovalNotification removalNotification) { + final DatasourceContextIdentifier datasourceContextIdentifier = removalNotification.getKey(); + final DatasourcePluginContext datasourcePluginContext = removalNotification.getValue(); + + log.debug( + "Removing Datasource Context from cache and closing the open connection for DatasourceId: {} and environmentId: {}", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + log.info("LRU Cache Size after eviction: {}", datasourcePluginContextMapLRUCache.size()); + + // Close connection and remove entry from both cache maps + final Object connection = getConnectionFromDatasourceContextMap(datasourceContextIdentifier); + + Mono pluginMono = + pluginService.findById(datasourcePluginContext.getPluginId()).cache(); + if (connection != null) { + pluginExecutorHelper + .getPluginExecutor(pluginMono) + .flatMap(pluginExecutor -> Mono.fromRunnable(() -> pluginExecutor.datasourceDestroy(connection))) + .onErrorResume(e -> { + log.error("Error destroying stale datasource connection", e); + return Mono.empty(); + }) + .subscribe(); // Trigger the execution + } + // Remove the entries from both maps + datasourceContextMonoMap.remove(datasourceContextIdentifier); + datasourceContextMap.remove(datasourceContextIdentifier); + } + /** * This method defines a critical section that can be executed only by one thread at a time per datasource id - i * .e. if two threads want to create datasource for different datasource ids then they would not be synchronized. @@ -115,6 +183,11 @@ public Mono> getCachedDatasourceContextMono( } datasourceContextMonoMap.remove(datasourceContextIdentifier); datasourceContextMap.remove(datasourceContextIdentifier); + log.info( + "Invalidating the LRU cache entry for datasource id {}, environment id {} as the connection is stale or in error state", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier); } /* @@ -129,17 +202,13 @@ public Mono> getCachedDatasourceContextMono( + ": Cached resource context mono exists for datasource id {}, environment id {}. Returning the same.", datasourceContextIdentifier.getDatasourceId(), datasourceContextIdentifier.getEnvironmentId()); + // Accessing the LRU cache to update the last accessed time + datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier); return datasourceContextMonoMap.get(datasourceContextIdentifier); } /* Create a fresh datasource context */ DatasourceContext datasourceContext = new DatasourceContext<>(); - if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) { - /* For this datasource, either the context doesn't exist, or the context is stale. Replace (or add) with - the new connection in the context map. */ - datasourceContextMap.put(datasourceContextIdentifier, datasourceContext); - } - Mono connectionMonoCache = pluginExecutor .datasourceCreate(datasourceStorage.getDatasourceConfiguration()) .cache(); @@ -159,15 +228,34 @@ public Mono> getCachedDatasourceContextMono( datasourceContext) .cache(); /* Cache the value so that further evaluations don't result in new connections */ - if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) { - datasourceContextMonoMap.put(datasourceContextIdentifier, datasourceContextMonoCache); - } - log.debug( - Thread.currentThread().getName() - + ": Cached new datasource context for datasource id {}, environment id {}", - datasourceContextIdentifier.getDatasourceId(), - datasourceContextIdentifier.getEnvironmentId()); - return datasourceContextMonoCache; + return connectionMonoCache + .flatMap(connection -> { + datasourceContext.setConnection(connection); + if (datasourceContextIdentifier.isKeyValid() + && shouldCacheContextForThisPlugin(plugin)) { + datasourceContextMap.put(datasourceContextIdentifier, datasourceContext); + datasourceContextMonoMap.put( + datasourceContextIdentifier, datasourceContextMonoCache); + + if (TRUE.equals(datasourceStorage.getIsMock()) + && PluginConstants.PackageName.MONGO_PLUGIN.equals( + plugin.getPackageName())) { + log.info( + "Datasource is a mock mongo DB. Adding the connection to LRU cache!"); + DatasourcePluginContext datasourcePluginContext = + new DatasourcePluginContext<>(); + datasourcePluginContext.setConnection(datasourceContext.getConnection()); + datasourcePluginContext.setPluginId(plugin.getId()); + datasourcePluginContextMapLRUCache.put( + datasourceContextIdentifier, datasourcePluginContext); + log.info( + "LRU Cache Size after adding: {}", + datasourcePluginContextMapLRUCache.size()); + } + } + return datasourceContextMonoCache; + }) + .switchIfEmpty(datasourceContextMonoCache); } }) .flatMap(obj -> obj) @@ -195,7 +283,7 @@ public Mono updateDatasourceAndSetAuthentication(Object connection, Data .setAuthentication(updatableConnection.getAuthenticationDTO( datasourceStorage.getDatasourceConfiguration().getAuthentication())); datasourceStorageMono = datasourceStorageService.updateDatasourceStorage( - datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false); + datasourceStorage, datasourceStorage.getEnvironmentId(), FALSE, false); } return datasourceStorageMono.thenReturn(connection); } @@ -308,6 +396,8 @@ public Mono> getDatasourceContext(DatasourceStorage datasou } else { if (isValidDatasourceContextAvailable(datasourceStorage, datasourceContextIdentifier)) { log.debug("Resource context exists. Returning the same."); + // Accessing the LRU cache to update the last accessed time + datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier); return Mono.just(datasourceContextMap.get(datasourceContextIdentifier)); } } @@ -399,7 +489,11 @@ public Mono> deleteDatasourceContext(DatasourceStorage data log.info("Clearing datasource context for datasource storage ID {}.", datasourceStorage.getId()); pluginExecutor.datasourceDestroy(datasourceContext.getConnection()); datasourceContextMonoMap.remove(datasourceContextIdentifier); - + log.info( + "Invalidating the LRU cache entry for datasource id {}, environment id {} as delete datasource context is invoked", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier); if (!datasourceContextMap.containsKey(datasourceContextIdentifier)) { log.info( "datasourceContextMap does not contain any entry for datasource storage with id: {} ",