Skip to content

Commit

Permalink
Add identity element
Browse files Browse the repository at this point in the history
  • Loading branch information
srinivasyadav18 committed Jun 28, 2024
1 parent 01fa6c3 commit 45adafc
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 68 deletions.
21 changes: 17 additions & 4 deletions benchmarks/hash_table/static_map/insert_or_apply_bench.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
using namespace cuco::benchmark;
using namespace cuco::utility;

const auto USE_IDENTITY = std::vector<nvbench::int64_t>{0, 1};
const auto MULTIPLICITY_RANGE = std::vector<nvbench::int64_t>{1, 2, 4, 8, 16, 32, 64, 128};
/**
* @brief A benchmark evaluating `cuco::static_map::insert_or_apply` performance
*/
Expand All @@ -40,9 +42,13 @@ std::enable_if_t<(sizeof(Key) == sizeof(Value)), void> static_map_insert_or_appl
auto const num_keys = state.get_int64_or_default("NumInputs", defaults::N);
auto const occupancy = state.get_float64_or_default("Occupancy", defaults::OCCUPANCY);
auto const multiplicity = state.get_int64_or_default("Multiplicity", defaults::MULTIPLICITY);
auto const use_identity = state.get_int64_or_default("UseIdentity", 1);

std::size_t const size = cuco::detail::int_div_ceil(num_keys, multiplicity) / occupancy;

cuda::std::optional<Value> identity{};
if (use_identity) identity = 0;

thrust::device_vector<Key> keys(num_keys);

key_generator gen;
Expand All @@ -57,12 +63,17 @@ std::enable_if_t<(sizeof(Key) == sizeof(Value)), void> static_map_insert_or_appl

cuco::static_map map{size, cuco::empty_key<Key>{-1}, cuco::empty_value<Value>{0}};

using Map = decltype(map);

auto const op = [] __device__(cuda::atomic_ref<Value, Map::thread_scope> lhs, const Value& rhs) {
lhs.fetch_add(rhs, cuda::memory_order_relaxed);
};

