Skip to content

Commit

Permalink
Introduce passthrough field type
Browse files Browse the repository at this point in the history
`PassthoughObjectMapper` extends `ObjectMapper` to create a container
for fields that also need to be referenced as if they were at the root
level. This is done by creating aliases for all its subfields.

It also supports an option of annotating all its subfields as
dimensions. This will be leveraged in TSDB, where dimension fields can
be dynamically defined as nested under a passthrough object - and still
referenced directly (i.e. without prefixes) in aggregation queries.

Related to elastic#103567
  • Loading branch information
kkrik-es committed Dec 21, 2023
1 parent cd6a2fd commit 3cb55c6
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ private List<String> findRoutingPaths(String indexName, Settings allSettings, Li
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, dummyShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, shardReplicas)
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
// Avoid failing because index.routing_path is missing
.put(IndexSettings.MODE.getKey(), IndexMode.STANDARD)
.putList(INDEX_ROUTING_PATH.getKey(), List.of("path"))
.build();

tmpIndexMetadata.settings(finalResolvedSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,83 @@ index without timestamp with pipeline:
pipeline: my_pipeline
body:
- '{"@timestamp": "wrong_format", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'

---
dynamic templates:
- skip:
version: " - 8.12.99"
features: "default_shards"
reason: "Support for dynamic fields was added in 8.13"
- do:
indices.put_index_template:
name: my-dynamic-template
body:
index_patterns: [k9s*]
data_stream: {}
template:
settings:
index:
number_of_shards: 1
mode: time_series
time_series:
start_time: 2023-08-31T13:03:08.138Z

mappings:
properties:
attributes:
type: passthrough
dynamic: true
time_series_dimension: true
dynamic_templates:
- counter_metric:
mapping:
type: integer
time_series_metric: counter
- keyword_dimension:
path_match: "attributes.*"
mapping:
type: keyword
time_series_dimension: true

- do:
bulk:
index: k9s
refresh: true
body:
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:08.138Z","data": "10", "attributes.dim": "A" }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:09.138Z","data": "20", "attributes.dim": "A" }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:10.138Z","data": "30", "attributes.dim": "B" }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:10.238Z","data": "40", "attributes.dim": "B" }'

- do:
search:
index: k9s
body:
size: 0

- match: { hits.total.value: 4 }

- do:
search:
index: k9s
body:
size: 0
aggs:
filterA:
filter:
term:
dim: A
aggs:
tsids:
terms:
field: _tsid
order:
_key: asc

