Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Fixes Poll Interval for App Jobs, and fixed Search Indexing Stats #15453

Merged
merged 9 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.service.exception.UnhandledServerException;
Expand All @@ -12,12 +11,6 @@
@Slf4j
public class ApplicationHandler {

private static HashMap<String, Object> instances = new HashMap<>();

public static Object getAppInstance(String className) {
return instances.get(className);
}

private ApplicationHandler() {
/*Helper*/
}
Expand Down Expand Up @@ -53,8 +46,6 @@ public static Object runAppInit(
Method initMethod = resource.getClass().getMethod("init", App.class);
initMethod.invoke(resource, app);

instances.put(app.getClassName(), resource);

return resource;
}

Expand All @@ -63,11 +54,7 @@ public static void runMethodFromApplication(
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
// Native Application
try {
Object resource = getAppInstance(app.getClassName());
if (resource == null) {
resource = runAppInit(app, daoCollection, searchRepository);
}

Object resource = runAppInit(app, daoCollection, searchRepository);
// Call method on demand
Method scheduleMethod = resource.getClass().getMethod(methodName);
scheduleMethod.invoke(resource);
Expand All @@ -77,13 +64,9 @@ public static void runMethodFromApplication(
| IllegalAccessException
| InvocationTargetException e) {
LOG.error("Exception encountered", e);
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohityadav766 lets not throw the message here, we should provide clear api response

} catch (ClassNotFoundException e) {
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
}
}

public static void removeUninstalledApp(String className) {
instances.remove(className);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.openmetadata.service.apps.bundles.searchIndex;

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;

import java.util.ArrayList;
Expand Down Expand Up @@ -166,8 +166,8 @@ public void startApp(JobExecutionContext jobExecutionContext) {
}

// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
entitiesReIndex(jobExecutionContext);
dataInsightReindex(jobExecutionContext);
// Mark Job as Completed
updateJobStatus();
} catch (Exception ex) {
Expand All @@ -182,12 +182,8 @@ public void startApp(JobExecutionContext jobExecutionContext) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
// Send update
sendUpdates();
sendUpdates(jobExecutionContext);
}
}

Expand All @@ -212,7 +208,7 @@ public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
}

private void entitiesReIndex() {
private void entitiesReIndex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
reCreateIndexes(paginatedEntitiesSource.getEntityType());
Expand All @@ -222,18 +218,32 @@ private void entitiesReIndex() {
try {
resultList = paginatedEntitiesSource.readNext(null);
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedEntitiesSource.getLastFailedCursor())
.withSubmittedCount(paginatedEntitiesSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
}
} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(rx.getIndexingError());
} finally {
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates();
}
}

private void dataInsightReindex() {
private void dataInsightReindex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) {
reCreateIndexes(paginatedDataInsightSource.getEntityType());
Expand All @@ -247,17 +257,23 @@ private void dataInsightReindex() {
dataInsightProcessor.process(resultList, contextData), contextData);
}
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates();
}
}

private void sendUpdates() {
private void sendUpdates(JobExecutionContext jobExecutionContext) {
try {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(
Expand Down Expand Up @@ -286,8 +302,17 @@ public void updateStats(String entityType, StepStats currentEntityStats) {
new StepStats()
.withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO));
}
getUpdatedStats(
stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords());

stats.setSuccessRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getSuccessRecords)
.sum());
stats.setFailedRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getFailedRecords)
.sum());

