Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support tracking updated aggregated states #465

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/Common/HashMapsTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/StringHashTable.h>
#include <Common/HashTable/StringHashMap.h>
#include <Common/HashTable/TwoLevelStringHashMap.h>

namespace DB
{
Expand All @@ -24,9 +25,14 @@ void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, Wr
});
}

template <bool is_string_hash_map, typename Map, typename MappedDeserializer>
template <typename Map, typename MappedDeserializer>
void deserializeHashMap(Map & map, MappedDeserializer && mapped_deserializer, Arena & pool, ReadBuffer & rb)
{
using Mapped = std::decay_t<Map>::mapped_type;

constexpr bool is_string_hash_map
= std::is_same_v<std::decay_t<Map>, StringHashMap<Mapped>> || std::is_same_v<std::decay_t<Map>, TwoLevelStringHashMap<Mapped>>;

/// For StringHashMap or TwoLevelStringHashMap, it requires StringRef key padded 8 keys(left and right).
/// So far, the Arena's MemoryChunk is always padding right 15, so we just pad left 8 here
if constexpr (is_string_hash_map)
Expand Down Expand Up @@ -60,6 +66,20 @@ void deserializeHashMap(Map & map, MappedDeserializer && mapped_deserializer, Ar
pool.setPaddingLeft(0);
}

template <typename Map, typename MappedSerializer>
void serializeTwoLevelHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb)
{
serializeHashMap<Map, MappedSerializer>(map, std::move(mapped_serializer), wb);
map.writeUpdatedBuckets(wb);
}

template <typename Map, typename MappedDeserializer>
void deserializeTwoLevelHashMap(Map & map, MappedDeserializer && mapped_deserializer, Arena & pool, ReadBuffer & rb)
{
deserializeHashMap<Map, MappedDeserializer>(map, std::move(mapped_deserializer), pool, rb);
map.readUpdatedBuckets(rb); /// recover buckets updated status
}

/// HashMapsTemplate is a taken from HashJoin class and make it standalone
/// and could be shared among different components

Expand Down Expand Up @@ -187,7 +207,7 @@ struct HashMapsTemplate
#define M(NAME) \
case HashType::NAME: { \
assert(NAME); \
deserializeHashMap<false>(*NAME, mapped_deserializer, pool, rb); \
deserializeHashMap(*NAME, mapped_deserializer, pool, rb); \
return; \
}
APPLY_FOR_HASH_KEY_VARIANTS(M)
Expand Down
14 changes: 14 additions & 0 deletions src/Common/HashTable/TimeBucketHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class TimeBucketHashMapTable
p.second.forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto & p : this->impls)
{
if (this->isBucketUpdated(p.first))
{
p.second.forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(p.first);
}
}
}

typename Cell::Mapped & ALWAYS_INLINE operator[](const Key & x)
{
LookupResult it;
Expand Down
57 changes: 57 additions & 0 deletions src/Common/HashTable/TimeBucketHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
using ConstLookupResult = typename Impl::ConstLookupResult;

/// FIXME, choose a better perf data structure
/// Usually we don't have too many time buckets
std::map<Int64, Impl> impls;
std::unordered_map<Int64, bool/*updated*/> updated_buckets;
Impl sentinel;

TimeBucketHashTable() { }
Expand Down Expand Up @@ -263,6 +265,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
auto window = windowKey(key_holder);
impls[window].emplace(key_holder, it, inserted, hash_value);
updated_buckets[window] = true; /// updated
}

LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
Expand All @@ -289,6 +292,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
DB::writeIntBinary(p.first);
p.second.write(wb);
DB::writeBinary(updated_buckets[p.first], wb);
}
}

Expand All @@ -309,7 +313,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
/// Write key and key-value separator
DB::writeIntText(p.first, wb);
DB::writeChar(KEY_VALUE_SEPARATOR, wb);
/// <impl,updated>
DB::writeChar('<', wb);
p.second.writeText(wb);
DB::writeChar(',', wb);
DB::writeBoolText(updated_buckets[p.first], wb);
DB::writeChar('>', wb);
}
DB::writeChar(END_BUCKET_MARKER, wb);
}
Expand All @@ -327,6 +336,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
assert(key != 0);
assert(!impls.contains(key));
impls[key].read(rb);
DB::readBinary(updated_buckets[key], rb);
}
}

Expand All @@ -349,7 +359,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty

assert(key != 0);
assert(!impls.contains(key));
/// <impl,updated>
DB::assertChar('<', rb);
impls[key].readText(rb);
DB::assertChar(',', rb);
DB::readBoolText(updated_buckets[key], rb);
DB::assertChar('>', rb);
}
DB::assertChar(END_BUCKET_MARKER, rb);
}
Expand Down Expand Up @@ -402,6 +417,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
last_removed_watermark = it->first;
++removed;

