Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce reroute method on IngestDocument #94000

Merged
merged 28 commits into from
Mar 22, 2023

Conversation

felixbarny
Copy link
Member

@felixbarny felixbarny commented Feb 22, 2023

  • Overrides _index
  • Skips current pipeline
  • Invokes default pipeline of new index

Requirement for

Combines #85932 and #85931

Fixes #83653

- Overrides _index
- Skips current pipeline
- Invokes default pipeline of new index
@elasticsearchmachine elasticsearchmachine added v8.8.0 external-contributor Pull request authored by a developer outside the Elasticsearch team needs:triage Requires assignment of a team area label labels Feb 22, 2023
@felixbarny felixbarny added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement labels Feb 22, 2023
@elasticsearchmachine elasticsearchmachine added Team:Data Management Meta label for data/management team and removed needs:triage Requires assignment of a team area label labels Feb 22, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

return new Pipelines(pipelineId, finalPipelineId);
}

private static class Pipelines implements Iterable<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable abstraction, but I don't like that we make it mutable, and that we don't encapsulate enough here.

The withoutDefaultPipeline method makes me uncomfortable as its name makes it sound like it would return a new object rather than making a mutable change.

I also think we don't need to have executePipelines(...) take a boolean, we're treating this internally as though we'll always have a list of pipelines, but we could probably get away with passing a proper object instead Iterator<String>. I think it'd be much clearer that way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I abstracted the Iterator<String> and the boolean flag into a PipelineIterator, however it's not immutable. As that's the nature of iterators, I think that's fine.

The executePipelines needs to know about three properties of the current pipeline: the name (to add the name in the exception in case the pipeline itself can't be resolved), the pipeline itself, and whether the current pipeline is the final pipeline (if true, it's disallowed to override _index).

The PipelineIterator encapsulates that state and I think that makes it cleaner than before. Thanks for the suggestion 👍

