From f4bb67a16de5b2984415b3cc3a4e04df709e378b Mon Sep 17 00:00:00 2001 From: Zachary Dremann Date: Sat, 20 Jan 2024 14:13:26 -0500 Subject: [PATCH] node48 children can have holes (#565) * fix(art): node48 children can have holes This matches the [java implementation][1] in Roaring. When removing from a node48, the child is simply set to NULL. For example, if the state is (ignoring that it actually needs at least 17 items to not be a node16): ``` keys: {[0]=0, [1]=1} children: {[0]=child1, [1]=child2} count: 2 ``` After removing key 0, the state will be: ``` keys: {[0]=48, [1]=1} children: {[0]=NULL, [1]=child2} count: 1 ``` If we then try to insert at key 3, the item at `children[count]` is already in use: we have to find and reuse the 0th child. This also has some fixes for `art_printf`: * When printing keys, don't cast to char, if char is signed, when integer promoted, a 0xFF char will be printed as `FFFFFFFF` * Correctly output keys for node48 * Include child index for node48 * Correct child output for node48 [1]: https://github.com/RoaringBitmap/RoaringBitmap/blob/1ed0242d1f298cf93d3666a754eb4bb4c5180c4c/RoaringBitmap/src/main/java/org/roaringbitmap/art/Node48.java#L181-L187 * Keep track of used children for node48 in a bitset Rather than searching for an unset child, we can just use the first set bit in a bitset of available children. Also, when freeing the node, rather than iterating through all 256 possible keys, just iterate through all set children directly. This slightly increases the size of the node48 struct, but it means: * We don't have to initialize all children to null at init time * We don't have to loop looking for a child (sometimes) on insert * Move test for node48 into `art_unit` --- include/roaring/portability.h | 6 ++-- src/art/art.c | 33 ++++++++++++------ tests/art_unit.cpp | 66 +++++++++++++++++++++++++++++------ 3 files changed, 82 insertions(+), 23 deletions(-) diff --git a/include/roaring/portability.h b/include/roaring/portability.h index 9db49c42e..bc20edb7f 100644 --- a/include/roaring/portability.h +++ b/include/roaring/portability.h @@ -183,7 +183,7 @@ extern "C" { // portability definitions are in global scope, not a namespace // specifically. /* wrappers for Visual Studio built-ins that look like gcc built-ins __builtin_ctzll */ -/* result might be undefined when input_num is zero */ +/** result might be undefined when input_num is zero */ inline int roaring_trailing_zeroes(unsigned long long input_num) { unsigned long index; #ifdef _WIN64 // highly recommended!!! @@ -200,7 +200,7 @@ inline int roaring_trailing_zeroes(unsigned long long input_num) { } /* wrappers for Visual Studio built-ins that look like gcc built-ins __builtin_clzll */ -/* result might be undefined when input_num is zero */ +/** result might be undefined when input_num is zero */ inline int roaring_leading_zeroes(unsigned long long input_num) { unsigned long index; #ifdef _WIN64 // highly recommended!!! @@ -225,7 +225,9 @@ inline int roaring_leading_zeroes(unsigned long long input_num) { #ifndef CROARING_INTRINSICS #define CROARING_INTRINSICS 1 #define roaring_unreachable __builtin_unreachable() +/** result might be undefined when input_num is zero */ inline int roaring_trailing_zeroes(unsigned long long input_num) { return __builtin_ctzll(input_num); } +/** result might be undefined when input_num is zero */ inline int roaring_leading_zeroes(unsigned long long input_num) { return __builtin_clzll(input_num); } #endif diff --git a/src/art/art.c b/src/art/art.c index bb30f179a..f9518ef94 100644 --- a/src/art/art.c +++ b/src/art/art.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -30,6 +31,8 @@ #define SET_LEAF(p) ((art_node_t *)((uintptr_t)(p) | 1)) #define CAST_LEAF(p) ((art_leaf_t *)((void *)((uintptr_t)(p) & ~1))) +#define NODE48_AVAILABLE_CHILDREN_MASK ((UINT64_C(1) << 48) - 1) + #ifdef __cplusplus extern "C" { namespace roaring { @@ -74,6 +77,9 @@ typedef struct art_node16_s { typedef struct art_node48_s { art_inner_node_t base; uint8_t count; + // Bitset where the ith bit is set if children[i] is available + // Because there are at most 48 children, only the bottom 48 bits are used. + uint64_t available_children; uint8_t keys[256]; art_node_t *children[48]; } art_node48_t; @@ -459,6 +465,7 @@ static art_node48_t *art_node48_create(const art_key_chunk_t prefix[], art_node48_t *node = (art_node48_t *)roaring_malloc(sizeof(art_node48_t)); art_init_inner_node(&node->base, ART_NODE48_TYPE, prefix, prefix_size); node->count = 0; + node->available_children = NODE48_AVAILABLE_CHILDREN_MASK; for (size_t i = 0; i < 256; ++i) { node->keys[i] = ART_NODE48_EMPTY_VAL; } @@ -466,11 +473,12 @@ static art_node48_t *art_node48_create(const art_key_chunk_t prefix[], } static void art_free_node48(art_node48_t *node) { - for (size_t i = 0; i < 256; ++i) { - uint8_t val_idx = node->keys[i]; - if (val_idx != ART_NODE48_EMPTY_VAL) { - art_free_node(node->children[val_idx]); - } + uint64_t used_children = (node->available_children) ^ NODE48_AVAILABLE_CHILDREN_MASK; + while (used_children != 0) { + // We checked above that used_children is not zero + uint8_t child_idx = roaring_trailing_zeroes(used_children); + art_free_node(node->children[child_idx]); + used_children &= ~(UINT64_C(1) << child_idx); } roaring_free(node); } @@ -487,10 +495,12 @@ static inline art_node_t *art_node48_find_child(const art_node48_t *node, static art_node_t *art_node48_insert(art_node48_t *node, art_node_t *child, uint8_t key) { if (node->count < 48) { - uint8_t val_idx = node->count; + // node->available_children is only zero when the node is full (count == 48), we just checked count < 48 + uint8_t val_idx = roaring_trailing_zeroes(node->available_children); node->keys[key] = val_idx; node->children[val_idx] = child; node->count++; + node->available_children &= ~(UINT64_C(1) << val_idx); return (art_node_t *)node; } art_node256_t *new_node = @@ -511,8 +521,8 @@ static inline art_node_t *art_node48_erase(art_node48_t *node, if (val_idx == ART_NODE48_EMPTY_VAL) { return (art_node_t *)node; } - node->children[val_idx] = NULL; node->keys[key_chunk] = ART_NODE48_EMPTY_VAL; + node->available_children |= UINT64_C(1) << val_idx; node->count--; if (node->count > 16) { return (art_node_t *)node; @@ -1182,7 +1192,7 @@ void art_node_printf(const art_node_t *node, uint8_t depth) { printf("{ type: Leaf, key: "); art_leaf_t *leaf = CAST_LEAF(node); for (size_t i = 0; i < ART_KEY_BYTES; ++i) { - printf("%02x", (char)(leaf->key[i])); + printf("%02x", leaf->key[i]); } printf(" }\n"); return; @@ -1202,7 +1212,7 @@ void art_node_printf(const art_node_t *node, uint8_t depth) { printf("%*s", depth, ""); printf("prefix: "); for (uint8_t i = 0; i < inner_node->prefix_size; ++i) { - printf("%02x", (char)inner_node->prefix[i]); + printf("%02x", inner_node->prefix[i]); } printf("\n"); @@ -1228,8 +1238,9 @@ void art_node_printf(const art_node_t *node, uint8_t depth) { for (int i = 0; i < 256; ++i) { if (node48->keys[i] != ART_NODE48_EMPTY_VAL) { printf("%*s", depth, ""); - printf("key: %02x ", node48->keys[i]); - art_node_printf(node48->children[i], depth); + printf("key: %02x ", i); + printf("child: %02x ", node48->keys[i]); + art_node_printf(node48->children[node48->keys[i]], depth); } } } break; diff --git a/tests/art_unit.cpp b/tests/art_unit.cpp index 56355d6f1..09ccab00e 100644 --- a/tests/art_unit.cpp +++ b/tests/art_unit.cpp @@ -1,11 +1,10 @@ #include #include -#include #include +#include #include #include -#include #include #include #include @@ -17,13 +16,13 @@ using namespace roaring::internal; namespace { -void print_key(art_key_chunk_t* key) { +void print_key(const art_key_chunk_t* key) { for (size_t i = 0; i < ART_KEY_BYTES; ++i) { printf("%x", *(key + i)); } } -void assert_key_eq(art_key_chunk_t* key1, art_key_chunk_t* key2) { +void assert_key_eq(const art_key_chunk_t* key1, const art_key_chunk_t* key2) { for (size_t i = 0; i < ART_KEY_BYTES; ++i) { if (*(key1 + i) != *(key2 + i)) { print_key(key1); @@ -39,8 +38,7 @@ void assert_key_eq(art_key_chunk_t* key1, art_key_chunk_t* key2) { class Key { public: Key(uint64_t key) { - // Reverse byte order of the low 6 bytes. Not portable to big-endian - // systems! + // Store the low 6 bytes of the key in big-endian order. key_[0] = key >> 40 & 0xFF; key_[1] = key >> 32 & 0xFF; key_[2] = key >> 24 & 0xFF; @@ -49,7 +47,7 @@ class Key { key_[5] = key >> 0 & 0xFF; } - Key(uint8_t* key) { + Key(const uint8_t* key) { for (size_t i = 0; i < 6; ++i) { key_[i] = *(key + i); } @@ -78,9 +76,9 @@ class Key { struct Value : art_val_t { Value() {} Value(uint64_t val_) : val(val_) {} - bool operator==(const Value& other) { return val == other.val; } + bool operator==(const Value& other) const { return val == other.val; } - uint64_t val; + uint64_t val = 0; }; class ShadowedART { @@ -121,7 +119,7 @@ class ShadowedART { break; } if (found_val->val != value.val) { - printf("Key %s: ART value %lu != shadow value %lu\n", + printf("Key %s: ART value %" PRIu64 " != shadow value %" PRIu64 "\n", key.string().c_str(), found_val->val, value.val); assert_true(*found_val == value); break; @@ -440,6 +438,53 @@ DEFINE_TEST(test_art_shadowed) { art.assertLowerBoundValid(1); } +DEFINE_TEST(test_art_shrink_grow_node48) { + art_t art{nullptr}; + std::vector values(48); + // Make a full node48. + for (int i = 0; i < 48; i++) { + auto key = Key(i); + values[i].val = i; + art_insert(&art, key.data(), &values[i]); + } + // Remove the first several containers + for (int i = 0; i < 8; i++) { + auto key = Key(i); + Value *removed_val = (Value *)(art_erase(&art, key.data())); + assert_int_equal(removed_val->val, i); + } + { + art_iterator_t iterator = art_init_iterator(&art, true); + int i = 8; + do { + auto key = Key(i); + assert_key_eq(iterator.key, key.data()); + assert_true(iterator.value == &values[i]); + ++i; + } while (art_iterator_next(&iterator)); + assert_int_equal(i, 48); + } + + // Fill the containers back up + for (int i = 0; i < 8; i++) { + auto key = Key(i); + values[i].val = i; + art_insert(&art, key.data(), &values[i]); + } + { + art_iterator_t iterator = art_init_iterator(&art, true); + int i = 0; + do { + auto key = Key(i); + assert_key_eq(iterator.key, key.data()); + assert_true(iterator.value == &values[i]); + ++i; + } while (art_iterator_next(&iterator)); + assert_int_equal(i, 48); + } + art_free(&art); +} + } // namespace int main() { @@ -455,6 +500,7 @@ int main() { cmocka_unit_test(test_art_iterator_erase), cmocka_unit_test(test_art_iterator_insert), cmocka_unit_test(test_art_shadowed), + cmocka_unit_test(test_art_shrink_grow_node48), }; return cmocka_run_group_tests(tests, NULL, NULL); }