updated_buckets.erase(it->first);
it = impls.erase(it);
}
else
Expand Down Expand Up @@ -438,4 +454,45 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty

return buckets;
}

bool isBucketUpdated(Int64 bucket_) const
{
auto it = updated_buckets.find(bucket_);
if (it != updated_buckets.end())
return it->second;

return false;
}

void resetUpdatedBucket(Int64 bucket_)
{
auto it = updated_buckets.find(bucket_);
if (it != updated_buckets.end())
it->second = false;
}

void writeUpdatedBuckets(DB::WriteBuffer & wb) const
{
DB::writeVarUInt(updated_buckets.size(), wb);
for (const auto & [bucket, updated] : updated_buckets)
{
DB::writeIntBinary(bucket, wb);
DB::writeBinary(updated, wb);
}
}

void readUpdatedBuckets(DB::ReadBuffer & rb)
{
size_t size = 0;
DB::readVarUInt(size, rb);
updated_buckets.clear();
Int64 bucket = 0;
bool updated = false;
for (size_t i = 0; i < size; ++i)
{
DB::readIntBinary(bucket, rb);
DB::readBinary(updated, rb);
updated_buckets.emplace(bucket, updated);
}
}
};
14 changes: 14 additions & 0 deletions src/Common/HashTable/TwoLevelHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ class TwoLevelHashMapTable : public TwoLevelHashTable<Key, Cell, Hash, Grower, A
this->impls[i].forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
{
if (this->isBucketUpdated(i))
{
this->impls[i].forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(i);
}
}
}

template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
Expand Down
47 changes: 47 additions & 0 deletions src/Common/HashTable/TwoLevelHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class TwoLevelHashTable :
using ConstLookupResult = typename Impl::ConstLookupResult;

Impl impls[NUM_BUCKETS];
bool updated_buckets[NUM_BUCKETS] = {false};


TwoLevelHashTable() = default;
Expand Down Expand Up @@ -119,6 +120,7 @@ class TwoLevelHashTable :
size_t hash_value = cell->getHash(src);
size_t buck = getBucketFromHash(hash_value);
impls[buck].insertUniqueNonZero(cell, hash_value);
updated_buckets[buck] = true;
}
}

Expand Down Expand Up @@ -271,6 +273,7 @@ class TwoLevelHashTable :
{
size_t buck = getBucketFromHash(hash_value);
impls[buck].emplace(key_holder, it, inserted, hash_value);
updated_buckets[buck] = true;
}

LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
Expand All @@ -292,7 +295,10 @@ class TwoLevelHashTable :
void write(DB::WriteBuffer & wb) const
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
{
impls[i].write(wb);
DB::writeBinary(updated_buckets[i], wb);
}
}

void writeText(DB::WriteBuffer & wb) const
Expand All @@ -301,14 +307,23 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::writeChar(',', wb);

/// <impl,updated>
DB::writeChar('<', wb);
impls[i].writeText(wb);
DB::writeChar(',', wb);
DB::writeBoolText(updated_buckets[i], wb);
DB::writeChar('>', wb);
}
}

void read(DB::ReadBuffer & rb)
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
{
impls[i].read(rb);
DB::readBinary(updated_buckets[i], rb);
}
}

void readText(DB::ReadBuffer & rb)
Expand All @@ -317,7 +332,13 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::assertChar(',', rb);

/// <impl,updated>
DB::assertChar('<', rb);
impls[i].readText(rb);
DB::assertChar(',', rb);
DB::readBoolText(updated_buckets[i], rb);
DB::assertChar('>', rb);
}
}

Expand Down Expand Up @@ -365,5 +386,31 @@ class TwoLevelHashTable :
std::iota(bucket_ids.begin(), bucket_ids.end(), 0);
return bucket_ids;
}

bool isBucketUpdated(Int64 bucket_) const
{
return updated_buckets[bucket_];
}

void resetUpdatedBucket(Int64 bucket_)
{
updated_buckets[bucket_] = false;
}

void writeUpdatedBuckets(DB::WriteBuffer & wb) const
{
DB::writeVarUInt(NUM_BUCKETS, wb);
for (const auto & elem : updated_buckets)
DB::writeBinary(elem, wb);
}

void readUpdatedBuckets(DB::ReadBuffer & rb)
{
size_t size = 0;
DB::readVarUInt(size, rb);
assert(size == NUM_BUCKETS);
for (auto & elem : updated_buckets)
DB::readBinary(elem, rb);
}
/// proton : ends
};
14 changes: 14 additions & 0 deletions src/Common/HashTable/TwoLevelStringHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ class TwoLevelStringHashMap : public TwoLevelStringHashTable<StringHashMapSubMap
this->impls[i].forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
{
if (this->isBucketUpdated(i))
{
this->impls[i].forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(i);
}
}
}

template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
Expand Down
Loading
Loading