Comment on lines 923 to 925
void resetPipelineSkipping() {
skipCurrentPipeline = false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this also reset invokeDefaultPipelineOfDestination? Also, it's unclear from the code why we need this method to be invoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it needs to reset both. I've changed it to just be a single boolean.

Comment on lines 915 to 921
boolean isInvokeDefaultPipelineOfDestination() {
return invokeDefaultPipelineOfDestination;
}

boolean isSkipCurrentPipeline() {
return skipCurrentPipeline;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need multiple boolean flags for this right? If we do, then it's not clear enough why they have to be separated (needs documentation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it to just be a single boolean and a single getter for it (isRedirect()). The downside is that isRedirect() is less expressive as isSkipCurrentPipeline() in the context of determining whether to skip the pipeline.
We could keep isSkipCurrentPipeline() and just return the redirect flag. But that feels a bit off as well.

@felixbarny felixbarny changed the title Introduce redirect method on IngestDocument Introduce reroute method on IngestDocument Mar 1, 2023
@joegallo
Copy link
Contributor

joegallo commented Mar 7, 2023

I'm pretty sure this is going to fail the test introduced by #94281, which means that this PR is a bugfix for #83653. /cc @HiDAl

edit: added a commit and updated the description

double edit: there might be a BWC issue with the test -- mixed mode clusters wouldn't always have the bugfix and all that. If that's the case, I'll take on the task of adding the right limitations to the test to make it version-specific. (edit: there's no bwc tests yaml tests in modules/ingest-common, at least as far as I can see.)

Since this PR fixes the linked bug
joegallo added 3 commits March 7, 2023 16:27
If a final pipeline changes the indices such that a cycle is created,
it's more important to error that a final pipeline changed the indices
than that a cycle was created.
@HiDAl
Copy link

HiDAl commented Mar 8, 2023

Yesterday I had to revert the test because it was failing due to warning headers. Today I pushed and merged PR #94388 which addresses the issue. Please rebase/merge on this PR @joegallo @felixbarny

@felixbarny
Copy link
Member Author

I've updated to main and re-adjusted the test. I ran that test locally and I can confirm that it prevents the final pipeline from running twice.

@@ -903,6 +904,29 @@ public String toString() {
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
}

public void reroute(String destIndex) {
getMetadata().setIndex(destIndex);
Copy link
Member Author

@felixbarny felixbarny Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joegallo what do you think bout adding the history of the _index field in an ingest metadata field? This wouldn't be indexed by default but in order to debug, users can use a set processor to add this to the documents:

{
  "set": {
    "field": "reroute_history",
    "copy_from": "_ingest.reroute_history"
  }
}
Suggested change
getMetadata().setIndex(destIndex);
getMetadata().setIndex(destIndex);
appendFieldValue("_ingest.reroute_history", getMetadata().getIndex());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to adding a mechanism like that in a future PR, but I would like to keep the scope of this PR fixed.

When we do add that mechanism, though, I'd prefer that the list be an immutable reference to the collection we're tracking for index recursion purposes, rather than a new collection. Similarly, appendFieldValue is more for processors to use when the first argument is customer-provided -- we can just traverse the data structures ourselves in IngestService, there's no need for string parsing and evaluation there.

@joegallo
Copy link
Contributor

@elasticmachine update branch

@joegallo
Copy link
Contributor

joegallo commented Mar 21, 2023

I added some new commits this morning that address my biggest issues here -- but the failure we see from elasticsearch-ci/part-1 seems to be real (and so would have its root in my commits). I'll add another commit here and get that passing again. ☹️ (edit: fixed 2db2a19)


I was waiting for green CI before I added the following comment, but since CI is going to be red for a little while I'll just drop my message in now:

I'm not especially enamored with the way this tracks indexRecursionDetection -- there's a somewhat similar executedPipelines that's being tracked by IngestDocument itself. Having IngestDocument track the index changes and reroutes like that would simplify the parameters to executePipelines, and I think it would make later changes like adding the index history a bit simpler.

That said -- this is good enough as it is. I'll take on moving indexRecursionDetection over to IngestDocument as a refactoring in a separate PR. Let's merge this and start iterating on #76511.

@joegallo joegallo requested a review from dakrone March 21, 2023 14:50
But make the null handling explicit rather than implicit
@felixbarny
Copy link
Member Author

I'm not especially enamored with the way this tracks indexRecursionDetection -- there's a somewhat similar executedPipelines that's being tracked by IngestDocument itself. Having IngestDocument track the index changes and reroutes like that would simplify the parameters to executePipelines, and I think it would make later changes like adding the index history a bit simpler.

I completely agree with that. Ideally, ingest pipelines and scripts should not be able to modify the history so that we can be certain whether a document is sent directly to a target or whether it has been rerouted.

I'll take on moving indexRecursionDetection over to IngestDocument as a refactoring in a separate PR.

Thanks!

Let's merge this and start iterating on #76511.

Thanks for the review and the approval! I'll wait for @dakrone's approval before merging.

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM also, thanks for all the work on this Felix & Joe

@felixbarny felixbarny merged commit cdf2522 into elastic:main Mar 22, 2023
yaauie added a commit to yaauie/elasticsearch that referenced this pull request Jun 19, 2023
Logstash's Integration filter works directly with the processors, but cannot
use the IngestService that is tightly-coupled with cluster state and must
therefore emulate the behavior introduced in elastic#94000. To do so, the additional
methods for inquiring about and resetting the reroute state need to be
externally-accessible.
yaauie added a commit to yaauie/elasticsearch that referenced this pull request Jun 20, 2023
Logstash's Integration filter works directly with the processors, but cannot
use the IngestService that is tightly-coupled with cluster state and must
therefore emulate the behavior introduced in elastic#94000.

To do so, the additional methods for inquiring about and resetting the reroute
state need to be externally-accessible. Exposing them through a clearly-named
bridge allows us to avoid making these Elastic-internal bits a part of the
public APIs that are subject to years-long stability and deprecation notice
policies.
yaauie added a commit that referenced this pull request Jun 20, 2023
…96958)

* ingest: expose reroute inquiry/reset via Elastic-internal API bridge

Logstash's Integration filter works directly with the processors, but cannot
use the IngestService that is tightly-coupled with cluster state and must
therefore emulate the behavior introduced in #94000.

To do so, the additional methods for inquiring about and resetting the reroute
state need to be externally-accessible. Exposing them through a clearly-named
bridge allows us to avoid making these Elastic-internal bits a part of the
public APIs that are subject to years-long stability and deprecation notice
policies.

* Update docs/changelog/96958.yaml

* javadoc: rephrase to avoid use of @APinote
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Data Management Meta label for data/management team v8.8.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

final_pipeline appears to process twice when using the date match index processor
6 participants