Skip to content

Commit

Permalink
Add prefixesToRename config for renaming fields upon ingestion (#8273)
Browse files Browse the repository at this point in the history
* Allow renaming of unnested fields upon ingestion

* Rename config and add unit tests

* Refactor renaming of prefixes

* Add validation for prefixesToRename config

* Fix empty name after renaming case

* Fix code style nits

* Update pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java

Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>

* Update previous nits to pass unit tests

Co-authored-by: Jeffrey Liu <[email protected]>
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
  • Loading branch information
3 people authored Mar 17, 2022
1 parent f60bfc8 commit 360a205
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ public void testSerDe()
List<Map<String, String>> batchConfigMaps = new ArrayList<>();
batchConfigMaps.add(batchConfigMap);
List<String> fieldsToUnnest = Arrays.asList("c1, c2");
Map<String, String> prefixesToRename = new HashMap<>();
IngestionConfig ingestionConfig =
new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"),
new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs,
new ComplexTypeConfig(fieldsToUnnest, ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE));
new ComplexTypeConfig(fieldsToUnnest, ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename));
TableConfig tableConfig = tableConfigBuilder.setIngestionConfig(ingestionConfig).build();

checkIngestionConfig(tableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,28 @@ public class ComplexTypeTransformer implements RecordTransformer {
private final List<String> _fieldsToUnnest;
private final String _delimiter;
private final ComplexTypeConfig.CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
private final Map<String, String> _prefixesToRename;

public ComplexTypeTransformer(TableConfig tableConfig) {
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), parseCollectionNotUnnestedToJson(tableConfig));
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig));
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter) {
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE);
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap());
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter,
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson) {
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map<String, String> prefixesToRename) {
_fieldsToUnnest = new ArrayList<>(fieldsToUnnest);
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
// the unnest fields are sorted to achieve the topological sort of the collections, so that the parent collection
// (e.g. foo) is unnested before the child collection (e.g. foo.bar)
Collections.sort(_fieldsToUnnest);
_prefixesToRename = prefixesToRename;
}

private static List<String> parseFieldsToUnnest(TableConfig tableConfig) {
Expand Down Expand Up @@ -149,12 +152,22 @@ private static ComplexTypeConfig.CollectionNotUnnestedToJson parseCollectionNotU
}
}

private static Map<String, String> parsePrefixesToRename(TableConfig tableConfig) {
if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getComplexTypeConfig() != null
&& tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToRename() != null) {
return tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToRename();
} else {
return Collections.emptyMap();
}
}

@Override
public GenericRow transform(GenericRow record) {
flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
}
renamePrefixes(record);
return record;
}

Expand Down Expand Up @@ -279,6 +292,33 @@ protected void flattenMap(GenericRow record, List<String> columns) {
}
}

/**
* Loops through all columns and renames the column's prefix with the corresponding replacement if the prefix matches.
*/
@VisibleForTesting
protected void renamePrefixes(GenericRow record) {
if (_prefixesToRename.isEmpty()) {
return;
}
List<String> fields = new ArrayList<>(record.getFieldToValueMap().keySet());
for (Map.Entry<String, String> entry : _prefixesToRename.entrySet()) {
for (String field : fields) {
String prefix = entry.getKey();
String replacementPrefix = entry.getValue();
if (field.startsWith(prefix)) {
Object value = record.removeValue(field);
String remainingColumnName = field.substring(prefix.length());
String newName = replacementPrefix + remainingColumnName;
if (newName.isEmpty() || record.getValue(newName) != null) {
throw new RuntimeException(
String.format("Name conflict after attempting to rename field %s to %s", field, newName));
}
record.putValue(newName, value);
}
}
}
}

