Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format Error on Indexing Application, and in schedule turn off recreate #14225

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
Expand Down Expand Up @@ -111,6 +112,16 @@ public void startApp(JobExecutionContext jobExecutionContext) {
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);

// Make recreate as false for onDemand
AppRunType runType =
AppRunType.fromValue((String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"));

// Schedule Run has recreate as false always
if (runType.equals(AppRunType.Scheduled)) {
jobData.setRecreateIndex(false);
}

// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
Expand All @@ -123,7 +134,7 @@ public void startApp(JobExecutionContext jobExecutionContext) {
jobData.toString(), ExceptionUtils.getStackTrace(ex));
LOG.error(error);
jobData.setStatus(EventPublisherJob.Status.FAILED);
handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis());
handleJobError(error, System.currentTimeMillis());
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
Expand Down Expand Up @@ -185,30 +196,15 @@ private void entitiesReIndex() {
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ""),
currentTime);
}
}
Expand Down Expand Up @@ -246,30 +242,15 @@ private void dataInsightReindex() {
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ""),
currentTime);
}
}
Expand Down Expand Up @@ -335,29 +316,28 @@ private void handleErrorsEs(ResultList<?> data, String lastCursor, BulkResponse
handleEsSinkErrors(response, time);
}

private void handleSourceError(String context, String reason, long time) {
handleError("source", context, reason, time);
private void handleSourceError(String reason, long time) {
handleError("source", reason, time);
}

private void handleProcessorError(String context, String reason, long time) {
handleError("processor", context, reason, time);
private void handleProcessorError(String reason, long time) {
handleError("processor", reason, time);
}

private void handleError(String errType, String context, String reason, long time) {
private void handleError(String errType, String reason, long time) {
Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure();
failures.withAdditionalProperty("errorFrom", errType);
failures.withAdditionalProperty("context", context);
failures.withAdditionalProperty("lastFailedReason", reason);
failures.withAdditionalProperty("lastFailedAt", time);
jobData.setFailure(failures);
}

private void handleEsSinkError(String context, String reason, long time) {
handleError("sink", context, reason, time);
private void handleEsSinkError(String reason, long time) {
handleError("sink", reason, time);
}

private void handleJobError(String context, String reason, long time) {
handleError("job", context, reason, time);
private void handleJobError(String reason, long time) {
handleError("job", reason, time);
}

@SneakyThrows
Expand All @@ -369,8 +349,9 @@ private void handleSourceError(ResultList<?> data, String lastCursor, long time)
builder.append("%n");
}
handleSourceError(
String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor),
String.format("Following Entities were not fetched Successfully : %s", builder),
String.format(
"SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s",
lastCursor, builder),
time);
}
}
Expand All @@ -397,15 +378,15 @@ private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse respo
}
if (!details.isEmpty()) {
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)),
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ",
JsonUtils.pojoToJson(details, true)),
time);
}
}

@SneakyThrows
private void handleEsSinkErrors(BulkResponse response, long time) {

List<Map<String, Object>> details = new ArrayList<>();
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
Expand All @@ -426,8 +407,8 @@ private void handleEsSinkErrors(BulkResponse response, long time) {
}
if (!details.isEmpty()) {
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)),
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %s ", JsonUtils.pojoToJson(details, true)),
time);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
Expand Down Expand Up @@ -76,16 +75,18 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SourceE
null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
String errMsg =
StringBuilder errMsg = new StringBuilder();
errMsg.append(
String.format(
"[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %s",
this.lastFailedCursor,
batchSize,
result.getData().size(),
result.getErrors().size(),
JsonUtils.pojoToJson(result.getErrors()));
LOG.error(errMsg);
throw new SourceException(errMsg);
"[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %n",
this.lastFailedCursor, batchSize, result.getData().size(), result.getErrors().size()));
for (int i = 0; i < result.getErrors().size(); i++) {
errMsg.append(String.format("%s. EntityError :- %s", i, result.getErrors().get(i)));
errMsg.append("%n");
}
String error = errMsg.toString();
LOG.error(error);
throw new SourceException(error);
}

LOG.debug(
Expand Down