Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Restrict cron execution for single pod in clustered environment #39171

Open
wants to merge 20 commits into
base: release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6c5aca3
feat: Restrict cron execution for single pod in clustered environment
abhvsn Feb 11, 2025
2f4aa9f
chore: Run cron on all the tenants present in DB
abhvsn Feb 11, 2025
ab8cb0e
Merge branch 'release' of github.com:appsmithorg/appsmith into chore/…
abhvsn Feb 11, 2025
d96aebb
fix: Testcases and build failures
abhvsn Feb 12, 2025
267c9ed
chore: Refactor instance config
abhvsn Feb 12, 2025
f66a9c2
chore: Fix build failure
abhvsn Feb 12, 2025
c79441a
Merge branch 'release' of github.com:appsmithorg/appsmith into chore/…
abhvsn Feb 17, 2025
f74a2e7
chore: Merge release and resolve conflicts
abhvsn Feb 19, 2025
636fdc4
chore: Add spotless changes
abhvsn Feb 19, 2025
4b73e84
chore: Remove empty files
abhvsn Feb 19, 2025
f17e82a
chore: Cleanup
abhvsn Feb 19, 2025
3f409dc
chore: Migrate ping scheduled cron
abhvsn Feb 21, 2025
8b87e17
chore: Migrate tenant to org in comments
abhvsn Feb 21, 2025
cfd0408
chore: Add error logs for dev
abhvsn Feb 21, 2025
bb4e6cd
chore: Add fallback for default org in case of user object is not ava…
abhvsn Feb 21, 2025
5d9cd35
chore: Refactor to wait for fetching flags for all the orgs
abhvsn Feb 24, 2025
6190c27
Merge branch 'release' of github.com:appsmithorg/appsmith into chore/…
abhvsn Feb 24, 2025
925f44d
chore: Add fallback for anonymous user while fetching feature flags
abhvsn Feb 24, 2025
66af257
chore: Add pingStats analytics event for all the orgs
abhvsn Feb 24, 2025
f179756
chore: Add tests
abhvsn Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private Object invokeMethod(Boolean isFeatureSupported, ProceedingJoinPoint join
if (e instanceof AppsmithException) {
throw (AppsmithException) e;
}
log.error("Exception while invoking super class method", e);
String errorMessage = "Exception while invoking super class method";
AppsmithException exception = getInvalidAnnotationUsageException(method, errorMessage);
log.error(exception.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
Mono<Void> registrationAndRtsCheckMono = configService
.getByName(Appsmith.APPSMITH_REGISTERED)
.filter(config -> TRUE.equals(config.getConfig().get("value")))
.switchIfEmpty(Mono.defer(() -> instanceConfigHelper.registerInstance()))
.switchIfEmpty(Mono.defer(instanceConfigHelper::registerInstance))
.onErrorResume(errorSignal -> {
log.debug("Instance registration failed with error: \n{}", errorSignal.getMessage());
return Mono.empty();
Expand All @@ -64,16 +64,9 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
// TODO Update implementation to fetch license status for all the organizations once multi-tenancy is
// introduced
.then(Mono.defer(instanceConfigHelper::isLicenseValid)
// Ensure that the organization feature flags are refreshed with the latest values after
// completing
// the
// Ensure that the org feature flags are refreshed with the latest values after completing the
// license verification process.
.flatMap(isValid -> {
log.debug(
"License verification completed with status: {}",
TRUE.equals(isValid) ? "valid" : "invalid");
return instanceConfigHelper.updateCacheForOrganizationFeatureFlags();
}));
.flatMap(isValid -> instanceConfigHelper.updateCacheForOrganizationFeatureFlags()));

try {
startupProcess.block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import java.io.Serializable;

import static com.appsmith.external.helpers.StringUtils.dotted;

@Getter
@Setter
@ToString
Expand All @@ -35,4 +37,9 @@ public class Organization extends BaseDomain implements Serializable {
OrganizationConfiguration organizationConfiguration;

// TODO add SSO and other configurations here after migrating from environment variables to database configuration

public static class Fields {
public static final String organizationConfiguration_isRestartRequired =
dotted(organizationConfiguration, OrganizationConfiguration.Fields.isRestartRequired);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import com.appsmith.server.domains.ce.OrganizationConfigurationCE;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.FieldNameConstants;

@Data
@EqualsAndHashCode(callSuper = true)
public class OrganizationConfiguration extends OrganizationConfigurationCE {}
@FieldNameConstants
public class OrganizationConfiguration extends OrganizationConfigurationCE {
public static class Fields extends OrganizationConfigurationCE.Fields {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.appsmith.server.domains.OrganizationConfiguration;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import lombok.experimental.FieldNameConstants;
import org.apache.commons.lang3.ObjectUtils;

import java.io.Serializable;
Expand All @@ -16,6 +17,7 @@
import java.util.Map;

@Data
@FieldNameConstants
public class OrganizationConfigurationCE implements Serializable {

private String googleMapsKey;
Expand Down Expand Up @@ -88,4 +90,6 @@ public void copyNonSensitiveValues(OrganizationConfiguration organizationConfigu
public Boolean isEmailVerificationEnabled() {
return Boolean.TRUE.equals(this.emailVerificationEnabled);
}

public static class Fields {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.appsmith.server.services.AnalyticsService;
import com.appsmith.server.services.ConfigService;
import com.appsmith.server.services.FeatureFlagService;
import com.appsmith.server.services.OrganizationService;
import com.appsmith.server.solutions.ReleaseNotesService;
import org.springframework.context.ApplicationContext;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
Expand All @@ -24,7 +25,8 @@ public InstanceConfigHelperImpl(
AnalyticsService analyticsService,
NetworkUtils networkUtils,
ReleaseNotesService releaseNotesService,
RTSCaller rtsCaller) {
RTSCaller rtsCaller,
OrganizationService organizationService) {
super(
configService,
cloudServicesConfig,
Expand All @@ -35,6 +37,7 @@ public InstanceConfigHelperImpl(
analyticsService,
networkUtils,
releaseNotesService,
rtsCaller);
rtsCaller,
organizationService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.appsmith.server.services.AnalyticsService;
import com.appsmith.server.services.ConfigService;
import com.appsmith.server.services.FeatureFlagService;
import com.appsmith.server.services.OrganizationService;
import com.appsmith.server.solutions.ReleaseNotesService;
import com.appsmith.util.WebClientUtils;
import joptsimple.internal.Strings;
Expand Down Expand Up @@ -54,8 +55,8 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE {
private final AnalyticsService analyticsService;
private final NetworkUtils networkUtils;
private final ReleaseNotesService releaseNotesService;

private final RTSCaller rtsCaller;
private final OrganizationService organizationService;

private boolean isRtsAccessible = false;

Expand Down Expand Up @@ -225,8 +226,25 @@ public Mono<String> checkMongoDBVersion() {
});
}

/**
* Method to trigger update for the organization feature flags. This method is called during the startup of
* the application. It's required at the startup to ensure that the feature flags are up-to-date which will then be
* consumed by {@link com.appsmith.server.aspect.FeatureFlaggedMethodInvokerAspect} in a non-reactive manner.
* In case the user tries to fetch the feature flags before the cache is updated, the aspect will fallback to the
* earlier cached data i.e. disabled state.
* @return Empty Mono
*/
@Override
public Mono<Void> updateCacheForOrganizationFeatureFlags() {
return featureFlagService.getOrganizationFeatures().then();
organizationService
.retrieveAll()
.flatMap(org -> featureFlagService.getOrganizationFeatures(org.getId()))
.onErrorResume(error -> {
log.error("Error while updating cache for org feature flags", error);
return Mono.empty();
})
.subscribeOn(LoadShifter.elasticScheduler)
.subscribe();
return Mono.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class JsonSchemaVersions extends JsonSchemaVersionsFallback {

/**
* Only tenant level flags would work over here.
* Only org level flags would work over here.
* @return an Integer which is server version
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.repository.NoRepositoryBean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.Serializable;
Expand Down Expand Up @@ -34,4 +35,11 @@ public interface BaseRepository<T, ID extends Serializable> extends ReactiveMong
* @return
*/
Mono<Boolean> archiveAllById(Collection<ID> ids);

/**
* This function retrieves all the documents in the collection without the user context. This method will be used
* by internal server workflows like crons etc. to fetch all the documents from the collection.
* @return
*/
Flux<T> retrieveAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public Flux<T> findAll() {
});
}

@Override
public Flux<T> retrieveAll() {
Query query = new Query(notDeleted());
return mongoOperations.find(
query.cursorBatchSize(10000), entityInformation.getJavaType(), entityInformation.getCollectionName());
}

/**
* We don't use this today, it doesn't use our `notDeleted` criteria, and since we don't use it, we're not porting
* it to Postgres. Querying with `queryBuilder` or anything else is arguably more readable than this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@

import com.appsmith.server.domains.Organization;
import com.appsmith.server.repositories.AppsmithRepository;
import reactor.core.publisher.Mono;

public interface CustomOrganizationRepositoryCE extends AppsmithRepository<Organization> {}
public interface CustomOrganizationRepositoryCE extends AppsmithRepository<Organization> {
Mono<Integer> disableRestartForAllTenants();
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
package com.appsmith.server.repositories.ce;

import com.appsmith.server.domains.Organization;
import com.appsmith.server.helpers.ce.bridge.Bridge;
import com.appsmith.server.repositories.BaseAppsmithRepositoryImpl;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

import static com.appsmith.server.domains.Organization.Fields.organizationConfiguration_isRestartRequired;

@Slf4j
public class CustomOrganizationRepositoryCEImpl extends BaseAppsmithRepositoryImpl<Organization>
implements CustomOrganizationRepositoryCE {}
implements CustomOrganizationRepositoryCE {

@Override
public Mono<Integer> disableRestartForAllTenants() {
log.info("Disabling restart for all tenants");
return queryBuilder()
.criteria(Bridge.isTrue(organizationConfiguration_isRestartRequired))
.updateAll(Bridge.update().set(organizationConfiguration_isRestartRequired, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ public interface FeatureFlagServiceCE {
* To get all features of the organization from Cloud Services and store them locally
* @return Mono of Void
*/
Mono<Void> getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations();
Mono<Organization> getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(
Organization organization);

/**
* To get all features of the current organization.
* @return Mono of Map
*/
Mono<Map<String, Boolean>> getOrganizationFeatures();

Mono<Map<String, Boolean>> getOrganizationFeatures(String orgId);

Mono<Organization> checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization);

CachedFeatures getCachedOrganizationFeatureFlags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.appsmith.server.services.OrganizationService;
import com.appsmith.server.services.SessionUserService;
import com.appsmith.server.services.UserIdentifierService;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
Expand All @@ -38,6 +39,8 @@ public class FeatureFlagServiceCEImpl implements FeatureFlagServiceCE {
private final FeatureFlagMigrationHelper featureFlagMigrationHelper;
private static final long FEATURE_FLAG_CACHE_TIME_MIN = 120;

// TODO remove once all the helper methods consuming @FeatureFlagged are converted to reactive
@Getter
private CachedFeatures cachedOrganizationFeatureFlags;

/**
Expand All @@ -59,7 +62,7 @@ public Mono<Boolean> check(FeatureFlagEnum featureEnum) {

/**
* Retrieves a map of feature flags along with their corresponding boolean values for the current user.
* This takes into account for both user-level and organization-level feature flags
* This takes into account for both user-level and organization-level feature flags.
*
* @return A Mono emitting a Map where keys are feature names and values are corresponding boolean flags.
*/
Expand Down Expand Up @@ -116,52 +119,66 @@ private Mono<Map<String, Boolean>> getAllRemoteFeatureFlagsForUser() {

/**
* To get all features of the organization from Cloud Services and store them locally
* @return Mono of Void
* @return Mono updated org
*/
public Mono<Void> getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations() {
return organizationService
.getDefaultOrganization()
.flatMap(defaultOrganization ->
// 1. Fetch current/saved feature flags from cache
// 2. Force update the org flags keeping existing flags as fallback in case the API
// call to fetch the flags fails for some reason
// 3. Get the diff and update the flags with pending migrations to be used to run
// migrations selectively
featureFlagMigrationHelper
.getUpdatedFlagsWithPendingMigration(defaultOrganization)
.flatMap(featureFlagWithPendingMigrations -> {
OrganizationConfiguration organizationConfiguration =
defaultOrganization.getOrganizationConfiguration() == null
? new OrganizationConfiguration()
: defaultOrganization.getOrganizationConfiguration();
// We expect the featureFlagWithPendingMigrations to be empty hence
// verifying only for null
if (featureFlagWithPendingMigrations != null
&& !featureFlagWithPendingMigrations.equals(
organizationConfiguration.getFeaturesWithPendingMigration())) {
organizationConfiguration.setFeaturesWithPendingMigration(
featureFlagWithPendingMigrations);
if (!featureFlagWithPendingMigrations.isEmpty()) {
organizationConfiguration.setMigrationStatus(MigrationStatus.PENDING);
} else {
organizationConfiguration.setMigrationStatus(MigrationStatus.COMPLETED);
}
return organizationService.update(
defaultOrganization.getId(), defaultOrganization);
}
return Mono.just(defaultOrganization);
}))
.then();
@Override
public Mono<Organization> getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(
Organization organization) {
// 1. Fetch current/saved feature flags from cache
// 2. Force update the org flags keeping existing flags as fallback in case the API
// call to fetch the flags fails for some reason
// 3. Get the diff and update the flags with pending migrations to be used to run
// migrations selectively
return featureFlagMigrationHelper
.getUpdatedFlagsWithPendingMigration(organization)
.flatMap(featureFlagWithPendingMigrations -> {
OrganizationConfiguration organizationConfiguration =
organization.getOrganizationConfiguration() == null
? new OrganizationConfiguration()
: organization.getOrganizationConfiguration();
// We expect the featureFlagWithPendingMigrations to be empty hence
// verifying only for null
if (featureFlagWithPendingMigrations != null
&& !featureFlagWithPendingMigrations.equals(
organizationConfiguration.getFeaturesWithPendingMigration())) {
organizationConfiguration.setFeaturesWithPendingMigration(featureFlagWithPendingMigrations);
if (!featureFlagWithPendingMigrations.isEmpty()) {
organizationConfiguration.setMigrationStatus(MigrationStatus.PENDING);
} else {
organizationConfiguration.setMigrationStatus(MigrationStatus.COMPLETED);
}
return organizationService.update(organization.getId(), organization);
}
return Mono.just(organization);
});
}

/**
* To get all features of the current organization.
* @return Mono of Map
*/
@Override
public Mono<Map<String, Boolean>> getOrganizationFeatures() {
return organizationService
.getDefaultOrganizationId()
.flatMap(cacheableFeatureFlagHelper::fetchCachedOrganizationFeatures)
return sessionUserService
.getCurrentUser()
.map(User::getOrganizationId)
.switchIfEmpty(Mono.defer(() -> {
log.error(
"No user found while fetching organization features, if the method is called without user "
+ "context please use getOrganizationFeatures(String organizationId)");
// TODO @CloudBilling - This is a temporary fix to fallback to default organization until we
// introduce a signup flow based on organization. Currently userSignup will end up in data
// corruption if the fallback is not provided to create default workspace in EE as this is
// controlled via flags, please refer WorkspaceServiceHelperImpl.isCreateWorkspaceAllowed.
return organizationService.getDefaultOrganizationId();
}))
.flatMap(this::getOrganizationFeatures);
}

@Override
public Mono<Map<String, Boolean>> getOrganizationFeatures(String organizationId) {
return cacheableFeatureFlagHelper
.fetchCachedOrganizationFeatures(organizationId)
.map(cachedFeatures -> {
cachedOrganizationFeatureFlags = cachedFeatures;
return cachedFeatures.getFeatures();
Expand All @@ -178,8 +195,4 @@ public Mono<Map<String, Boolean>> getOrganizationFeatures() {
public Mono<Organization> checkAndExecuteMigrationsForOrganizationFeatureFlags(Organization organization) {
return organizationService.checkAndExecuteMigrationsForOrganizationFeatureFlags(organization);
}

public CachedFeatures getCachedOrganizationFeatureFlags() {
return this.cachedOrganizationFeatureFlags;
}
}
Loading