Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a simulate ingest API #99270

5 changes: 5 additions & 0 deletions docs/changelog/99270.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99270
summary: Adding a simulate ingest API
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ static TransportVersion def(int id) {
public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0);
public static final TransportVersion RECOVERY_COMMIT_TOO_NEW_EXCEPTION_ADDED = def(8_503_00_0);
public static final TransportVersion NODE_INFO_COMPONENT_VERSIONS_ADDED = def(8_504_00_0);
public static final TransportVersion PIPELINES_IN_BULK_RESPONSE_ADDED = def(8_505_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.SimulateBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSimulateBulkAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.explain.ExplainAction;
Expand Down Expand Up @@ -441,6 +443,7 @@
import org.elasticsearch.rest.action.ingest.RestDeletePipelineAction;
import org.elasticsearch.rest.action.ingest.RestGetPipelineAction;
import org.elasticsearch.rest.action.ingest.RestPutPipelineAction;
import org.elasticsearch.rest.action.ingest.RestSimulateIngestAction;
import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction;
import org.elasticsearch.rest.action.search.RestClearScrollAction;
import org.elasticsearch.rest.action.search.RestCountAction;
Expand Down Expand Up @@ -761,6 +764,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class);
actions.register(TransportShardMultiGetAction.TYPE, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
actions.register(SimulateBulkAction.INSTANCE, TransportSimulateBulkAction.class);
actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
Expand Down Expand Up @@ -996,6 +1000,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestGetPipelineAction());
registerHandler.accept(new RestDeletePipelineAction());
registerHandler.accept(new RestSimulatePipelineAction());
registerHandler.accept(new RestSimulateIngestAction());

// Dangling indices API
registerHandler.accept(new RestListDanglingIndicesAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.transport.TransportRequestOptions;

public class SimulateBulkAction extends ActionType<BulkResponse> {

public static final SimulateBulkAction INSTANCE = new SimulateBulkAction();
public static final String NAME = "indices:admin/simulate/bulk";

private static final TransportRequestOptions TRANSPORT_REQUEST_OPTIONS = TransportRequestOptions.of(
null,
TransportRequestOptions.Type.BULK
);

private SimulateBulkAction() {
super(NAME, BulkResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class SimulateBulkRequest extends BulkRequest {
private Map<String, Object> pipelineSubstitutions = Map.of();

public SimulateBulkRequest() {
super();
}

public SimulateBulkRequest(StreamInput in) throws IOException {
super(in);
this.pipelineSubstitutions = in.readMap();
}

public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeGenericMap(pipelineSubstitutions);
}

public void setPipelineSubstitutions(Map<String, Object> pipelineSubstitutions) {
this.pipelineSubstitutions = pipelineSubstitutions;
}

@SuppressWarnings("unchecked")
public Map<String, Pipeline> getPipelineSubstitutions(IngestService ingestService) throws Exception {
Map<String, Pipeline> parsedPipelineSubstitutions = new HashMap<>();
if (pipelineSubstitutions != null) {
for (Map.Entry<String, Object> entry : pipelineSubstitutions.entrySet()) {
String pipelineId = entry.getKey();
Pipeline pipeline = Pipeline.create(
pipelineId,
(Map<String, Object>) entry.getValue(),
ingestService.getProcessorFactories(),
ingestService.getScriptService()
);
parsedPipelineSubstitutions.put(pipelineId, pipeline);
}
}
return parsedPipelineSubstitutions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,35 @@ public TransportBulkAction(
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this(
BulkAction.NAME,
threadPool,
transportService,
clusterService,
ingestService,
client,
actionFilters,
indexNameExpressionResolver,
indexingPressure,
systemIndices,
relativeTimeProvider
);
}

TransportBulkAction(
String actionName,
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
IngestService ingestService,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
super(actionName, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand Down Expand Up @@ -336,6 +364,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
}

// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
indexData(task, bulkRequest, executorName, listener, responses, autoCreateIndices, indicesThatCannotBeCreated, startTime);
}

protected void indexData(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
Expand Down Expand Up @@ -386,6 +427,10 @@ protected void doRun() {
}
}

protected IngestService getIngestService(BulkRequest request) {
return ingestService;
}

static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, Metadata metadata) {
DocWriteRequest.OpType opType = writeRequest.opType();
if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) {
Expand Down Expand Up @@ -491,7 +536,7 @@ private static boolean setResponseFailureIfIndexMatches(
return false;
}

private long buildTookInMillis(long startTimeNanos) {
protected long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}

Expand Down Expand Up @@ -809,7 +854,7 @@ private void processBulkIndexIngestRequest(
) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
getIngestService(original).executeBulkRequest(
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.SimulateIngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Map;
import java.util.Set;

public class TransportSimulateBulkAction extends TransportBulkAction {
@Inject
public TransportSimulateBulkAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
IngestService ingestService,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices
) {
super(
SimulateBulkAction.NAME,
threadPool,
transportService,
clusterService,
ingestService,
client,
actionFilters,
indexNameExpressionResolver,
indexingPressure,
systemIndices,
System::nanoTime
);
}

@Override
protected void indexData(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
responses.set(
i,
BulkItemResponse.success(
0,
DocWriteRequest.OpType.CREATE,
new SimulateIndexResponse(
request.index(),
((IndexRequest) request).source(),
((IndexRequest) request).getContentType(),
((IndexRequest) request).getPipelines()
)
)
);
}
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
}

@Override
protected IngestService getIngestService(BulkRequest request) {
IngestService rawIngestService = super.getIngestService(request);
return new SimulateIngestService(rawIngestService, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/
private Object rawTimestamp;
private boolean pipelinesHaveRun = false;
private List<String> pipelines = new ArrayList<>();

public IndexRequest(StreamInput in) throws IOException {
this(null, in);
Expand Down Expand Up @@ -167,6 +171,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
pipelinesHaveRun = in.readBoolean();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.PIPELINES_IN_BULK_RESPONSE_ADDED)) {
this.pipelines = new ArrayList<>(in.readCollectionAsList(StreamInput::readString));
}
}

public IndexRequest() {
Expand Down Expand Up @@ -727,6 +734,9 @@ private void writeBody(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
out.writeBoolean(pipelinesHaveRun);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.PIPELINES_IN_BULK_RESPONSE_ADDED)) {
out.writeCollection(pipelines, StreamOutput::writeString);
}
}

@Override
Expand Down Expand Up @@ -830,4 +840,12 @@ public void setPipelinesHaveRun() {
public boolean pipelinesHaveRun() {
return pipelinesHaveRun;
}

public void addPipeline(String pipeline) {
pipelines.add(pipeline);
}

public List<String> getPipelines() {
return Collections.unmodifiableList(pipelines);
}
}
Loading