From 6c5aca34de7a52e9ef72aab46d6b20ace59206fe Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Tue, 11 Feb 2025 15:37:32 +0530 Subject: [PATCH 01/16] feat: Restrict cron execution for single pod in clustered environment --- .../caching/annotations/DistributedLock.java | 16 +++++ .../aspects/DistributedLockAspect.java | 63 +++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java create mode 100644 app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java new file mode 100644 index 000000000000..0df1e7c1add5 --- /dev/null +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java @@ -0,0 +1,16 @@ +package com.appsmith.caching.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface DistributedLock { + String key(); + + long ttl() default 5 * 60; // Default TTL: 5 minutes + + boolean shouldReleaseLock() default true; +} diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java new file mode 100644 index 000000000000..9b95805d411a --- /dev/null +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -0,0 +1,63 @@ +package com.appsmith.caching.aspects; + +import com.appsmith.caching.annotations.DistributedLock; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.lang.reflect.Method; +import java.time.Duration; + +@Aspect +@Component +@Slf4j +public class DistributedLockAspect { + private final ReactiveRedisOperations redisOperations; + + public DistributedLockAspect(ReactiveRedisOperations redisOperations) { + this.redisOperations = redisOperations; + } + + // Method to acquire a distributed lock before executing the annotated method. + @Around("@annotation(lockAnnotation)") + public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation) throws Throwable { + // Check method return type + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Class returnType = method.getReturnType(); + boolean isReactive = returnType.isAssignableFrom(Mono.class) || returnType.isAssignableFrom(Flux.class); + if (isReactive) { + // If method does returns Mono or Flux raise exception + throw new IllegalAccessException( + "Invalid usage of @DistributedLock annotation. Only non-reactive objects are supported for locking."); + } + return handleBlocking(joinPoint, lockAnnotation); + } + + private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { + String lockKey = "lock:" + lock.key(); + long ttl = lock.ttl(); // Time-to-live for the lock + Boolean acquired = redisOperations + .opsForValue() + .setIfAbsent(lockKey, "locked", Duration.ofSeconds(ttl)) + .block(); // Blocking call + + if (Boolean.TRUE.equals(acquired)) { + try { + return joinPoint.proceed(); // Execute method + } finally { + if (lock.shouldReleaseLock()) { + redisOperations.delete(lock.key()).block(); // Release lock + } + } + } else { + return null; // Skip execution if another pod holds the lock + } + } +} From 2f4aa9fd65cdd29150053e30d8bbe5cbb27f3e39 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Tue, 11 Feb 2025 16:01:59 +0530 Subject: [PATCH 02/16] chore: Run cron on all the tenants present in DB --- .../com/appsmith/server/domains/Tenant.java | 7 ++ .../server/domains/TenantConfiguration.java | 6 +- .../domains/ce/TenantConfigurationCE.java | 4 + .../ce/CustomTenantRepositoryCE.java | 5 +- .../ce/CustomTenantRepositoryCEImpl.java | 15 +++- .../services/ce/FeatureFlagServiceCE.java | 2 +- .../services/ce/FeatureFlagServiceCEImpl.java | 77 ++++++++++--------- .../server/services/ce/TenantServiceCE.java | 3 + .../services/ce/TenantServiceCEImpl.java | 29 ++++--- .../solutions/ce/PingScheduledTaskCEImpl.java | 8 +- .../solutions/ce/ScheduledTaskCEImpl.java | 18 +++-- .../services/ce/FeatureFlagServiceCETest.java | 6 +- .../caching/annotations/DistributedLock.java | 4 + .../aspects/DistributedLockAspect.java | 24 +++--- 14 files changed, 135 insertions(+), 73 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java index f4a005bbff76..2b82bff8d3bd 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java @@ -12,6 +12,8 @@ import java.io.Serializable; +import static com.appsmith.external.helpers.StringUtils.dotted; + @Getter @Setter @ToString @@ -35,4 +37,9 @@ public class Tenant extends BaseDomain implements Serializable { TenantConfiguration tenantConfiguration; // TODO add SSO and other configurations here after migrating from environment variables to database configuration + + public static class Fields { + public static final String tenantConfiguration_isRestartRequired = + dotted(tenantConfiguration, TenantConfiguration.Fields.isRestartRequired); + } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java index e4b293b814a7..2fc766b5b1b5 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java @@ -3,7 +3,11 @@ import com.appsmith.server.domains.ce.TenantConfigurationCE; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.experimental.FieldNameConstants; @Data @EqualsAndHashCode(callSuper = true) -public class TenantConfiguration extends TenantConfigurationCE {} +@FieldNameConstants +public class TenantConfiguration extends TenantConfigurationCE { + public static class Fields extends TenantConfigurationCE.Fields {} +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java index 74a62527e47c..0b29f1439aee 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java @@ -8,6 +8,7 @@ import com.appsmith.server.domains.TenantConfiguration; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; +import lombok.experimental.FieldNameConstants; import org.apache.commons.lang3.ObjectUtils; import java.io.Serializable; @@ -16,6 +17,7 @@ import java.util.Map; @Data +@FieldNameConstants public class TenantConfigurationCE implements Serializable { private String googleMapsKey; @@ -86,4 +88,6 @@ public void copyNonSensitiveValues(TenantConfiguration tenantConfiguration) { public Boolean isEmailVerificationEnabled() { return Boolean.TRUE.equals(this.emailVerificationEnabled); } + + public static class Fields {} } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java index e9cc3b92c097..510f53162c1b 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java @@ -2,5 +2,8 @@ import com.appsmith.server.domains.Tenant; import com.appsmith.server.repositories.AppsmithRepository; +import reactor.core.publisher.Mono; -public interface CustomTenantRepositoryCE extends AppsmithRepository {} +public interface CustomTenantRepositoryCE extends AppsmithRepository { + Mono disableRestartForAllTenants(); +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java index 2b56879bdef8..6d59ec8b6405 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java @@ -1,9 +1,22 @@ package com.appsmith.server.repositories.ce; import com.appsmith.server.domains.Tenant; +import com.appsmith.server.helpers.ce.bridge.Bridge; import com.appsmith.server.repositories.BaseAppsmithRepositoryImpl; import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import static com.appsmith.server.domains.Tenant.Fields.tenantConfiguration_isRestartRequired; @Slf4j public class CustomTenantRepositoryCEImpl extends BaseAppsmithRepositoryImpl - implements CustomTenantRepositoryCE {} + implements CustomTenantRepositoryCE { + + @Override + public Mono disableRestartForAllTenants() { + log.info("Disabling restart for all tenants"); + return queryBuilder() + .criteria(Bridge.isTrue(tenantConfiguration_isRestartRequired)) + .updateAll(Bridge.update().set(tenantConfiguration_isRestartRequired, false)); + } +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java index c10ba8c38352..ee15169fa65a 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java @@ -29,7 +29,7 @@ public interface FeatureFlagServiceCE { * To get all features of the tenant from Cloud Services and store them locally * @return Mono of Void */ - Mono getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations(); + Mono getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations(Tenant tenant); /** * To get all features of the current tenant. diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index fda8d2da3764..05b47c87ae87 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -38,7 +38,7 @@ public class FeatureFlagServiceCEImpl implements FeatureFlagServiceCE { private final FeatureFlagMigrationHelper featureFlagMigrationHelper; private static final long FEATURE_FLAG_CACHE_TIME_MIN = 120; - private CachedFeatures cachedTenantFeatureFlags; + private Map cachedTenantFeatureFlags = new HashMap<>(); /** * This function checks if the feature is enabled for the current user. In case the user object is not present, @@ -116,39 +116,36 @@ private Mono> getAllRemoteFeatureFlagsForUser() { /** * To get all features of the tenant from Cloud Services and store them locally - * @return Mono of Void + * @return Mono updated tenant */ - public Mono getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations() { - return tenantService - .getDefaultTenant() - .flatMap(defaultTenant -> - // 1. Fetch current/saved feature flags from cache - // 2. Force update the tenant flags keeping existing flags as fallback in case the API - // call to fetch the flags fails for some reason - // 3. Get the diff and update the flags with pending migrations to be used to run - // migrations selectively - featureFlagMigrationHelper - .getUpdatedFlagsWithPendingMigration(defaultTenant) - .flatMap(featureFlagWithPendingMigrations -> { - TenantConfiguration tenantConfig = defaultTenant.getTenantConfiguration() == null - ? new TenantConfiguration() - : defaultTenant.getTenantConfiguration(); - // We expect the featureFlagWithPendingMigrations to be empty hence - // verifying only for null - if (featureFlagWithPendingMigrations != null - && !featureFlagWithPendingMigrations.equals( - tenantConfig.getFeaturesWithPendingMigration())) { - tenantConfig.setFeaturesWithPendingMigration(featureFlagWithPendingMigrations); - if (!featureFlagWithPendingMigrations.isEmpty()) { - tenantConfig.setMigrationStatus(MigrationStatus.PENDING); - } else { - tenantConfig.setMigrationStatus(MigrationStatus.COMPLETED); - } - return tenantService.update(defaultTenant.getId(), defaultTenant); - } - return Mono.just(defaultTenant); - })) - .then(); + @Override + public Mono getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations(Tenant tenant) { + // 1. Fetch current/saved feature flags from cache + // 2. Force update the tenant flags keeping existing flags as fallback in case the API + // call to fetch the flags fails for some reason + // 3. Get the diff and update the flags with pending migrations to be used to run + // migrations selectively + return featureFlagMigrationHelper + .getUpdatedFlagsWithPendingMigration(tenant) + .flatMap(featureFlagWithPendingMigrations -> { + TenantConfiguration tenantConfig = tenant.getTenantConfiguration() == null + ? new TenantConfiguration() + : tenant.getTenantConfiguration(); + // We expect the featureFlagWithPendingMigrations to be empty hence + // verifying only for null + if (featureFlagWithPendingMigrations != null + && !featureFlagWithPendingMigrations.equals( + tenantConfig.getFeaturesWithPendingMigration())) { + tenantConfig.setFeaturesWithPendingMigration(featureFlagWithPendingMigrations); + if (!featureFlagWithPendingMigrations.isEmpty()) { + tenantConfig.setMigrationStatus(MigrationStatus.PENDING); + } else { + tenantConfig.setMigrationStatus(MigrationStatus.COMPLETED); + } + return tenantService.update(tenant.getId(), tenant); + } + return Mono.just(tenant); + }); } /** @@ -158,9 +155,11 @@ public Mono getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingM public Mono> getTenantFeatures() { return tenantService .getDefaultTenantId() - .flatMap(cacheableFeatureFlagHelper::fetchCachedTenantFeatures) - .map(cachedFeatures -> { - cachedTenantFeatureFlags = cachedFeatures; + .zipWhen(cacheableFeatureFlagHelper::fetchCachedTenantFeatures) + .map(tuple2 -> { + String tenantId = tuple2.getT1(); + CachedFeatures cachedFeatures = tuple2.getT2(); + cachedTenantFeatureFlags.put(tenantId, cachedFeatures); return cachedFeatures.getFeatures(); }) .switchIfEmpty(Mono.just(new HashMap<>())); @@ -178,6 +177,10 @@ public Mono checkAndExecuteMigrationsForTenantFeatureFlags(Tenant tenant @Override public CachedFeatures getCachedTenantFeatureFlags() { - return this.cachedTenantFeatureFlags; + // TODO Avoid blocking call + return tenantService + .getDefaultTenantId() + .map(id -> this.cachedTenantFeatureFlags.get(id)) + .block(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java index b8fb796bc630..6dcfcd218333 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java @@ -4,6 +4,7 @@ import com.appsmith.server.domains.Tenant; import com.appsmith.server.domains.TenantConfiguration; import com.appsmith.server.services.CrudService; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface TenantServiceCE extends CrudService { @@ -29,4 +30,6 @@ public interface TenantServiceCE extends CrudService { Mono retrieveById(String id); Mono restartTenant(); + + Flux findAll(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java index 2592b08a00fe..226eea298fe4 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java @@ -25,6 +25,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.util.StringUtils; import reactor.core.observability.micrometer.Micrometer; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Map; @@ -313,17 +314,18 @@ public Mono update(String tenantId, Tenant tenant) { */ @Override public Mono restartTenant() { - // Avoid dependency on user context as this method will be called internally by the server - Mono defaultTenantMono = this.getDefaultTenantId().flatMap(this::retrieveById); - return defaultTenantMono.flatMap(updatedTenant -> { - if (TRUE.equals(updatedTenant.getTenantConfiguration().getIsRestartRequired())) { - log.debug("Triggering tenant restart after the feature flag migrations are executed"); - TenantConfiguration tenantConfiguration = updatedTenant.getTenantConfiguration(); - tenantConfiguration.setIsRestartRequired(false); - return this.update(updatedTenant.getId(), updatedTenant).then(envManager.restartWithoutAclCheck()); - } - return Mono.empty(); - }); + // TODO remove this method once we move the form login env to DB variable which is currently required as a part + // of downgrade migration for SSO + return this.findAll() + .filter(tenant -> TRUE.equals(tenant.getTenantConfiguration().getIsRestartRequired())) + .take(1) + .hasElements() + .flatMap(hasElement -> { + if (hasElement) { + return repository.disableRestartForAllTenants().then(envManager.restartWithoutAclCheck()); + } + return Mono.empty(); + }); } private boolean isMigrationRequired(Tenant tenant) { @@ -335,4 +337,9 @@ private boolean isMigrationRequired(Tenant tenant) { && !MigrationStatus.COMPLETED.equals( tenant.getTenantConfiguration().getMigrationStatus()))); } + + @Override + public Flux findAll() { + return repository.findAll(); + } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java index f33982a7d160..3ca4e6a55605 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java @@ -1,5 +1,6 @@ package com.appsmith.server.solutions.ce; +import com.appsmith.caching.annotations.DistributedLock; import com.appsmith.server.acl.AclPermission; import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.configurations.DeploymentProperties; @@ -19,7 +20,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.http.MediaType; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.reactive.function.BodyInserters; @@ -43,7 +43,6 @@ */ @Slf4j @RequiredArgsConstructor -@ConditionalOnExpression("!${is.cloud-hosting:false}") public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { private final ConfigService configService; @@ -74,6 +73,10 @@ enum UserTrackingType { */ // Number of milliseconds between the start of each scheduled calls to this method. @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 6 * 60 * 60 * 1000 /* six hours */) + @DistributedLock( + key = "pingSchedule", + ttl = 5 * 60 * 60, // 5 hours + shouldReleaseLock = false) @Observed(name = "pingSchedule") public void pingSchedule() { if (commonConfig.isTelemetryDisabled()) { @@ -124,6 +127,7 @@ private Mono doPing(String instanceId, String ipAddress) { // Number of milliseconds between the start of each scheduled calls to this method. @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 24 * 60 * 60 * 1000 /* a day */) + @DistributedLock(key = "pingStats", ttl = 12 * 60 * 60, shouldReleaseLock = false) @Observed(name = "pingStats") public void pingStats() { if (commonConfig.isTelemetryDisabled()) { diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index 9d385f2655fb..8b5709f440b0 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -1,5 +1,7 @@ package com.appsmith.server.solutions.ce; +import com.appsmith.caching.annotations.DistributedLock; +import com.appsmith.server.domains.Tenant; import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.TenantService; @@ -8,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; @RequiredArgsConstructor @Slf4j @@ -19,16 +22,19 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { private final TenantService tenantService; @Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */) + @DistributedLock( + key = "fetchFeatures", + ttl = 20 * 60, // 20 minutes + shouldReleaseLock = false) // Ensure only one pod executes this @Observed(name = "fetchFeatures") public void fetchFeatures() { log.info("Fetching features for default tenant"); - featureFlagService - .getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations() - .then(tenantService - .getDefaultTenant() - .flatMap(featureFlagService::checkAndExecuteMigrationsForTenantFeatureFlags) - .then(tenantService.restartTenant())) + Flux tenantFlux = tenantService.findAll(); + tenantFlux + .flatMap(featureFlagService::getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap(featureFlagService::checkAndExecuteMigrationsForTenantFeatureFlags) .doOnError(error -> log.error("Error while fetching tenant feature flags", error)) + .then(tenantService.restartTenant()) .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); } diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java index e4b875e2672f..1d7e5809f6c6 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java @@ -206,7 +206,7 @@ public void evictFeatures_withTenantIdentifier_redisKeyDoesNotExist() { .thenReturn(Mono.just(new HashMap<>())); featureFlagService - .getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations() + .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() .block(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { @@ -226,7 +226,7 @@ public void evictFeatures_withTenantIdentifier_redisKeyDoesNotExist() { .thenReturn(Mono.just(Map.of(TENANT_TEST_FEATURE, DISABLE))); featureFlagService - .getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations() + .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() .block(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { @@ -245,7 +245,7 @@ public void getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrati .thenReturn(Mono.just(Map.of(TENANT_TEST_FEATURE, ENABLE))); featureFlagService - .getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrations() + .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() .block(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java index 0df1e7c1add5..8276175ad009 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java @@ -10,6 +10,10 @@ public @interface DistributedLock { String key(); + // Time-to-live for the lock in seconds. + // - If the method execution takes longer than this TTL, the lock will be released automatically. + // - If the locking is used for cron jobs, make sure the TTL is less than the delay between 2 runs to refresh the + // status for every run. long ttl() default 5 * 60; // Default TTL: 5 minutes boolean shouldReleaseLock() default true; diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 9b95805d411a..069eb0ffe8fc 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -6,21 +6,26 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; -import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.data.redis.core.RedisOperations; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.lang.reflect.Method; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; @Aspect @Component @Slf4j public class DistributedLockAspect { - private final ReactiveRedisOperations redisOperations; - public DistributedLockAspect(ReactiveRedisOperations redisOperations) { + private final RedisOperations redisOperations; + + private static final String LOCK_PREFIX = "lock:"; + + public DistributedLockAspect(RedisOperations redisOperations) { this.redisOperations = redisOperations; } @@ -35,25 +40,24 @@ public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotati if (isReactive) { // If method does returns Mono or Flux raise exception throw new IllegalAccessException( - "Invalid usage of @DistributedLock annotation. Only non-reactive objects are supported for locking."); + "Invalid usage of @DistributedLock annotation. Only non-reactive methods are supported for locking."); } return handleBlocking(joinPoint, lockAnnotation); } private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { - String lockKey = "lock:" + lock.key(); + String lockKey = LOCK_PREFIX + lock.key(); long ttl = lock.ttl(); // Time-to-live for the lock - Boolean acquired = redisOperations - .opsForValue() - .setIfAbsent(lockKey, "locked", Duration.ofSeconds(ttl)) - .block(); // Blocking call + String value = "locked until " + + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); // Value to set in the lock key + Boolean acquired = redisOperations.opsForValue().setIfAbsent(lockKey, value, Duration.ofSeconds(ttl)); if (Boolean.TRUE.equals(acquired)) { try { return joinPoint.proceed(); // Execute method } finally { if (lock.shouldReleaseLock()) { - redisOperations.delete(lock.key()).block(); // Release lock + redisOperations.delete(lock.key()); // Release lock } } } else { From d96aebb39a96b3b2138b25eefd08706244009d4a Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 12 Feb 2025 11:22:52 +0530 Subject: [PATCH 03/16] fix: Testcases and build failures --- .../server/repositories/BaseRepository.java | 8 +++++++ .../repositories/BaseRepositoryImpl.java | 7 +++++++ .../server/services/ce/TenantServiceCE.java | 2 +- .../services/ce/TenantServiceCEImpl.java | 6 +++--- .../solutions/ce/ScheduledTaskCEImpl.java | 2 +- .../services/ce/FeatureFlagServiceCETest.java | 21 +++++++++++-------- .../aspects/DistributedLockAspect.java | 2 +- 7 files changed, 33 insertions(+), 15 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java index 44af581b6008..b3b15fb95106 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java @@ -2,6 +2,7 @@ import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.repository.NoRepositoryBean; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.Serializable; @@ -34,4 +35,11 @@ public interface BaseRepository extends ReactiveMong * @return */ Mono archiveAllById(Collection ids); + + /** + * This function retrieves all the documents in the collection without the user context. This method will be used + * by internal server workflows like crons etc. to fetch all the documents from the collection. + * @return + */ + Flux retrieveAll(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java index 365e58d19446..5088ac6526ca 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java @@ -101,6 +101,13 @@ public Flux findAll() { }); } + @Override + public Flux retrieveAll() { + Query query = new Query(notDeleted()); + return mongoOperations.find( + query.cursorBatchSize(10000), entityInformation.getJavaType(), entityInformation.getCollectionName()); + } + /** * We don't use this today, it doesn't use our `notDeleted` criteria, and since we don't use it, we're not porting * it to Postgres. Querying with `queryBuilder` or anything else is arguably more readable than this. diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java index 6dcfcd218333..ad7c6ddcf79c 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java @@ -31,5 +31,5 @@ public interface TenantServiceCE extends CrudService { Mono restartTenant(); - Flux findAll(); + Flux retrieveAll(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java index 226eea298fe4..ced44f4fcb9b 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java @@ -316,7 +316,7 @@ public Mono update(String tenantId, Tenant tenant) { public Mono restartTenant() { // TODO remove this method once we move the form login env to DB variable which is currently required as a part // of downgrade migration for SSO - return this.findAll() + return this.retrieveAll() .filter(tenant -> TRUE.equals(tenant.getTenantConfiguration().getIsRestartRequired())) .take(1) .hasElements() @@ -339,7 +339,7 @@ private boolean isMigrationRequired(Tenant tenant) { } @Override - public Flux findAll() { - return repository.findAll(); + public Flux retrieveAll() { + return repository.retrieveAll(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index 8b5709f440b0..ab78c396d039 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -29,7 +29,7 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { @Observed(name = "fetchFeatures") public void fetchFeatures() { log.info("Fetching features for default tenant"); - Flux tenantFlux = tenantService.findAll(); + Flux tenantFlux = tenantService.retrieveAll(); tenantFlux .flatMap(featureFlagService::getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations) .flatMap(featureFlagService::checkAndExecuteMigrationsForTenantFeatureFlags) diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java index 1d7e5809f6c6..fde50e72a285 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java @@ -205,9 +205,10 @@ public void evictFeatures_withTenantIdentifier_redisKeyDoesNotExist() { Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(new HashMap<>())); - featureFlagService - .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + tenantService + .retrieveAll() + .flatMap(featureFlagService::getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { assertThat(tenant.getTenantConfiguration().getFeaturesWithPendingMigration()) @@ -225,9 +226,10 @@ public void evictFeatures_withTenantIdentifier_redisKeyDoesNotExist() { Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(Map.of(TENANT_TEST_FEATURE, DISABLE))); - featureFlagService - .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + tenantService + .retrieveAll() + .flatMap(featureFlagService::getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { assertThat(tenant.getTenantConfiguration().getFeaturesWithPendingMigration()) @@ -244,9 +246,10 @@ public void getAllRemoteFeaturesForTenantAndUpdateFeatureFlagsWithPendingMigrati Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(Map.of(TENANT_TEST_FEATURE, ENABLE))); - featureFlagService - .getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + tenantService + .retrieveAll() + .flatMap(featureFlagService::getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(tenantService.getDefaultTenant()) .assertNext(tenant -> { assertThat(tenant.getTenantConfiguration().getFeaturesWithPendingMigration()) diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 069eb0ffe8fc..201298eff72c 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -38,7 +38,7 @@ public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotati Class returnType = method.getReturnType(); boolean isReactive = returnType.isAssignableFrom(Mono.class) || returnType.isAssignableFrom(Flux.class); if (isReactive) { - // If method does returns Mono or Flux raise exception + // Locking for reactive methods can be added with reactive Redis operations if needed. throw new IllegalAccessException( "Invalid usage of @DistributedLock annotation. Only non-reactive methods are supported for locking."); } From 267c9ed96d72b68528e094d8b216b463919be0cc Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 12 Feb 2025 16:56:00 +0530 Subject: [PATCH 04/16] chore: Refactor instance config --- .../server/configurations/InstanceConfig.java | 10 ++------- .../ce/InstanceConfigHelperCEImpl.java | 21 ++++++++++++++++++- .../services/ce/FeatureFlagServiceCE.java | 2 ++ .../services/ce/FeatureFlagServiceCEImpl.java | 16 ++++++++------ 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java index 879780c0bb71..41ec1ca2c041 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java @@ -65,14 +65,8 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { // introduced .then(Mono.defer(instanceConfigHelper::isLicenseValid) // Ensure that the tenant feature flags are refreshed with the latest values after completing - // the - // license verification process. - .flatMap(isValid -> { - log.debug( - "License verification completed with status: {}", - TRUE.equals(isValid) ? "valid" : "invalid"); - return instanceConfigHelper.updateCacheForTenantFeatureFlags(); - })); + // the license verification process. + .flatMap(isValid -> instanceConfigHelper.updateCacheForTenantFeatureFlags())); try { startupProcess.block(); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index 68126fa0c7da..49bba549de33 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -15,6 +15,7 @@ import com.appsmith.server.services.AnalyticsService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; +import com.appsmith.server.services.TenantService; import com.appsmith.server.solutions.ReleaseNotesService; import com.appsmith.util.WebClientUtils; import joptsimple.internal.Strings; @@ -54,6 +55,7 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { private final AnalyticsService analyticsService; private final NetworkUtils networkUtils; private final ReleaseNotesService releaseNotesService; + private final TenantService tenantService; private final RTSCaller rtsCaller; @@ -225,8 +227,25 @@ public Mono checkMongoDBVersion() { }); } + /** + * Method to trigger update for the cache of all tenant feature flags. This method is called during the startup of + * the application. It's required at the startup to ensure that the feature flags are up-to-date which will then be + * consumed by {@link com.appsmith.server.aspect.FeatureFlaggedMethodInvokerAspect} in a non-reactive manner. + * In case the user tries to fetch the feature flags before the cache is updated, the aspect will fallback to the + * earlier cached data. + * @return Mono that completes when the cache is updated. + */ @Override public Mono updateCacheForTenantFeatureFlags() { - return featureFlagService.getTenantFeatures().then(); + tenantService + .retrieveAll() + .flatMap(tenant -> featureFlagService.getTenantFeatures(tenant.getId())) + .onErrorResume(error -> { + log.error("Error while updating cache for tenant feature flags", error); + return Mono.empty(); + }) + .subscribeOn(LoadShifter.elasticScheduler) + .subscribe(); + return Mono.empty(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java index ee15169fa65a..c631a2eec085 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java @@ -37,6 +37,8 @@ public interface FeatureFlagServiceCE { */ Mono> getTenantFeatures(); + Mono> getTenantFeatures(String tenantId); + Mono checkAndExecuteMigrationsForTenantFeatureFlags(Tenant tenant); CachedFeatures getCachedTenantFeatureFlags(); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index 05b47c87ae87..b1c945d95d1e 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -152,13 +152,17 @@ public Mono getAllRemoteFeaturesForAllTenantAndUpdateFeatureFlagsWithPen * To get all features of the current tenant. * @return Mono of Map */ + @Override public Mono> getTenantFeatures() { - return tenantService - .getDefaultTenantId() - .zipWhen(cacheableFeatureFlagHelper::fetchCachedTenantFeatures) - .map(tuple2 -> { - String tenantId = tuple2.getT1(); - CachedFeatures cachedFeatures = tuple2.getT2(); + // TODO change this to use the tenant from the user session for multi-tenancy + return tenantService.getDefaultTenantId().flatMap(this::getTenantFeatures); + } + + @Override + public Mono> getTenantFeatures(String tenantId) { + return cacheableFeatureFlagHelper + .fetchCachedTenantFeatures(tenantId) + .map(cachedFeatures -> { cachedTenantFeatureFlags.put(tenantId, cachedFeatures); return cachedFeatures.getFeatures(); }) From f66a9c26387821021bbc8b80db3cc86ca6272118 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 12 Feb 2025 17:08:50 +0530 Subject: [PATCH 05/16] chore: Fix build failure --- .../appsmith/server/helpers/InstanceConfigHelperImpl.java | 7 +++++-- .../server/helpers/ce/InstanceConfigHelperCEImpl.java | 5 ++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java index c8a9b10fb182..ee1c40cf2fe2 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java @@ -7,6 +7,7 @@ import com.appsmith.server.services.AnalyticsService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; +import com.appsmith.server.services.TenantService; import com.appsmith.server.solutions.ReleaseNotesService; import org.springframework.context.ApplicationContext; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; @@ -24,7 +25,8 @@ public InstanceConfigHelperImpl( AnalyticsService analyticsService, NetworkUtils networkUtils, ReleaseNotesService releaseNotesService, - RTSCaller rtsCaller) { + RTSCaller rtsCaller, + TenantService tenantService) { super( configService, cloudServicesConfig, @@ -35,6 +37,7 @@ public InstanceConfigHelperImpl( analyticsService, networkUtils, releaseNotesService, - rtsCaller); + rtsCaller, + tenantService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index 49bba549de33..9995866c1992 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -55,9 +55,8 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { private final AnalyticsService analyticsService; private final NetworkUtils networkUtils; private final ReleaseNotesService releaseNotesService; - private final TenantService tenantService; - private final RTSCaller rtsCaller; + private final TenantService tenantService; private boolean isRtsAccessible = false; @@ -233,7 +232,7 @@ public Mono checkMongoDBVersion() { * consumed by {@link com.appsmith.server.aspect.FeatureFlaggedMethodInvokerAspect} in a non-reactive manner. * In case the user tries to fetch the feature flags before the cache is updated, the aspect will fallback to the * earlier cached data. - * @return Mono that completes when the cache is updated. + * @return Empty Mono */ @Override public Mono updateCacheForTenantFeatureFlags() { From 636fdc4482d403becefd8542f3567bdb2a8a4134 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 19 Feb 2025 16:02:44 +0530 Subject: [PATCH 06/16] chore: Add spotless changes --- .../appsmith/server/domains/Organization.java | 2 +- .../com/appsmith/server/domains/Tenant.java | 1 + .../CustomOrganizationRepositoryCEImpl.java | 5 ++--- .../services/ce/FeatureFlagServiceCE.java | 4 ++-- .../services/ce/FeatureFlagServiceCEImpl.java | 16 +++++++--------- .../ce/OrganizationServiceCEImpl.java | 19 ++++++++++--------- .../solutions/ce/ScheduledTaskCEImpl.java | 6 ++++-- .../services/ce/FeatureFlagServiceCETest.java | 16 +++++++++++----- 8 files changed, 38 insertions(+), 31 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java index 4f668746639d..28d7a6448064 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java @@ -40,6 +40,6 @@ public class Organization extends BaseDomain implements Serializable { public static class Fields { public static final String organizationConfiguration_isRestartRequired = - dotted(organizationConfiguration, OrganizationConfiguration.Fields.isRestartRequired); + dotted(organizationConfiguration, OrganizationConfiguration.Fields.isRestartRequired); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java index a70a89553591..8ea7a65a9987 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java @@ -11,6 +11,7 @@ import org.springframework.data.mongodb.core.mapping.Document; import java.io.Serializable; + import static com.appsmith.external.helpers.StringUtils.dotted; @Deprecated diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java index 7283f18ffb3e..8d57c4a0b884 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java @@ -16,8 +16,7 @@ public class CustomOrganizationRepositoryCEImpl extends BaseAppsmithRepositoryIm public Mono disableRestartForAllTenants() { log.info("Disabling restart for all tenants"); return queryBuilder() - .criteria(Bridge.isTrue(organizationConfiguration_isRestartRequired)) - .updateAll(Bridge.update().set(organizationConfiguration_isRestartRequired, false)); + .criteria(Bridge.isTrue(organizationConfiguration_isRestartRequired)) + .updateAll(Bridge.update().set(organizationConfiguration_isRestartRequired, false)); } - } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java index e807e8244cad..cce3d3402deb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java @@ -29,7 +29,8 @@ public interface FeatureFlagServiceCE { * To get all features of the organization from Cloud Services and store them locally * @return Mono of Void */ - Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(Organization organization); + Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations( + Organization organization); /** * To get all features of the current organization. @@ -41,6 +42,5 @@ public interface FeatureFlagServiceCE { Mono checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization); - CachedFeatures getCachedOrganizationFeatureFlags(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index f9dbdfd06f49..e1e133f4095b 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -122,7 +122,8 @@ private Mono> getAllRemoteFeatureFlagsForUser() { * @return Mono updated tenant */ @Override - public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(Organization organization) { + public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations( + Organization organization) { // 1. Fetch current/saved feature flags from cache // 2. Force update the org flags keeping existing flags as fallback in case the API // call to fetch the flags fails for some reason @@ -132,14 +133,14 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla .getUpdatedFlagsWithPendingMigration(organization) .flatMap(featureFlagWithPendingMigrations -> { OrganizationConfiguration organizationConfiguration = - organization.getOrganizationConfiguration() == null - ? new OrganizationConfiguration() - : organization.getOrganizationConfiguration(); + organization.getOrganizationConfiguration() == null + ? new OrganizationConfiguration() + : organization.getOrganizationConfiguration(); // We expect the featureFlagWithPendingMigrations to be empty hence // verifying only for null if (featureFlagWithPendingMigrations != null && !featureFlagWithPendingMigrations.equals( - organizationConfiguration.getFeaturesWithPendingMigration())) { + organizationConfiguration.getFeaturesWithPendingMigration())) { organizationConfiguration.setFeaturesWithPendingMigration(featureFlagWithPendingMigrations); if (!featureFlagWithPendingMigrations.isEmpty()) { organizationConfiguration.setMigrationStatus(MigrationStatus.PENDING); @@ -159,9 +160,7 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla @Override public Mono> getOrganizationFeatures() { // TODO change this to use the tenant from the user session for multi-tenancy - return sessionUserService.getCurrentUser() - .map(User::getOrganizationId) - .flatMap(this::getOrganizationFeatures); + return sessionUserService.getCurrentUser().map(User::getOrganizationId).flatMap(this::getOrganizationFeatures); } @Override @@ -184,5 +183,4 @@ public Mono> getOrganizationFeatures(String organizationId) public Mono checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization) { return organizationService.checkAndExecuteMigrationsForOrganizationFeatureFlags(organization); } - } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java index acbe15d0437a..c3aac3305e5a 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java @@ -333,15 +333,16 @@ public Mono restartOrganization() { // TODO remove this method once we move the form login env to DB variable which is currently required as a part // of downgrade migration for SSO return this.retrieveAll() - .filter(organization -> TRUE.equals(organization.getOrganizationConfiguration().getIsRestartRequired())) - .take(1) - .hasElements() - .flatMap(hasElement -> { - if (hasElement) { - return repository.disableRestartForAllTenants().then(envManager.restartWithoutAclCheck()); - } - return Mono.empty(); - }); + .filter(organization -> + TRUE.equals(organization.getOrganizationConfiguration().getIsRestartRequired())) + .take(1) + .hasElements() + .flatMap(hasElement -> { + if (hasElement) { + return repository.disableRestartForAllTenants().then(envManager.restartWithoutAclCheck()); + } + return Mono.empty(); + }); } private boolean isMigrationRequired(Organization organization) { diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index 96afb95e4c7d..e56c29d0fe97 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -31,8 +31,10 @@ public void fetchFeatures() { log.info("Fetching features for default organization"); Flux organizationFlux = organizationService.retrieveAll(); organizationFlux - .flatMap(featureFlagService::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) - .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) .doOnError(error -> log.error("Error while fetching tenant feature flags", error)) .then(organizationService.restartOrganization()) .subscribeOn(LoadShifter.elasticScheduler) diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java index 453077739d9f..d615089f7ad4 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java @@ -207,7 +207,9 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { organizationService .retrieveAll() - .flatMap(featureFlagService::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { @@ -228,7 +230,9 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { organizationService .retrieveAll() - .flatMap(featureFlagService::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { @@ -249,14 +253,16 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { organizationService .retrieveAll() - .flatMap(featureFlagService::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { assertThat(organization.getOrganizationConfiguration().getFeaturesWithPendingMigration()) - .isEqualTo(Map.of(ORGANIZATION_TEST_FEATURE, ENABLE)); + .isEqualTo(Map.of(ORGANIZATION_TEST_FEATURE, ENABLE)); assertThat(organization.getOrganizationConfiguration().getMigrationStatus()) - .isEqualTo(PENDING); + .isEqualTo(PENDING); }) .verifyComplete(); } From 4b73e84c6fbfce80a610f4788b975bb74c4b033d Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 19 Feb 2025 18:13:25 +0530 Subject: [PATCH 07/16] chore: Remove empty files --- .../ce/CustomTenantRepositoryCE.java | 0 .../ce/CustomTenantRepositoryCEImpl.java | 0 .../services/ce/FeatureFlagServiceCEImpl.java | 1 - .../server/services/ce/TenantServiceCE.java | 0 .../services/ce/TenantServiceCEImpl.java | 0 .../aspects/DistributedLockAspect.java | 129 +++++++++++++++--- 6 files changed, 112 insertions(+), 18 deletions(-) delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCE.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomTenantRepositoryCEImpl.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index e1e133f4095b..3c61af44543e 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -159,7 +159,6 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla */ @Override public Mono> getOrganizationFeatures() { - // TODO change this to use the tenant from the user session for multi-tenancy return sessionUserService.getCurrentUser().map(User::getOrganizationId).flatMap(this::getOrganizationFeatures); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCE.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/TenantServiceCEImpl.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 201298eff72c..039fdd30cb5e 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -6,10 +6,11 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; -import org.springframework.data.redis.core.RedisOperations; +import org.springframework.data.redis.core.ReactiveRedisOperations; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.lang.reflect.Method; import java.time.Duration; @@ -21,11 +22,11 @@ @Slf4j public class DistributedLockAspect { - private final RedisOperations redisOperations; + private final ReactiveRedisOperations redisOperations; private static final String LOCK_PREFIX = "lock:"; - public DistributedLockAspect(RedisOperations redisOperations) { + public DistributedLockAspect(ReactiveRedisOperations redisOperations) { this.redisOperations = redisOperations; } @@ -36,32 +37,126 @@ public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotati MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); Class returnType = method.getReturnType(); - boolean isReactive = returnType.isAssignableFrom(Mono.class) || returnType.isAssignableFrom(Flux.class); - if (isReactive) { - // Locking for reactive methods can be added with reactive Redis operations if needed. - throw new IllegalAccessException( - "Invalid usage of @DistributedLock annotation. Only non-reactive methods are supported for locking."); + if (returnType.isAssignableFrom(Mono.class)) { + return handleMono(joinPoint, lockAnnotation); + } else if (returnType.isAssignableFrom(Flux.class)) { + return handleFlux(joinPoint, lockAnnotation); } return handleBlocking(joinPoint, lockAnnotation); } - private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { + private static class LockDetails { + final String key; + final String value; + final Duration duration; + + LockDetails(String key, String value, Duration duration) { + this.key = key; + this.value = value; + this.duration = duration; + } + } + + private LockDetails createLockDetails(DistributedLock lock) { String lockKey = LOCK_PREFIX + lock.key(); - long ttl = lock.ttl(); // Time-to-live for the lock - String value = "locked until " - + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); // Value to set in the lock key - Boolean acquired = redisOperations.opsForValue().setIfAbsent(lockKey, value, Duration.ofSeconds(ttl)); + long ttl = lock.ttl(); + String value = "locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); + return new LockDetails(lockKey, value, Duration.ofSeconds(ttl)); + } + + private void releaseLock(String lockKey) { + redisOperations.delete(lockKey) + .doOnSuccess(deleted -> { + log.info("Released lock for: {}", lockKey); + }) + .onErrorResume(error -> { + log.error("Error while releasing lock: {}", lockKey, error); + return Mono.empty(); + }) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } + + private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) { + LockDetails lockDetails = createLockDetails(lock); + + return redisOperations.opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .flatMap(acquired -> { + if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); + try { + return ((Mono) joinPoint.proceed()) + .doOnError(error -> releaseLock(lockDetails.key)) + .doFinally(signalType -> { + if (lock.shouldReleaseLock()) { + releaseLock(lockDetails.key); + } + }); + } catch (Throwable e) { + releaseLock(lockDetails.key); + return Mono.error(e); + } + } + log.info("Lock already acquired for: {}", lockDetails.key); + return Mono.empty(); + }); + } + + private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) { + LockDetails lockDetails = createLockDetails(lock); + + return redisOperations.opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .flatMapMany(acquired -> { + if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); + try { + return ((Flux) joinPoint.proceed()) + .doOnError(error -> releaseLock(lockDetails.key)) + .doFinally(signalType -> { + if (lock.shouldReleaseLock()) { + releaseLock(lockDetails.key); + } + }); + } catch (Throwable e) { + releaseLock(lockDetails.key); + return Flux.error(e); + } + } + log.info("Lock already acquired for: {}", lockDetails.key); + return Flux.empty(); + }); + } + + private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { + LockDetails lockDetails = createLockDetails(lock); + + Boolean acquired = null; + try { + acquired = redisOperations.opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .block(); + } catch (Exception e) { + log.error("Error while acquiring lock: {}", lockDetails.key, e); + throw e; + } if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); try { - return joinPoint.proceed(); // Execute method + return joinPoint.proceed(); + } catch (Throwable e) { + // Always release lock on error + releaseLock(lockDetails.key); + throw e; } finally { if (lock.shouldReleaseLock()) { - redisOperations.delete(lock.key()); // Release lock + releaseLock(lockDetails.key); } } - } else { - return null; // Skip execution if another pod holds the lock } + log.info("Lock already acquired for: {}", lockDetails.key); + return null; } } From f17e82afc24a9ab41b8b120f9db21720f6562037 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 19 Feb 2025 18:17:09 +0530 Subject: [PATCH 08/16] chore: Cleanup --- .../java/com/appsmith/server/domains/Tenant.java | 7 ------- .../server/domains/TenantConfiguration.java | 6 +----- .../server/domains/ce/TenantConfigurationCE.java | 4 ---- .../caching/aspects/DistributedLockAspect.java | 15 ++++++++++----- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java index 8ea7a65a9987..53f97fae68b6 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Tenant.java @@ -12,8 +12,6 @@ import java.io.Serializable; -import static com.appsmith.external.helpers.StringUtils.dotted; - @Deprecated // This class has been deprecated. Please use Organization instead. @Getter @@ -39,9 +37,4 @@ public class Tenant extends BaseDomain implements Serializable { TenantConfiguration tenantConfiguration; // TODO add SSO and other configurations here after migrating from environment variables to database configuration - - public static class Fields { - public static final String tenantConfiguration_isRestartRequired = - dotted(tenantConfiguration, TenantConfiguration.Fields.isRestartRequired); - } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java index a081b51428d2..ccae88485bd5 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/TenantConfiguration.java @@ -3,12 +3,8 @@ import com.appsmith.server.domains.ce.TenantConfigurationCE; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.experimental.FieldNameConstants; @Deprecated @Data @EqualsAndHashCode(callSuper = true) -@FieldNameConstants -public class TenantConfiguration extends TenantConfigurationCE { - public static class Fields extends TenantConfigurationCE.Fields {} -} +public class TenantConfiguration extends TenantConfigurationCE {} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java index d97a807dff51..29f5e4e84bb3 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/TenantConfigurationCE.java @@ -8,7 +8,6 @@ import com.appsmith.server.domains.TenantConfiguration; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; -import lombok.experimental.FieldNameConstants; import org.apache.commons.lang3.ObjectUtils; import java.io.Serializable; @@ -18,7 +17,6 @@ @Deprecated @Data -@FieldNameConstants public class TenantConfigurationCE implements Serializable { private String googleMapsKey; @@ -89,6 +87,4 @@ public void copyNonSensitiveValues(TenantConfiguration tenantConfiguration) { public Boolean isEmailVerificationEnabled() { return Boolean.TRUE.equals(this.emailVerificationEnabled); } - - public static class Fields {} } diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 039fdd30cb5e..829a8ad8153c 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -60,12 +60,14 @@ private static class LockDetails { private LockDetails createLockDetails(DistributedLock lock) { String lockKey = LOCK_PREFIX + lock.key(); long ttl = lock.ttl(); - String value = "locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); + String value = + "locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); return new LockDetails(lockKey, value, Duration.ofSeconds(ttl)); } private void releaseLock(String lockKey) { - redisOperations.delete(lockKey) + redisOperations + .delete(lockKey) .doOnSuccess(deleted -> { log.info("Released lock for: {}", lockKey); }) @@ -80,7 +82,8 @@ private void releaseLock(String lockKey) { private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) { LockDetails lockDetails = createLockDetails(lock); - return redisOperations.opsForValue() + return redisOperations + .opsForValue() .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) .flatMap(acquired -> { if (Boolean.TRUE.equals(acquired)) { @@ -106,7 +109,8 @@ private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) { private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) { LockDetails lockDetails = createLockDetails(lock); - return redisOperations.opsForValue() + return redisOperations + .opsForValue() .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) .flatMapMany(acquired -> { if (Boolean.TRUE.equals(acquired)) { @@ -134,7 +138,8 @@ private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock loc Boolean acquired = null; try { - acquired = redisOperations.opsForValue() + acquired = redisOperations + .opsForValue() .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) .block(); } catch (Exception e) { From 3f409dc2ab09042d8fbf92994f7c598b71c6ddc2 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Fri, 21 Feb 2025 12:05:03 +0530 Subject: [PATCH 09/16] chore: Migrate ping scheduled cron --- .../server/solutions/PingScheduledTaskImpl.java | 7 +++++-- .../solutions/ce/PingScheduledTaskCEImpl.java | 17 ++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java index f371993bcd6b..2af74d10aa9b 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java @@ -12,6 +12,7 @@ import com.appsmith.server.repositories.UserRepository; import com.appsmith.server.repositories.WorkspaceRepository; import com.appsmith.server.services.ConfigService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.services.PermissionGroupService; import com.appsmith.server.solutions.ce.PingScheduledTaskCEImpl; import lombok.extern.slf4j.Slf4j; @@ -41,7 +42,8 @@ public PingScheduledTaskImpl( ProjectProperties projectProperties, DeploymentProperties deploymentProperties, NetworkUtils networkUtils, - PermissionGroupService permissionGroupService) { + PermissionGroupService permissionGroupService, + OrganizationService organizationService) { super( configService, segmentConfig, @@ -55,6 +57,7 @@ public PingScheduledTaskImpl( projectProperties, deploymentProperties, networkUtils, - permissionGroupService); + permissionGroupService, + organizationService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java index 3ca4e6a55605..71ff756c84ab 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java @@ -14,6 +14,7 @@ import com.appsmith.server.repositories.UserRepository; import com.appsmith.server.repositories.WorkspaceRepository; import com.appsmith.server.services.ConfigService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.services.PermissionGroupService; import com.appsmith.util.WebClientUtils; import io.micrometer.observation.annotation.Observed; @@ -59,6 +60,7 @@ public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { private final DeploymentProperties deploymentProperties; private final NetworkUtils networkUtils; private final PermissionGroupService permissionGroupService; + private final OrganizationService organizationService; enum UserTrackingType { DAU, @@ -83,8 +85,12 @@ public void pingSchedule() { return; } - Mono.zip(configService.getInstanceId(), networkUtils.getExternalAddress()) - .flatMap(tuple -> doPing(tuple.getT1(), tuple.getT2())) + Mono instanceMono = configService.getInstanceId().cache(); + Mono ipMono = networkUtils.getExternalAddress().cache(); + organizationService + .retrieveAll() + .flatMap(organization -> Mono.zip(Mono.just(organization.getId()), instanceMono, ipMono)) + .flatMap(objects -> doPing(objects.getT1(), objects.getT2(), objects.getT3())) .subscribeOn(Schedulers.single()) .subscribe(); } @@ -97,7 +103,7 @@ public void pingSchedule() { * @param ipAddress The external IP address of this instance's machine. * @return A publisher that yields the string response of recording the data point. */ - private Mono doPing(String instanceId, String ipAddress) { + private Mono doPing(String organizationId, String instanceId, String ipAddress) { // Note: Hard-coding Segment auth header and the event name intentionally. These are not intended to be // environment specific values, instead, they are common values for all self-hosted environments. As such, they // are not intended to be configurable. @@ -118,7 +124,7 @@ private Mono doPing(String instanceId, String ipAddress) { "context", Map.of("ip", ipAddress), "properties", - Map.of("instanceId", instanceId), + Map.of("instanceId", instanceId, "organizationId", organizationId), "event", "Instance Active"))) .retrieve() @@ -130,7 +136,8 @@ private Mono doPing(String instanceId, String ipAddress) { @DistributedLock(key = "pingStats", ttl = 12 * 60 * 60, shouldReleaseLock = false) @Observed(name = "pingStats") public void pingStats() { - if (commonConfig.isTelemetryDisabled()) { + // TODO @CloudBilling remove cloud hosting check and migrate the cron to report organization level stats + if (commonConfig.isTelemetryDisabled() || commonConfig.isCloudHosting()) { return; } From 8b87e17d86dacb8fc9c808ee0b02f7129319ab15 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Fri, 21 Feb 2025 14:52:32 +0530 Subject: [PATCH 10/16] chore: Migrate tenant to org in comments --- .../com/appsmith/server/configurations/InstanceConfig.java | 4 ++-- .../server/helpers/ce/InstanceConfigHelperCEImpl.java | 4 ++-- .../com/appsmith/server/migrations/JsonSchemaVersions.java | 2 +- .../appsmith/server/services/ce/FeatureFlagServiceCEImpl.java | 2 +- .../com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java index 267fdd25240d..554468e75b95 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java @@ -64,8 +64,8 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { // TODO Update implementation to fetch license status for all the organizations once multi-tenancy is // introduced .then(Mono.defer(instanceConfigHelper::isLicenseValid) - // Ensure that the tenant feature flags are refreshed with the latest values after completing - // the license verification process. + // Ensure that the org feature flags are refreshed with the latest values after completing the + // license verification process. .flatMap(isValid -> instanceConfigHelper.updateCacheForOrganizationFeatureFlags())); try { diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index f69159f62e62..5322765d3b65 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -227,11 +227,11 @@ public Mono checkMongoDBVersion() { } /** - * Method to trigger update for the cache of all tenant feature flags. This method is called during the startup of + * Method to trigger update for the organization feature flags. This method is called during the startup of * the application. It's required at the startup to ensure that the feature flags are up-to-date which will then be * consumed by {@link com.appsmith.server.aspect.FeatureFlaggedMethodInvokerAspect} in a non-reactive manner. * In case the user tries to fetch the feature flags before the cache is updated, the aspect will fallback to the - * earlier cached data. + * earlier cached data i.e. disabled state. * @return Empty Mono */ @Override diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java index 90ad2a2a3b87..98b273a9aa05 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java @@ -14,7 +14,7 @@ public class JsonSchemaVersions extends JsonSchemaVersionsFallback { /** - * Only tenant level flags would work over here. + * Only org level flags would work over here. * @return an Integer which is server version */ @Override diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index 3c61af44543e..be0a7f6558b7 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -119,7 +119,7 @@ private Mono> getAllRemoteFeatureFlagsForUser() { /** * To get all features of the organization from Cloud Services and store them locally - * @return Mono updated tenant + * @return Mono updated org */ @Override public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations( diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index e56c29d0fe97..9ab2c61c84d3 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -35,7 +35,7 @@ public void fetchFeatures() { featureFlagService ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) - .doOnError(error -> log.error("Error while fetching tenant feature flags", error)) + .doOnError(error -> log.error("Error while fetching organization feature flags", error)) .then(organizationService.restartOrganization()) .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); From cfd04085ca3a40a9201416f8e75a0c2ceb4b0ce0 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Fri, 21 Feb 2025 15:31:11 +0530 Subject: [PATCH 11/16] chore: Add error logs for dev --- .../server/services/ce/FeatureFlagServiceCEImpl.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index be0a7f6558b7..f6f412261ec0 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -159,7 +159,16 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla */ @Override public Mono> getOrganizationFeatures() { - return sessionUserService.getCurrentUser().map(User::getOrganizationId).flatMap(this::getOrganizationFeatures); + return sessionUserService + .getCurrentUser() + .switchIfEmpty(Mono.defer(() -> { + log.error( + "No user found while fetching organization features, if the method is called without user " + + "context please use getOrganizationFeatures(String organizationId)"); + return Mono.empty(); + })) + .map(User::getOrganizationId) + .flatMap(this::getOrganizationFeatures); } @Override From bb4e6cde7ad98f2fefb67585c624fb5421eab146 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Fri, 21 Feb 2025 18:54:07 +0530 Subject: [PATCH 12/16] chore: Add fallback for default org in case of user object is not available in the feature flag check --- .../server/services/ce/FeatureFlagServiceCEImpl.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index f6f412261ec0..14a66b182895 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -161,13 +161,17 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla public Mono> getOrganizationFeatures() { return sessionUserService .getCurrentUser() + .map(User::getOrganizationId) .switchIfEmpty(Mono.defer(() -> { log.error( "No user found while fetching organization features, if the method is called without user " + "context please use getOrganizationFeatures(String organizationId)"); - return Mono.empty(); + // TODO @CloudBilling - This is a temporary fix to fallback to default organization until we + // introduce a signup flow based on organization. Currently userSignup will end up in data + // corruption if the fallback is not provided to create default workspace in EE as this is + // controlled via flags, please refer WorkspaceServiceHelperImpl.isCreateWorkspaceAllowed. + return organizationService.getDefaultOrganizationId(); })) - .map(User::getOrganizationId) .flatMap(this::getOrganizationFeatures); } From 5d9cd35ad63a646c8527ca49ca2137cde959b642 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Mon, 24 Feb 2025 14:25:42 +0530 Subject: [PATCH 13/16] chore: Refactor to wait for fetching flags for all the orgs --- .../server/helpers/ce/InstanceConfigHelperCEImpl.java | 8 ++++---- .../appsmith/server/solutions/ce/ScheduledTaskCEImpl.java | 7 +++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index 5322765d3b65..c208cfbcecf9 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -236,15 +236,15 @@ public Mono checkMongoDBVersion() { */ @Override public Mono updateCacheForOrganizationFeatureFlags() { - organizationService + // TODO @CloudBilling: Fix this to update feature flags for all organizations and also should not affect the + // startup + return organizationService .retrieveAll() .flatMap(org -> featureFlagService.getOrganizationFeatures(org.getId())) .onErrorResume(error -> { log.error("Error while updating cache for org feature flags", error); return Mono.empty(); }) - .subscribeOn(LoadShifter.elasticScheduler) - .subscribe(); - return Mono.empty(); + .then(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index 9ab2c61c84d3..dbe29623fcd8 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -28,14 +28,17 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { shouldReleaseLock = false) // Ensure only one pod executes this @Observed(name = "fetchFeatures") public void fetchFeatures() { - log.info("Fetching features for default organization"); + log.info("Fetching features for organizations"); Flux organizationFlux = organizationService.retrieveAll(); organizationFlux .flatMap( featureFlagService ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) - .doOnError(error -> log.error("Error while fetching organization feature flags", error)) + .onErrorResume(error -> { + log.error("Error while fetching organization feature flags", error); + return Flux.empty(); + }) .then(organizationService.restartOrganization()) .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); From 925f44d5db946a4c0967d5ed928e929b492db58e Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Mon, 24 Feb 2025 19:25:24 +0530 Subject: [PATCH 14/16] chore: Add fallback for anonymous user while fetching feature flags --- .../server/services/ce/FeatureFlagServiceCEImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index 14a66b182895..a67b2ad84bcb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -16,6 +16,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; import java.time.Instant; @@ -161,7 +162,11 @@ public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFla public Mono> getOrganizationFeatures() { return sessionUserService .getCurrentUser() - .map(User::getOrganizationId) + // TODO @CloudBilling: In case of anonymousUser the organizationId will be empty, fallback to default + // organization. Update this to get the orgId based on the request origin. + .flatMap(user -> StringUtils.hasText(user.getOrganizationId()) + ? Mono.just(user.getOrganizationId()) + : Mono.empty()) .switchIfEmpty(Mono.defer(() -> { log.error( "No user found while fetching organization features, if the method is called without user " From 66af2575121c6b79dd9672e18ab198da0399bbd0 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Mon, 24 Feb 2025 22:59:48 +0530 Subject: [PATCH 15/16] chore: Add pingStats analytics event for all the orgs --- .../solutions/ce/PingScheduledTaskCEImpl.java | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java index 71ff756c84ab..111ebefb7d78 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java @@ -6,6 +6,7 @@ import com.appsmith.server.configurations.DeploymentProperties; import com.appsmith.server.configurations.ProjectProperties; import com.appsmith.server.configurations.SegmentConfig; +import com.appsmith.server.domains.Organization; import com.appsmith.server.helpers.NetworkUtils; import com.appsmith.server.repositories.ApplicationRepository; import com.appsmith.server.repositories.DatasourceRepository; @@ -28,6 +29,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple7; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Map; @@ -62,6 +64,9 @@ public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { private final PermissionGroupService permissionGroupService; private final OrganizationService organizationService; + // Delay to avoid 429 between the analytics call. + private static final Duration DELAY_BETWEEN_PINGS = Duration.ofMillis(200); + enum UserTrackingType { DAU, WAU, @@ -89,6 +94,7 @@ public void pingSchedule() { Mono ipMono = networkUtils.getExternalAddress().cache(); organizationService .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) .flatMap(organization -> Mono.zip(Mono.just(organization.getId()), instanceMono, ipMono)) .flatMap(objects -> doPing(objects.getT1(), objects.getT2(), objects.getT3())) .subscribeOn(Schedulers.single()) @@ -163,23 +169,33 @@ public void pingStats() { userCountMono, getUserTrackingDetails()); - publicPermissionGroupIdMono - .flatMap(publicPermissionGroupId -> Mono.zip( - configService.getInstanceId().defaultIfEmpty("null"), - networkUtils.getExternalAddress(), - nonDeletedObjectsCountMono, - applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( - AclPermission.READ_APPLICATIONS, publicPermissionGroupId))) + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .map(Organization::getId) + .zipWith(publicPermissionGroupIdMono) + .flatMap(tuple2 -> { + final String organizationId = tuple2.getT1(); + final String publicPermissionGroupId = tuple2.getT2(); + return Mono.zip( + configService.getInstanceId().defaultIfEmpty("null"), + Mono.just(organizationId), + networkUtils.getExternalAddress(), + nonDeletedObjectsCountMono, + applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( + AclPermission.READ_APPLICATIONS, publicPermissionGroupId)); + }) .flatMap(statsData -> { Map propertiesMap = new java.util.HashMap<>(Map.ofEntries( entry("instanceId", statsData.getT1()), - entry("numOrgs", statsData.getT3().getT1()), - entry("numApps", statsData.getT3().getT2()), - entry("numPages", statsData.getT3().getT3()), - entry("numActions", statsData.getT3().getT4()), - entry("numDatasources", statsData.getT3().getT5()), - entry("numUsers", statsData.getT3().getT6()), - entry("numPublicApps", statsData.getT4()), + entry("organizationId", statsData.getT2()), + entry("numOrgs", statsData.getT4().getT1()), + entry("numApps", statsData.getT4().getT2()), + entry("numPages", statsData.getT4().getT3()), + entry("numActions", statsData.getT4().getT4()), + entry("numDatasources", statsData.getT4().getT5()), + entry("numUsers", statsData.getT4().getT6()), + entry("numPublicApps", statsData.getT5()), entry("version", projectProperties.getVersion()), entry("edition", deploymentProperties.getEdition()), entry("cloudProvider", defaultIfEmpty(deploymentProperties.getCloudProvider(), "")), @@ -190,9 +206,9 @@ public void pingStats() { entry(ADMIN_EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()), entry(EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()))); - propertiesMap.putAll(statsData.getT3().getT7()); + propertiesMap.putAll(statsData.getT4().getT7()); - final String ipAddress = statsData.getT2(); + final String ipAddress = statsData.getT3(); return WebClientUtils.create("https://api.segment.io") .post() .uri("/v1/track") From f179756ac8729ed7d0ecee8a1b9b7095511e4bd0 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Tue, 25 Feb 2025 00:03:56 +0530 Subject: [PATCH 16/16] chore: Add tests --- .../server/configurations/InstanceConfig.java | 5 +- .../services/ce/FeatureFlagServiceCEImpl.java | 2 +- .../ce/OrganizationServiceCEImpl.java | 4 +- .../aspect/DistributedLockAspectTest.java | 195 ++++++++++++++++++ .../server/aspect/TestLockService.java | 58 ++++++ .../aspects/DistributedLockAspect.java | 2 +- 6 files changed, 259 insertions(+), 7 deletions(-) create mode 100644 app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java create mode 100644 app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java index 554468e75b95..88e18173a8f4 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java @@ -60,9 +60,8 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { .flatMap(signal -> registrationAndRtsCheckMono) // Prefill the server cache with anonymous user permission group ids. .then(cacheableRepositoryHelper.preFillAnonymousUserPermissionGroupIdsCache()) - // Add cold publisher as we have dependency on the instance registration - // TODO Update implementation to fetch license status for all the organizations once multi-tenancy is - // introduced + // Cold publisher to wait for upstream execution to complete as we have dependency on the instance + // registration .then(Mono.defer(instanceConfigHelper::isLicenseValid) // Ensure that the org feature flags are refreshed with the latest values after completing the // license verification process. diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index a67b2ad84bcb..b8362f034293 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -40,7 +40,7 @@ public class FeatureFlagServiceCEImpl implements FeatureFlagServiceCE { private final FeatureFlagMigrationHelper featureFlagMigrationHelper; private static final long FEATURE_FLAG_CACHE_TIME_MIN = 120; - // TODO remove once all the helper methods consuming @FeatureFlagged are converted to reactive + // TODO @CloudBilling: Remove once all the helper methods consuming @FeatureFlagged are converted to reactive @Getter private CachedFeatures cachedOrganizationFeatureFlags; diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java index c3aac3305e5a..d78774670d4c 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java @@ -330,8 +330,8 @@ public Mono update(String organizationId, Organization organizatio */ @Override public Mono restartOrganization() { - // TODO remove this method once we move the form login env to DB variable which is currently required as a part - // of downgrade migration for SSO + // TODO @CloudBilling: remove this method once we move the form login env to DB variable which is currently + // required as a part of downgrade migration for SSO return this.retrieveAll() .filter(organization -> TRUE.equals(organization.getOrganizationConfiguration().getIsRestartRequired())) diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java new file mode 100644 index 000000000000..a03b1eaada50 --- /dev/null +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java @@ -0,0 +1,195 @@ +package com.appsmith.server.aspect; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SpringBootTest +class DistributedLockAspectTest { + + @Autowired + private TestLockService testLockService; + + @Autowired + private ReactiveRedisOperations redisOperations; + + private static final String LOCK_PREFIX = "lock:"; + + @Test + void testMonoOperation() { + StepVerifier.create(testLockService.monoOperation()) + .expectNext("mono-success") + .verifyComplete(); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "mono-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testFluxOperation() { + StepVerifier.create(testLockService.fluxOperation().collectList()) + .expectNext(List.of("flux-success-1", "flux-success-2")) + .verifyComplete(); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "flux-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testBlockingOperation() { + String result = testLockService.blockingOperation(); + assertEquals("blocking-success", result); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "blocking-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testConcurrentAccess() throws InterruptedException { + AtomicReference thread1Result = new AtomicReference<>(); + AtomicReference thread2Result = new AtomicReference<>(); + CountDownLatch thread1Started = new CountDownLatch(1); + + // Thread 1: Long running operation + Thread thread1 = new Thread( + () -> { + thread1Started.countDown(); + thread1Result.set(testLockService.longRunningMonoOperation().block()); + }, + "Thread-1"); + + // Thread 2: Tries to execute while Thread 1 is running + Thread thread2 = new Thread( + () -> { + try { + thread1Started.await(); // Wait for Thread 1 to start + Thread.sleep(100); // Small delay to ensure Thread 1 has acquired lock + thread2Result.set( + testLockService.longRunningMonoOperation().block()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + "Thread-2"); + + // Start both threads + thread1.start(); + thread2.start(); + + // Wait for both threads to complete + thread1.join(5000); + thread2.join(5000); + + // Verify results + assertEquals("long-running-success", thread1Result.get()); + assertNull(thread2Result.get()); // Thread 2 should not get the lock + } + + @Test + void testPersistentLock() { + // First operation acquires lock and doesn't release it + StepVerifier.create(testLockService.operationWithPersistentLock()) + .expectNext("success") + .verifyComplete(); + + // Verify lock still exists after operation completes + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "persistent-lock")) + .expectNext(true) + .verifyComplete(); + + // Second operation should fail to acquire the same lock + StepVerifier.create(testLockService.operationWithPersistentLock()) + .verifyComplete(); // Completes empty because lock is still held + + // Cleanup: Release lock for other tests + StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations)) + .expectNext(1L) + .verifyComplete(); + } + + @Test + void testPersistentLockExpiration() { + // Execute operation with short-lived lock + StepVerifier.create(Mono.just(testLockService.operationWithShortLivedLock())) + .expectNext("success") + .verifyComplete(); + + // Verify lock exists immediately after + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + .expectNext(true) + .verifyComplete(); + + // Wait for lock to expire + try { + Thread.sleep(1100); // Wait just over 1 second + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // Verify lock has expired + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnBlockingError() { + // Execute operation that throws error + assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); + + // Verify lock is released despite shouldReleaseLock = false + StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnReactiveError() { + // Execute operation that returns Mono.error + StepVerifier.create(testLockService.reactiveMethodWithError()) + .expectError(RuntimeException.class) + .verify(); + + // Verify lock is released despite shouldReleaseLock = false + StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnErrorAllowsSubsequentExecution() { + // First call throws error + assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); + + // Verify we can acquire the same lock immediately after error + AtomicBoolean lockAcquired = new AtomicBoolean(false); + StepVerifier.create(redisOperations.opsForValue().setIfAbsent("lock:error-lock", "test-value")) + .consumeNextWith(result -> lockAcquired.set(result)) + .verifyComplete(); + + // Should be able to acquire lock after error + assertTrue(lockAcquired.get()); + + // Cleanup + redisOperations.delete("lock:error-lock").block(); + } +} diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java new file mode 100644 index 000000000000..0ff5a8d1933b --- /dev/null +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java @@ -0,0 +1,58 @@ +package com.appsmith.server.aspect; + +import com.appsmith.caching.annotations.DistributedLock; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +@Service +public class TestLockService { + @DistributedLock(key = "mono-test") + public Mono monoOperation() { + return Mono.just("mono-success"); + } + + @DistributedLock(key = "flux-test") + public Flux fluxOperation() { + return Flux.just("flux-success-1", "flux-success-2"); + } + + @DistributedLock(key = "blocking-test") + public String blockingOperation() { + return "blocking-success"; + } + + @DistributedLock(key = "long-running-mono", ttl = 5) + public Mono longRunningMonoOperation() { + return Mono.just("long-running-success").delayElement(Duration.ofSeconds(2)); + } + + // Test method to check when the lock is persisted after the execution + @DistributedLock(key = "persistent-lock", shouldReleaseLock = false) + public Mono operationWithPersistentLock() { + return Mono.just("success").delayElement(Duration.ofMillis(100)); + } + + @DistributedLock(key = "short-lived-lock", shouldReleaseLock = false, ttl = 1) + public String operationWithShortLivedLock() { + return "success"; + } + + // Method to manually release the lock (for testing cleanup) + public Mono releaseLock(String lockKey, ReactiveRedisOperations redisOperations) { + return redisOperations.delete("lock:" + lockKey); + } + + @DistributedLock(key = "error-lock", shouldReleaseLock = false) + public void blockingMethodWithError() { + throw new RuntimeException("Simulated error"); + } + + @DistributedLock(key = "error-lock", shouldReleaseLock = false) + public Mono reactiveMethodWithError() { + return Mono.error(new RuntimeException("Simulated error")); + } +} diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 829a8ad8153c..3bf198eccaa5 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -75,7 +75,7 @@ private void releaseLock(String lockKey) { log.error("Error while releasing lock: {}", lockKey, error); return Mono.empty(); }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.immediate()) .subscribe(); }