Skip to content

Commit

Permalink
feat: added LRU cache for mockDB connections (#36480)
Browse files Browse the repository at this point in the history
## Description
This PR implements an LRU (Least Recently Used) caching mechanism for
the Mongo mockDB connections which get auto-cleaned up every 2 hours
based on their access time.
This is done to stop the overpopulation of the open dangling connections
to the mockDB resulting in the max connection limit.
The Caching Implementation used is [Google Guave
Cache](https://github.com/google/guava/wiki/CachesExplained).
Also refer - [Working of Guava cache
](https://medium.com/@alxkm/introduction-to-caching-with-google-guava-a-simple-and-flexible-solution-2c721427e72e)

<img width="811" alt="image"
src="https://github.com/user-attachments/assets/5abb3e05-13ea-421e-aaf0-22ac441f68e6">


Fixes #36474

## Automation

/ok-to-test tags="@tag.Sanity"

### 🔍 Cypress test results
<!-- This is an auto-generated comment: Cypress test results  -->
> [!TIP]
> 🟢 🟢 🟢 All cypress tests have passed! 🎉 🎉 🎉
> Workflow run:
<https://github.com/appsmithorg/appsmith/actions/runs/11066462645>
> Commit: 38fcf57
> <a
href="https://internal.appsmith.com/app/cypress-dashboard/rundetails-65890b3c81d7400d08fa9ee5?branch=master&workflowId=11066462645&attempt=1"
target="_blank">Cypress dashboard</a>.
> Tags: `@tag.Sanity`
> Spec:
> <hr>Fri, 27 Sep 2024 08:01:28 UTC
<!-- end of auto-generated comment: Cypress test results  -->


## Communication
Should the DevRel and Marketing teams inform users about this change?
- [ ] Yes
- [ ] No


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced a new constant for the MongoDB plugin to enhance plugin
identification.
- Added a `DatasourcePluginContext` class to encapsulate datasource
plugin context, including connection details and creation time.
- Implemented a caching mechanism for datasource contexts to optimize
connection management and reduce excessive database connections.
	- Added functionality to identify mock MongoDB datasources.

- **Bug Fixes**
- Enhanced cleanup processes for stale connections in the caching
system.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
NilanshBansal authored Sep 27, 2024
1 parent 6368215 commit e6cd973
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface PackageName {
String APPSMITH_AI_PLUGIN = "appsmithai-plugin";
String DATABRICKS_PLUGIN = "databricks-plugin";
String AWS_LAMBDA_PLUGIN = "aws-lambda-plugin";
String MONGO_PLUGIN = "mongo-plugin";
}

public static final String DEFAULT_REST_DATASOURCE = "DEFAULT_REST_DATASOURCE";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.appsmith.server.domains;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;

@Getter
@Setter
@ToString
public class DatasourcePluginContext<T> {
private T connection;
private String pluginId;
private Instant creationTime;

public DatasourcePluginContext() {
creationTime = Instant.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
import com.appsmith.server.datasourcestorages.base.DatasourceStorageService;
import com.appsmith.server.domains.DatasourceContext;
import com.appsmith.server.domains.DatasourceContextIdentifier;
import com.appsmith.server.domains.DatasourcePluginContext;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.PluginExecutorHelper;
import com.appsmith.server.plugins.base.PluginService;
import com.appsmith.server.services.ConfigService;
import com.appsmith.server.solutions.DatasourcePermission;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
Expand All @@ -29,15 +34,34 @@
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;

@Slf4j
public class DatasourceContextServiceCEImpl implements DatasourceContextServiceCE {

// DatasourceContextIdentifier contains datasourceId & environmentId which is mapped to DatasourceContext
protected final Map<DatasourceContextIdentifier, Mono<DatasourceContext<Object>>> datasourceContextMonoMap;
protected final Map<DatasourceContextIdentifier, Object> datasourceContextSynchronizationMonitorMap;
protected final Map<DatasourceContextIdentifier, DatasourceContext<?>> datasourceContextMap;

/**
* This cache is used to store the datasource context for a limited time and a limited max number of connections and
* then destroy the least recently used connection. The cleanup process is triggered when the cache is accessed and
* either the time limit or the max connections are reached.
* The purpose of this is to prevent the large number of open dangling connections to the movies mockDB.
* The removalListener method is called when the connection is removed from the cache.
*/
protected final Cache<DatasourceContextIdentifier, DatasourcePluginContext> datasourcePluginContextMapLRUCache =
CacheBuilder.newBuilder()
.removalListener(createRemovalListener())
.expireAfterAccess(2, TimeUnit.HOURS)
.maximumSize(300) // caches most recently used 300 mock connections per pod
.build();

private final DatasourceService datasourceService;
private final DatasourceStorageService datasourceStorageService;
private final PluginService pluginService;
Expand Down Expand Up @@ -67,6 +91,50 @@ public DatasourceContextServiceCEImpl(
this.datasourcePermission = datasourcePermission;
}

private RemovalListener<DatasourceContextIdentifier, DatasourcePluginContext> createRemovalListener() {
return (RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) -> {
handleRemoval(removalNotification);
};
}

private Object getConnectionFromDatasourceContextMap(DatasourceContextIdentifier datasourceContextIdentifier) {
return this.datasourceContextMap.containsKey(datasourceContextIdentifier)
&& this.datasourceContextMap.get(datasourceContextIdentifier) != null
? this.datasourceContextMap.get(datasourceContextIdentifier).getConnection()
: null;
}

private void handleRemoval(
RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) {
final DatasourceContextIdentifier datasourceContextIdentifier = removalNotification.getKey();
final DatasourcePluginContext datasourcePluginContext = removalNotification.getValue();

log.debug(
"Removing Datasource Context from cache and closing the open connection for DatasourceId: {} and environmentId: {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
log.info("LRU Cache Size after eviction: {}", datasourcePluginContextMapLRUCache.size());

// Close connection and remove entry from both cache maps
final Object connection = getConnectionFromDatasourceContextMap(datasourceContextIdentifier);

Mono<Plugin> pluginMono =
pluginService.findById(datasourcePluginContext.getPluginId()).cache();
if (connection != null) {
pluginExecutorHelper
.getPluginExecutor(pluginMono)
.flatMap(pluginExecutor -> Mono.fromRunnable(() -> pluginExecutor.datasourceDestroy(connection)))
.onErrorResume(e -> {
log.error("Error destroying stale datasource connection", e);
return Mono.empty();
})
.subscribe(); // Trigger the execution
}
// Remove the entries from both maps
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
}

/**
* This method defines a critical section that can be executed only by one thread at a time per datasource id - i
* .e. if two threads want to create datasource for different datasource ids then they would not be synchronized.
Expand Down Expand Up @@ -115,6 +183,11 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
}
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as the connection is stale or in error state",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
}

/*
Expand All @@ -129,17 +202,13 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
+ ": Cached resource context mono exists for datasource id {}, environment id {}. Returning the same.",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
// Accessing the LRU cache to update the last accessed time
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return datasourceContextMonoMap.get(datasourceContextIdentifier);
}

/* Create a fresh datasource context */
DatasourceContext<Object> datasourceContext = new DatasourceContext<>();
if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
/* For this datasource, either the context doesn't exist, or the context is stale. Replace (or add) with
the new connection in the context map. */
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
}

Mono<Object> connectionMonoCache = pluginExecutor
.datasourceCreate(datasourceStorage.getDatasourceConfiguration())
.cache();
Expand All @@ -159,15 +228,34 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
datasourceContext)
.cache(); /* Cache the value so that further evaluations don't result in new connections */

if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMonoMap.put(datasourceContextIdentifier, datasourceContextMonoCache);
}
log.debug(
Thread.currentThread().getName()
+ ": Cached new datasource context for datasource id {}, environment id {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
return datasourceContextMonoCache;
return connectionMonoCache
.flatMap(connection -> {
datasourceContext.setConnection(connection);
if (datasourceContextIdentifier.isKeyValid()
&& shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
datasourceContextMonoMap.put(
datasourceContextIdentifier, datasourceContextMonoCache);

if (TRUE.equals(datasourceStorage.getIsMock())
&& PluginConstants.PackageName.MONGO_PLUGIN.equals(
plugin.getPackageName())) {
log.info(
"Datasource is a mock mongo DB. Adding the connection to LRU cache!");
DatasourcePluginContext<Object> datasourcePluginContext =
new DatasourcePluginContext<>();
datasourcePluginContext.setConnection(datasourceContext.getConnection());
datasourcePluginContext.setPluginId(plugin.getId());
datasourcePluginContextMapLRUCache.put(
datasourceContextIdentifier, datasourcePluginContext);
log.info(
"LRU Cache Size after adding: {}",
datasourcePluginContextMapLRUCache.size());
}
}
return datasourceContextMonoCache;
})
.switchIfEmpty(datasourceContextMonoCache);
}
})
.flatMap(obj -> obj)
Expand Down Expand Up @@ -195,7 +283,7 @@ public Mono<Object> updateDatasourceAndSetAuthentication(Object connection, Data
.setAuthentication(updatableConnection.getAuthenticationDTO(
datasourceStorage.getDatasourceConfiguration().getAuthentication()));
datasourceStorageMono = datasourceStorageService.updateDatasourceStorage(
datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false);
datasourceStorage, datasourceStorage.getEnvironmentId(), FALSE, false);
}
return datasourceStorageMono.thenReturn(connection);
}
Expand Down Expand Up @@ -308,6 +396,8 @@ public Mono<DatasourceContext<?>> getDatasourceContext(DatasourceStorage datasou
} else {
if (isValidDatasourceContextAvailable(datasourceStorage, datasourceContextIdentifier)) {
log.debug("Resource context exists. Returning the same.");
// Accessing the LRU cache to update the last accessed time
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return Mono.just(datasourceContextMap.get(datasourceContextIdentifier));
}
}
Expand Down Expand Up @@ -399,7 +489,11 @@ public Mono<DatasourceContext<?>> deleteDatasourceContext(DatasourceStorage data
log.info("Clearing datasource context for datasource storage ID {}.", datasourceStorage.getId());
pluginExecutor.datasourceDestroy(datasourceContext.getConnection());
datasourceContextMonoMap.remove(datasourceContextIdentifier);

log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as delete datasource context is invoked",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
if (!datasourceContextMap.containsKey(datasourceContextIdentifier)) {
log.info(
"datasourceContextMap does not contain any entry for datasource storage with id: {} ",
Expand Down

0 comments on commit e6cd973

Please sign in to comment.