state.exec(nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
map.clear_async({launch.get_stream()});

timer.start();
map.insert_or_apply_async(
pairs.begin(), pairs.end(), cuco::op::reduce::sum, {launch.get_stream()});
map.insert_or_apply_async(pairs.begin(), pairs.end(), op, identity, {launch.get_stream()});
timer.stop();
});
}
Expand All @@ -81,7 +92,8 @@ NVBENCH_BENCH_TYPES(static_map_insert_or_apply,
.set_name("static_map_insert_or_apply_uniform_multiplicity")
.set_type_axes_names({"Key", "Value", "Distribution"})
.set_max_noise(defaults::MAX_NOISE)
.add_int64_axis("Multiplicity", defaults::MULTIPLICITY_RANGE);
.add_int64_axis("Multiplicity", MULTIPLICITY_RANGE)
.add_int64_axis("UseIdentity", USE_IDENTITY);

NVBENCH_BENCH_TYPES(static_map_insert_or_apply,
NVBENCH_TYPE_AXES(defaults::KEY_TYPE_RANGE,
Expand All @@ -90,4 +102,5 @@ NVBENCH_BENCH_TYPES(static_map_insert_or_apply,
.set_name("static_set_insert_or_apply_uniform_occupancy")
.set_type_axes_names({"Key", "Value", "Distribution"})
.set_max_noise(defaults::MAX_NOISE)
.add_float64_axis("Occupancy", defaults::OCCUPANCY_RANGE);
.add_float64_axis("Occupancy", defaults::OCCUPANCY_RANGE)
.add_int64_axis("UseIdentity", USE_IDENTITY);
21 changes: 17 additions & 4 deletions include/cuco/detail/static_map/kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cub/block/block_reduce.cuh>
#include <cuda/atomic>
#include <cuda/std/optional>

#include <cooperative_groups.h>

Expand Down Expand Up @@ -82,27 +83,39 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_assign(InputIt first,
* @tparam InputIt Device accessible input iterator whose `value_type` is
* convertible to the `value_type` of the data structure
* @tparam Op Callable type used to peform apply operation.
* @tparam T Type of optional idenitity element which is convertible
* to `value_type` of the data structure
* @tparam Ref Type of non-owning device ref allowing access to storage
*
* @param first Beginning of the sequence of input elements
* @param n Number of input elements
* @param op callable object to perform apply operation.
* @param identity_element An optional Identity element of the binary operation
* @param ref Non-owning container device ref used to access the slot storage
*/
template <int32_t CGSize, int32_t BlockSize, typename InputIt, typename Op, typename Ref>
__global__ void insert_or_apply(InputIt first, cuco::detail::index_type n, Op op, Ref ref)
template <int32_t CGSize,
int32_t BlockSize,
typename InputIt,
typename Op,
typename T,
typename Ref>
__global__ void insert_or_apply(InputIt first,
cuco::detail::index_type n,
Op op,
cuda::std::optional<T> identity_element,
Ref ref)
{
auto const loop_stride = cuco::detail::grid_stride() / CGSize;
auto idx = cuco::detail::global_thread_id() / CGSize;

while (idx < n) {
typename std::iterator_traits<InputIt>::value_type const& insert_pair = *(first + idx);
if constexpr (CGSize == 1) {
ref.insert_or_apply(insert_pair, op);
ref.insert_or_apply(insert_pair, op, identity_element);
} else {
auto const tile =
cooperative_groups::tiled_partition<CGSize>(cooperative_groups::this_thread_block());
ref.insert_or_apply(tile, insert_pair, op);
ref.insert_or_apply(tile, insert_pair, op, identity_element);
}
idx += loop_stride;
}
Expand Down
18 changes: 14 additions & 4 deletions include/cuco/detail/static_map/static_map.inl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cuco/operator.hpp>
#include <cuco/static_map_ref.cuh>

#include <cuda/std/optional>

#include <cstddef>

namespace cuco {
Expand Down Expand Up @@ -253,9 +255,13 @@ template <class Key,
class Storage>
template <typename InputIt, typename Op>
void static_map<Key, T, Extent, Scope, KeyEqual, ProbingScheme, Allocator, Storage>::
insert_or_apply(InputIt first, InputIt last, Op op, cuda_stream_ref stream) noexcept
insert_or_apply(InputIt first,
InputIt last,
Op op,
cuda::std::optional<T> identity_element,
cuda_stream_ref stream) noexcept
{
return this->insert_or_apply_async(first, last, op, stream);
return this->insert_or_apply_async(first, last, op, identity_element, stream);
stream.synchronize();
}

Expand All @@ -269,7 +275,11 @@ template <class Key,
class Storage>
template <typename InputIt, typename Op>
void static_map<Key, T, Extent, Scope, KeyEqual, ProbingScheme, Allocator, Storage>::
insert_or_apply_async(InputIt first, InputIt last, Op op, cuda_stream_ref stream) noexcept
insert_or_apply_async(InputIt first,
InputIt last,
Op op,
cuda::std::optional<T> identity_element,
cuda_stream_ref stream) noexcept
{
auto const num = cuco::detail::distance(first, last);
if (num == 0) { return; }
Expand All @@ -278,7 +288,7 @@ void static_map<Key, T, Extent, Scope, KeyEqual, ProbingScheme, Allocator, Stora

static_map_ns::detail::insert_or_apply<cg_size, cuco::detail::default_block_size()>
<<<grid_size, cuco::detail::default_block_size(), 0, stream>>>(
first, num, op, ref(op::insert_or_apply));
first, num, op, identity_element, ref(op::insert_or_apply));
}

template <class Key,
Expand Down
138 changes: 98 additions & 40 deletions include/cuco/detail/static_map/static_map_ref.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cuco/operator.hpp>

#include <cuda/atomic>
#include <cuda/std/optional>
#include <thrust/tuple.h>

#include <cooperative_groups.h>
Expand Down Expand Up @@ -579,10 +580,13 @@ class operator_impl<
* @param value The element to insert
* @param op The callable object to perform binary operation between existing value at the slot
* and the element to insert.
* @param identity_element An optional Identity element of the binary operation
*/

template <typename Value, typename Op>
__device__ void insert_or_apply(Value const& value, Op op)
__device__ void insert_or_apply(Value const& value,
Op op,
cuda::std::optional<T> identity_element = {})
{
static_assert(cg_size == 1, "Non-CG operation is incompatible with the current probing scheme");

Expand All @@ -592,33 +596,51 @@ class operator_impl<

ref_type& ref_ = static_cast<ref_type&>(*this);

auto const val = ref_.impl_.heterogeneous_value(value);
auto const key = ref_.impl_.extract_key(val);
auto& probing_scheme = ref_.impl_.probing_scheme();
auto storage_ref = ref_.impl_.storage_ref();
auto probing_iter = probing_scheme(key, storage_ref.window_extent());
auto const val = ref_.impl_.heterogeneous_value(value);
auto const key = ref_.impl_.extract_key(val);
auto& probing_scheme = ref_.impl_.probing_scheme();
auto storage_ref = ref_.impl_.storage_ref();
auto probing_iter = probing_scheme(key, storage_ref.window_extent());
auto const empty_value = ref_.impl_.empty_slot_sentinel().second;

// optimize first insert when sentinel payload value equals identity element
auto const optimize_insert = [&]() {
if (identity_element.has_value()) {
if (identity_element.value() == empty_value) return true;
}
return false;
}();

while (true) {
auto const window_slots = storage_ref[*probing_iter];

for (auto& slot_content : window_slots) {
auto const eq_res =
ref_.impl_.predicate_.operator()<is_insert::YES>(key, slot_content.first);
auto const intra_window_index = thrust::distance(window_slots.begin(), &slot_content);
auto slot_ptr = (storage_ref.data() + *probing_iter)->data() + intra_window_index;

// If the key is already in the container, update the payload and return
if (eq_res == detail::equal_result::EQUAL) {
auto const intra_window_index = thrust::distance(window_slots.begin(), &slot_content);
op(
cuda::atomic_ref<T, Scope>{
((storage_ref.data() + *probing_iter)->data() + intra_window_index)->second},
val.second);
op(cuda::atomic_ref<T, Scope>{slot_ptr->second}, val.second);
return;
}
if (eq_res == detail::equal_result::AVAILABLE) {
auto const intra_window_index = thrust::distance(window_slots.begin(), &slot_content);
if (attempt_insert_or_apply(
(storage_ref.data() + *probing_iter)->data() + intra_window_index, value, op)) {
return;
// if the sentinel value and identity_element are same, perform op
// and return, no need to wait on payload
if (optimize_insert) {
if (attempt_insert_or_apply(slot_ptr, val, op)) return;
continue;
}
// else, attempt stable insert
switch (ref_.impl_.attempt_insert_stable(slot_ptr, slot_content, val)) {
case insert_result::CONTINUE: continue;
case insert_result::SUCCESS: return;
case insert_result::DUPLICATE: {
ref_.impl_.wait_for_payload(slot_ptr->second, empty_value);
op(cuda::atomic_ref<T, Scope>{slot_ptr->second}, val.second);
return;
}
}
}
}
Expand All @@ -627,12 +649,17 @@ class operator_impl<
}

template <typename Value>
__device__ void insert_or_apply(Value const& value, cuco::op::reduce::sum_tag)
__device__ void insert_or_apply(Value const& value,
cuco::op::reduce::sum_tag,
cuda::std::optional<T> = {})
{
auto& ref_ = static_cast<ref_type&>(*this);
ref_.insert_or_apply(value, [](cuda::atomic_ref<T, Scope> slot_ref, T const& payload) {
slot_ref.fetch_add(payload, cuda::memory_order_relaxed);
});
ref_.insert_or_apply(
value,
[](cuda::atomic_ref<T, Scope> slot_ref, T const& payload) {
slot_ref.fetch_add(payload, cuda::memory_order_relaxed);
},
static_cast<T>(0));
}

/**
Expand All @@ -648,24 +675,34 @@ class operator_impl<
* @param value The element to insert
* @param op The callable object to perform binary operation between existing value at the slot
* and the element to insert.
* @param identity_element An optional Identity element of the binary operation
*/

template <typename Value, typename Op>
__device__ void insert_or_apply(cooperative_groups::thread_block_tile<cg_size> const& group,
Value const& value,
Op op)
Op op,
cuda::std::optional<T> identity_element = {})
{
static_assert(
std::is_invocable_v<Op, cuda::atomic_ref<T, Scope>, T>,
"insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref<T, Scope>, T)`");

ref_type& ref_ = static_cast<ref_type&>(*this);

auto const val = ref_.impl_.heterogeneous_value(value);
auto const key = ref_.impl_.extract_key(val);
auto& probing_scheme = ref_.impl_.probing_scheme();
auto storage_ref = ref_.impl_.storage_ref();
auto probing_iter = probing_scheme(group, key, storage_ref.window_extent());
auto const val = ref_.impl_.heterogeneous_value(value);
auto const key = ref_.impl_.extract_key(val);
auto& probing_scheme = ref_.impl_.probing_scheme();
auto storage_ref = ref_.impl_.storage_ref();
auto probing_iter = probing_scheme(group, key, storage_ref.window_extent());
auto const empty_value = ref_.impl_.empty_slot_sentinel().second;

auto const optimize_insert = [&]() {
if (identity_element.has_value()) {
if (identity_element.value() == empty_value) return true;
}
return false;
}();

while (true) {
auto const window_slots = storage_ref[*probing_iter];
Expand All @@ -682,14 +719,13 @@ class operator_impl<
return detail::window_probing_results{res, -1};
}();

auto* slot_ptr = (storage_ref.data() + *probing_iter)->data() + intra_window_index;

auto const group_contains_equal = group.ballot(state == detail::equal_result::EQUAL);
if (group_contains_equal) {
auto const src_lane = __ffs(group_contains_equal) - 1;
if (group.thread_rank() == src_lane) {
op(
cuda::atomic_ref<T, Scope>{
((storage_ref.data() + *probing_iter)->data() + intra_window_index)->second},
val.second);
op(cuda::atomic_ref<T, Scope>{slot_ptr->second}, val.second);
}
group.sync();
return;
Expand All @@ -698,14 +734,31 @@ class operator_impl<
auto const group_contains_available = group.ballot(state == detail::equal_result::AVAILABLE);
if (group_contains_available) {
auto const src_lane = __ffs(group_contains_available) - 1;
auto const status =
(group.thread_rank() == src_lane)
? attempt_insert_or_apply(
(storage_ref.data() + *probing_iter)->data() + intra_window_index, value, op)
: false;

// Exit if inserted or assigned
if (group.shfl(status, src_lane)) { return; }
if (optimize_insert) {
auto const status = (group.thread_rank() == src_lane)
? attempt_insert_or_apply(slot_ptr, value, op)
: false;
if (group.shfl(status, src_lane)) { return; }
continue;
}
auto const status = [&, target_idx = intra_window_index]() {
if (group.thread_rank() != src_lane) { return insert_result::CONTINUE; }
return ref_.impl_.attempt_insert_stable(slot_ptr, window_slots[target_idx], val);
}();

switch (group.shfl(status, src_lane)) {
case insert_result::SUCCESS: return;
case insert_result::DUPLICATE: {
if (group.thread_rank() == src_lane) {
ref_.impl_.wait_for_payload(slot_ptr->second, empty_value);
op(cuda::atomic_ref<T, Scope>{slot_ptr->second}, val.second);
}
group.sync();
return;
}
default: continue;
}
} else {
++probing_iter;
}
Expand All @@ -715,12 +768,17 @@ class operator_impl<
template <typename Value>
__device__ void insert_or_apply(cooperative_groups::thread_block_tile<cg_size> const& group,
Value const& value,
cuco::op::reduce::sum_tag)
cuco::op::reduce::sum_tag,
cuda::std::optional<T> = {})
{
auto& ref_ = static_cast<ref_type&>(*this);
ref_.insert_or_apply(group, value, [](cuda::atomic_ref<T, Scope> slot_ref, T const& payload) {
slot_ref.fetch_add(payload, cuda::memory_order_relaxed);
});
ref_.insert_or_apply(
group,
value,
[](cuda::atomic_ref<T, Scope> slot_ref, T const& payload) {
slot_ref.fetch_add(payload, cuda::memory_order_relaxed);
},
static_cast<T>(0));
}

private:
Expand Down
Loading

0 comments on commit 45adafc

Please sign in to comment.