private boolean containPrimitives(Object[] value) {
if (value.length == 0) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ private static void registerPinotFS(String fileURIScheme, String fsClass, PinotC
* TableConfig and Schema
* Fields for ingestion come from 2 places:
* 1. The schema
* 2. The ingestion config in the table config. The ingestion config (e.g. filter) can have fields which are not in
* the schema.
* 2. The ingestion config in the table config. The ingestion config (e.g. filter, complexType) can have fields which
* are not in the schema.
*/
public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) {
Set<String> fieldsForRecordExtractor = new HashSet<>();
Expand Down Expand Up @@ -371,6 +371,10 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i
// transform again
}
}
ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
if (complexTypeConfig != null && complexTypeConfig.getFieldsToUnnest() != null) {
fields.addAll(complexTypeConfig.getFieldsToUnnest());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
Expand Down Expand Up @@ -343,6 +344,22 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
}
}
}

// Complex configs
ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
if (complexTypeConfig != null && schema != null) {
Map<String, String> prefixesToRename = complexTypeConfig.getPrefixesToRename();
Set<String> fieldNames = schema.getFieldSpecMap().keySet();
if (MapUtils.isNotEmpty(prefixesToRename)) {
for (String prefix : prefixesToRename.keySet()) {
for (String field : fieldNames) {
Preconditions.checkState(!field.startsWith(prefix),
"Fields in the schema may not begin with any prefix specified in the prefixesToRename"
+ " config. Name conflict with field: " + field + " and prefix: " + prefix);
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer.DEFAULT_COLLECTION_TO_JSON_MODE;


public class ComplexTypeTransformerTest {
@Test
Expand Down Expand Up @@ -296,7 +298,8 @@ public void testConvertCollectionToString() {
// {
// "array":"[1,2]"
// }
transformer = new ComplexTypeTransformer(Arrays.asList(), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.ALL);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>());
genericRow = new GenericRow();
array = new Object[]{1, 2};
genericRow.putValue("array", array);
Expand Down Expand Up @@ -341,8 +344,98 @@ public void testConvertCollectionToString() {
array1[0] = ImmutableMap.of("b", "v1");
map.put("array1", array1);
genericRow.putValue("t", map);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NONE);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>());
transformer.transform(genericRow);
Assert.assertTrue(ComplexTypeTransformer.isArray(genericRow.getValue("t.array1")));
}

@Test
public void testRenamePrefixes() {
HashMap<String, String> prefixesToRename = new HashMap<>();
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);

GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("map1.b", 2L);
genericRow.putValue("map2.c", "u");
transformer.renamePrefixes(genericRow);
Assert.assertEquals(genericRow.getValue("a"), 1L);
Assert.assertEquals(genericRow.getValue("b"), 2L);
Assert.assertEquals(genericRow.getValue("test.c"), "u");

// name conflict where there becomes duplicate field names after renaming
prefixesToRename = new HashMap<>();
prefixesToRename.put("test.", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test.a", 2L);
try {
transformer.renamePrefixes(genericRow);
Assert.fail("Should fail due to name conflict after renaming");
} catch (RuntimeException e) {
// expected
}

// name conflict where there becomes an empty field name after renaming
prefixesToRename = new HashMap<>();
prefixesToRename.put("test", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
try {
transformer.renamePrefixes(genericRow);
Assert.fail("Should fail due to empty name after renaming");
} catch (RuntimeException e) {
// expected
}

// case where nothing gets renamed
prefixesToRename = new HashMap<>();
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
transformer.renamePrefixes(genericRow);
Assert.assertEquals(genericRow.getValue("a"), 1L);
Assert.assertEquals(genericRow.getValue("test"), 2L);
}

@Test
public void testPrefixesToRename() {
HashMap<String, String> prefixesToRename = new HashMap<>();
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);

// test flatten root-level tuples
GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
Map<String, Object> map1 = new HashMap<>();
genericRow.putValue("map1", map1);
map1.put("b", "v");
Map<String, Object> innerMap1 = new HashMap<>();
innerMap1.put("aa", 2);
innerMap1.put("bb", "u");
map1.put("im1", innerMap1);
Map<String, Object> map2 = new HashMap<>();
map2.put("c", 3);
genericRow.putValue("map2", map2);

transformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("a"), 1L);
Assert.assertEquals(genericRow.getValue("b"), "v");
Assert.assertEquals(genericRow.getValue("im1.aa"), 2);
Assert.assertEquals(genericRow.getValue("im1.bb"), "u");
Assert.assertEquals(genericRow.getValue("test.c"), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
Expand Down Expand Up @@ -201,5 +204,27 @@ public void testExtractFieldsIngestionConfig() {
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema));
Assert.assertEquals(extract.size(), 6);
Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn")));

// filter + transform configs + schema fields + schema transform + complex type configs
schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
.addSingleValueDimension("d2", FieldSpec.DataType.STRING)
.addMetric("m1", FieldSpec.DataType.INT)
.addDateTime("dateColumn", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS")
.build();
schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
TransformConfig transformConfig = new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')");
transformConfigs = Lists.newArrayList(transformConfig);
List<String> fieldsToUnnest = Arrays.asList("before.test", "after.test");
Map<String, String> prefixesToRename = new HashMap<>();
prefixesToRename.put("before", "after");
ComplexTypeConfig complexTypeConfigs = new ComplexTypeConfig(fieldsToUnnest, ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename);
FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)");
ingestionConfig = new IngestionConfig(null, null, filterConfig, transformConfigs, complexTypeConfigs);
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema));
Assert.assertEquals(extract.size(), 8);
List<String> expectedColumns =
Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn", "before", "after");
Assert.assertTrue(extract.containsAll(expectedColumns));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
Expand Down Expand Up @@ -401,6 +402,22 @@ public void validateIngestionConfig() {
new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("transformedCol", "reverse(x)"),
new TransformConfig("myCol", "lower(transformedCol)")), null)).build();
TableConfigUtils.validate(tableConfig, schema);

