diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/aspect/FeatureFlaggedMethodInvokerAspect.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/aspect/FeatureFlaggedMethodInvokerAspect.java index e0377b652b23..1dd29721d59a 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/aspect/FeatureFlaggedMethodInvokerAspect.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/aspect/FeatureFlaggedMethodInvokerAspect.java @@ -84,6 +84,7 @@ private Object invokeMethod(Boolean isFeatureSupported, ProceedingJoinPoint join if (e instanceof AppsmithException) { throw (AppsmithException) e; } + log.error("Exception while invoking super class method", e); String errorMessage = "Exception while invoking super class method"; AppsmithException exception = getInvalidAnnotationUsageException(method, errorMessage); log.error(exception.getMessage(), e); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java index 261286df2b33..88e18173a8f4 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceConfig.java @@ -47,7 +47,7 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { Mono registrationAndRtsCheckMono = configService .getByName(Appsmith.APPSMITH_REGISTERED) .filter(config -> TRUE.equals(config.getConfig().get("value"))) - .switchIfEmpty(Mono.defer(() -> instanceConfigHelper.registerInstance())) + .switchIfEmpty(Mono.defer(instanceConfigHelper::registerInstance)) .onErrorResume(errorSignal -> { log.debug("Instance registration failed with error: \n{}", errorSignal.getMessage()); return Mono.empty(); @@ -60,20 +60,12 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { .flatMap(signal -> registrationAndRtsCheckMono) // Prefill the server cache with anonymous user permission group ids. .then(cacheableRepositoryHelper.preFillAnonymousUserPermissionGroupIdsCache()) - // Add cold publisher as we have dependency on the instance registration - // TODO Update implementation to fetch license status for all the organizations once multi-tenancy is - // introduced + // Cold publisher to wait for upstream execution to complete as we have dependency on the instance + // registration .then(Mono.defer(instanceConfigHelper::isLicenseValid) - // Ensure that the organization feature flags are refreshed with the latest values after - // completing - // the + // Ensure that the org feature flags are refreshed with the latest values after completing the // license verification process. - .flatMap(isValid -> { - log.debug( - "License verification completed with status: {}", - TRUE.equals(isValid) ? "valid" : "invalid"); - return instanceConfigHelper.updateCacheForOrganizationFeatureFlags(); - })); + .flatMap(isValid -> instanceConfigHelper.updateCacheForOrganizationFeatureFlags())); try { startupProcess.block(); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java index 6740f279b216..28d7a6448064 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/Organization.java @@ -12,6 +12,8 @@ import java.io.Serializable; +import static com.appsmith.external.helpers.StringUtils.dotted; + @Getter @Setter @ToString @@ -35,4 +37,9 @@ public class Organization extends BaseDomain implements Serializable { OrganizationConfiguration organizationConfiguration; // TODO add SSO and other configurations here after migrating from environment variables to database configuration + + public static class Fields { + public static final String organizationConfiguration_isRestartRequired = + dotted(organizationConfiguration, OrganizationConfiguration.Fields.isRestartRequired); + } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/OrganizationConfiguration.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/OrganizationConfiguration.java index 5be0df072461..0f64640325fa 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/OrganizationConfiguration.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/OrganizationConfiguration.java @@ -3,7 +3,11 @@ import com.appsmith.server.domains.ce.OrganizationConfigurationCE; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.experimental.FieldNameConstants; @Data @EqualsAndHashCode(callSuper = true) -public class OrganizationConfiguration extends OrganizationConfigurationCE {} +@FieldNameConstants +public class OrganizationConfiguration extends OrganizationConfigurationCE { + public static class Fields extends OrganizationConfigurationCE.Fields {} +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/OrganizationConfigurationCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/OrganizationConfigurationCE.java index a8cd697d5594..748d152183ef 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/OrganizationConfigurationCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/domains/ce/OrganizationConfigurationCE.java @@ -8,6 +8,7 @@ import com.appsmith.server.domains.OrganizationConfiguration; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; +import lombok.experimental.FieldNameConstants; import org.apache.commons.lang3.ObjectUtils; import java.io.Serializable; @@ -16,6 +17,7 @@ import java.util.Map; @Data +@FieldNameConstants public class OrganizationConfigurationCE implements Serializable { private String googleMapsKey; @@ -88,4 +90,6 @@ public void copyNonSensitiveValues(OrganizationConfiguration organizationConfigu public Boolean isEmailVerificationEnabled() { return Boolean.TRUE.equals(this.emailVerificationEnabled); } + + public static class Fields {} } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java index c8a9b10fb182..b81dd8343771 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/InstanceConfigHelperImpl.java @@ -7,6 +7,7 @@ import com.appsmith.server.services.AnalyticsService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.solutions.ReleaseNotesService; import org.springframework.context.ApplicationContext; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; @@ -24,7 +25,8 @@ public InstanceConfigHelperImpl( AnalyticsService analyticsService, NetworkUtils networkUtils, ReleaseNotesService releaseNotesService, - RTSCaller rtsCaller) { + RTSCaller rtsCaller, + OrganizationService organizationService) { super( configService, cloudServicesConfig, @@ -35,6 +37,7 @@ public InstanceConfigHelperImpl( analyticsService, networkUtils, releaseNotesService, - rtsCaller); + rtsCaller, + organizationService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index 1d26b253898b..c208cfbcecf9 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -15,6 +15,7 @@ import com.appsmith.server.services.AnalyticsService; import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.solutions.ReleaseNotesService; import com.appsmith.util.WebClientUtils; import joptsimple.internal.Strings; @@ -54,8 +55,8 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { private final AnalyticsService analyticsService; private final NetworkUtils networkUtils; private final ReleaseNotesService releaseNotesService; - private final RTSCaller rtsCaller; + private final OrganizationService organizationService; private boolean isRtsAccessible = false; @@ -225,8 +226,25 @@ public Mono checkMongoDBVersion() { }); } + /** + * Method to trigger update for the organization feature flags. This method is called during the startup of + * the application. It's required at the startup to ensure that the feature flags are up-to-date which will then be + * consumed by {@link com.appsmith.server.aspect.FeatureFlaggedMethodInvokerAspect} in a non-reactive manner. + * In case the user tries to fetch the feature flags before the cache is updated, the aspect will fallback to the + * earlier cached data i.e. disabled state. + * @return Empty Mono + */ @Override public Mono updateCacheForOrganizationFeatureFlags() { - return featureFlagService.getOrganizationFeatures().then(); + // TODO @CloudBilling: Fix this to update feature flags for all organizations and also should not affect the + // startup + return organizationService + .retrieveAll() + .flatMap(org -> featureFlagService.getOrganizationFeatures(org.getId())) + .onErrorResume(error -> { + log.error("Error while updating cache for org feature flags", error); + return Mono.empty(); + }) + .then(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java index 90ad2a2a3b87..98b273a9aa05 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/JsonSchemaVersions.java @@ -14,7 +14,7 @@ public class JsonSchemaVersions extends JsonSchemaVersionsFallback { /** - * Only tenant level flags would work over here. + * Only org level flags would work over here. * @return an Integer which is server version */ @Override diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java index 44af581b6008..b3b15fb95106 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepository.java @@ -2,6 +2,7 @@ import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.repository.NoRepositoryBean; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.Serializable; @@ -34,4 +35,11 @@ public interface BaseRepository extends ReactiveMong * @return */ Mono archiveAllById(Collection ids); + + /** + * This function retrieves all the documents in the collection without the user context. This method will be used + * by internal server workflows like crons etc. to fetch all the documents from the collection. + * @return + */ + Flux retrieveAll(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java index 365e58d19446..5088ac6526ca 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/BaseRepositoryImpl.java @@ -101,6 +101,13 @@ public Flux findAll() { }); } + @Override + public Flux retrieveAll() { + Query query = new Query(notDeleted()); + return mongoOperations.find( + query.cursorBatchSize(10000), entityInformation.getJavaType(), entityInformation.getCollectionName()); + } + /** * We don't use this today, it doesn't use our `notDeleted` criteria, and since we don't use it, we're not porting * it to Postgres. Querying with `queryBuilder` or anything else is arguably more readable than this. diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCE.java index 1f3b9139fb47..a6d4c7563080 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCE.java @@ -2,5 +2,8 @@ import com.appsmith.server.domains.Organization; import com.appsmith.server.repositories.AppsmithRepository; +import reactor.core.publisher.Mono; -public interface CustomOrganizationRepositoryCE extends AppsmithRepository {} +public interface CustomOrganizationRepositoryCE extends AppsmithRepository { + Mono disableRestartForAllTenants(); +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java index 7b21660b8509..8d57c4a0b884 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/repositories/ce/CustomOrganizationRepositoryCEImpl.java @@ -1,9 +1,22 @@ package com.appsmith.server.repositories.ce; import com.appsmith.server.domains.Organization; +import com.appsmith.server.helpers.ce.bridge.Bridge; import com.appsmith.server.repositories.BaseAppsmithRepositoryImpl; import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import static com.appsmith.server.domains.Organization.Fields.organizationConfiguration_isRestartRequired; @Slf4j public class CustomOrganizationRepositoryCEImpl extends BaseAppsmithRepositoryImpl - implements CustomOrganizationRepositoryCE {} + implements CustomOrganizationRepositoryCE { + + @Override + public Mono disableRestartForAllTenants() { + log.info("Disabling restart for all tenants"); + return queryBuilder() + .criteria(Bridge.isTrue(organizationConfiguration_isRestartRequired)) + .updateAll(Bridge.update().set(organizationConfiguration_isRestartRequired, false)); + } +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java index 3613e452b664..cce3d3402deb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCE.java @@ -29,7 +29,8 @@ public interface FeatureFlagServiceCE { * To get all features of the organization from Cloud Services and store them locally * @return Mono of Void */ - Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(); + Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations( + Organization organization); /** * To get all features of the current organization. @@ -37,6 +38,8 @@ public interface FeatureFlagServiceCE { */ Mono> getOrganizationFeatures(); + Mono> getOrganizationFeatures(String orgId); + Mono checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization); CachedFeatures getCachedOrganizationFeatureFlags(); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java index 7f3a8f4a52d0..b8362f034293 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/FeatureFlagServiceCEImpl.java @@ -13,8 +13,10 @@ import com.appsmith.server.services.OrganizationService; import com.appsmith.server.services.SessionUserService; import com.appsmith.server.services.UserIdentifierService; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; import java.time.Instant; @@ -38,6 +40,8 @@ public class FeatureFlagServiceCEImpl implements FeatureFlagServiceCE { private final FeatureFlagMigrationHelper featureFlagMigrationHelper; private static final long FEATURE_FLAG_CACHE_TIME_MIN = 120; + // TODO @CloudBilling: Remove once all the helper methods consuming @FeatureFlagged are converted to reactive + @Getter private CachedFeatures cachedOrganizationFeatureFlags; /** @@ -59,7 +63,7 @@ public Mono check(FeatureFlagEnum featureEnum) { /** * Retrieves a map of feature flags along with their corresponding boolean values for the current user. - * This takes into account for both user-level and organization-level feature flags + * This takes into account for both user-level and organization-level feature flags. * * @return A Mono emitting a Map where keys are feature names and values are corresponding boolean flags. */ @@ -116,52 +120,70 @@ private Mono> getAllRemoteFeatureFlagsForUser() { /** * To get all features of the organization from Cloud Services and store them locally - * @return Mono of Void + * @return Mono updated org */ - public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() { - return organizationService - .getDefaultOrganization() - .flatMap(defaultOrganization -> - // 1. Fetch current/saved feature flags from cache - // 2. Force update the org flags keeping existing flags as fallback in case the API - // call to fetch the flags fails for some reason - // 3. Get the diff and update the flags with pending migrations to be used to run - // migrations selectively - featureFlagMigrationHelper - .getUpdatedFlagsWithPendingMigration(defaultOrganization) - .flatMap(featureFlagWithPendingMigrations -> { - OrganizationConfiguration organizationConfiguration = - defaultOrganization.getOrganizationConfiguration() == null - ? new OrganizationConfiguration() - : defaultOrganization.getOrganizationConfiguration(); - // We expect the featureFlagWithPendingMigrations to be empty hence - // verifying only for null - if (featureFlagWithPendingMigrations != null - && !featureFlagWithPendingMigrations.equals( - organizationConfiguration.getFeaturesWithPendingMigration())) { - organizationConfiguration.setFeaturesWithPendingMigration( - featureFlagWithPendingMigrations); - if (!featureFlagWithPendingMigrations.isEmpty()) { - organizationConfiguration.setMigrationStatus(MigrationStatus.PENDING); - } else { - organizationConfiguration.setMigrationStatus(MigrationStatus.COMPLETED); - } - return organizationService.update( - defaultOrganization.getId(), defaultOrganization); - } - return Mono.just(defaultOrganization); - })) - .then(); + @Override + public Mono getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations( + Organization organization) { + // 1. Fetch current/saved feature flags from cache + // 2. Force update the org flags keeping existing flags as fallback in case the API + // call to fetch the flags fails for some reason + // 3. Get the diff and update the flags with pending migrations to be used to run + // migrations selectively + return featureFlagMigrationHelper + .getUpdatedFlagsWithPendingMigration(organization) + .flatMap(featureFlagWithPendingMigrations -> { + OrganizationConfiguration organizationConfiguration = + organization.getOrganizationConfiguration() == null + ? new OrganizationConfiguration() + : organization.getOrganizationConfiguration(); + // We expect the featureFlagWithPendingMigrations to be empty hence + // verifying only for null + if (featureFlagWithPendingMigrations != null + && !featureFlagWithPendingMigrations.equals( + organizationConfiguration.getFeaturesWithPendingMigration())) { + organizationConfiguration.setFeaturesWithPendingMigration(featureFlagWithPendingMigrations); + if (!featureFlagWithPendingMigrations.isEmpty()) { + organizationConfiguration.setMigrationStatus(MigrationStatus.PENDING); + } else { + organizationConfiguration.setMigrationStatus(MigrationStatus.COMPLETED); + } + return organizationService.update(organization.getId(), organization); + } + return Mono.just(organization); + }); } /** * To get all features of the current organization. * @return Mono of Map */ + @Override public Mono> getOrganizationFeatures() { - return organizationService - .getDefaultOrganizationId() - .flatMap(cacheableFeatureFlagHelper::fetchCachedOrganizationFeatures) + return sessionUserService + .getCurrentUser() + // TODO @CloudBilling: In case of anonymousUser the organizationId will be empty, fallback to default + // organization. Update this to get the orgId based on the request origin. + .flatMap(user -> StringUtils.hasText(user.getOrganizationId()) + ? Mono.just(user.getOrganizationId()) + : Mono.empty()) + .switchIfEmpty(Mono.defer(() -> { + log.error( + "No user found while fetching organization features, if the method is called without user " + + "context please use getOrganizationFeatures(String organizationId)"); + // TODO @CloudBilling - This is a temporary fix to fallback to default organization until we + // introduce a signup flow based on organization. Currently userSignup will end up in data + // corruption if the fallback is not provided to create default workspace in EE as this is + // controlled via flags, please refer WorkspaceServiceHelperImpl.isCreateWorkspaceAllowed. + return organizationService.getDefaultOrganizationId(); + })) + .flatMap(this::getOrganizationFeatures); + } + + @Override + public Mono> getOrganizationFeatures(String organizationId) { + return cacheableFeatureFlagHelper + .fetchCachedOrganizationFeatures(organizationId) .map(cachedFeatures -> { cachedOrganizationFeatureFlags = cachedFeatures; return cachedFeatures.getFeatures(); @@ -178,8 +200,4 @@ public Mono> getOrganizationFeatures() { public Mono checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization) { return organizationService.checkAndExecuteMigrationsForOrganizationFeatureFlags(organization); } - - public CachedFeatures getCachedOrganizationFeatureFlags() { - return this.cachedOrganizationFeatureFlags; - } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCE.java index d2ecea72254a..2ddb540ca09f 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCE.java @@ -4,6 +4,7 @@ import com.appsmith.server.domains.Organization; import com.appsmith.server.domains.OrganizationConfiguration; import com.appsmith.server.services.CrudService; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface OrganizationServiceCE extends CrudService { @@ -30,4 +31,6 @@ Mono updateOrganizationConfiguration( Mono retrieveById(String id); Mono restartOrganization(); + + Flux retrieveAll(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java index 997be981e3e1..d78774670d4c 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/OrganizationServiceCEImpl.java @@ -26,6 +26,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.util.StringUtils; import reactor.core.observability.micrometer.Micrometer; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Map; @@ -322,27 +323,26 @@ public Mono update(String organizationId, Organization organizatio } /** - * This function checks if the organization needs to be restarted and restarts after the feature flag migrations are - * executed. + * This function checks if the organization needs to be restarted, and executes the restart after the feature flag + * migrations are completed. * - * @return + * @return Mono */ @Override public Mono restartOrganization() { - // Avoid dependency on user context as this method will be called internally by the server - Mono defaultOrganizationMono = - this.getDefaultOrganizationId().flatMap(this::retrieveById); - return defaultOrganizationMono.flatMap(updatedOrganization -> { - if (TRUE.equals(updatedOrganization.getOrganizationConfiguration().getIsRestartRequired())) { - log.debug("Triggering organization restart after the feature flag migrations are executed"); - OrganizationConfiguration organizationConfiguration = - updatedOrganization.getOrganizationConfiguration(); - organizationConfiguration.setIsRestartRequired(false); - return this.update(updatedOrganization.getId(), updatedOrganization) - .then(envManager.restartWithoutAclCheck()); - } - return Mono.empty(); - }); + // TODO @CloudBilling: remove this method once we move the form login env to DB variable which is currently + // required as a part of downgrade migration for SSO + return this.retrieveAll() + .filter(organization -> + TRUE.equals(organization.getOrganizationConfiguration().getIsRestartRequired())) + .take(1) + .hasElements() + .flatMap(hasElement -> { + if (hasElement) { + return repository.disableRestartForAllTenants().then(envManager.restartWithoutAclCheck()); + } + return Mono.empty(); + }); } private boolean isMigrationRequired(Organization organization) { @@ -356,4 +356,9 @@ private boolean isMigrationRequired(Organization organization) { .getOrganizationConfiguration() .getMigrationStatus()))); } + + @Override + public Flux retrieveAll() { + return repository.retrieveAll(); + } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java index f371993bcd6b..2af74d10aa9b 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java @@ -12,6 +12,7 @@ import com.appsmith.server.repositories.UserRepository; import com.appsmith.server.repositories.WorkspaceRepository; import com.appsmith.server.services.ConfigService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.services.PermissionGroupService; import com.appsmith.server.solutions.ce.PingScheduledTaskCEImpl; import lombok.extern.slf4j.Slf4j; @@ -41,7 +42,8 @@ public PingScheduledTaskImpl( ProjectProperties projectProperties, DeploymentProperties deploymentProperties, NetworkUtils networkUtils, - PermissionGroupService permissionGroupService) { + PermissionGroupService permissionGroupService, + OrganizationService organizationService) { super( configService, segmentConfig, @@ -55,6 +57,7 @@ public PingScheduledTaskImpl( projectProperties, deploymentProperties, networkUtils, - permissionGroupService); + permissionGroupService, + organizationService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java index f33982a7d160..111ebefb7d78 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java @@ -1,10 +1,12 @@ package com.appsmith.server.solutions.ce; +import com.appsmith.caching.annotations.DistributedLock; import com.appsmith.server.acl.AclPermission; import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.configurations.DeploymentProperties; import com.appsmith.server.configurations.ProjectProperties; import com.appsmith.server.configurations.SegmentConfig; +import com.appsmith.server.domains.Organization; import com.appsmith.server.helpers.NetworkUtils; import com.appsmith.server.repositories.ApplicationRepository; import com.appsmith.server.repositories.DatasourceRepository; @@ -13,13 +15,13 @@ import com.appsmith.server.repositories.UserRepository; import com.appsmith.server.repositories.WorkspaceRepository; import com.appsmith.server.services.ConfigService; +import com.appsmith.server.services.OrganizationService; import com.appsmith.server.services.PermissionGroupService; import com.appsmith.util.WebClientUtils; import io.micrometer.observation.annotation.Observed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.http.MediaType; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.reactive.function.BodyInserters; @@ -27,6 +29,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple7; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Map; @@ -43,7 +46,6 @@ */ @Slf4j @RequiredArgsConstructor -@ConditionalOnExpression("!${is.cloud-hosting:false}") public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { private final ConfigService configService; @@ -60,6 +62,10 @@ public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { private final DeploymentProperties deploymentProperties; private final NetworkUtils networkUtils; private final PermissionGroupService permissionGroupService; + private final OrganizationService organizationService; + + // Delay to avoid 429 between the analytics call. + private static final Duration DELAY_BETWEEN_PINGS = Duration.ofMillis(200); enum UserTrackingType { DAU, @@ -74,14 +80,23 @@ enum UserTrackingType { */ // Number of milliseconds between the start of each scheduled calls to this method. @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 6 * 60 * 60 * 1000 /* six hours */) + @DistributedLock( + key = "pingSchedule", + ttl = 5 * 60 * 60, // 5 hours + shouldReleaseLock = false) @Observed(name = "pingSchedule") public void pingSchedule() { if (commonConfig.isTelemetryDisabled()) { return; } - Mono.zip(configService.getInstanceId(), networkUtils.getExternalAddress()) - .flatMap(tuple -> doPing(tuple.getT1(), tuple.getT2())) + Mono instanceMono = configService.getInstanceId().cache(); + Mono ipMono = networkUtils.getExternalAddress().cache(); + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .flatMap(organization -> Mono.zip(Mono.just(organization.getId()), instanceMono, ipMono)) + .flatMap(objects -> doPing(objects.getT1(), objects.getT2(), objects.getT3())) .subscribeOn(Schedulers.single()) .subscribe(); } @@ -94,7 +109,7 @@ public void pingSchedule() { * @param ipAddress The external IP address of this instance's machine. * @return A publisher that yields the string response of recording the data point. */ - private Mono doPing(String instanceId, String ipAddress) { + private Mono doPing(String organizationId, String instanceId, String ipAddress) { // Note: Hard-coding Segment auth header and the event name intentionally. These are not intended to be // environment specific values, instead, they are common values for all self-hosted environments. As such, they // are not intended to be configurable. @@ -115,7 +130,7 @@ private Mono doPing(String instanceId, String ipAddress) { "context", Map.of("ip", ipAddress), "properties", - Map.of("instanceId", instanceId), + Map.of("instanceId", instanceId, "organizationId", organizationId), "event", "Instance Active"))) .retrieve() @@ -124,9 +139,11 @@ private Mono doPing(String instanceId, String ipAddress) { // Number of milliseconds between the start of each scheduled calls to this method. @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 24 * 60 * 60 * 1000 /* a day */) + @DistributedLock(key = "pingStats", ttl = 12 * 60 * 60, shouldReleaseLock = false) @Observed(name = "pingStats") public void pingStats() { - if (commonConfig.isTelemetryDisabled()) { + // TODO @CloudBilling remove cloud hosting check and migrate the cron to report organization level stats + if (commonConfig.isTelemetryDisabled() || commonConfig.isCloudHosting()) { return; } @@ -152,23 +169,33 @@ public void pingStats() { userCountMono, getUserTrackingDetails()); - publicPermissionGroupIdMono - .flatMap(publicPermissionGroupId -> Mono.zip( - configService.getInstanceId().defaultIfEmpty("null"), - networkUtils.getExternalAddress(), - nonDeletedObjectsCountMono, - applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( - AclPermission.READ_APPLICATIONS, publicPermissionGroupId))) + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .map(Organization::getId) + .zipWith(publicPermissionGroupIdMono) + .flatMap(tuple2 -> { + final String organizationId = tuple2.getT1(); + final String publicPermissionGroupId = tuple2.getT2(); + return Mono.zip( + configService.getInstanceId().defaultIfEmpty("null"), + Mono.just(organizationId), + networkUtils.getExternalAddress(), + nonDeletedObjectsCountMono, + applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( + AclPermission.READ_APPLICATIONS, publicPermissionGroupId)); + }) .flatMap(statsData -> { Map propertiesMap = new java.util.HashMap<>(Map.ofEntries( entry("instanceId", statsData.getT1()), - entry("numOrgs", statsData.getT3().getT1()), - entry("numApps", statsData.getT3().getT2()), - entry("numPages", statsData.getT3().getT3()), - entry("numActions", statsData.getT3().getT4()), - entry("numDatasources", statsData.getT3().getT5()), - entry("numUsers", statsData.getT3().getT6()), - entry("numPublicApps", statsData.getT4()), + entry("organizationId", statsData.getT2()), + entry("numOrgs", statsData.getT4().getT1()), + entry("numApps", statsData.getT4().getT2()), + entry("numPages", statsData.getT4().getT3()), + entry("numActions", statsData.getT4().getT4()), + entry("numDatasources", statsData.getT4().getT5()), + entry("numUsers", statsData.getT4().getT6()), + entry("numPublicApps", statsData.getT5()), entry("version", projectProperties.getVersion()), entry("edition", deploymentProperties.getEdition()), entry("cloudProvider", defaultIfEmpty(deploymentProperties.getCloudProvider(), "")), @@ -179,9 +206,9 @@ public void pingStats() { entry(ADMIN_EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()), entry(EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()))); - propertiesMap.putAll(statsData.getT3().getT7()); + propertiesMap.putAll(statsData.getT4().getT7()); - final String ipAddress = statsData.getT2(); + final String ipAddress = statsData.getT3(); return WebClientUtils.create("https://api.segment.io") .post() .uri("/v1/track") diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index d34cd294fb4e..dbe29623fcd8 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -1,5 +1,7 @@ package com.appsmith.server.solutions.ce; +import com.appsmith.caching.annotations.DistributedLock; +import com.appsmith.server.domains.Organization; import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.OrganizationService; @@ -8,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; @RequiredArgsConstructor @Slf4j @@ -19,16 +22,24 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { private final OrganizationService organizationService; @Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */) + @DistributedLock( + key = "fetchFeatures", + ttl = 20 * 60, // 20 minutes + shouldReleaseLock = false) // Ensure only one pod executes this @Observed(name = "fetchFeatures") public void fetchFeatures() { - log.info("Fetching features for default organization"); - featureFlagService - .getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() - .then(organizationService - .getDefaultOrganization() - .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) - .then(organizationService.restartOrganization())) - .doOnError(error -> log.error("Error while fetching organization feature flags", error)) + log.info("Fetching features for organizations"); + Flux organizationFlux = organizationService.retrieveAll(); + organizationFlux + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) + .onErrorResume(error -> { + log.error("Error while fetching organization feature flags", error); + return Flux.empty(); + }) + .then(organizationService.restartOrganization()) .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); } diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java new file mode 100644 index 000000000000..a03b1eaada50 --- /dev/null +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java @@ -0,0 +1,195 @@ +package com.appsmith.server.aspect; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SpringBootTest +class DistributedLockAspectTest { + + @Autowired + private TestLockService testLockService; + + @Autowired + private ReactiveRedisOperations redisOperations; + + private static final String LOCK_PREFIX = "lock:"; + + @Test + void testMonoOperation() { + StepVerifier.create(testLockService.monoOperation()) + .expectNext("mono-success") + .verifyComplete(); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "mono-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testFluxOperation() { + StepVerifier.create(testLockService.fluxOperation().collectList()) + .expectNext(List.of("flux-success-1", "flux-success-2")) + .verifyComplete(); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "flux-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testBlockingOperation() { + String result = testLockService.blockingOperation(); + assertEquals("blocking-success", result); + + // Verify lock is released + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "blocking-test")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testConcurrentAccess() throws InterruptedException { + AtomicReference thread1Result = new AtomicReference<>(); + AtomicReference thread2Result = new AtomicReference<>(); + CountDownLatch thread1Started = new CountDownLatch(1); + + // Thread 1: Long running operation + Thread thread1 = new Thread( + () -> { + thread1Started.countDown(); + thread1Result.set(testLockService.longRunningMonoOperation().block()); + }, + "Thread-1"); + + // Thread 2: Tries to execute while Thread 1 is running + Thread thread2 = new Thread( + () -> { + try { + thread1Started.await(); // Wait for Thread 1 to start + Thread.sleep(100); // Small delay to ensure Thread 1 has acquired lock + thread2Result.set( + testLockService.longRunningMonoOperation().block()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + "Thread-2"); + + // Start both threads + thread1.start(); + thread2.start(); + + // Wait for both threads to complete + thread1.join(5000); + thread2.join(5000); + + // Verify results + assertEquals("long-running-success", thread1Result.get()); + assertNull(thread2Result.get()); // Thread 2 should not get the lock + } + + @Test + void testPersistentLock() { + // First operation acquires lock and doesn't release it + StepVerifier.create(testLockService.operationWithPersistentLock()) + .expectNext("success") + .verifyComplete(); + + // Verify lock still exists after operation completes + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "persistent-lock")) + .expectNext(true) + .verifyComplete(); + + // Second operation should fail to acquire the same lock + StepVerifier.create(testLockService.operationWithPersistentLock()) + .verifyComplete(); // Completes empty because lock is still held + + // Cleanup: Release lock for other tests + StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations)) + .expectNext(1L) + .verifyComplete(); + } + + @Test + void testPersistentLockExpiration() { + // Execute operation with short-lived lock + StepVerifier.create(Mono.just(testLockService.operationWithShortLivedLock())) + .expectNext("success") + .verifyComplete(); + + // Verify lock exists immediately after + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + .expectNext(true) + .verifyComplete(); + + // Wait for lock to expire + try { + Thread.sleep(1100); // Wait just over 1 second + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // Verify lock has expired + StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnBlockingError() { + // Execute operation that throws error + assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); + + // Verify lock is released despite shouldReleaseLock = false + StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnReactiveError() { + // Execute operation that returns Mono.error + StepVerifier.create(testLockService.reactiveMethodWithError()) + .expectError(RuntimeException.class) + .verify(); + + // Verify lock is released despite shouldReleaseLock = false + StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testLockReleasedOnErrorAllowsSubsequentExecution() { + // First call throws error + assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); + + // Verify we can acquire the same lock immediately after error + AtomicBoolean lockAcquired = new AtomicBoolean(false); + StepVerifier.create(redisOperations.opsForValue().setIfAbsent("lock:error-lock", "test-value")) + .consumeNextWith(result -> lockAcquired.set(result)) + .verifyComplete(); + + // Should be able to acquire lock after error + assertTrue(lockAcquired.get()); + + // Cleanup + redisOperations.delete("lock:error-lock").block(); + } +} diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java new file mode 100644 index 000000000000..0ff5a8d1933b --- /dev/null +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java @@ -0,0 +1,58 @@ +package com.appsmith.server.aspect; + +import com.appsmith.caching.annotations.DistributedLock; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +@Service +public class TestLockService { + @DistributedLock(key = "mono-test") + public Mono monoOperation() { + return Mono.just("mono-success"); + } + + @DistributedLock(key = "flux-test") + public Flux fluxOperation() { + return Flux.just("flux-success-1", "flux-success-2"); + } + + @DistributedLock(key = "blocking-test") + public String blockingOperation() { + return "blocking-success"; + } + + @DistributedLock(key = "long-running-mono", ttl = 5) + public Mono longRunningMonoOperation() { + return Mono.just("long-running-success").delayElement(Duration.ofSeconds(2)); + } + + // Test method to check when the lock is persisted after the execution + @DistributedLock(key = "persistent-lock", shouldReleaseLock = false) + public Mono operationWithPersistentLock() { + return Mono.just("success").delayElement(Duration.ofMillis(100)); + } + + @DistributedLock(key = "short-lived-lock", shouldReleaseLock = false, ttl = 1) + public String operationWithShortLivedLock() { + return "success"; + } + + // Method to manually release the lock (for testing cleanup) + public Mono releaseLock(String lockKey, ReactiveRedisOperations redisOperations) { + return redisOperations.delete("lock:" + lockKey); + } + + @DistributedLock(key = "error-lock", shouldReleaseLock = false) + public void blockingMethodWithError() { + throw new RuntimeException("Simulated error"); + } + + @DistributedLock(key = "error-lock", shouldReleaseLock = false) + public Mono reactiveMethodWithError() { + return Mono.error(new RuntimeException("Simulated error")); + } +} diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java index 89129e40c09d..d615089f7ad4 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/FeatureFlagServiceCETest.java @@ -205,9 +205,12 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(new HashMap<>())); - featureFlagService - .getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + organizationService + .retrieveAll() + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { assertThat(organization.getOrganizationConfiguration().getFeaturesWithPendingMigration()) @@ -225,9 +228,12 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(Map.of(ORGANIZATION_TEST_FEATURE, DISABLE))); - featureFlagService - .getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + organizationService + .retrieveAll() + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { assertThat(organization.getOrganizationConfiguration().getFeaturesWithPendingMigration()) @@ -245,9 +251,12 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { Mockito.when(featureFlagMigrationHelper.getUpdatedFlagsWithPendingMigration(any())) .thenReturn(Mono.just(Map.of(ORGANIZATION_TEST_FEATURE, ENABLE))); - featureFlagService - .getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() - .block(); + organizationService + .retrieveAll() + .flatMap( + featureFlagService + ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) + .blockLast(); StepVerifier.create(organizationService.getDefaultOrganization()) .assertNext(organization -> { assertThat(organization.getOrganizationConfiguration().getFeaturesWithPendingMigration()) @@ -259,6 +268,7 @@ public void evictFeatures_withOrganizationIdentifier_redisKeyDoesNotExist() { } @Test + @WithUserDetails(value = "api_user") public void getOrganizationFeatureFlags_withDefaultOrganization_fetchLatestFlags() { Map organizationFeatures = new HashMap<>(); @@ -275,6 +285,7 @@ public void getOrganizationFeatureFlags_withDefaultOrganization_fetchLatestFlags } @Test + @WithUserDetails(value = "api_user") public void getCachedOrganizationFeatureFlags_withDefaultOrganization_organizationFeatureFlagsAreCached() { // Assert that the cached feature flags are empty before the remote fetch diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java new file mode 100644 index 000000000000..8276175ad009 --- /dev/null +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/annotations/DistributedLock.java @@ -0,0 +1,20 @@ +package com.appsmith.caching.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface DistributedLock { + String key(); + + // Time-to-live for the lock in seconds. + // - If the method execution takes longer than this TTL, the lock will be released automatically. + // - If the locking is used for cron jobs, make sure the TTL is less than the delay between 2 runs to refresh the + // status for every run. + long ttl() default 5 * 60; // Default TTL: 5 minutes + + boolean shouldReleaseLock() default true; +} diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java new file mode 100644 index 000000000000..3bf198eccaa5 --- /dev/null +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -0,0 +1,167 @@ +package com.appsmith.caching.aspects; + +import com.appsmith.caching.annotations.DistributedLock; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +@Aspect +@Component +@Slf4j +public class DistributedLockAspect { + + private final ReactiveRedisOperations redisOperations; + + private static final String LOCK_PREFIX = "lock:"; + + public DistributedLockAspect(ReactiveRedisOperations redisOperations) { + this.redisOperations = redisOperations; + } + + // Method to acquire a distributed lock before executing the annotated method. + @Around("@annotation(lockAnnotation)") + public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation) throws Throwable { + // Check method return type + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Class returnType = method.getReturnType(); + if (returnType.isAssignableFrom(Mono.class)) { + return handleMono(joinPoint, lockAnnotation); + } else if (returnType.isAssignableFrom(Flux.class)) { + return handleFlux(joinPoint, lockAnnotation); + } + return handleBlocking(joinPoint, lockAnnotation); + } + + private static class LockDetails { + final String key; + final String value; + final Duration duration; + + LockDetails(String key, String value, Duration duration) { + this.key = key; + this.value = value; + this.duration = duration; + } + } + + private LockDetails createLockDetails(DistributedLock lock) { + String lockKey = LOCK_PREFIX + lock.key(); + long ttl = lock.ttl(); + String value = + "locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); + return new LockDetails(lockKey, value, Duration.ofSeconds(ttl)); + } + + private void releaseLock(String lockKey) { + redisOperations + .delete(lockKey) + .doOnSuccess(deleted -> { + log.info("Released lock for: {}", lockKey); + }) + .onErrorResume(error -> { + log.error("Error while releasing lock: {}", lockKey, error); + return Mono.empty(); + }) + .subscribeOn(Schedulers.immediate()) + .subscribe(); + } + + private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) { + LockDetails lockDetails = createLockDetails(lock); + + return redisOperations + .opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .flatMap(acquired -> { + if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); + try { + return ((Mono) joinPoint.proceed()) + .doOnError(error -> releaseLock(lockDetails.key)) + .doFinally(signalType -> { + if (lock.shouldReleaseLock()) { + releaseLock(lockDetails.key); + } + }); + } catch (Throwable e) { + releaseLock(lockDetails.key); + return Mono.error(e); + } + } + log.info("Lock already acquired for: {}", lockDetails.key); + return Mono.empty(); + }); + } + + private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) { + LockDetails lockDetails = createLockDetails(lock); + + return redisOperations + .opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .flatMapMany(acquired -> { + if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); + try { + return ((Flux) joinPoint.proceed()) + .doOnError(error -> releaseLock(lockDetails.key)) + .doFinally(signalType -> { + if (lock.shouldReleaseLock()) { + releaseLock(lockDetails.key); + } + }); + } catch (Throwable e) { + releaseLock(lockDetails.key); + return Flux.error(e); + } + } + log.info("Lock already acquired for: {}", lockDetails.key); + return Flux.empty(); + }); + } + + private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { + LockDetails lockDetails = createLockDetails(lock); + + Boolean acquired = null; + try { + acquired = redisOperations + .opsForValue() + .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) + .block(); + } catch (Exception e) { + log.error("Error while acquiring lock: {}", lockDetails.key, e); + throw e; + } + + if (Boolean.TRUE.equals(acquired)) { + log.info("Acquired lock for: {}", lockDetails.key); + try { + return joinPoint.proceed(); + } catch (Throwable e) { + // Always release lock on error + releaseLock(lockDetails.key); + throw e; + } finally { + if (lock.shouldReleaseLock()) { + releaseLock(lockDetails.key); + } + } + } + log.info("Lock already acquired for: {}", lockDetails.key); + return null; + } +}