Skip to content

Commit

Permalink
- Fix Poll Interval (#15453)
Browse files Browse the repository at this point in the history
* - Fix Poll Interval

* make poll interval as 1

* - Add random tests
- Disable Tests for Subscription for Query

* - Checkstyle Fix

* - Remove ondemand jobs as well
- On update remove already init instance

* - use alias

* - typo

* - Search Index Fix
  • Loading branch information
mohityadav766 authored Mar 5, 2024
1 parent dd05b67 commit 9491e04
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.service.exception.UnhandledServerException;
Expand All @@ -12,12 +11,6 @@
@Slf4j
public class ApplicationHandler {

private static HashMap<String, Object> instances = new HashMap<>();

public static Object getAppInstance(String className) {
return instances.get(className);
}

private ApplicationHandler() {
/*Helper*/
}
Expand Down Expand Up @@ -53,8 +46,6 @@ public static Object runAppInit(
Method initMethod = resource.getClass().getMethod("init", App.class);
initMethod.invoke(resource, app);

instances.put(app.getClassName(), resource);

return resource;
}

Expand All @@ -63,11 +54,7 @@ public static void runMethodFromApplication(
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
// Native Application
try {
Object resource = getAppInstance(app.getClassName());
if (resource == null) {
resource = runAppInit(app, daoCollection, searchRepository);
}

Object resource = runAppInit(app, daoCollection, searchRepository);
// Call method on demand
Method scheduleMethod = resource.getClass().getMethod(methodName);
scheduleMethod.invoke(resource);
Expand All @@ -77,13 +64,9 @@ public static void runMethodFromApplication(
| IllegalAccessException
| InvocationTargetException e) {
LOG.error("Exception encountered", e);
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
} catch (ClassNotFoundException e) {
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
}
}

public static void removeUninstalledApp(String className) {
instances.remove(className);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.openmetadata.service.apps.bundles.searchIndex;

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;

import java.util.ArrayList;
Expand Down Expand Up @@ -166,8 +166,8 @@ public void startApp(JobExecutionContext jobExecutionContext) {
}

// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
entitiesReIndex(jobExecutionContext);
dataInsightReindex(jobExecutionContext);
// Mark Job as Completed
updateJobStatus();
} catch (Exception ex) {
Expand All @@ -182,12 +182,8 @@ public void startApp(JobExecutionContext jobExecutionContext) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
// Send update
sendUpdates();
sendUpdates(jobExecutionContext);
}
}

Expand All @@ -212,7 +208,7 @@ public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
}

private void entitiesReIndex() {
private void entitiesReIndex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
reCreateIndexes(paginatedEntitiesSource.getEntityType());
Expand All @@ -222,18 +218,32 @@ private void entitiesReIndex() {
try {
resultList = paginatedEntitiesSource.readNext(null);
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedEntitiesSource.getLastFailedCursor())
.withSubmittedCount(paginatedEntitiesSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
}
} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(rx.getIndexingError());
} finally {
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates();
}
}

private void dataInsightReindex() {
private void dataInsightReindex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) {
reCreateIndexes(paginatedDataInsightSource.getEntityType());
Expand All @@ -247,17 +257,23 @@ private void dataInsightReindex() {
dataInsightProcessor.process(resultList, contextData), contextData);
}
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates();
}
}

private void sendUpdates() {
private void sendUpdates(JobExecutionContext jobExecutionContext) {
try {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(
Expand Down Expand Up @@ -286,8 +302,17 @@ public void updateStats(String entityType, StepStats currentEntityStats) {
new StepStats()
.withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO));
}
getUpdatedStats(
stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords());

stats.setSuccessRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getSuccessRecords)
.sum());
stats.setFailedRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getFailedRecords)
.sum());