// invalid field name in schema with matching prefix from complexConfigType's prefixesToRename
HashMap<String, String> prefixesToRename = new HashMap<>();
prefixesToRename.put("after.", "");
ComplexTypeConfig complexConfig = new ComplexTypeConfig(null, ".", null, prefixesToRename);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
new IngestionConfig(null, null, null,
null, complexConfig)).build();
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addMultiValueDimension("after.test", FieldSpec.DataType.STRING).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to name conflict from field name in schema with a prefix in prefixesToRename");
} catch (IllegalStateException e) {
// expected
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;

Expand All @@ -44,13 +45,18 @@ public enum CollectionNotUnnestedToJson {
@JsonPropertyDescription("The mode of converting collection to JSON string")
private final CollectionNotUnnestedToJson _collectionNotUnnestedToJson;

@JsonPropertyDescription("Map of <prefix, replacement> so matching fields are renamed to start with the replacement")
private final Map<String, String> _prefixesToRename;

@JsonCreator
public ComplexTypeConfig(@JsonProperty("fieldsToUnnest") @Nullable List<String> fieldsToUnnest,
@JsonProperty("delimiter") @Nullable String delimiter,
@JsonProperty("collectionNotUnnestedToJson") @Nullable CollectionNotUnnestedToJson collectionNotUnnestedToJson) {
@JsonProperty("collectionNotUnnestedToJson") @Nullable CollectionNotUnnestedToJson collectionNotUnnestedToJson,
@JsonProperty("prefixesToRename") @Nullable Map<String, String> prefixesToRename) {
_fieldsToUnnest = fieldsToUnnest;
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
_prefixesToRename = prefixesToRename;
}

@Nullable
Expand All @@ -67,4 +73,9 @@ public String getDelimiter() {
public CollectionNotUnnestedToJson getCollectionNotUnnestedToJson() {
return _collectionNotUnnestedToJson;
}

@Nullable
public Map<String, String> getPrefixesToRename() {
return _prefixesToRename;
}
}

0 comments on commit 360a205

Please sign in to comment.