Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IngestDocument reroute recursion detection #95350

Merged
merged 4 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ public final class IngestDocument {

// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

/**
* An ordered set of the values of the _index that have been used for this document.
* <p>
* IMPORTANT: This is only updated after a top-level pipeline has run (see {@code IngestService#executePipelines(...)}).
* <p>
* For example, if a processor changes the _index for a document from 'foo' to 'bar',
* and then another processor changes the value back to 'foo', then the overall effect
* of the pipeline was that the _index value did not change and so only 'foo' would appear
* in the index history.
*/
private Set<String> indexHistory = new LinkedHashSet<>();

private boolean doNoSelfReferencesCheck = false;
private boolean reroute = false;

Expand All @@ -70,21 +83,27 @@ public IngestDocument(String index, String id, long version, String routing, Ver
this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow());
this.templateModel = initializeTemplateModel();

// initialize the index history by putting the current index into it
this.indexHistory.add(index);
}

// note: these rest of these constructors deal with the data-centric view of the IngestDocument, not the execution-centric view.
// For example, the copy constructor doesn't populate the `executedPipelines` or `indexHistory` (as well as some other fields),
// because those fields are execution-centric.

/**
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
*/
public IngestDocument(IngestDocument other) {
this(
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
this.reroute = other.reroute;
}

/**
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
*/
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
Map<String, Object> source;
Expand All @@ -107,7 +126,7 @@ public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object>
}

/**
* Constructor to create an IngestDocument from its constituent maps
* Constructor to create an IngestDocument from its constituent maps.
*/
IngestDocument(IngestCtxMap ctxMap, Map<String, Object> ingestMetadata) {
this.ctxMap = Objects.requireNonNull(ctxMap);
Expand Down Expand Up @@ -841,6 +860,24 @@ List<String> getPipelineStack() {
return pipelineStack;
}

/**
* Adds an index to the index history for this document, returning true if the index
* was added to the index history (i.e. if it wasn't already in the index history).
*
* @param index the index to potentially add to the index history
* @return true if the index history did not already contain the index in question
*/
public boolean updateIndexHistory(String index) {
return indexHistory.add(index);
}

/**
* @return an unmodifiable view of the document's index history
*/
public Set<String> getIndexHistory() {
return Collections.unmodifiableSet(indexHistory);
}

/**
* @return Whether a self referencing check should be performed
*/
Expand Down Expand Up @@ -990,7 +1027,7 @@ static ResolveResult error(String errorMessage) {
/**
* Provides a shallowly read-only, very limited, map-like view of two maps. The only methods that are implemented are
* {@link Map#get(Object)} and {@link Map#containsKey(Object)}, everything else throws UnsupportedOperationException.
*
* <p>
* The overrides map has higher priority than the primary map -- values in that map under some key will take priority over values
* in the primary map under the same key.
*
Expand Down
21 changes: 9 additions & 12 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -688,9 +687,7 @@ public void onFailure(Exception e) {
});

IngestDocument ingestDocument = newIngestDocument(indexRequest);
LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
indexRecursionDetection.add(indexRequest.index());
executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection);
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
i++;
}
}
Expand Down Expand Up @@ -774,8 +771,7 @@ private void executePipelines(
final PipelineIterator pipelines,
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
final ActionListener<Boolean> listener,
final Set<String> indexRecursionDetection
final ActionListener<Boolean> listener
) {
assert pipelines.hasNext();
PipelineSlot slot = pipelines.next();
Expand Down Expand Up @@ -859,17 +855,18 @@ private void executePipelines(
return; // document failed!
}

// check for cycles in the visited indices
if (indexRecursionDetection.add(newIndex) == false) {
List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
indexRoute.add(newIndex);
// add the index to the document's index history, and check for cycles in the visited indices
boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
if (cycle) {
List<String> indexCycle = new ArrayList<>(ingestDocument.getIndexHistory());
indexCycle.add(newIndex);
listener.onFailure(
new IllegalStateException(
format(
"index cycle detected while processing pipeline [%s] for document [%s]: %s",
pipelineId,
indexRequest.id(),
indexRoute
indexCycle
)
)
);
Expand All @@ -890,7 +887,7 @@ private void executePipelines(
}

if (newPipelines.hasNext()) {
executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection);
executePipelines(newPipelines, indexRequest, ingestDocument, listener);
} else {
// update the index request's source and (potentially) cache the timestamp for TSDB
updateIndexRequestSource(indexRequest, ingestDocument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.Before;

import java.time.Instant;
Expand Down Expand Up @@ -1142,4 +1143,19 @@ public void testIsMetadata() {
assertFalse(IngestDocument.Metadata.isMetadata("address"));
}

public void testIndexHistory() {
// the index history contains the original index
String index1 = ingestDocument.getFieldValue("_index", String.class);
assertThat(index1, equalTo("index"));
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1));

// it can be updated to include another index
String index2 = "another_index";
assertTrue(ingestDocument.updateIndexHistory(index2));
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));

// an index cycle cannot be introduced, however
assertFalse(ingestDocument.updateIndexHistory(index1));
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));
}
}