Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefixesToRename config for renaming fields upon ingestion #8273

Merged
merged 8 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,12 @@ public void testSerDe()
List<Map<String, String>> batchConfigMaps = new ArrayList<>();
batchConfigMaps.add(batchConfigMap);
List<String> fieldsToUnnest = Arrays.asList("c1, c2");
List<String> prefixesToDropFromFields = new ArrayList<>();
IngestionConfig ingestionConfig =
new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"),
new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs,
new ComplexTypeConfig(fieldsToUnnest, ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE));
new ComplexTypeConfig(fieldsToUnnest, ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToDropFromFields));
TableConfig tableConfig = tableConfigBuilder.setIngestionConfig(ingestionConfig).build();

checkIngestionConfig(tableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,30 @@ public class ComplexTypeTransformer implements RecordTransformer {
private final List<String> _fieldsToUnnest;
private final String _delimiter;
private final ComplexTypeConfig.CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
private final List<String> _prefixesToDropFromFields;

public ComplexTypeTransformer(TableConfig tableConfig) {
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), parseCollectionNotUnnestedToJson(tableConfig));
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToDropFromFields(tableConfig));
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter) {
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE);
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, new ArrayList<>());
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter,
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson) {
ComplexTypeTransformer(List<String> fieldsToUnnest,
String delimiter,
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson,
List<String> prefixesToDropFromFields) {
_fieldsToUnnest = new ArrayList<>(fieldsToUnnest);
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
// the unnest fields are sorted to achieve the topological sort of the collections, so that the parent collection
// (e.g. foo) is unnested before the child collection (e.g. foo.bar)
Collections.sort(_fieldsToUnnest);
_prefixesToDropFromFields = new ArrayList<>(prefixesToDropFromFields);
}

private static List<String> parseFieldsToUnnest(TableConfig tableConfig) {
Expand Down Expand Up @@ -149,12 +154,22 @@ private static ComplexTypeConfig.CollectionNotUnnestedToJson parseCollectionNotU
}
}

private static List<String> parsePrefixesToDropFromFields(TableConfig tableConfig) {
if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getComplexTypeConfig() != null
&& tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToDropFromFields() != null) {
return tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToDropFromFields();
} else {
return new ArrayList<>();
}
}

@Override
public GenericRow transform(GenericRow record) {
flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
}
dropPrefixes(record);
return record;
}

Expand Down Expand Up @@ -279,6 +294,23 @@ protected void flattenMap(GenericRow record, List<String> columns) {
}
}

/**
*
*/
@VisibleForTesting
protected void dropPrefixes(GenericRow record) {
List<String> columns = new ArrayList<>(record.getFieldToValueMap().keySet());
for (String column : columns) {
for (String prefix : _prefixesToDropFromFields) {
if (column.startsWith(prefix)) {
Object value = record.removeValue(column);
String newName = column.substring(prefix.length());
record.putValue(newName, value);
}
}
}
}

private boolean containPrimitives(Object[] value) {
if (value.length == 0) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ private static void registerPinotFS(String fileURIScheme, String fsClass, PinotC
* TableConfig and Schema
* Fields for ingestion come from 2 places:
* 1. The schema
* 2. The ingestion config in the table config. The ingestion config (e.g. filter) can have fields which are not in
* the schema.
* 2. The ingestion config in the table config. The ingestion config (e.g. filter, complexType) can have fields which
* are not in the schema.
*/
public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) {
Set<String> fieldsForRecordExtractor = new HashSet<>();
Expand Down Expand Up @@ -371,6 +371,10 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i
// transform again
}
}
ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
if (complexTypeConfig != null && complexTypeConfig.getFieldsToUnnest() != null) {
fields.addAll(complexTypeConfig.getFieldsToUnnest());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer.DEFAULT_COLLECTION_TO_JSON_MODE;


public class ComplexTypeTransformerTest {
@Test
Expand Down Expand Up @@ -296,7 +298,8 @@ public void testConvertCollectionToString() {
// {
// "array":"[1,2]"
// }
transformer = new ComplexTypeTransformer(Arrays.asList(), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.ALL);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, Arrays.asList());
genericRow = new GenericRow();
array = new Object[]{1, 2};
genericRow.putValue("array", array);
Expand Down Expand Up @@ -341,8 +344,37 @@ public void testConvertCollectionToString() {
array1[0] = ImmutableMap.of("b", "v1");
map.put("array1", array1);
genericRow.putValue("t", map);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NONE);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, Arrays.asList());
transformer.transform(genericRow);
Assert.assertTrue(ComplexTypeTransformer.isArray(genericRow.getValue("t.array1")));
}