// Update for the Job
jobDataStats.setJobStats(stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public void pushApplicationStatusUpdates(
JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
// Update the Run Record in Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));

// Push Updates to the Database
App jobApp =
JsonUtils.readOrConvertValue(
context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,13 @@ public static AppScheduler getInstance() {

public void addApplicationSchedule(App application) {
try {
if (scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP))
!= null) {
if (scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)) != null) {
LOG.info("Job already exists for the application, skipping the scheduling");
return;
}
AppRuntime context = getAppRuntime(application);
if (Boolean.TRUE.equals(context.getEnabled())) {
JobDetail jobDetail = jobBuilder(application, application.getId().toString());
JobDetail jobDetail = jobBuilder(application, application.getName());
Trigger trigger = trigger(application);
scheduler.scheduleJob(jobDetail, trigger);
} else {
Expand All @@ -155,8 +154,18 @@ public void addApplicationSchedule(App application) {
}

public void deleteScheduledApplication(App app) throws SchedulerException {
scheduler.deleteJob(new JobKey(app.getId().toString(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getId().toString(), APPS_TRIGGER_GROUP));
// Scheduled Jobs
scheduler.deleteJob(new JobKey(app.getName(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getName(), APPS_TRIGGER_GROUP));

// OnDemand Jobs
scheduler.deleteJob(
new JobKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP));
scheduler.unscheduleJob(
new TriggerKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP));
}

private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
Expand All @@ -175,7 +184,7 @@ private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundEx

private Trigger trigger(App app) {
return TriggerBuilder.newTrigger()
.withIdentity(app.getId().toString(), APPS_TRIGGER_GROUP)
.withIdentity(app.getName(), APPS_TRIGGER_GROUP)
.withSchedule(getCronSchedule(app.getAppSchedule()))
.build();
}
Expand Down Expand Up @@ -210,11 +219,11 @@ public static CronScheduleBuilder getCronSchedule(AppSchedule scheduleInfo) {
public void triggerOnDemandApplication(App application) {
try {
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP));
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_JOB_GROUP));
// Check if the job is already running
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
Expand All @@ -233,12 +242,12 @@ public void triggerOnDemandApplication(App application) {
JobDetail newJobDetail =
jobBuilder(
application,
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()));
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()));
newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value());
Trigger trigger =
TriggerBuilder.newTrigger()
.withIdentity(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP)
.startNow()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private JobDetail jobBuilder(
private Trigger trigger(EventSubscription eventSubscription) {
return TriggerBuilder.newTrigger()
.withIdentity(eventSubscription.getId().toString(), ALERT_TRIGGER_GROUP)
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForever(1))
.withSchedule(
SimpleScheduleBuilder.repeatSecondlyForever(eventSubscription.getPollInterval()))
.startNow()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ List<String> listWithoutEntityFilter(
@Bind("eventType") String eventType, @Bind("timestamp") long timestamp);

@SqlQuery(
"SELECT json FROM change_event where offset > :offset ORDER BY eventTime ASC LIMIT :limit")
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit")
List<String> list(@Bind("limit") long limit, @Bind("offset") long offset);

@SqlQuery("SELECT count(*) FROM change_event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.openmetadata.service.Entity.APPLICATION;
import static org.openmetadata.service.Entity.BOT;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.apps.ApplicationHandler.removeUninstalledApp;
import static org.openmetadata.service.jdbi3.EntityRepository.getEntitiesFromSeedData;

import io.swagger.v3.oas.annotations.ExternalDocumentation;
Expand Down Expand Up @@ -1031,8 +1030,5 @@ private void deleteApp(SecurityContext securityContext, App installedApp, boolea
}
}
}

// Remove App from instances Map Lookup
removeUninstalledApp(installedApp.getClassName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ResultList<? extends EntityInterface> readNext(Map<String, Object> contex
private ResultList<? extends EntityInterface> read(String cursor) throws SearchIndexException {
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result = null;
ResultList<? extends EntityInterface> result;
try {
result =
entityRepository.listAfterWithSkipFailure(
Expand All @@ -83,31 +83,19 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SearchI
cursor);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(lastFailedCursor)
.withSubmittedCount(batchSize)
.withSuccessCount(result.getData().size())
.withFailedCount(result.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(result.getErrors()));
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
return result;
}

LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
updateStats(result.getData().size(), result.getErrors().size());
} catch (SearchIndexException ex) {
lastFailedCursor = this.cursor;
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
throw ex;
} catch (Exception e) {
lastFailedCursor = this.cursor;
IndexingError indexingError =
Expand Down
Loading

0 comments on commit 9491e04

Please sign in to comment.