diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
index 3fda07e672805..8a04c229de261 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
@@ -33,6 +33,7 @@
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.common.Strings;
+import org.elasticsearch.client.ml.FlushJobRequest;
import java.io.IOException;
@@ -127,6 +128,19 @@ static Request getBuckets(GetBucketsRequest getBucketsRequest) throws IOExceptio
return request;
}
+ static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
+ String endpoint = new EndpointBuilder()
+ .addPathPartAsIs("_xpack")
+ .addPathPartAsIs("ml")
+ .addPathPartAsIs("anomaly_detectors")
+ .addPathPart(flushJobRequest.getJobId())
+ .addPathPartAsIs("_flush")
+ .build();
+ Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+ request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
+ return request;
+ }
+
static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
index 59e222c5e4cd3..ac44f16b80b16 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
@@ -19,6 +19,8 @@
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.client.ml.GetJobStatsResponse;
import org.elasticsearch.client.ml.job.stats.JobStats;
@@ -292,6 +294,60 @@ public void getBucketsAsync(GetBucketsRequest request, RequestOptions options, A
}
/**
+ * Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
+ * This may cause new results to be calculated depending on the contents of the buffer
+ *
+ * Both flush and close operations are similar,
+ * however the flush is more efficient if you are expecting to send more data for analysis.
+ *
+ * When flushing, the job remains open and is available to continue analyzing data.
+ * A close operation additionally prunes and persists the model state to disk and the
+ * job must be opened again before analyzing further data.
+ *
+ *
+ * For additional info
+ * see Flush ML job documentation
+ *
+ * @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
+ * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ */
+ public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity(request,
+ MLRequestConverters::flushJob,
+ options,
+ FlushJobResponse::fromXContent,
+ Collections.emptySet());
+ }
+
+ /**
+ * Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
+ * This may cause new results to be calculated depending on the contents of the buffer
+ *
+ * Both flush and close operations are similar,
+ * however the flush is more efficient if you are expecting to send more data for analysis.
+ *
+ * When flushing, the job remains open and is available to continue analyzing data.
+ * A close operation additionally prunes and persists the model state to disk and the
+ * job must be opened again before analyzing further data.
+ *
+ *
+ * For additional info
+ * see Flush ML job documentation
+ *
+ * @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
+ * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @param listener Listener to be notified upon request completion
+ */
+ public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener listener) {
+ restHighLevelClient.performRequestAsyncAndParseEntity(request,
+ MLRequestConverters::flushJob,
+ options,
+ FlushJobResponse::fromXContent,
+ listener,
+ Collections.emptySet());
+ }
+
+ /**
* Gets usage statistics for one or more Machine Learning jobs
*
*
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobRequest.java
new file mode 100644
index 0000000000000..067851d452666
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobRequest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.client.ml.job.config.Job;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Request object to flush a given Machine Learning job.
+ */
+public class FlushJobRequest extends ActionRequest implements ToXContentObject {
+
+ public static final ParseField CALC_INTERIM = new ParseField("calc_interim");
+ public static final ParseField START = new ParseField("start");
+ public static final ParseField END = new ParseField("end");
+ public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
+ public static final ParseField SKIP_TIME = new ParseField("skip_time");
+
+ public static final ConstructingObjectParser PARSER =
+ new ConstructingObjectParser<>("flush_job_request", (a) -> new FlushJobRequest((String) a[0]));
+
+ static {
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
+ PARSER.declareBoolean(FlushJobRequest::setCalcInterim, CALC_INTERIM);
+ PARSER.declareString(FlushJobRequest::setStart, START);
+ PARSER.declareString(FlushJobRequest::setEnd, END);
+ PARSER.declareString(FlushJobRequest::setAdvanceTime, ADVANCE_TIME);
+ PARSER.declareString(FlushJobRequest::setSkipTime, SKIP_TIME);
+ }
+
+ private final String jobId;
+ private Boolean calcInterim;
+ private String start;
+ private String end;
+ private String advanceTime;
+ private String skipTime;
+
+ /**
+ * Create new Flush job request
+ *
+ * @param jobId The job ID of the job to flush
+ */
+ public FlushJobRequest(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public boolean getCalcInterim() {
+ return calcInterim;
+ }
+
+ /**
+ * When {@code true} calculates the interim results for the most recent bucket or all buckets within the latency period.
+ *
+ * @param calcInterim defaults to {@code false}.
+ */
+ public void setCalcInterim(boolean calcInterim) {
+ this.calcInterim = calcInterim;
+ }
+
+ public String getStart() {
+ return start;
+ }
+
+ /**
+ * When used in conjunction with {@link FlushJobRequest#calcInterim},
+ * specifies the start of the range of buckets on which to calculate interim results.
+ *
+ * @param start the beginning of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
+ */
+ public void setStart(String start) {
+ this.start = start;
+ }
+
+ public String getEnd() {
+ return end;
+ }
+
+ /**
+ * When used in conjunction with {@link FlushJobRequest#calcInterim}, specifies the end of the range
+ * of buckets on which to calculate interim results
+ *
+ * @param end the end of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
+ */
+ public void setEnd(String end) {
+ this.end = end;
+ }
+
+ public String getAdvanceTime() {
+ return advanceTime;
+ }
+
+ /**
+ * Specifies to advance to a particular time value.
+ * Results are generated and the model is updated for data from the specified time interval.
+ *
+ * @param advanceTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
+ */
+ public void setAdvanceTime(String advanceTime) {
+ this.advanceTime = advanceTime;
+ }
+
+ public String getSkipTime() {
+ return skipTime;
+ }
+
+ /**
+ * Specifies to skip to a particular time value.
+ * Results are not generated and the model is not updated for data from the specified time interval.
+ *
+ * @param skipTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
+ */
+ public void setSkipTime(String skipTime) {
+ this.skipTime = skipTime;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ FlushJobRequest other = (FlushJobRequest) obj;
+ return Objects.equals(jobId, other.jobId) &&
+ calcInterim == other.calcInterim &&
+ Objects.equals(start, other.start) &&
+ Objects.equals(end, other.end) &&
+ Objects.equals(advanceTime, other.advanceTime) &&
+ Objects.equals(skipTime, other.skipTime);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(Job.ID.getPreferredName(), jobId);
+ if (calcInterim != null) {
+ builder.field(CALC_INTERIM.getPreferredName(), calcInterim);
+ }
+ if (start != null) {
+ builder.field(START.getPreferredName(), start);
+ }
+ if (end != null) {
+ builder.field(END.getPreferredName(), end);
+ }
+ if (advanceTime != null) {
+ builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
+ }
+ if (skipTime != null) {
+ builder.field(SKIP_TIME.getPreferredName(), skipTime);
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobResponse.java
new file mode 100644
index 0000000000000..048b07b504ae0
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobResponse.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * Response object containing flush acknowledgement and additional data
+ */
+public class FlushJobResponse extends ActionResponse implements ToXContentObject {
+
+ public static final ParseField FLUSHED = new ParseField("flushed");
+ public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
+
+ public static final ConstructingObjectParser PARSER =
+ new ConstructingObjectParser<>("flush_job_response",
+ true,
+ (a) -> {
+ boolean flushed = (boolean) a[0];
+ Date date = a[1] == null ? null : new Date((long) a[1]);
+ return new FlushJobResponse(flushed, date);
+ });
+
+ static {
+ PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FLUSHED);
+ PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
+ }
+
+ public static FlushJobResponse fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+
+ private final boolean flushed;
+ private final Date lastFinalizedBucketEnd;
+
+ public FlushJobResponse(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
+ this.flushed = flushed;
+ this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
+ }
+
+ /**
+ * Was the job successfully flushed or not
+ */
+ public boolean isFlushed() {
+ return flushed;
+ }
+
+ /**
+ * Provides the timestamp (in milliseconds-since-the-epoch) of the end of the last bucket that was processed.
+ */
+ @Nullable
+ public Date getLastFinalizedBucketEnd() {
+ return lastFinalizedBucketEnd;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(flushed, lastFinalizedBucketEnd);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ FlushJobResponse that = (FlushJobResponse) other;
+ return that.flushed == flushed && Objects.equals(lastFinalizedBucketEnd, that.lastFinalizedBucketEnd);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(FLUSHED.getPreferredName(), flushed);
+ if (lastFinalizedBucketEnd != null) {
+ builder.timeField(LAST_FINALIZED_BUCKET_END.getPreferredName(),
+ LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime());
+ }
+ builder.endObject();
+ return builder;
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
index 4950a1c8139f6..d84099d9a3c40 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
@@ -36,6 +36,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.test.ESTestCase;
@@ -140,6 +141,27 @@ public void testGetBuckets() throws IOException {
}
}
+ public void testFlushJob() throws Exception {
+ String jobId = randomAlphaOfLength(10);
+ FlushJobRequest flushJobRequest = new FlushJobRequest(jobId);
+
+ Request request = MLRequestConverters.flushJob(flushJobRequest);
+ assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+ assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_flush", request.getEndpoint());
+ assertEquals("{\"job_id\":\"" + jobId + "\"}", requestEntityToString(request));
+
+ flushJobRequest.setSkipTime("1000");
+ flushJobRequest.setStart("105");
+ flushJobRequest.setEnd("200");
+ flushJobRequest.setAdvanceTime("100");
+ flushJobRequest.setCalcInterim(true);
+ request = MLRequestConverters.flushJob(flushJobRequest);
+ assertEquals(
+ "{\"job_id\":\"" + jobId + "\",\"calc_interim\":true,\"start\":\"105\"," +
+ "\"end\":\"200\",\"advance_time\":\"100\",\"skip_time\":\"1000\"}",
+ requestEntityToString(request));
+ }
+
public void testGetJobStats() {
GetJobStatsRequest getJobStatsRequestRequest = new GetJobStatsRequest();
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
index 7b8e4b3e4c4c7..cd4b6ffc7691f 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
@@ -39,6 +39,9 @@
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
import org.junit.After;
import java.io.IOException;
@@ -144,6 +147,19 @@ public void testCloseJob() throws Exception {
assertTrue(response.isClosed());
}
+ public void testFlushJob() throws Exception {
+ String jobId = randomValidJobId();
+ Job job = buildJob(jobId);
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
+ machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+ machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT);
+
+ FlushJobResponse response = execute(new FlushJobRequest(jobId),
+ machineLearningClient::flushJob,
+ machineLearningClient::flushJobAsync);
+ assertTrue(response.isFlushed());
+ }
+
public void testGetJobStats() throws Exception {
String jobId1 = "ml-get-job-stats-test-id-1";
String jobId2 = "ml-get-job-stats-test-id-2";
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
index d97db0a311f75..f92f01f6bad19 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
@@ -52,6 +52,8 @@
import org.elasticsearch.client.ml.job.util.PageParams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.job.stats.JobStats;
import org.junit.After;
@@ -461,6 +463,69 @@ public void onFailure(Exception e) {
}
}
+ public void testFlushJob() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+
+ Job job = MachineLearningIT.buildJob("flushing-my-first-machine-learning-job");
+ client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+ client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);
+
+ Job secondJob = MachineLearningIT.buildJob("flushing-my-second-machine-learning-job");
+ client.machineLearning().putJob(new PutJobRequest(secondJob), RequestOptions.DEFAULT);
+ client.machineLearning().openJob(new OpenJobRequest(secondJob.getId()), RequestOptions.DEFAULT);
+
+ {
+ //tag::x-pack-ml-flush-job-request
+ FlushJobRequest flushJobRequest = new FlushJobRequest("flushing-my-first-machine-learning-job"); //<1>
+ //end::x-pack-ml-flush-job-request
+
+ //tag::x-pack-ml-flush-job-request-options
+ flushJobRequest.setCalcInterim(true); //<1>
+ flushJobRequest.setAdvanceTime("2018-08-31T16:35:07+00:00"); //<2>
+ flushJobRequest.setStart("2018-08-31T16:35:17+00:00"); //<3>
+ flushJobRequest.setEnd("2018-08-31T16:35:27+00:00"); //<4>
+ flushJobRequest.setSkipTime("2018-08-31T16:35:00+00:00"); //<5>
+ //end::x-pack-ml-flush-job-request-options
+
+ //tag::x-pack-ml-flush-job-execute
+ FlushJobResponse flushJobResponse = client.machineLearning().flushJob(flushJobRequest, RequestOptions.DEFAULT);
+ //end::x-pack-ml-flush-job-execute
+
+ //tag::x-pack-ml-flush-job-response
+ boolean isFlushed = flushJobResponse.isFlushed(); //<1>
+ Date lastFinalizedBucketEnd = flushJobResponse.getLastFinalizedBucketEnd(); //<2>
+ //end::x-pack-ml-flush-job-response
+
+ }
+ {
+ //tag::x-pack-ml-flush-job-listener
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(FlushJobResponse FlushJobResponse) {
+ //<1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ //end::x-pack-ml-flush-job-listener
+ FlushJobRequest flushJobRequest = new FlushJobRequest("flushing-my-second-machine-learning-job");
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::x-pack-ml-flush-job-execute-async
+ client.machineLearning().flushJobAsync(flushJobRequest, RequestOptions.DEFAULT, listener); //<1>
+ // end::x-pack-ml-flush-job-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+ }
+
+
public void testGetJobStats() throws Exception {
RestHighLevelClient client = highLevelClient();
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobRequestTests.java
new file mode 100644
index 0000000000000..c2bddd436ccd5
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobRequestTests.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class FlushJobRequestTests extends AbstractXContentTestCase {
+
+ @Override
+ protected FlushJobRequest createTestInstance() {
+ FlushJobRequest request = new FlushJobRequest(randomAlphaOfLengthBetween(1, 20));
+
+ if (randomBoolean()) {
+ request.setCalcInterim(randomBoolean());
+ }
+ if (randomBoolean()) {
+ request.setAdvanceTime(String.valueOf(randomLong()));
+ }
+ if (randomBoolean()) {
+ request.setStart(String.valueOf(randomLong()));
+ }
+ if (randomBoolean()) {
+ request.setEnd(String.valueOf(randomLong()));
+ }
+ if (randomBoolean()) {
+ request.setSkipTime(String.valueOf(randomLong()));
+ }
+ return request;
+ }
+
+ @Override
+ protected FlushJobRequest doParseInstance(XContentParser parser) throws IOException {
+ return FlushJobRequest.PARSER.apply(parser, null);
+ }
+
+ @Override
+ protected boolean supportsUnknownFields() {
+ return false;
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobResponseTests.java
new file mode 100644
index 0000000000000..bc968ff4564ab
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobResponseTests.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class FlushJobResponseTests extends AbstractXContentTestCase {
+
+ @Override
+ protected FlushJobResponse createTestInstance() {
+ return new FlushJobResponse(randomBoolean(),
+ randomBoolean() ? null : new Date(randomNonNegativeLong()));
+ }
+
+ @Override
+ protected FlushJobResponse doParseInstance(XContentParser parser) throws IOException {
+ return FlushJobResponse.PARSER.apply(parser, null);
+ }
+
+ @Override
+ protected boolean supportsUnknownFields() {
+ return true;
+ }
+}
diff --git a/docs/java-rest/high-level/ml/flush-job.asciidoc b/docs/java-rest/high-level/ml/flush-job.asciidoc
new file mode 100644
index 0000000000000..1f815bba0d564
--- /dev/null
+++ b/docs/java-rest/high-level/ml/flush-job.asciidoc
@@ -0,0 +1,83 @@
+[[java-rest-high-x-pack-ml-flush-job]]
+=== Flush Job API
+
+The Flush Job API provides the ability to flush a {ml} job's
+datafeed in the cluster.
+It accepts a `FlushJobRequest` object and responds
+with a `FlushJobResponse` object.
+
+[[java-rest-high-x-pack-ml-flush-job-request]]
+==== Flush Job Request
+
+A `FlushJobRequest` object gets created with an existing non-null `jobId`.
+All other fields are optional for the request.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-request]
+--------------------------------------------------
+<1> Constructing a new request referencing an existing `jobId`
+
+==== Optional Arguments
+
+The following arguments are optional.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-request-options]
+--------------------------------------------------
+<1> Set request to calculate the interim results
+<2> Set the advanced time to flush to the particular time value
+<3> Set the start time for the range of buckets on which
+to calculate the interim results (requires `calc_interim` to be `true`)
+<4> Set the end time for the range of buckets on which
+to calculate interim results (requires `calc_interim` to be `true`)
+<5> Set the skip time to skip a particular time value
+
+[[java-rest-high-x-pack-ml-flush-job-execution]]
+==== Execution
+
+The request can be executed through the `MachineLearningClient` contained
+in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-execute]
+--------------------------------------------------
+
+[[java-rest-high-x-pack-ml-flush-job-execution-async]]
+==== Asynchronous Execution
+
+The request can also be executed asynchronously:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-execute-async]
+--------------------------------------------------
+<1> The `FlushJobRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The method does not block and returns immediately. The passed `ActionListener` is used
+to notify the caller of completion. A typical `ActionListener` for `FlushJobResponse` may
+look like
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-listener]
+--------------------------------------------------
+<1> `onResponse` is called back when the action is completed successfully
+<2> `onFailure` is called back when some unexpected error occurs
+
+[[java-rest-high-x-pack-ml-flush-job-response]]
+==== Flush Job Response
+
+A `FlushJobResponse` contains an acknowledgement and an optional end date for the
+last finalized bucket
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-response]
+--------------------------------------------------
+<1> `isFlushed()` indicates if the job was successfully flushed or not.
+<2> `getLastFinalizedBucketEnd()` provides the timestamp
+(in milliseconds-since-the-epoch) of the end of the last bucket that was processed.
\ No newline at end of file
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index 76fbc223a1b23..68320fbfe9ff3 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -211,6 +211,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
* <>
* <>
* <>
+* <>
* <>
* <>
* <>
@@ -220,6 +221,7 @@ include::ml/get-job.asciidoc[]
include::ml/delete-job.asciidoc[]
include::ml/open-job.asciidoc[]
include::ml/close-job.asciidoc[]
+include::ml/flush-job.asciidoc[]
include::ml/get-job-stats.asciidoc[]
include::ml/get-buckets.asciidoc[]
include::ml/get-records.asciidoc[]