Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/simulate ingest with pipeline defs and mapping validation #99920

Conversation

masseyke
Copy link
Member

This is a draft PR that introduces a new _ingest/simulate API that runs any pipelines on the given data that would be executed for a given index, but instead of indexing the data into the index, returns the transformed documents. The difference from the simulate pipeline API is that the simulate pipeline API only runs the single pipeline it is given. This new API could potentially run an unlimited number of pipelines -- the given pipeline, the default pipeline for the index given, any default pipelines in indices that the reroute processor forwards the data to, and the final pipeline of the last index in the chain.
For example, if we have the following pipelines:

curl -X PUT "localhost:9200/_ingest/pipeline/my-pipeline?pretty" -H 'Content-Type: application/json' -d'
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    },
    {
      "reroute": {
        "destination": "my-index-2"
      }
    }
  ]
}
'

curl -X PUT "localhost:9200/_ingest/pipeline/my-final-pipeline?pretty" -H 'Content-Type: application/json' -d'
{
  "processors": [
    {
      "set": {
        "field": "my-boolean-field",
        "value": false
      }
    }
  ]
}
'

curl -X PUT "localhost:9200/_ingest/pipeline/my-pipeline-2?pretty" -H 'Content-Type: application/json' -d'
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 20
      }
    },
    {
      "uppercase": {
        "field": "my-keyword-field"
      }
    }
    }
  ]
}
'

curl -X PUT "localhost:9200/_ingest/pipeline/my-final-pipeline-2?pretty" -H 'Content-Type: application/json' -d'
{
  "processors": [
    {
      "set": {
        "field": "my-new-boolean-field",
        "value": false
      }
    }
  ]
}
'

And then the following indexes:

curl -X PUT "localhost:9200/my-index?pretty" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "index": {
      "default_pipeline": "my-pipeline",
      "final_pipeline": "my-final-pipeline"
    }
  }
}
'
curl -u elastic:password -X PUT "localhost:9200/my-index-2?pretty" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "index": {
      "default_pipeline": "my-pipeline-2",
      "final_pipeline": "my-final-pipeline-2"
    }
  },
  "mappings": {
    "dynamic": "strict",
    "properties": {
      "my-keyword-field":    { "type": "text" },  
      "my-boolean-field":  { "type": "boolean"  }, 
      "my-new-boolean-field":  { "type": "boolean"  }, 
      "my-long-field":   { "type": "long"  }     
    }
  }
}
'

Then calling _ingest/_simulate with this data:

curl -X POST "localhost:9200/_ingest/_simulate?pretty&index=my-index" -H 'Content-Type: application/json' -d'
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}
'

might return

{
  "errors" : false,
  "took" : 0,
  "ingest_took" : 1,
  "items" : [
    {
      "create" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-long-field" : 20,
          "my-keyword-field" : "FOO",
          "my-new-boolean-field" : false
        },
        "executed_pipelines" : [
          "my-pipeline",
          "my-pipeline-2",
          "my-final-pipeline-2"
        ],
        "status" : 201
      }
    },
    {
      "index" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-long-field" : 20,
          "my-keyword-field" : "BAR",
          "my-boolean-field" : false
        },
        "executed_pipelines" : [
          "my-pipeline",
          "my-pipeline-2",
          "my-final-pipeline-2"
        ],
        "status" : 201
      }
    }
  ]
}

You can also specify substitute pipeline definitions so that you can try pipeline changes without actually having to change pipelines. For example, to substitute a new my-pipeline-2, you could do the following:

curl -X POST "localhost:9200/_ingest/_simulate?pretty&index=my-index" -H 'Content-Type: application/json' -d'
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ],
  "pipeline_substitutions": {
    "my-pipeline-2": {
      "processors": [
        {
          "set": {
            "field": "my-new-boolean-field",
            "value": true
          }
        }
      ]
    }
  }
}
'

This substitutes the pipeline body given in the request for the my-pipeline-2 stored in the cluster. The pipeline definition is only changed for this request, and does not impact anything else running on the cluster now or in the future.

If the index that the data would land in (my-index-2 in the example above) exists, then the API will validate that the output of the pipelines is compatible with the index. For example if we intentionally set my-boolean-field to the wrong type:

