From 7c1fbe88bba7257aa1341567f8ee49a3a8ee4d49 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 21 May 2019 16:42:31 +0100 Subject: [PATCH 01/11] Revert "invalid test" This reverts commit 9dd8b52c13c716918ff97e6527aaf43aefc4695d. --- .../test/data_frame/transforms_start_stop.yml | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 58af6e0899dda..617da7fb25189 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -209,3 +209,46 @@ teardown: - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-start-later" + +--- +"Test stop all": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stop-all" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-start-later" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stop-all" + - match: { started: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { started: true } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "_all" + wait_for_completion: true + - match: { stopped: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "*" + - match: { count: 2 } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.1.state.indexer_state: "stopped" } + - match: { transforms.1.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "airline-transform-stop-all" From 061687537ee53cdea4a9eb33db92dd01d0bd7f7c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 24 May 2019 15:40:24 +0100 Subject: [PATCH 02/11] Testing --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 14 +++++++++----- .../transforms/DataFrameTransformTask.java | 14 +------------- .../test/data_frame/transforms_start_stop.yml | 6 +++--- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 80b0378ae35ff..9153c8354fbab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -108,7 +108,7 @@ public synchronized IndexerState stop() { }); if (wasStartedAndSetStopped.get()) { - onStop(); + doSaveState(IndexerState.STOPPED, getPosition(), this::onStop); } return currentState; } @@ -287,7 +287,10 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - return state.updateAndGet(prev -> { + + checkState(state.get()); + + IndexerState updated = state.updateAndGet(prev -> { switch (prev) { case INDEXING: // ready for another job @@ -295,12 +298,10 @@ private IndexerState finishAndSetState() { case STOPPING: // must be started again - onStop(); return IndexerState.STOPPED; case ABORTING: // abort and exit - onAbort(); return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first case STOPPED: @@ -315,6 +316,9 @@ private IndexerState finishAndSetState() { throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); } }); + + + return updated; } private void onSearchResponse(SearchResponse searchResponse) { @@ -409,7 +413,7 @@ private boolean checkState(IndexerState currentState) { case STOPPING: logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), () -> {}); + doSaveState(IndexerState.STOPPED, getPosition(), this::onStop); return false; case STOPPED: diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 13deab6748c94..1937ba2c72eae 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -602,20 +602,8 @@ protected void onFinish(ActionListener listener) { protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); - transformTask.setTaskStateStopped(); - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - transformTask.shutdown(); - }, - statsExc -> { - transformTask.shutdown(); - logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc); - } - )); + transformTask.shutdown(); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 617da7fb25189..3d5aba1ce1cc1 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -118,7 +118,7 @@ teardown: - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - match: { transforms.0.state.indexer_state: "stopped" } - - match: { transforms.0.state.task_state: "stopped" } +# - match: { transforms.0.state.task_state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -245,9 +245,9 @@ teardown: transform_id: "*" - match: { count: 2 } - match: { transforms.0.state.indexer_state: "stopped" } - - match: { transforms.0.state.task_state: "stopped" } +# - match: { transforms.0.state.task_state: "stopped" } - match: { transforms.1.state.indexer_state: "stopped" } - - match: { transforms.1.state.task_state: "stopped" } +# - match: { transforms.1.state.task_state: "stopped" } - do: data_frame.delete_data_frame_transform: From 9ceeb76ec2715abc73b7726dafb4fd1bb2013317 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 24 May 2019 17:01:18 +0100 Subject: [PATCH 03/11] mend --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 9153c8354fbab..71b5b95bf033d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -290,7 +290,7 @@ private IndexerState finishAndSetState() { checkState(state.get()); - IndexerState updated = state.updateAndGet(prev -> { + return state.updateAndGet(prev -> { switch (prev) { case INDEXING: // ready for another job @@ -316,9 +316,6 @@ private IndexerState finishAndSetState() { throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); } }); - - - return updated; } private void onSearchResponse(SearchResponse searchResponse) { From 62f9b7c73b2c505dbf9bd63281c40fe20d4b1426 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 May 2019 12:09:43 +0100 Subject: [PATCH 04/11] Revert "[ML Data Frame] Mute Data Frame tests" This reverts commit 5d837fa312b0e41a77a65462667a2d92d1114567. --- .../xpack/dataframe/integration/DataFrameTransformIT.java | 1 - .../xpack/dataframe/integration/DataFrameAuditorIT.java | 2 -- .../dataframe/integration/DataFrameConfigurationIndexIT.java | 2 -- .../xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java | 2 -- .../xpack/dataframe/integration/DataFrameMetaDataIT.java | 2 -- .../xpack/dataframe/integration/DataFramePivotRestIT.java | 2 -- .../xpack/dataframe/integration/DataFrameTaskFailedStateIT.java | 2 -- .../xpack/dataframe/integration/DataFrameUsageIT.java | 2 -- 8 files changed, 15 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 486ea5e5d7403..b98367979bff9 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException { cleanUp(); } - @AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public void testDataFrameTransformCrud() throws Exception { createReviewsIndex(); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java index 7dc79c1ae8fbe..9884c9bb6793b 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.junit.Before; @@ -23,7 +22,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameAuditorIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java index d7e12cf2bee4d..681599331c8af 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java @@ -8,7 +8,6 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -23,7 +22,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase { /** diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 9bac6ca0b4049..d9927cd09ed8f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -22,7 +21,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_user"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java index 5b95d1daead53..26a957ea055c2 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -16,7 +15,6 @@ import java.io.IOException; import java.util.Map; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameMetaDataIT extends DataFrameRestTestCase { private boolean indicesCreated = false; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index a0bec6ec13c34..22586a7b37d27 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -22,7 +21,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFramePivotRestIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 7b63644dd34ad..96aeeda8755f4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; @@ -20,7 +19,6 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { public void testDummy() { diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index f98fa6a271365..4f209c5a9f3f4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -23,7 +22,6 @@ import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE; import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameUsageIT extends DataFrameRestTestCase { private boolean indicesCreated = false; From ea7f790ca3d212d3216a010fa8ec837375e49204 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 May 2019 15:28:04 +0100 Subject: [PATCH 05/11] Call onStop and onAbort outside atomic update --- .../core/indexing/AsyncTwoPhaseIndexer.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 71b5b95bf033d..53ad88021b0da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -89,28 +89,21 @@ public synchronized IndexerState start() { * Sets the internal state to {@link IndexerState#STOPPING} if an async job is * running in the background, {@link #onStop()} will be called when the background job * detects that the indexer is stopped. - * If there is no job running when this function is called - * the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly. + * If there is no job running when this function is called the returned + * state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ public synchronized IndexerState stop() { - AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false); - IndexerState currentState = state.updateAndGet(previousState -> { + return state.updateAndGet(previousState -> { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { - wasStartedAndSetStopped.set(true); return IndexerState.STOPPED; } else { return previousState; } }); - - if (wasStartedAndSetStopped.get()) { - doSaveState(IndexerState.STOPPED, getPosition(), this::onStop); - } - return currentState; } /** @@ -287,20 +280,21 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - - checkState(state.get()); - - return state.updateAndGet(prev -> { + AtomicBoolean callOnStop = new AtomicBoolean(false); + AtomicBoolean callOnAbort = new AtomicBoolean(false); + IndexerState updatedState = state.updateAndGet(prev -> { switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: + callOnStop.set(true); // must be started again return IndexerState.STOPPED; case ABORTING: + callOnAbort.set(true); // abort and exit return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first @@ -316,6 +310,14 @@ private IndexerState finishAndSetState() { throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); } }); + + if (callOnStop.get()) { + onStop(); + } else if (callOnAbort.get()) { + onAbort(); + } + + return updatedState; } private void onSearchResponse(SearchResponse searchResponse) { @@ -410,7 +412,7 @@ private boolean checkState(IndexerState currentState) { case STOPPING: logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(IndexerState.STOPPED, getPosition(), this::onStop); + doSaveState(finishAndSetState(), getPosition(), () -> {}); return false; case STOPPED: From 47eb26a05154aea5f9ba5c614514e3b96415b0ca Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 May 2019 15:28:27 +0100 Subject: [PATCH 06/11] =?UTF-8?q?Don=E2=80=99t=20update=20CS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transforms/DataFrameTransformTask.java | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 1937ba2c72eae..4b6c3c2060eb6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final Map initialPosition; private final IndexerState initialIndexerState; - private final SetOnce indexer = new SetOnce<>(); + private final SetOnce indexer = new SetOnce<>(); private final AtomicReference taskState; private final AtomicReference stateReason; @@ -125,7 +125,7 @@ public Status getStatus() { return getState(); } - private DataFrameIndexer getIndexer() { + private ClientDataFrameIndexer getIndexer() { return indexer.get(); } @@ -236,7 +236,10 @@ public synchronized void stop() { return; } - getIndexer().stop(); + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().saveState(state, () -> getIndexer().onStop()); + } } @Override @@ -531,6 +534,11 @@ protected void doSaveState(IndexerState indexerState, Map positi return; } + saveState(indexerState, next); + } + + public void saveState(IndexerState indexerState, Runnable next){ + final DataFrameTransformState state = new DataFrameTransformState( transformTask.taskState.get(), indexerState, @@ -542,28 +550,18 @@ protected void doSaveState(IndexerState indexerState, Map positi // Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and // only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity - ActionListener> updateClusterStateListener = ActionListener.wrap( - task -> { - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, state, getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - next.run(); - }, - statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); - next.run(); - } - )); - }, - exc -> { - logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc); - next.run(); - } - ); - - transformTask.persistStateToClusterState(state, updateClusterStateListener); + transformsConfigManager.putOrUpdateTransformStats( + new DataFrameTransformStateAndStats(transformId, state, getStats(), + DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null + ActionListener.wrap( + r -> { + next.run(); + }, + statsExc -> { + logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + next.run(); + } + )); } @Override From 0b537c6341c317b1a2a916c37a546779b75fa09b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 May 2019 16:54:01 +0100 Subject: [PATCH 07/11] Tidying up --- .../dataframe/transforms/DataFrameTransformTask.java | 9 ++------- .../test/data_frame/transforms_start_stop.yml | 6 +++--- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 4b6c3c2060eb6..f3f67f4bfd2ab 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -238,7 +238,7 @@ public synchronized void stop() { IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().saveState(state, () -> getIndexer().onStop()); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); } } @@ -534,15 +534,10 @@ protected void doSaveState(IndexerState indexerState, Map positi return; } - saveState(indexerState, next); - } - - public void saveState(IndexerState indexerState, Runnable next){ - final DataFrameTransformState state = new DataFrameTransformState( transformTask.taskState.get(), indexerState, - getPosition(), + position, transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), getProgress()); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 3d5aba1ce1cc1..617da7fb25189 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -118,7 +118,7 @@ teardown: - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - match: { transforms.0.state.indexer_state: "stopped" } -# - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -245,9 +245,9 @@ teardown: transform_id: "*" - match: { count: 2 } - match: { transforms.0.state.indexer_state: "stopped" } -# - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } - match: { transforms.1.state.indexer_state: "stopped" } -# - match: { transforms.1.state.task_state: "stopped" } + - match: { transforms.1.state.task_state: "stopped" } - do: data_frame.delete_data_frame_transform: From daf6f106634cc724cdbebfdccd11444788654dd1 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 May 2019 22:32:29 +0100 Subject: [PATCH 08/11] Remove invalid test that asserted logic that has been removed --- .../indexing/AsyncTwoPhaseIndexerTests.java | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index fc86a9554880f..053e41d9b2a63 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException { } } - public void testStop_AfterIndexerIsFinished() throws InterruptedException { - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { - CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); - indexer.start(); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - countDownLatch.countDown(); - assertTrue(awaitBusy(() -> isFinished.get())); - - indexer.stop(); - assertTrue(isStopped.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); - } finally { - executor.shutdownNow(); - } - } - public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); From 02ba992f4818bebd838e1c7678bd2e1cc090bfab Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 29 May 2019 12:09:22 +0100 Subject: [PATCH 09/11] Add stopped event --- .../core/indexing/AsyncTwoPhaseIndexer.java | 44 ++++++++++++------- .../indexing/AsyncTwoPhaseIndexerTests.java | 36 ++++++++++++++- .../transforms/DataFrameTransformTask.java | 10 ++++- 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 53ad88021b0da..3f7a56f52e4a9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -23,11 +23,11 @@ * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). * Only one background job can run simultaneously and {@link #onFinish} is called when the job - * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call - * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} + * finishes before state is persisted. The indexer can be stopped early by a call to {@link #stop()} which will + * trigger the {@link #onStopping()} and {@link #onStopped()} methods. + * {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()} * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called. - * {@link #stop()} can be used to stop the background job without aborting the indexer. * * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. @@ -87,10 +87,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background, {@link #onStop()} will be called when the background job + * running in the background, {@link #onStopped()} will be called when the background job * detects that the indexer is stopped. * If there is no job running when this function is called the returned - * state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called. + * state is {@link IndexerState#STOPPED} and {@link #onStopped()} will not be called. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ @@ -249,20 +249,30 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to - * {@link IndexerState#STARTED}. + * {@link IndexerState#STARTED} and before {@link #doSaveState(IndexerState, Object, Runnable)} is called * * @param listener listener to call after done */ protected abstract void onFinish(ActionListener listener); + /** - * Called when the indexer is stopped. This is only called when the indexer is stopped - * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called - * when the indexer's work is done. + * Called when a background job stops after internal state has changed from {@link IndexerState#STOPPING} + * to {@link IndexerState#STOPPED} and before state is persisted via {@link #doSaveState(IndexerState, Object, Runnable)}. + * This is only called when the indexer is stopped due to a call to {@link #stop()} */ - protected void onStop() { + protected void onStopping() { } + /** + * Called when the indexer is stopped after {@link #onStopping()} and {@link #doSaveState(IndexerState, Object, Runnable)} + * have been called. + */ + protected void onStopped() { + } + + + /** * Called when a background job detects that the indexer is aborted causing the * async execution to stop. @@ -280,16 +290,18 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - AtomicBoolean callOnStop = new AtomicBoolean(false); - AtomicBoolean callOnAbort = new AtomicBoolean(false); + AtomicBoolean callOnStopping = new AtomicBoolean(); + AtomicBoolean callOnAbort = new AtomicBoolean(); IndexerState updatedState = state.updateAndGet(prev -> { + callOnAbort.set(false); + callOnStopping.set(false); switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: - callOnStop.set(true); + callOnStopping.set(true); // must be started again return IndexerState.STOPPED; @@ -311,8 +323,8 @@ private IndexerState finishAndSetState() { } }); - if (callOnStop.get()) { - onStop(); + if (callOnStopping.get()) { + onStopping(); } else if (callOnAbort.get()) { onAbort(); } @@ -412,7 +424,7 @@ private boolean checkState(IndexerState currentState) { case STOPPING: logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), () -> {}); + doSaveState(finishAndSetState(), getPosition(), this::onStopped); return false; case STOPPED: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 053e41d9b2a63..f083863814a55 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -36,11 +36,13 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); AtomicBoolean isStopped = new AtomicBoolean(false); + AtomicBoolean isStopping = new AtomicBoolean(false); @Before public void reset() { isFinished.set(false); isStopped.set(false); + isStopping.set(false); } private class MockIndexer extends AsyncTwoPhaseIndexer { @@ -112,7 +114,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - int expectedStep = stoppedBeforeFinished ? 3 : 5; + int expectedStep = stoppedBeforeFinished ? 4 : 5; assertThat(step, equalTo(expectedStep)); ++step; next.run(); @@ -132,7 +134,16 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStop() { + protected void onStopping() { + assertThat(step, equalTo(3)); + ++step; + assertTrue(isStopping.compareAndSet(false, true)); + } + + @Override + protected void onStopped() { + assertThat(step, equalTo(5)); + ++step; assertTrue(isStopped.compareAndSet(false, true)); } @@ -268,6 +279,26 @@ public void testStateMachineBrokenSearch() throws InterruptedException { } } + public void testStop_AfterIndexerIsFinished() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + countDownLatch.countDown(); + assertTrue(awaitBusy(() -> isFinished.get())); + + indexer.stop(); + assertFalse(isStopping.get()); + assertFalse(isStopped.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); + } finally { + executor.shutdownNow(); + } + } + public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); @@ -282,6 +313,7 @@ public void testStop_WhileIndexing() throws InterruptedException { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); + assertTrue(awaitBusy(() -> isStopping.get())); assertTrue(awaitBusy(() -> isStopped.get())); assertFalse(isFinished.get()); } finally { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f3f67f4bfd2ab..b4f79d9302e30 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -238,7 +238,8 @@ public synchronized void stop() { IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); + getIndexer().onStopping(); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStopped()); } } @@ -592,7 +593,12 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStop() { + protected void onStopping() { + transformTask.setTaskStateStopped(); + } + + @Override + protected void onStopped() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); transformTask.setTaskStateStopped(); From d4457ff4b1b013be70ecea218f50cce7567cd00d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 29 May 2019 11:38:53 -0500 Subject: [PATCH 10/11] Revert "Add stopped event" This reverts commit 02ba992f4818bebd838e1c7678bd2e1cc090bfab. --- .../core/indexing/AsyncTwoPhaseIndexer.java | 44 +++++++------------ .../indexing/AsyncTwoPhaseIndexerTests.java | 36 +-------------- .../transforms/DataFrameTransformTask.java | 10 +---- 3 files changed, 20 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 3f7a56f52e4a9..53ad88021b0da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -23,11 +23,11 @@ * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). * Only one background job can run simultaneously and {@link #onFinish} is called when the job - * finishes before state is persisted. The indexer can be stopped early by a call to {@link #stop()} which will - * trigger the {@link #onStopping()} and {@link #onStopped()} methods. - * {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} + * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call + * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()} * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called. + * {@link #stop()} can be used to stop the background job without aborting the indexer. * * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. @@ -87,10 +87,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background, {@link #onStopped()} will be called when the background job + * running in the background, {@link #onStop()} will be called when the background job * detects that the indexer is stopped. * If there is no job running when this function is called the returned - * state is {@link IndexerState#STOPPED} and {@link #onStopped()} will not be called. + * state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ @@ -249,30 +249,20 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to - * {@link IndexerState#STARTED} and before {@link #doSaveState(IndexerState, Object, Runnable)} is called + * {@link IndexerState#STARTED}. * * @param listener listener to call after done */ protected abstract void onFinish(ActionListener listener); - /** - * Called when a background job stops after internal state has changed from {@link IndexerState#STOPPING} - * to {@link IndexerState#STOPPED} and before state is persisted via {@link #doSaveState(IndexerState, Object, Runnable)}. - * This is only called when the indexer is stopped due to a call to {@link #stop()} + * Called when the indexer is stopped. This is only called when the indexer is stopped + * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called + * when the indexer's work is done. */ - protected void onStopping() { + protected void onStop() { } - /** - * Called when the indexer is stopped after {@link #onStopping()} and {@link #doSaveState(IndexerState, Object, Runnable)} - * have been called. - */ - protected void onStopped() { - } - - - /** * Called when a background job detects that the indexer is aborted causing the * async execution to stop. @@ -290,18 +280,16 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - AtomicBoolean callOnStopping = new AtomicBoolean(); - AtomicBoolean callOnAbort = new AtomicBoolean(); + AtomicBoolean callOnStop = new AtomicBoolean(false); + AtomicBoolean callOnAbort = new AtomicBoolean(false); IndexerState updatedState = state.updateAndGet(prev -> { - callOnAbort.set(false); - callOnStopping.set(false); switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: - callOnStopping.set(true); + callOnStop.set(true); // must be started again return IndexerState.STOPPED; @@ -323,8 +311,8 @@ private IndexerState finishAndSetState() { } }); - if (callOnStopping.get()) { - onStopping(); + if (callOnStop.get()) { + onStop(); } else if (callOnAbort.get()) { onAbort(); } @@ -424,7 +412,7 @@ private boolean checkState(IndexerState currentState) { case STOPPING: logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), this::onStopped); + doSaveState(finishAndSetState(), getPosition(), () -> {}); return false; case STOPPED: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index f083863814a55..053e41d9b2a63 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -36,13 +36,11 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); AtomicBoolean isStopped = new AtomicBoolean(false); - AtomicBoolean isStopping = new AtomicBoolean(false); @Before public void reset() { isFinished.set(false); isStopped.set(false); - isStopping.set(false); } private class MockIndexer extends AsyncTwoPhaseIndexer { @@ -114,7 +112,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - int expectedStep = stoppedBeforeFinished ? 4 : 5; + int expectedStep = stoppedBeforeFinished ? 3 : 5; assertThat(step, equalTo(expectedStep)); ++step; next.run(); @@ -134,16 +132,7 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStopping() { - assertThat(step, equalTo(3)); - ++step; - assertTrue(isStopping.compareAndSet(false, true)); - } - - @Override - protected void onStopped() { - assertThat(step, equalTo(5)); - ++step; + protected void onStop() { assertTrue(isStopped.compareAndSet(false, true)); } @@ -279,26 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException { } } - public void testStop_AfterIndexerIsFinished() throws InterruptedException { - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { - CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); - indexer.start(); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - countDownLatch.countDown(); - assertTrue(awaitBusy(() -> isFinished.get())); - - indexer.stop(); - assertFalse(isStopping.get()); - assertFalse(isStopped.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); - } finally { - executor.shutdownNow(); - } - } - public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); @@ -313,7 +282,6 @@ public void testStop_WhileIndexing() throws InterruptedException { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); - assertTrue(awaitBusy(() -> isStopping.get())); assertTrue(awaitBusy(() -> isStopped.get())); assertFalse(isFinished.get()); } finally { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b4f79d9302e30..f3f67f4bfd2ab 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -238,8 +238,7 @@ public synchronized void stop() { IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().onStopping(); - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStopped()); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); } } @@ -593,12 +592,7 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStopping() { - transformTask.setTaskStateStopped(); - } - - @Override - protected void onStopped() { + protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); transformTask.setTaskStateStopped(); From c25c2c7551886236be2a17d316294266005d33db Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 29 May 2019 13:34:59 -0500 Subject: [PATCH 11/11] Adding check for STOPPED in saveState --- .../xpack/dataframe/transforms/DataFrameTransformTask.java | 7 ++++++- .../test/data_frame/transforms_start_stop.yml | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f3f67f4bfd2ab..575cd4c15bd67 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -533,6 +533,12 @@ protected void doSaveState(IndexerState indexerState, Map positi next.run(); return; } + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` + // OR we called `doSaveState` manually as the indexer was not actively running. + // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state + if (indexerState.equals(IndexerState.STOPPED)) { + transformTask.setTaskStateStopped(); + } final DataFrameTransformState state = new DataFrameTransformState( transformTask.taskState.get(), @@ -595,7 +601,6 @@ protected void onFinish(ActionListener listener) { protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); - transformTask.setTaskStateStopped(); transformTask.shutdown(); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 617da7fb25189..e8fd274a79318 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -190,6 +190,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" + wait_for_completion: true - match: { stopped: true } - do: