Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multi-stage] Optimize group key generation #12394

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,9 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
private final ValueToIdMap[] _onTheFlyDictionaries;
private final Object2IntOpenHashMap<FixedIntArray> _groupKeyMap;
private final boolean[] _isSingleValueExpressions;
private final int _globalGroupIdUpperBound;
private final int _numGroupsLimit;
private final boolean _nullHandlingEnabled;

private int _numGroups = 0;

public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled) {
_groupByExpressions = groupByExpressions;
Expand All @@ -87,12 +85,12 @@ public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOp

_groupKeyMap = new Object2IntOpenHashMap<>();
_groupKeyMap.defaultReturnValue(INVALID_ID);
_globalGroupIdUpperBound = numGroupsLimit;
_numGroupsLimit = numGroupsLimit;
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _globalGroupIdUpperBound;
return _numGroupsLimit;
}

@Override
Expand All @@ -117,6 +115,9 @@ public void generateKeysForBlock(ValueBlock valueBlock, int[] groupKeys) {
case DOUBLE:
values[i] = blockValSet.getDoubleValuesSV();
break;
case BIG_DECIMAL:
values[i] = blockValSet.getBigDecimalValuesSV();
break;
case STRING:
values[i] = blockValSet.getStringValuesSV();
break;
Expand All @@ -137,53 +138,134 @@ public void generateKeysForBlock(ValueBlock valueBlock, int[] groupKeys) {
nullBitmaps[i] = valueBlock.getBlockValueSet(_groupByExpressions[i]).getNullBitmap();
}
for (int row = 0; row < numDocs; row++) {
for (int col = 0; col < _numGroupByExpressions; col++) {
if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
keyValues[col] = ID_FOR_NULL;
} else {
int numGroups = _groupKeyMap.size();
boolean hasInvalidKeyValue = false;
if (numGroups < _numGroupsLimit) {
for (int col = 0; col < _numGroupByExpressions; col++) {
if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
keyValues[col] = ID_FOR_NULL;
} else {
Object columnValues = values[col];
ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
int keyValue;
if (columnValues instanceof int[]) {
keyValue = onTheFlyDictionary.put(((int[]) columnValues)[row]);
} else if (columnValues instanceof long[]) {
keyValue = onTheFlyDictionary.put(((long[]) columnValues)[row]);
} else if (columnValues instanceof float[]) {
keyValue = onTheFlyDictionary.put(((float[]) columnValues)[row]);
} else if (columnValues instanceof double[]) {
keyValue = onTheFlyDictionary.put(((double[]) columnValues)[row]);
} else if (columnValues instanceof byte[][]) {
keyValue = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
} else {
keyValue = onTheFlyDictionary.put(((Object[]) columnValues)[row]);
}
keyValues[col] = keyValue;
}
}
} else {
for (int col = 0; col < _numGroupByExpressions; col++) {
if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
keyValues[col] = ID_FOR_NULL;
} else {
Object columnValues = values[col];
ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
int keyValue;
if (columnValues instanceof int[]) {
keyValue = onTheFlyDictionary.getId(((int[]) columnValues)[row]);
} else if (columnValues instanceof long[]) {
keyValue = onTheFlyDictionary.getId(((long[]) columnValues)[row]);
} else if (columnValues instanceof float[]) {
keyValue = onTheFlyDictionary.getId(((float[]) columnValues)[row]);
} else if (columnValues instanceof double[]) {
keyValue = onTheFlyDictionary.getId(((double[]) columnValues)[row]);
} else if (columnValues instanceof byte[][]) {
keyValue = onTheFlyDictionary.getId(new ByteArray(((byte[][]) columnValues)[row]));
} else {
keyValue = onTheFlyDictionary.getId(((Object[]) columnValues)[row]);
}
if (keyValue == INVALID_ID) {
hasInvalidKeyValue = true;
break;
}
}
}
}
if (hasInvalidKeyValue) {
groupKeys[row] = INVALID_ID;
} else {
int groupId = getGroupIdForKey(flyweightKey);
if (groupId == numGroups) {
// When a new group is added, create a new FixedIntArray
keyValues = new int[_numGroupByExpressions];
flyweightKey = new FixedIntArray(keyValues);
}
groupKeys[row] = groupId;
}
}
} else {
for (int row = 0; row < numDocs; row++) {
int numGroups = _groupKeyMap.size();
boolean hasInvalidKeyValue = false;
if (numGroups < _numGroupsLimit) {
for (int col = 0; col < _numGroupByExpressions; col++) {
Object columnValues = values[col];
ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
int keyValue;
if (columnValues instanceof int[]) {
keyValues[col] = onTheFlyDictionary.put(((int[]) columnValues)[row]);
int columnValue = ((int[]) columnValues)[row];
keyValue = onTheFlyDictionary != null ? onTheFlyDictionary.put(columnValue) : columnValue;
} else if (columnValues instanceof long[]) {
keyValues[col] = onTheFlyDictionary.put(((long[]) columnValues)[row]);
keyValue = onTheFlyDictionary.put(((long[]) columnValues)[row]);
} else if (columnValues instanceof float[]) {
keyValues[col] = onTheFlyDictionary.put(((float[]) columnValues)[row]);
keyValue = onTheFlyDictionary.put(((float[]) columnValues)[row]);
} else if (columnValues instanceof double[]) {
keyValues[col] = onTheFlyDictionary.put(((double[]) columnValues)[row]);
} else if (columnValues instanceof String[]) {
keyValues[col] = onTheFlyDictionary.put(((String[]) columnValues)[row]);
keyValue = onTheFlyDictionary.put(((double[]) columnValues)[row]);
} else if (columnValues instanceof byte[][]) {
keyValues[col] = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
keyValue = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
} else {
keyValue = onTheFlyDictionary.put(((Object[]) columnValues)[row]);
}
keyValues[col] = keyValue;
}
}
groupKeys[row] = getGroupIdForFlyweightKey(flyweightKey);
}
} else {
for (int row = 0; row < numDocs; row++) {
for (int col = 0; col < _numGroupByExpressions; col++) {
Object columnValues = values[col];
ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
if (columnValues instanceof int[]) {
if (onTheFlyDictionary == null) {
keyValues[col] = ((int[]) columnValues)[row];
} else {
for (int col = 0; col < _numGroupByExpressions; col++) {
Object columnValues = values[col];
ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
int keyValue;
if (columnValues instanceof int[]) {
int columnValue = ((int[]) columnValues)[row];
keyValue = onTheFlyDictionary != null ? onTheFlyDictionary.getId(columnValue) : columnValue;
} else if (columnValues instanceof long[]) {
keyValue = onTheFlyDictionary.getId(((long[]) columnValues)[row]);
} else if (columnValues instanceof float[]) {
keyValue = onTheFlyDictionary.getId(((float[]) columnValues)[row]);
} else if (columnValues instanceof double[]) {
keyValue = onTheFlyDictionary.getId(((double[]) columnValues)[row]);
} else if (columnValues instanceof byte[][]) {
keyValue = onTheFlyDictionary.getId(new ByteArray(((byte[][]) columnValues)[row]));
} else {
keyValues[col] = onTheFlyDictionary.put(((int[]) columnValues)[row]);
keyValue = onTheFlyDictionary.getId(((Object[]) columnValues)[row]);
}
if (keyValue == INVALID_ID) {
hasInvalidKeyValue = true;
break;
}
} else if (columnValues instanceof long[]) {
keyValues[col] = onTheFlyDictionary.put(((long[]) columnValues)[row]);
} else if (columnValues instanceof float[]) {
keyValues[col] = onTheFlyDictionary.put(((float[]) columnValues)[row]);
} else if (columnValues instanceof double[]) {
keyValues[col] = onTheFlyDictionary.put(((double[]) columnValues)[row]);
} else if (columnValues instanceof String[]) {
keyValues[col] = onTheFlyDictionary.put(((String[]) columnValues)[row]);
} else if (columnValues instanceof byte[][]) {
keyValues[col] = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
keyValues[col] = keyValue;
}
}
if (hasInvalidKeyValue) {
groupKeys[row] = INVALID_ID;
} else {
int groupId = getGroupIdForKey(flyweightKey);
if (groupId == numGroups) {
// When a new group is added, create a new FixedIntArray
keyValues = new int[_numGroupByExpressions];
flyweightKey = new FixedIntArray(keyValues);
}
groupKeys[row] = groupId;
}
groupKeys[row] = getGroupIdForFlyweightKey(flyweightKey);
}
}
}
Expand Down Expand Up @@ -329,38 +411,19 @@ public Iterator<GroupKey> getGroupKeys() {
return new GroupKeyIterator();
}

/**
* Helper method to get or create group-id for a group key.
*
* @param flyweight Group key, that is a list of objects to be grouped, will be cloned on first occurrence
* @return Group id
*/
private int getGroupIdForFlyweightKey(FixedIntArray flyweight) {
int groupId = _groupKeyMap.getInt(flyweight);
if (groupId == INVALID_ID) {
if (_numGroups < _globalGroupIdUpperBound) {
groupId = _numGroups;
_groupKeyMap.put(flyweight.clone(), _numGroups++);
}
}
return groupId;
}

/**
* Helper method to get or create group-id for a group key.
*
* @param keyList Group key, that is a list of objects to be grouped
* @return Group id
*/
private int getGroupIdForKey(FixedIntArray keyList) {
int groupId = _groupKeyMap.getInt(keyList);
if (groupId == INVALID_ID) {
if (_numGroups < _globalGroupIdUpperBound) {
groupId = _numGroups;
_groupKeyMap.put(keyList, _numGroups++);
}
int numGroups = _groupKeyMap.size();
if (numGroups < _numGroupsLimit) {
return _groupKeyMap.computeIfAbsent(keyList, k -> numGroups);
} else {
return _groupKeyMap.getInt(keyList);
}
return groupId;
}

/**
Expand Down

This file was deleted.

Loading
Loading