Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming processing supports nullable datatype #594

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/Common/HashTable/TimeBucketHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ template <
typename Hash = DefaultHash<Key>,
typename Grower = TimeBucketHashTableGrower<>,
typename Allocator = HashTableAllocator,
template <typename...> typename ImplTable = HashMapTable>
template <typename...> typename ImplTable = HashMapTable,
size_t WindowOffset = 0>
class TimeBucketHashMapTable
: public TimeBucketHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>
: public TimeBucketHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>, WindowOffset>
{
public:
using Impl = ImplTable<Key, Cell, Hash, Grower, Allocator>;
using LookupResult = typename Impl::LookupResult;

using TimeBucketHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>::TimeBucketHashTable;
using TimeBucketHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>, WindowOffset>::TimeBucketHashTable;

template <typename Func>
void ALWAYS_INLINE forEachMapped(Func && func)
Expand Down Expand Up @@ -64,10 +65,11 @@ template <
typename Key,
typename Mapped,
typename Hash = DefaultHash<Key>,
size_t WindowOffset = 0,
typename Grower = TimeBucketHashTableGrower<>,
typename Allocator = HashTableAllocator,
template <typename...> typename ImplTable = HashMapTable>
using TimeBucketHashMap = TimeBucketHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator, ImplTable>;
using TimeBucketHashMap = TimeBucketHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator, ImplTable, WindowOffset>;

template <
typename Key,
Expand Down
22 changes: 20 additions & 2 deletions src/Common/HashTable/TimeBucketHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,30 @@ struct TimeBucketHashTableGrower : public HashTableGrower<initial_size_degree>
void increaseSize() { this->size_degree += this->size_degree >= 15 ? 1 : 2; }
};


/**
* why need WindowOffset? what is it?
* In query such as 'select ... from tumble(stream, 5s) group by window_start, col', if the toatal length of group by key are fixed,
* and the col are nullable columns, in function 'packFixed', it will put the KeysNullMap(indicates which column of this row of data is null) in the front of the key,
* then put the window time key and other group by key behind it.But in TimeBucketHashTable::windowKey, we assume the window time key is in the front of the key,
* The key's layout is like:
* | key |
* +-----------------+------------+
* | col, window time| KeysNullMap|
* +-----------------+------------+ low bit
* |WindowOffset|
*
* so we need to add a WindowOffset to indicate the length of the KeysNullMap, then we can get the window time key correctly.
* PS: The WindowOffset will only work in this situation(group by window_start and other nullable column), other situation will not be 0, and it will not affect the result.
*/
template <
typename Key,
typename Cell,
typename Hash,
typename Grower,
typename Allocator,
typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator>>
typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator>,
size_t WindowOffset = 0>
class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty base optimization
{
protected:
Expand Down Expand Up @@ -49,7 +66,8 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
/// window time key is always: 4 or 8 bytes
/// window time key are always lower bits of integral type of T
/// key & 0xFFFF or 0xFFFFFFFF or 0xFFFFFFFFFFFFFFFF
return key & ((0xFFull << ((win_key_size - 1) << 3)) + ((1ull << ((win_key_size - 1) << 3)) - 1));

return (key >> (8 * WindowOffset)) & ((0xFFull << ((win_key_size - 1) << 3)) + ((1ull << ((win_key_size - 1) << 3)) - 1));
}

ALWAYS_INLINE Int64 windowKey(StringRef key)
Expand Down
37 changes: 30 additions & 7 deletions src/DataTypes/Serializations/SerializationNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <DataTypes/DataTypesNumber.h>

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
Expand All @@ -14,6 +16,8 @@
#include <Common/assert_cast.h>
#include <base/scope_guard.h>

#include <typeinfo>

namespace DB
{

Expand Down Expand Up @@ -622,13 +626,32 @@ void SerializationNullable::deserializeTextJSON(IColumn & column, ReadBuffer & i
deserializeTextJSONImpl<void>(column, istr, settings, nested);
}

template<typename ReturnType>
ReturnType SerializationNullable::deserializeTextJSONImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const SerializationPtr & nested)
{
return safeDeserialize<ReturnType>(column, *nested,
[&istr] { return checkStringByFirstCharacterAndAssertTheRest("null", istr); },
[&nested, &istr, &settings] (IColumn & nested_column) { nested->deserializeTextJSON(nested_column, istr, settings); });
template <typename ReturnType>
ReturnType SerializationNullable::deserializeTextJSONImpl(
IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested)
{
return safeDeserialize<ReturnType>(
column,
*nested,
[&istr, &column] {
if (column.isNullable())
{
auto & column_nullable = dynamic_cast<ColumnNullable &>(column);
auto & nested_column = column_nullable.getNestedColumn();
if (typeid(nested_column) != typeid(ColumnString) && typeid(nested_column) != typeid(ColumnFixedString))
{
/// If the column is nullable and the value field is empty, we assume it is NULL.
/// eg. {"key": ""}
if (*istr.position() == '"' && (*(istr.position() + 1) == '"'))
{
istr.position() += 2;
return true;
}
}
}
return checkStringByFirstCharacterAndAssertTheRest("null", istr);
},
[&nested, &istr, &settings](IColumn & nested_column) { nested->deserializeTextJSON(nested_column, istr, settings); });
}

void SerializationNullable::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethodTimeBucketTwoLev
}

/// Fallback case.
return AggregatedDataVariants::Type::serialized;
return AggregatedDataVariants::Type::time_bucket_serialized_two_level;
}

/// No key has been found to be nullable.
Expand Down
9 changes: 6 additions & 3 deletions src/Interpreters/Streaming/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ using TimeBucketAggregatedDataWithStringKeyTwoLevel = TimeBucketHashMapWithSaved
using TimeBucketAggregatedDataWithKeys128TwoLevel = TimeBucketHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
using TimeBucketAggregatedDataWithKeys256TwoLevel = TimeBucketHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;

using TimeBucketAggregatedDataWithKeys128TwoLevelNullable = TimeBucketHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32, getBitmapSize<UInt128>()>;
using TimeBucketAggregatedDataWithKeys256TwoLevelNullable = TimeBucketHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32, getBitmapSize<UInt256>()>;


class Aggregator;
struct AggregatedDataMetrics;

Expand Down Expand Up @@ -205,9 +209,8 @@ SERDE struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<TimeBucketAggregatedDataWithKeys256TwoLevel>> time_bucket_keys256_two_level;

/// Nullable
std::unique_ptr<AggregationMethodKeysFixed<TimeBucketAggregatedDataWithKeys128TwoLevel, true>> time_bucket_nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<TimeBucketAggregatedDataWithKeys256TwoLevel, true>> time_bucket_nullable_keys256_two_level;

std::unique_ptr<AggregationMethodKeysFixed<TimeBucketAggregatedDataWithKeys128TwoLevelNullable, true>> time_bucket_nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<TimeBucketAggregatedDataWithKeys256TwoLevelNullable, true>> time_bucket_nullable_keys256_two_level;
/// Low cardinality
// std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, StreamingAggregatedDataWithNullableUInt64KeyTwoLevel>>> streaming_low_cardinality_key32_two_level;
// std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, StreamingAggregatedDataWithNullableUInt64KeyTwoLevel>>> streaming_low_cardinality_key64_two_level;
Expand Down
Loading
Loading