From c93727ae3f9d7a11f322bffddddc1ec98d02764d Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 Dec 2023 19:05:08 +0530 Subject: [PATCH] Format Error on Indexing Application, and in schedule turn off recreate --- .../bundles/searchIndex/SearchIndexApp.java | 89 ++++++++----------- .../searchIndex/PaginatedEntitiesSource.java | 21 ++--- 2 files changed, 46 insertions(+), 64 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index bd78b640ef85..b7693514ab47 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -22,6 +22,7 @@ import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.AppRunType; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; @@ -111,6 +112,16 @@ public void startApp(JobExecutionContext jobExecutionContext) { LOG.info("Executing Reindexing Job with JobData : {}", jobData); // Update Job Status jobData.setStatus(EventPublisherJob.Status.RUNNING); + + // Make recreate as false for onDemand + AppRunType runType = + AppRunType.fromValue((String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType")); + + // Schedule Run has recreate as false always + if (runType.equals(AppRunType.Scheduled)) { + jobData.setRecreateIndex(false); + } + // Run ReIndexing entitiesReIndex(); dataInsightReindex(); @@ -123,7 +134,7 @@ public void startApp(JobExecutionContext jobExecutionContext) { jobData.toString(), ExceptionUtils.getStackTrace(ex)); LOG.error(error); jobData.setStatus(EventPublisherJob.Status.FAILED); - handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis()); + handleJobError(error, System.currentTimeMillis()); } finally { // store job details in Database jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); @@ -185,30 +196,15 @@ private void entitiesReIndex() { } } catch (SourceException rx) { handleSourceError( - rx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - rx.getCause(), - ExceptionUtils.getStackTrace(rx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ""), currentTime); } catch (ProcessorException px) { handleProcessorError( - px.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - px.getCause(), - ExceptionUtils.getStackTrace(px)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ""), currentTime); } catch (SinkException wx) { handleEsSinkError( - wx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - wx.getCause(), - ExceptionUtils.getStackTrace(wx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ""), currentTime); } } @@ -246,30 +242,15 @@ private void dataInsightReindex() { } } catch (SourceException rx) { handleSourceError( - rx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - rx.getCause(), - ExceptionUtils.getStackTrace(rx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ""), currentTime); } catch (ProcessorException px) { handleProcessorError( - px.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - px.getCause(), - ExceptionUtils.getStackTrace(px)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ""), currentTime); } catch (SinkException wx) { handleEsSinkError( - wx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - wx.getCause(), - ExceptionUtils.getStackTrace(wx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ""), currentTime); } } @@ -335,29 +316,28 @@ private void handleErrorsEs(ResultList data, String lastCursor, BulkResponse handleEsSinkErrors(response, time); } - private void handleSourceError(String context, String reason, long time) { - handleError("source", context, reason, time); + private void handleSourceError(String reason, long time) { + handleError("source", reason, time); } - private void handleProcessorError(String context, String reason, long time) { - handleError("processor", context, reason, time); + private void handleProcessorError(String reason, long time) { + handleError("processor", reason, time); } - private void handleError(String errType, String context, String reason, long time) { + private void handleError(String errType, String reason, long time) { Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure(); failures.withAdditionalProperty("errorFrom", errType); - failures.withAdditionalProperty("context", context); failures.withAdditionalProperty("lastFailedReason", reason); failures.withAdditionalProperty("lastFailedAt", time); jobData.setFailure(failures); } - private void handleEsSinkError(String context, String reason, long time) { - handleError("sink", context, reason, time); + private void handleEsSinkError(String reason, long time) { + handleError("sink", reason, time); } - private void handleJobError(String context, String reason, long time) { - handleError("job", context, reason, time); + private void handleJobError(String reason, long time) { + handleError("job", reason, time); } @SneakyThrows @@ -369,8 +349,9 @@ private void handleSourceError(ResultList data, String lastCursor, long time) builder.append("%n"); } handleSourceError( - String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), - String.format("Following Entities were not fetched Successfully : %s", builder), + String.format( + "SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s", + lastCursor, builder), time); } } @@ -397,15 +378,15 @@ private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse respo } if (!details.isEmpty()) { handleEsSinkError( - "[EsWriter] BulkResponseItems", - String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), + String.format( + "[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ", + JsonUtils.pojoToJson(details, true)), time); } } @SneakyThrows private void handleEsSinkErrors(BulkResponse response, long time) { - List> details = new ArrayList<>(); for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { @@ -426,8 +407,8 @@ private void handleEsSinkErrors(BulkResponse response, long time) { } if (!details.isEmpty()) { handleEsSinkError( - "[EsWriter] BulkResponseItems", - String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), + String.format( + "[EsWriter][BulkItemResponse] Got Following Error Responses: %s ", JsonUtils.pojoToJson(details, true)), time); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 5df6b13124bb..5e6a8de5014b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -27,7 +27,6 @@ import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; -import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @@ -76,16 +75,18 @@ private ResultList read(String cursor) throws SourceE null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor); if (!result.getErrors().isEmpty()) { lastFailedCursor = this.cursor; - String errMsg = + StringBuilder errMsg = new StringBuilder(); + errMsg.append( String.format( - "[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %s", - this.lastFailedCursor, - batchSize, - result.getData().size(), - result.getErrors().size(), - JsonUtils.pojoToJson(result.getErrors())); - LOG.error(errMsg); - throw new SourceException(errMsg); + "[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %n", + this.lastFailedCursor, batchSize, result.getData().size(), result.getErrors().size())); + for (int i = 0; i < result.getErrors().size(); i++) { + errMsg.append(String.format("%s. EntityError :- %s", i, result.getErrors().get(i))); + errMsg.append("%n"); + } + String error = errMsg.toString(); + LOG.error(error); + throw new SourceException(error); } LOG.debug(