@Test
public void testPrefixesToDropFromFields() {
List<String> prefixesToDropFromFields = Arrays.asList("map1.");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToDropFromFields);

// test flatten root-level tuples
GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
Map<String, Object> map1 = new HashMap<>();
genericRow.putValue("map1", map1);
map1.put("b", "v");
Map<String, Object> innerMap1 = new HashMap<>();
innerMap1.put("aa", 2);
innerMap1.put("bb", "u");
map1.put("im1", innerMap1);
Map<String, Object> map2 = new HashMap<>();
map2.put("c", 3);
genericRow.putValue("map2", map2);

transformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("a"), 1L);
Assert.assertEquals(genericRow.getValue("b"), "v");
Assert.assertEquals(genericRow.getValue("im1.aa"), 2);
Assert.assertEquals(genericRow.getValue("im1.bb"), "u");
Assert.assertEquals(genericRow.getValue("map2.c"), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
Expand Down Expand Up @@ -201,5 +202,26 @@ public void testExtractFieldsIngestionConfig() {
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema));
Assert.assertEquals(extract.size(), 6);
Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn")));

// filter + transform configs + schema fields + schema transform + complex type configs
schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
.addSingleValueDimension("d2", FieldSpec.DataType.STRING)
.addMetric("m1", FieldSpec.DataType.INT)
.addDateTime("dateColumn", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS")
.build();
schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
TransformConfig transformConfig = new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')");
transformConfigs = Lists.newArrayList(transformConfig);
List<String> fieldsToUnnest = Arrays.asList("before.test", "after.test");
List<String> prefixesToDropFromFields = Arrays.asList("before", "after");
ComplexTypeConfig complexTypeConfigs = new ComplexTypeConfig(fieldsToUnnest, ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToDropFromFields);
FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)");
ingestionConfig = new IngestionConfig(null, null, filterConfig, transformConfigs, complexTypeConfigs);
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema));
Assert.assertEquals(extract.size(), 8);
List<String> expectedColumns =
Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn", "before", "after");
Assert.assertTrue(extract.containsAll(expectedColumns));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ public enum CollectionNotUnnestedToJson {
@JsonPropertyDescription("The mode of converting collection to JSON string")
private final CollectionNotUnnestedToJson _collectionNotUnnestedToJson;

@JsonPropertyDescription("The prefixes of fields to rename so the resulting field names don't have them")
private final List<String> _prefixesToDropFromFields;

@JsonCreator
public ComplexTypeConfig(@JsonProperty("fieldsToUnnest") @Nullable List<String> fieldsToUnnest,
@JsonProperty("delimiter") @Nullable String delimiter,
@JsonProperty("collectionNotUnnestedToJson") @Nullable CollectionNotUnnestedToJson collectionNotUnnestedToJson) {
@JsonProperty("collectionNotUnnestedToJson") @Nullable CollectionNotUnnestedToJson collectionNotUnnestedToJson,
@JsonProperty("prefixesToDropFromFields") @Nullable List<String> prefixesToDropFromFields) {
_fieldsToUnnest = fieldsToUnnest;
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
_prefixesToDropFromFields = prefixesToDropFromFields;
}

@Nullable
Expand All @@ -67,4 +72,9 @@ public String getDelimiter() {
public CollectionNotUnnestedToJson getCollectionNotUnnestedToJson() {
return _collectionNotUnnestedToJson;
}

@Nullable
public List<String> getPrefixesToDropFromFields() {
return _prefixesToDropFromFields;
}
}