// Update for the Job
jobDataStats.setJobStats(stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public void pushApplicationStatusUpdates(
JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
// Update the Run Record in Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));

// Push Updates to the Database
App jobApp =
JsonUtils.readOrConvertValue(
context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,13 @@ public static AppScheduler getInstance() {

public void addApplicationSchedule(App application) {
try {
if (scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP))
!= null) {
if (scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)) != null) {
LOG.info("Job already exists for the application, skipping the scheduling");
return;
}
AppRuntime context = getAppRuntime(application);
if (Boolean.TRUE.equals(context.getEnabled())) {
JobDetail jobDetail = jobBuilder(application, application.getId().toString());
JobDetail jobDetail = jobBuilder(application, application.getName());
Trigger trigger = trigger(application);
scheduler.scheduleJob(jobDetail, trigger);
} else {
Expand All @@ -155,8 +154,18 @@ public void addApplicationSchedule(App application) {
}

public void deleteScheduledApplication(App app) throws SchedulerException {
scheduler.deleteJob(new JobKey(app.getId().toString(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getId().toString(), APPS_TRIGGER_GROUP));
// Scheduled Jobs
scheduler.deleteJob(new JobKey(app.getName(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getName(), APPS_TRIGGER_GROUP));

// OnDemand Jobs
scheduler.deleteJob(
new JobKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP));
scheduler.unscheduleJob(
new TriggerKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP));
}

private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
Expand All @@ -175,7 +184,7 @@ private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundEx

private Trigger trigger(App app) {
return TriggerBuilder.newTrigger()
.withIdentity(app.getId().toString(), APPS_TRIGGER_GROUP)
.withIdentity(app.getName(), APPS_TRIGGER_GROUP)
.withSchedule(getCronSchedule(app.getAppSchedule()))
.build();
}
Expand Down Expand Up @@ -210,11 +219,11 @@ public static CronScheduleBuilder getCronSchedule(AppSchedule scheduleInfo) {
public void triggerOnDemandApplication(App application) {
try {
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP));
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_JOB_GROUP));
// Check if the job is already running
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
Expand All @@ -233,12 +242,12 @@ public void triggerOnDemandApplication(App application) {
JobDetail newJobDetail =
jobBuilder(
application,
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()));
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()));
newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value());
Trigger trigger =
TriggerBuilder.newTrigger()
.withIdentity(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP)
.startNow()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private JobDetail jobBuilder(
private Trigger trigger(EventSubscription eventSubscription) {
return TriggerBuilder.newTrigger()
.withIdentity(eventSubscription.getId().toString(), ALERT_TRIGGER_GROUP)
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForever(1))
.withSchedule(
SimpleScheduleBuilder.repeatSecondlyForever(eventSubscription.getPollInterval()))
.startNow()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ List<String> listWithoutEntityFilter(
@Bind("eventType") String eventType, @Bind("timestamp") long timestamp);

@SqlQuery(
"SELECT json FROM change_event where offset > :offset ORDER BY eventTime ASC LIMIT :limit")
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit")
List<String> list(@Bind("limit") long limit, @Bind("offset") long offset);

@SqlQuery("SELECT count(*) FROM change_event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.openmetadata.service.Entity.APPLICATION;
import static org.openmetadata.service.Entity.BOT;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.apps.ApplicationHandler.removeUninstalledApp;
import static org.openmetadata.service.jdbi3.EntityRepository.getEntitiesFromSeedData;

import io.swagger.v3.oas.annotations.ExternalDocumentation;
Expand Down Expand Up @@ -1031,8 +1030,5 @@ private void deleteApp(SecurityContext securityContext, App installedApp, boolea
}
}
}

// Remove App from instances Map Lookup
removeUninstalledApp(installedApp.getClassName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ResultList<? extends EntityInterface> readNext(Map<String, Object> contex
private ResultList<? extends EntityInterface> read(String cursor) throws SearchIndexException {
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result = null;
ResultList<? extends EntityInterface> result;
try {
result =
entityRepository.listAfterWithSkipFailure(
Expand All @@ -83,31 +83,19 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SearchI
cursor);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(lastFailedCursor)
.withSubmittedCount(batchSize)
.withSuccessCount(result.getData().size())
.withFailedCount(result.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(result.getErrors()));
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
return result;
}

LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
updateStats(result.getData().size(), result.getErrors().size());
} catch (SearchIndexException ex) {
lastFailedCursor = this.cursor;
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
throw ex;
} catch (Exception e) {
lastFailedCursor = this.cursor;
IndexingError indexingError =
Expand Down
Loading
Loading