- length: { aggregations.filterA.tsids.buckets: 1 }
- match: { aggregations.filterA.tsids.buckets.0.key: { attributes.dim: A } }
- match: { aggregations.filterA.tsids.buckets.0.doc_count: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ public static class ExtractFromSource extends IndexRouting {
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(Set.copyOf(routingPaths), null, true);
}

public boolean matchesField(String fieldName) {
return isRoutingPath.test(fieldName);
}

@Override
public void process(IndexRequest indexRequest) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ protected static void parseProperties(
+ "] which does not support subobjects"
);
}
if (objBuilder.subobjects.value() == false && type.equals(PassthroughObjectMapper.CONTENT_TYPE)) {
throw new MapperParsingException(
"Tried to add passthrough subobject ["
+ fieldName
+ "] to object ["
+ objBuilder.name()
+ "] which does not support subobjects"
);
}
Mapper.TypeParser typeParser = parserContext.typeParser(type);
if (typeParser == null) {
throw new MapperParsingException("No handler for type [" + type + "] declared on field [" + fieldName + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

public class PassthroughObjectMapper extends ObjectMapper {
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(PassthroughObjectMapper.class);

public static final String CONTENT_TYPE = "passthrough";

public static class Builder extends ObjectMapper.Builder {

protected Explicit<Boolean> containsDimensions = Explicit.IMPLICIT_FALSE;

public Builder(String name, Explicit<Boolean> subobjects) {
super(name, subobjects);
}

@Override
public PassthroughObjectMapper.Builder add(Mapper.Builder builder) {
if (containsDimensions.value() && builder instanceof KeywordFieldMapper.Builder keywordBuilder) {
keywordBuilder.dimension(true);
}
super.add(builder);
return this;
}

@Override
public PassthroughObjectMapper build(MapperBuilderContext context) {
return new PassthroughObjectMapper(
name,
enabled,
subobjects,
dynamic,
buildMappers(context.createChildContext(name)),
containsDimensions
);
}
}

private final Explicit<Boolean> containsDimensions;

PassthroughObjectMapper(
String name,
Explicit<Boolean> enabled,
Explicit<Boolean> subobjects,
Dynamic dynamic,
Map<String, Mapper> mappers,
Explicit<Boolean> containsDimensions
) {
super(name, name, enabled, subobjects, dynamic, mappers);
this.containsDimensions = containsDimensions;
}

@Override
public PassthroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreated) {
PassthroughObjectMapper.Builder builder = new PassthroughObjectMapper.Builder(name(), subobjects);
builder.enabled = enabled;
builder.dynamic = dynamic;
return builder;
}

@Override
public PassthroughObjectMapper merge(Mapper mergeWith, MergeReason reason, MapperBuilderContext parentBuilderContext) {
final var mergeResult = MergeResult.build(this, mergeWith, reason, parentBuilderContext);
PassthroughObjectMapper mergeWithObject = (PassthroughObjectMapper) mergeWith;

final Explicit<Boolean> containsDimensions = (mergeWithObject.containsDimensions.explicit())
? mergeWithObject.containsDimensions
: this.containsDimensions;

return new PassthroughObjectMapper(
simpleName(),
mergeResult.enabled(),
mergeResult.subObjects(),
mergeResult.dynamic(),
mergeResult.mappers(),
containsDimensions
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(simpleName());
builder.field("type", CONTENT_TYPE);
if (containsDimensions.explicit()) {
builder.field(TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM, containsDimensions.value());
}
if (dynamic != null) {
builder.field("dynamic", dynamic.name().toLowerCase(Locale.ROOT));
}
if (isEnabled() != Defaults.ENABLED) {
builder.field("enabled", enabled.value());
}
serializeMappers(builder, params);
return builder.endObject();
}

public static class TypeParser extends ObjectMapper.TypeParser {
@Override
public Mapper.Builder parse(String name, Map<String, Object> node, MappingParserContext parserContext)
throws MapperParsingException {
Explicit<Boolean> subobjects = parseSubobjects(node);
PassthroughObjectMapper.Builder builder = new Builder(name, subobjects);

parsePassthrough(name, node, builder);
parseObjectFields(node, parserContext, builder);
return builder;
}

protected static void parsePassthrough(String name, Map<String, Object> node, PassthroughObjectMapper.Builder builder) {
Object fieldNode = node.get(TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM);
if (fieldNode != null) {
builder.containsDimensions = Explicit.explicitBoolean(
nodeBooleanValue(fieldNode, name + TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM)
);
node.remove(TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,41 @@ public RootObjectMapper.Builder addRuntimeFields(Map<String, RuntimeField> runti

@Override
public RootObjectMapper build(MapperBuilderContext context) {
Map<String, Mapper> mappers = buildMappers(context);
Map<String, Mapper> aliasMappers = new HashMap<>();
for (Mapper mapper : mappers.values()) {
// Create aliases for all fields in child passthrough mappers and place them under the root object.
if (mapper instanceof PassthroughObjectMapper passthroughMapper) {
for (Mapper internalMapper : passthroughMapper.mappers.values()) {
if (internalMapper instanceof FieldMapper fieldMapper) {
Mapper conflict = mappers.get(fieldMapper.simpleName());
if (conflict != null) {
if (conflict.typeName().equals(FieldAliasMapper.CONTENT_TYPE) == false) {
throw new IllegalArgumentException(
"field ["
+ fieldMapper.simpleName()
+ "] in object ["
+ passthroughMapper.name()
+ "] with [type=passthrough] conflicts with a field at the root level"
);
}
} else {
FieldAliasMapper aliasMapper = new FieldAliasMapper.Builder(fieldMapper.simpleName()).path(
fieldMapper.mappedFieldType.name()
).build(context);
aliasMappers.put(aliasMapper.simpleName(), aliasMapper);
}
}
}
}
}
mappers.putAll(aliasMappers);
return new RootObjectMapper(
name,
enabled,
subobjects,
dynamic,
buildMappers(context),
mappers,
new HashMap<>(runtimeFields),
dynamicDateTimeFormatters,
dynamicTemplates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static void createField(DocumentParserContext context, IndexRouting.Extra
}
long timestamp = timestampFields.get(0).numericValue().longValue();
byte[] suffix = new byte[16];
String id = createId(context.getDynamicMappers().isEmpty(), routingBuilder, tsid, timestamp, suffix);
String id = createId(context.getDynamicMappers().isEmpty() == false, routingBuilder, tsid, timestamp, suffix);
/*
* Make sure that _id from extracting the tsid matches that _id
* from extracting the _source. This should be true for all valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.index.mapper.NestedPathFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.mapper.PassthroughObjectMapper;
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.RuntimeField;
Expand Down Expand Up @@ -193,6 +194,7 @@ public static Map<String, Mapper.TypeParser> getMappers(List<MapperPlugin> mappe
mappers.put(KeywordFieldMapper.CONTENT_TYPE, KeywordFieldMapper.PARSER);
mappers.put(ObjectMapper.CONTENT_TYPE, new ObjectMapper.TypeParser());
mappers.put(NestedObjectMapper.CONTENT_TYPE, new NestedObjectMapper.TypeParser());
mappers.put(PassthroughObjectMapper.CONTENT_TYPE, new PassthroughObjectMapper.TypeParser());
mappers.put(TextFieldMapper.CONTENT_TYPE, TextFieldMapper.PARSER);

mappers.put(DenseVectorFieldMapper.CONTENT_TYPE, DenseVectorFieldMapper.PARSER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.NestedLookup;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.query.AbstractQueryBuilder;
Expand Down Expand Up @@ -75,6 +76,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -907,7 +910,22 @@ public SourceLoader newSourceLoader() {
public IdLoader newIdLoader() {
if (indexService.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
var indexRouting = (IndexRouting.ExtractFromSource) indexService.getIndexSettings().getIndexRouting();
return IdLoader.createTsIdLoader(indexRouting, indexService.getMetadata().getRoutingPaths());
List<String> routingPaths = indexService.getMetadata().getRoutingPaths();
for (String routingField : routingPaths) {
if (routingField.contains("*")) {
// In case the routing fields include path matches, find any matches and add them as distinct fields
// to the routing path.
Set<String> matchingRoutingPaths = new TreeSet<>(routingPaths);
for (Mapper mapper : indexService.mapperService().mappingLookup().fieldMappers()) {
if (indexRouting.matchesField(mapper.name())) {
matchingRoutingPaths.add(mapper.name());
}
}
routingPaths = new ArrayList<>(matchingRoutingPaths);
break;
}
}
return IdLoader.createTsIdLoader(indexRouting, routingPaths);
} else {
return IdLoader.fromLeafStoredFieldLoader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.elasticsearch.index.mapper.NestedObjectMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.mapper.PassthroughObjectMapper;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.index.mapper.TextFieldMapper;
Expand Down Expand Up @@ -200,6 +201,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
SparseVectorFieldMapper.CONTENT_TYPE, // Sparse vectors are no longer supported

NestedObjectMapper.CONTENT_TYPE, // TODO support for nested
PassthroughObjectMapper.CONTENT_TYPE, // TODO support for passthrough
CompletionFieldMapper.CONTENT_TYPE, // TODO support completion
FieldAliasMapper.CONTENT_TYPE // TODO support alias
);
Expand Down

0 comments on commit 3cb55c6

Please sign in to comment.