Skip to content

Commit

Permalink
Support for wildcards and override option for dot_expander processor (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny authored and elasticsearchmachine committed Jul 8, 2021
1 parent 593216e commit 52b4670
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 10 deletions.
76 changes: 72 additions & 4 deletions docs/reference/ingest/processors/dot-expand.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ Otherwise these fields can't be accessed by any processor.
.Dot Expand Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to expand into an object field
| `path` | no | - | The field that contains the field to expand. Only required if the field to expand is part another object field, because the `field` option can only understand leaf fields.
| Name | Required | Default | Description
| `field` | yes | - | The field to expand into an object field. If set to `*`, all top-level fields will be expanded.
| `path` | no | - | The field that contains the field to expand. Only required if the field to expand is part another object field, because the `field` option can only understand leaf fields.
| `override`| no | false | Controls the behavior when there is already an existing nested object that conflicts with the expanded field. When `false`, the processor will merge conflicts by combining the old and the new values into an array. When `true`, the value from the expanded field will overwrite the existing value.
include::common-options.asciidoc[]
|======

Expand Down Expand Up @@ -79,6 +80,73 @@ is transformed by the `dot_expander` processor into:
--------------------------------------------------
// NOTCONSOLE

Contrast that with when the `override` option is set to `true`.

[source,js]
--------------------------------------------------
{
"dot_expander": {
"field": "foo.bar",
"override": true
}
}
--------------------------------------------------
// NOTCONSOLE

In that case, the value of the expanded field overrides the value of the nested object.

[source,js]
--------------------------------------------------
{
"foo" : {
"bar" : "value2"
}
}
--------------------------------------------------
// NOTCONSOLE

'''

The value of `field` can also be set to a `*` to expand all top-level dotted field names:

[source,js]
--------------------------------------------------
{
"dot_expander": {
"field": "*"
}
}
--------------------------------------------------
// NOTCONSOLE

The dot expand processor would turn this document:

[source,js]
--------------------------------------------------
{
"foo.bar" : "value",
"baz.qux" : "value"
}
--------------------------------------------------
// NOTCONSOLE

into:

[source,js]
--------------------------------------------------
{
"foo" : {
"bar" : "value"
},
"baz" : {
"qux" : "value"
}
}
--------------------------------------------------
// NOTCONSOLE

'''

If any field outside of the leaf field conflicts with a pre-existing field of the same name,
then that field needs to be renamed first.

Expand All @@ -105,7 +173,7 @@ pipeline should be used:
{
"rename" : {
"field" : "foo",
"target_field" : "foo.bar""
"target_field" : "foo.bar"
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.ArrayList;
import java.util.Map;

public final class DotExpanderProcessor extends AbstractProcessor {
Expand All @@ -21,11 +22,17 @@ public final class DotExpanderProcessor extends AbstractProcessor {

private final String path;
private final String field;
private final boolean override;

DotExpanderProcessor(String tag, String description, String path, String field) {
this(tag, description, path, field, false);
}

DotExpanderProcessor(String tag, String description, String path, String field, boolean override) {
super(tag, description);
this.path = path;
this.field = field;
this.override = override;
}

@Override
Expand All @@ -41,10 +48,29 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
map = ingestDocument.getSourceAndMetadata();
}

if (this.field.equals("*")) {
for (String key : new ArrayList<>(map.keySet())) {
if (key.indexOf('.') > 0) {
path = this.path != null ? this.path + "." + key : key;
expandDot(ingestDocument, path, key, map);
}
}
} else {
expandDot(ingestDocument, path, field, map);
}

return ingestDocument;
}

private void expandDot(IngestDocument ingestDocument, String path, String field, Map<String, Object> map) {
if (map.containsKey(field)) {
if (ingestDocument.hasField(path)) {
Object value = map.remove(field);
ingestDocument.appendFieldValue(path, value);
if (override) {
ingestDocument.setFieldValue(path, value);
} else {
ingestDocument.appendFieldValue(path, value);
}
} else {
// check whether we actually can expand the field in question into an object field.
// part of the path may already exist and if part of it would be a value field (string, integer etc.)
Expand All @@ -66,7 +92,6 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue(path, value);
}
}
return ingestDocument;
}

@Override
Expand All @@ -88,9 +113,9 @@ public static final class Factory implements Processor.Factory {
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, String description,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
if (field.contains(".") == false) {
if (field.contains(".") == false && field.equals("*") == false) {
throw ConfigurationUtils.newConfigurationException(ConfigurationUtils.TAG_KEY, tag, "field",
"field does not contain a dot");
"field does not contain a dot and is not a wildcard");
}
if (field.indexOf('.') == 0 || field.lastIndexOf('.') == field.length() - 1) {
throw ConfigurationUtils.newConfigurationException(ConfigurationUtils.TAG_KEY, tag, "field",
Expand All @@ -106,7 +131,8 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
}

String path = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "path");
return new DotExpanderProcessor(tag, null, path, field);
boolean override = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", false);
return new DotExpanderProcessor(tag, null, path, field, override);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testCreate_invalidFields() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", field);
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, "_tag", null, config));
assertThat(e.getMessage(), equalTo("[field] field does not contain a dot"));
assertThat(e.getMessage(), equalTo("[field] field does not contain a dot and is not a wildcard"));
}

fields = new String[] {".a", "a.", "."};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,54 @@ public void testEscapeFields_doNothingIfFieldNotInSourceDoc() throws Exception {
assertThat(document.getFieldValue("foo.bar", String.class), equalTo("baz1"));
}

public void testOverride() throws Exception {
Map<String, Object> source = new HashMap<>();
Map<String, Object> inner = new HashMap<>();
inner.put("bar", "baz1");
inner.put("qux", "quux");
source.put("foo", inner);
source.put("foo.bar", "baz2");
IngestDocument document = new IngestDocument(source, Map.of());
DotExpanderProcessor processor = new DotExpanderProcessor("_tag", null, null, "foo.bar", true);
processor.execute(document);
assertThat(document.getFieldValue("foo", Map.class).size(), equalTo(2));
assertThat(document.getFieldValue("foo.bar", String.class), equalTo("baz2"));
assertThat(document.getFieldValue("foo.qux", String.class), equalTo("quux"));
}

public void testWildcard() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put("foo.bar", "baz");
source.put("qux.quux", "corge");
IngestDocument document = new IngestDocument(source, Map.of());
DotExpanderProcessor processor = new DotExpanderProcessor("_tag", null, null, "*");
processor.execute(document);
assertThat(document.getFieldValue("foo", Map.class).size(), equalTo(1));
assertThat(document.getFieldValue("foo.bar", String.class), equalTo("baz"));
assertThat(document.getFieldValue("qux", Map.class).size(), equalTo(1));
assertThat(document.getFieldValue("qux.quux", String.class), equalTo("corge"));

source = new HashMap<>();
Map<String, Object> inner = new HashMap<>();
inner.put("bar.baz", "qux");
source.put("foo", inner);
document = new IngestDocument(source, Map.of());
processor = new DotExpanderProcessor("_tag", null, "foo", "*");
processor.execute(document);
assertThat(document.getFieldValue("foo", Map.class).size(), equalTo(1));
assertThat(document.getFieldValue("foo.bar", Map.class).size(), equalTo(1));
assertThat(document.getFieldValue("foo.bar.baz", String.class), equalTo("qux"));

source = new HashMap<>();
inner = new HashMap<>();
inner.put("bar.baz", "qux");
source.put("foo", inner);
document = new IngestDocument(source, Map.of());
processor = new DotExpanderProcessor("_tag", null, null, "*");
processor.execute(document);
assertThat(document.getFieldValue("foo", Map.class).size(), equalTo(1));
IngestDocument finalDocument = document;
expectThrows(IllegalArgumentException.class, () -> finalDocument.getFieldValue("foo.bar", Map.class));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ teardown:
ingest.delete_pipeline:
id: "1"
ignore: 404
- do:
ingest.delete_pipeline:
id: "2"
ignore: 404

---
"Test escape_dot processor":
Expand Down Expand Up @@ -36,3 +40,40 @@ teardown:
index: test
id: 1
- match: { _source.foo.bar: "baz" }
---
"Test escape_dot processor with override and wildcard":
- do:
ingest.put_pipeline:
id: "2"
body: >
{
"processors": [
{
"dot_expander" : {
"field" : "*",
"override": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 2
pipeline: "2"
body: {
foo.bar: "baz",
foo: {
bar: "override_me",
qux: "quux"
}
}

- do:
get:
index: test
id: 2
- match: { _source.foo.bar: "baz" }
- match: { _source.foo.qux: "quux" }

0 comments on commit 52b4670

Please sign in to comment.