curl -X POST "localhost:9200/_ingest/_simulate?pretty&index=my-index" -H 'Content-Type: application/json' -d'
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ],
  "pipeline_substitutions": {
    "my-pipeline-2": {
      "processors": [
        {
          "set": {
            "field": "my-boolean-field",
            "value": "not-a-boolean"
          }
        }
      ]
    }
  }
}
'

Then you would still get the output of the pipelines, but you would also get the validation error:

{
  "errors" : false,
  "took" : 2,
  "ingest_took" : 0,
  "items" : [
    {
      "index" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-new-boolean-field" : false,
          "my-long-field" : 10,
          "my-keyword-field" : "foo",
          "my-boolean-field" : "not-a-boolean"
        },
        "executed_pipelines" : [
          "my-pipeline",
          "my-pipeline-2",
          "my-final-pipeline-2"
        ],
        "error" : {
          "type" : "document_parsing_exception",
          "reason" : "[1:94] failed to parse field [my-boolean-field] of type [boolean] in document with id '_id'. Preview of field's value: 'not-a-boolean'",
          "caused_by" : {
            "type" : "illegal_argument_exception",
            "reason" : "Failed to parse value [not-a-boolean] as only [true] or [false] are allowed."
          }
        },
        "status" : 201
      }
    },
    {
      "index" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-new-boolean-field" : false,
          "my-long-field" : 10,
          "my-keyword-field" : "bar",
          "my-boolean-field" : "not-a-boolean"
        },
        "executed_pipelines" : [
          "my-pipeline",
          "my-pipeline-2",
          "my-final-pipeline-2"
        ],
        "error" : {
          "type" : "document_parsing_exception",
          "reason" : "[1:94] failed to parse field [my-boolean-field] of type [boolean] in document with id '_id'. Preview of field's value: 'not-a-boolean'",
          "caused_by" : {
            "type" : "illegal_argument_exception",
            "reason" : "Failed to parse value [not-a-boolean] as only [true] or [false] are allowed."
          }
        },
        "status" : 201
      }
    }
  ]
}

If the index where the data would land does not exist, then the result of the pipelines is displayed, along with an error message that the index does not exist (so the mappings could not be validated). For example after calling:

curl -u elastic:password -X DELETE "localhost:9200/my-index-2"

Then:

curl -X POST "localhost:9200/_ingest/_simulate?pretty&index=my-index" -H 'Content-Type: application/json' -d'
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ],
  "pipeline_substitutions": {
    "my-pipeline-2": {
      "processors": [
        {
          "set": {
            "field": "my-boolean-field",
            "value": "not-a-boolean"
          }
        }
      ]
    }
  }
}
'
{
  "errors" : false,
  "took" : 1,
  "ingest_took" : 0,
  "items" : [
    {
      "index" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-long-field" : 10,
          "my-keyword-field" : "foo",
          "my-boolean-field" : true
        },
        "executed_pipelines" : [
          "my-pipeline"
        ],
        "error" : {
          "type" : "index_not_found_exception",
          "reason" : "no such index [my-index-2]",
          "resource.type" : "index_or_alias",
          "resource.id" : "my-index-2",
          "index_uuid" : "_na_",
          "index" : "my-index-2"
        },
        "status" : 201
      }
    },
    {
      "index" : {
        "_index" : "my-index-2",
        "_source" : {
          "my-long-field" : 10,
          "my-keyword-field" : "bar",
          "my-boolean-field" : true
        },
        "executed_pipelines" : [
          "my-pipeline"
        ],
        "error" : {
          "type" : "index_not_found_exception",
          "reason" : "no such index [my-index-2]",
          "resource.type" : "index_or_alias",
          "resource.id" : "my-index-2",
          "index_uuid" : "_na_",
          "index" : "my-index-2"
        },
        "status" : 201
      }
    }
  ]
}

Regardless of the result of the call to the API, no data is actually indexed, and no mappings are actually updated.

As a side note, here were some of the guidelines I used (and why the code is a little odd):

Make the API easy to use, and familiar to users of the simulate pipeline API.
Use as much of the existing bulk API logic as possible so that simulate does not diverge from real ingest behavior
Do not impact bulk API performance
Modify the bulk API code as little as possible. This is very critical code, and any change is an opportunity to introduce bugs.

@masseyke masseyke added >enhancement :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v8.11.0 labels Sep 26, 2023
@elasticsearchmachine
Copy link
Collaborator

Hi @masseyke, I've created a changelog YAML for you.

