diff --git a/build-tools-internal/src/main/groovy/elasticsearch.build-scan.gradle b/build-tools-internal/src/main/groovy/elasticsearch.build-scan.gradle index 797dc8bd0641b..8702f5a9bf0e9 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.build-scan.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.build-scan.gradle @@ -32,7 +32,9 @@ develocity { // Automatically publish scans from Elasticsearch CI if (onCI) { publishing.onlyIf { true } - server = 'https://gradle-enterprise.elastic.co' + if(server.isPresent() == false) { + server = 'https://gradle-enterprise.elastic.co' + } } else if( server.isPresent() == false) { publishing.onlyIf { false } } diff --git a/muted-tests.yml b/muted-tests.yml index 3f4b11bf76020..2c4fc4d421cc6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -389,8 +389,45 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/122377 - class: org.elasticsearch.repositories.blobstore.testkit.analyze.HdfsRepositoryAnalysisRestIT issue: https://github.com/elastic/elasticsearch/issues/122378 -- class: org.elasticsearch.xpack.esql.CsvTests - issue: https://github.com/elastic/elasticsearch/issues/122440 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testClosedIndexUpgrade {p0=[9.1.0, 8.19.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122481 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testRestoreIndex {p0=[9.1.0, 9.1.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122482 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testRestoreIndex {p0=[9.1.0, 8.19.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122483 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testIndexUpgrade {p0=[9.1.0, 8.19.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122484 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testIndexUpgrade {p0=[9.1.0, 9.1.0, 9.1.0]} + issue: https://github.com/elastic/elasticsearch/issues/122487 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testClosedIndexUpgrade {p0=[9.1.0, 9.1.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122488 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testIndexUpgrade {p0=[9.1.0, 9.1.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122489 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testRestoreIndex {p0=[9.1.0, 9.1.0, 9.1.0]} + issue: https://github.com/elastic/elasticsearch/issues/122490 +- class: org.elasticsearch.lucene.RollingUpgradeLuceneIndexCompatibilityTestCase + method: testClosedIndexUpgrade {p0=[9.1.0, 9.1.0, 9.1.0]} + issue: https://github.com/elastic/elasticsearch/issues/122495 +- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT + method: testSearchableSnapshotUpgrade {p0=[9.1.0, 8.19.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122500 +- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT + method: testMountSearchableSnapshot {p0=[9.1.0, 9.1.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122501 +- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT + method: testMountSearchableSnapshot {p0=[9.1.0, 8.19.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122502 +- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT + method: testSearchableSnapshotUpgrade {p0=[9.1.0, 9.1.0, 8.19.0]} + issue: https://github.com/elastic/elasticsearch/issues/122503 # Examples: # diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java index d69acab30451a..d42eb8f748b51 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java @@ -12,19 +12,27 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState; import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -44,9 +52,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -153,6 +159,55 @@ synchronized void start() { executor.execute(); } + private void removeDuplicateModelSnapshotDoc(Consumer runAfter) { + String snapshotDocId = jobId + "_model_snapshot_" + snapshotId; + client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPattern()) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.idsQuery().addIds(snapshotDocId))) + .setSize(2) + .addSort(ModelSnapshot.MIN_VERSION.getPreferredName(), org.elasticsearch.search.sort.SortOrder.ASC) + .execute(ActionListener.wrap(searchResponse -> { + if (searchResponse.getHits().getTotalHits().value() > 1) { + deleteOlderSnapshotDoc(searchResponse, runAfter); + } else { + onFinish.accept(null); + } + }, e -> { + logger.warn(() -> format("[%s] [%s] error during search for model snapshot documents", jobId, snapshotId), e); + onFinish.accept(null); + })); + } + + private void deleteOlderSnapshotDoc(SearchResponse searchResponse, Consumer runAfter) { + SearchHit firstHit = searchResponse.getHits().getAt(0); + logger.debug(() -> format("[%s] deleting duplicate model snapshot doc [%s]", jobId, firstHit.getId())); + client.prepareDelete() + .setIndex(firstHit.getIndex()) + .setId(firstHit.getId()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .execute(ActionListener.runAfter(ActionListener.wrap(deleteResponse -> { + if ((deleteResponse.getResult() == DocWriteResponse.Result.DELETED) == false) { + logger.warn( + () -> format( + "[%s] [%s] failed to delete old snapshot [%s] result document, document not found", + jobId, + snapshotId, + ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName() + ) + ); + } + }, e -> { + logger.warn( + () -> format( + "[%s] [%s] failed to delete old snapshot [%s] result document", + jobId, + snapshotId, + ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName() + ), + e + ); + }), () -> runAfter.accept(null))); + } + void setTaskToFailed(String reason, ActionListener> listener) { SnapshotUpgradeTaskState taskState = new SnapshotUpgradeTaskState(SnapshotUpgradeState.FAILED, task.getAllocationId(), reason); task.updatePersistentTaskState(taskState, ActionListener.wrap(listener::onResponse, f -> { @@ -259,7 +314,7 @@ void restoreState() { logger.error(() -> format("[%s] [%s] failed to write old state", jobId, snapshotId), e); setTaskToFailed( "Failed to write old state due to: " + e.getMessage(), - ActionListener.wrap(t -> shutdown(e), f -> shutdown(e)) + ActionListener.running(() -> shutdownWithFailure(e)) ); return; } @@ -273,7 +328,7 @@ void restoreState() { logger.error(() -> format("[%s] [%s] failed to flush after writing old state", jobId, snapshotId), e); nextStep = () -> setTaskToFailed( "Failed to flush after writing old state due to: " + e.getMessage(), - ActionListener.wrap(t -> shutdown(e), f -> shutdown(e)) + ActionListener.running(() -> shutdownWithFailure(e)) ); } else { logger.debug( @@ -295,7 +350,7 @@ private void requestStateWrite() { new SnapshotUpgradeTaskState(SnapshotUpgradeState.SAVING_NEW_STATE, task.getAllocationId(), ""), ActionListener.wrap(readingNewState -> { if (continueRunning.get() == false) { - shutdown(null); + shutdownWithFailure(null); return; } submitOperation(() -> { @@ -310,12 +365,12 @@ private void requestStateWrite() { // Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the // autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that // executor has a single processing queue. - (aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> shutdown(e)) + (aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> handlePersistingState(e)) ); logger.debug("[{}] [{}] asked for state to be persisted", jobId, snapshotId); }, f -> { logger.error(() -> format("[%s] [%s] failed to update snapshot upgrader task to started", jobId, snapshotId), f); - shutdown( + shutdownWithFailure( new ElasticsearchStatusException( "Failed to start snapshot upgrade [{}] for job [{}]", RestStatus.INTERNAL_SERVER_ERROR, @@ -378,17 +433,45 @@ private void checkResultsProcessorIsAlive() { } } - void shutdown(Exception e) { + private void handlePersistingState(@Nullable Exception exception) { + assert Thread.currentThread().getName().contains(UTILITY_THREAD_POOL_NAME); + + if (exception != null) { + shutdownWithFailure(exception); + } else { + stopProcess((aVoid, e) -> { + threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> { + autodetectWorkerExecutor.shutdownNow(); + // If there are two snapshot documents in the results indices with the same snapshot id, + // remove the old one. This can happen when the result index has been rolled over and + // the write alias is pointing to the new index. + removeDuplicateModelSnapshotDoc(onFinish); + }); + + }); + } + } + + void shutdownWithFailure(Exception e) { + stopProcess((aVoid, ignored) -> { + threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> { + onFinish.accept(e); + autodetectWorkerExecutor.shutdownNow(); + }); + }); + } + + private void stopProcess(BiConsumer, Exception> runNext) { logger.debug("[{}] [{}] shutdown initiated", jobId, snapshotId); // No point in sending an action to the executor if the process has died if (process.isProcessAlive() == false) { logger.debug("[{}] [{}] process is dead, no need to shutdown", jobId, snapshotId); - onFinish.accept(e); - autodetectWorkerExecutor.shutdownNow(); stateStreamer.cancel(); + runNext.accept(null, null); return; } - Future future = autodetectWorkerExecutor.submit(() -> { + + submitOperation(() -> { try { logger.debug("[{}] [{}] shutdown is now occurring", jobId, snapshotId); if (process.isReady()) { @@ -401,24 +484,10 @@ void shutdown(Exception e) { processor.awaitCompletion(); } catch (IOException | TimeoutException exc) { logger.warn(() -> format("[%s] [%s] failed to shutdown process", jobId, snapshotId), exc); - } finally { - onFinish.accept(e); } logger.debug("[{}] [{}] connection for upgrade has been closed, process is shutdown", jobId, snapshotId); - }); - try { - future.get(); - autodetectWorkerExecutor.shutdownNow(); - } catch (InterruptedException interrupt) { - Thread.currentThread().interrupt(); - } catch (ExecutionException executionException) { - if (processor.isProcessKilled()) { - // In this case the original exception is spurious and highly misleading - throw ExceptionsHelper.conflictStatusException("close snapshot upgrade interrupted by kill request"); - } else { - throw FutureUtils.rethrowExecutionException(executionException); - } - } + return Void.TYPE; + }, runNext); } } } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java index 75613c7e3ebc3..78f6bcd8ac9ab 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java @@ -65,7 +65,6 @@ protected static void waitForPendingUpgraderTasks() throws Exception { * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade */ - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98560") public void testSnapshotUpgrader() throws Exception { Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings"); adjustLoggingLevels.setJsonEntity(""" @@ -98,6 +97,13 @@ public void testSnapshotUpgrader() throws Exception { @SuppressWarnings("unchecked") private void testSnapshotUpgradeFailsOnMixedCluster() throws Exception { + // TODO the mixed cluster assertions sometimes fail because the code that + // detects the mixed cluster relies on the transport versions being different. + // This assumption does not hold immediately after a version bump and new + // branch being cut as the new branch will have the same transport version + // See https://github.com/elastic/elasticsearch/issues/98560 + + assumeTrue("The mixed cluster is not always detected correctly, see https://github.com/elastic/elasticsearch/issues/98560", false); Map jobs = entityAsMap(getJob(JOB_ID)); String currentSnapshot = ((List) XContentMapValues.extractValue("jobs.model_snapshot_id", jobs)).get(0); @@ -154,7 +160,7 @@ private void testSnapshotUpgrade() throws Exception { List> upgradedSnapshot = (List>) entityAsMap(getModelSnapshots(JOB_ID, snapshotToUpgradeId)) .get("model_snapshots"); - assertThat(upgradedSnapshot, hasSize(1)); + assertThat(upgradedSnapshot.toString(), upgradedSnapshot, hasSize(1)); assertThat(upgradedSnapshot.get(0).get("latest_record_time_stamp"), equalTo(snapshotToUpgrade.get("latest_record_time_stamp"))); // Does the snapshot still work?