diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java index 7c52a719d02c..960834f9064d 100644 --- a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java @@ -24,6 +24,7 @@ interface PackageName { String APPSMITH_AI_PLUGIN = "appsmithai-plugin"; String DATABRICKS_PLUGIN = "databricks-plugin"; String AWS_LAMBDA_PLUGIN = "aws-lambda-plugin"; + String MONGO_PLUGIN = "mongo-plugin"; } public static final String DEFAULT_REST_DATASOURCE = "DEFAULT_REST_DATASOURCE"; diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java new file mode 100644 index 000000000000..b3385964e2dc --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/DatasourcePluginContext.java @@ -0,0 +1,20 @@ +package com.appsmith.server.domains; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.time.Instant; + +@Getter +@Setter +@ToString +public class DatasourcePluginContext { + private T connection; + private String pluginId; + private Instant creationTime; + + public DatasourcePluginContext() { + creationTime = Instant.now(); + } +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java index 3693d1348c47..adc1d2ea82bb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/DatasourceContextServiceCEImpl.java @@ -13,6 +13,7 @@ import com.appsmith.server.datasourcestorages.base.DatasourceStorageService; import com.appsmith.server.domains.DatasourceContext; import com.appsmith.server.domains.DatasourceContextIdentifier; +import com.appsmith.server.domains.DatasourcePluginContext; import com.appsmith.server.domains.Plugin; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; @@ -20,6 +21,10 @@ import com.appsmith.server.plugins.base.PluginService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.solutions.DatasourcePermission; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -29,8 +34,12 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static java.lang.Boolean.FALSE; +import static java.lang.Boolean.TRUE; + @Slf4j public class DatasourceContextServiceCEImpl implements DatasourceContextServiceCE { @@ -38,6 +47,21 @@ public class DatasourceContextServiceCEImpl implements DatasourceContextServiceC protected final Map>> datasourceContextMonoMap; protected final Map datasourceContextSynchronizationMonitorMap; protected final Map> datasourceContextMap; + + /** + * This cache is used to store the datasource context for a limited time and a limited max number of connections and + * then destroy the least recently used connection. The cleanup process is triggered when the cache is accessed and + * either the time limit or the max connections are reached. + * The purpose of this is to prevent the large number of open dangling connections to the movies mockDB. + * The removalListener method is called when the connection is removed from the cache. + */ + protected final Cache datasourcePluginContextMapLRUCache = + CacheBuilder.newBuilder() + .removalListener(createRemovalListener()) + .expireAfterAccess(2, TimeUnit.HOURS) + .maximumSize(300) // caches most recently used 300 mock connections per pod + .build(); + private final DatasourceService datasourceService; private final DatasourceStorageService datasourceStorageService; private final PluginService pluginService; @@ -67,6 +91,50 @@ public DatasourceContextServiceCEImpl( this.datasourcePermission = datasourcePermission; } + private RemovalListener createRemovalListener() { + return (RemovalNotification removalNotification) -> { + handleRemoval(removalNotification); + }; + } + + private Object getConnectionFromDatasourceContextMap(DatasourceContextIdentifier datasourceContextIdentifier) { + return this.datasourceContextMap.containsKey(datasourceContextIdentifier) + && this.datasourceContextMap.get(datasourceContextIdentifier) != null + ? this.datasourceContextMap.get(datasourceContextIdentifier).getConnection() + : null; + } + + private void handleRemoval( + RemovalNotification removalNotification) { + final DatasourceContextIdentifier datasourceContextIdentifier = removalNotification.getKey(); + final DatasourcePluginContext datasourcePluginContext = removalNotification.getValue(); + + log.debug( + "Removing Datasource Context from cache and closing the open connection for DatasourceId: {} and environmentId: {}", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + log.info("LRU Cache Size after eviction: {}", datasourcePluginContextMapLRUCache.size()); + + // Close connection and remove entry from both cache maps + final Object connection = getConnectionFromDatasourceContextMap(datasourceContextIdentifier); + + Mono pluginMono = + pluginService.findById(datasourcePluginContext.getPluginId()).cache(); + if (connection != null) { + pluginExecutorHelper + .getPluginExecutor(pluginMono) + .flatMap(pluginExecutor -> Mono.fromRunnable(() -> pluginExecutor.datasourceDestroy(connection))) + .onErrorResume(e -> { + log.error("Error destroying stale datasource connection", e); + return Mono.empty(); + }) + .subscribe(); // Trigger the execution + } + // Remove the entries from both maps + datasourceContextMonoMap.remove(datasourceContextIdentifier); + datasourceContextMap.remove(datasourceContextIdentifier); + } + /** * This method defines a critical section that can be executed only by one thread at a time per datasource id - i * .e. if two threads want to create datasource for different datasource ids then they would not be synchronized. @@ -115,6 +183,11 @@ public Mono> getCachedDatasourceContextMono( } datasourceContextMonoMap.remove(datasourceContextIdentifier); datasourceContextMap.remove(datasourceContextIdentifier); + log.info( + "Invalidating the LRU cache entry for datasource id {}, environment id {} as the connection is stale or in error state", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier); } /* @@ -129,17 +202,13 @@ public Mono> getCachedDatasourceContextMono( + ": Cached resource context mono exists for datasource id {}, environment id {}. Returning the same.", datasourceContextIdentifier.getDatasourceId(), datasourceContextIdentifier.getEnvironmentId()); + // Accessing the LRU cache to update the last accessed time + datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier); return datasourceContextMonoMap.get(datasourceContextIdentifier); } /* Create a fresh datasource context */ DatasourceContext datasourceContext = new DatasourceContext<>(); - if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) { - /* For this datasource, either the context doesn't exist, or the context is stale. Replace (or add) with - the new connection in the context map. */ - datasourceContextMap.put(datasourceContextIdentifier, datasourceContext); - } - Mono connectionMonoCache = pluginExecutor .datasourceCreate(datasourceStorage.getDatasourceConfiguration()) .cache(); @@ -159,15 +228,34 @@ public Mono> getCachedDatasourceContextMono( datasourceContext) .cache(); /* Cache the value so that further evaluations don't result in new connections */ - if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) { - datasourceContextMonoMap.put(datasourceContextIdentifier, datasourceContextMonoCache); - } - log.debug( - Thread.currentThread().getName() - + ": Cached new datasource context for datasource id {}, environment id {}", - datasourceContextIdentifier.getDatasourceId(), - datasourceContextIdentifier.getEnvironmentId()); - return datasourceContextMonoCache; + return connectionMonoCache + .flatMap(connection -> { + datasourceContext.setConnection(connection); + if (datasourceContextIdentifier.isKeyValid() + && shouldCacheContextForThisPlugin(plugin)) { + datasourceContextMap.put(datasourceContextIdentifier, datasourceContext); + datasourceContextMonoMap.put( + datasourceContextIdentifier, datasourceContextMonoCache); + + if (TRUE.equals(datasourceStorage.getIsMock()) + && PluginConstants.PackageName.MONGO_PLUGIN.equals( + plugin.getPackageName())) { + log.info( + "Datasource is a mock mongo DB. Adding the connection to LRU cache!"); + DatasourcePluginContext datasourcePluginContext = + new DatasourcePluginContext<>(); + datasourcePluginContext.setConnection(datasourceContext.getConnection()); + datasourcePluginContext.setPluginId(plugin.getId()); + datasourcePluginContextMapLRUCache.put( + datasourceContextIdentifier, datasourcePluginContext); + log.info( + "LRU Cache Size after adding: {}", + datasourcePluginContextMapLRUCache.size()); + } + } + return datasourceContextMonoCache; + }) + .switchIfEmpty(datasourceContextMonoCache); } }) .flatMap(obj -> obj) @@ -195,7 +283,7 @@ public Mono updateDatasourceAndSetAuthentication(Object connection, Data .setAuthentication(updatableConnection.getAuthenticationDTO( datasourceStorage.getDatasourceConfiguration().getAuthentication())); datasourceStorageMono = datasourceStorageService.updateDatasourceStorage( - datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false); + datasourceStorage, datasourceStorage.getEnvironmentId(), FALSE, false); } return datasourceStorageMono.thenReturn(connection); } @@ -308,6 +396,8 @@ public Mono> getDatasourceContext(DatasourceStorage datasou } else { if (isValidDatasourceContextAvailable(datasourceStorage, datasourceContextIdentifier)) { log.debug("Resource context exists. Returning the same."); + // Accessing the LRU cache to update the last accessed time + datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier); return Mono.just(datasourceContextMap.get(datasourceContextIdentifier)); } } @@ -399,7 +489,11 @@ public Mono> deleteDatasourceContext(DatasourceStorage data log.info("Clearing datasource context for datasource storage ID {}.", datasourceStorage.getId()); pluginExecutor.datasourceDestroy(datasourceContext.getConnection()); datasourceContextMonoMap.remove(datasourceContextIdentifier); - + log.info( + "Invalidating the LRU cache entry for datasource id {}, environment id {} as delete datasource context is invoked", + datasourceContextIdentifier.getDatasourceId(), + datasourceContextIdentifier.getEnvironmentId()); + datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier); if (!datasourceContextMap.containsKey(datasourceContextIdentifier)) { log.info( "datasourceContextMap does not contain any entry for datasource storage with id: {} ",