…validation' of github.com:masseyke/elasticsearch into feature/simulate-ingest-with-pipeline-defs-and-mapping-validation
@mattc58 mattc58 added v8.12.0 and removed v8.11.0 labels Oct 4, 2023
@philippkahr
Copy link
Contributor

The _simulate currently also cannot deal with the reroute processor and resolve the configured pipelines in the default pipeline configuration on the index. Do you think this fixes this as well?

Because this:

POST _ingest/pipeline/_simulate
{
  "docs": [
    {
      "_index": "logs-generic-default",
      "_source": {
        "message": "abc"
      }
    }
  ],
  "pipeline": {
    "processors": [
      {
        "reroute": {
          "dataset": [
            "system.security"
          ]
        }
      }
    ]
  }
}

results in this:

{
  "docs": [
    {
      "doc": {
        "_index": "logs-system.security-default",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "message": "abc",
          "data_stream": {
            "namespace": "default",
            "type": "logs",
            "dataset": "system.security"
          }
        },
        "_ingest": {
          "timestamp": "2023-12-14T08:45:29.309995305Z"
        }
      }
    }
  ]
}

And the system.security has a very long pipeline that should be run.

Also I somehow never use this:

_ingest/_simulate?pretty&index=my-index

I usually do this (through Kibana Dev Tools)

_ingest/_simulate
{
   "docs": [ { "_index": "logs-generic-default", "_source": { ... } ], "pipeline": ...

So I'll add the index name into the _index field. I guess that's mostly due to me knowing what a document looks like.

@masseyke
Copy link
Member Author

masseyke commented Jan 8, 2024

The _simulate currently also cannot deal with the reroute processor and resolve the configured pipelines in the default pipeline configuration on the index. Do you think this fixes this as well?

This will not impact the _simulate API. It is an entirely new _simulate_ingest API. I believe the _simulate API is working as intended there -- it only works on a single pipeline, and does not look at indices at all. So as it is designed, it is correctly setting the index to the string system.security, but it never goes to look at what system.security is. I think the new _simulate_ingest API might be more what you're looking for. It is coming out in 8.12.0 -- https://www.elastic.co/guide/en/elasticsearch/reference/8.12/simulate-ingest-api.html.

@philippkahr
Copy link
Contributor

philippkahr commented Jan 8, 2024

I believe the _simulate API is working as intended there -- it only works on a single pipeline,

It sort of does. If you specify a pipeline processor it will read that one as well, so you can run multiple nested pipelines.

Will the new _simulate_ingest also get a fancy UI like the current ingest pipeline simulate wrapper, showcasing what is working and what not?

Does this imply that the _ingest/pipeline/_simulate is now deprecated and shouldn't be used anymore? Will both coexist? Wasn't there a possibility to adapt the existing _ingest/pipeline/_simulate API to contain the new features, using some flags? I think I have relied on the _ingest/pipeline/_simulate for a couple of years now...

@masseyke
Copy link
Member Author

masseyke commented Jan 8, 2024

Will the new _simulate_ingest also get a fancy UI like the current ingest pipeline simulate wrapper, showcasing what is working and what not?

Yes I believe so, but I don't have a timeline for that and it definitely won't be in 8.13.0.

Does this imply that the _ingest/pipeline/_simulate is now deprecated and shouldn't be used anymore?

No, definitely not. The two APIs server different purposes. The older API is more useful for development of individual pipelines. The new API is more useful for testing the integration of multiple pipelines and their configuration on indices.

@philippkahr
Copy link
Contributor

Ok perfect, just one thing that I didn't see in the tests here to run. Does it also work with dynamic mappings and runtime mappings when they are defined in the index?

@masseyke
Copy link
Member Author

masseyke commented Jan 9, 2024

Ok perfect, just one thing that I didn't see in the tests here to run. Does it also work with dynamic mappings and runtime mappings when they are defined in the index?

Yes, it is using the exact same code as the _bulk API, pulling together index definitions from the cluster state in the same way. So it will correctly follow any reroute processors. Note that the feature as it exists in 8.13.0 does not actually validate that the output of your pipelines is good with respect to the mappings of the index it would be inserted into. That's part of what this draft PR is dealing with, and it will be in some future version.

@masseyke
Copy link
Member Author

Closed in favor of #101409 and #106440

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement Team:Data Management Meta label for data/management team v8.14.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants