Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use size_t to allow large conditional joins #16127

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
join_size = size.value(stream);
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

Expand Down Expand Up @@ -232,13 +232,14 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

auto const& join_output_l = left_indices->data();
auto const& join_output_r = right_indices->data();

if (has_nulls) {
conditional_join<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
Expand Down
124 changes: 114 additions & 10 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,110 @@
namespace cudf {
namespace detail {

/**
* @brief Adds a pair of indices to the shared memory cache
*
* @param[in] first The first index in the pair
* @param[in] second The second index in the pair
* @param[in,out] current_idx_shared Pointer to shared index that determines
* where in the shared memory cache the pair will be written
* @param[in] warp_id The ID of the warp of the calling the thread
* @param[out] joined_shared_l Pointer to the shared memory cache for left indices
* @param[out] joined_shared_r Pointer to the shared memory cache for right indices
*/
__inline__ __device__ void add_pair_to_cache(size_type const first,
vyasr marked this conversation as resolved.
Show resolved Hide resolved
size_type const second,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l,
size_type* joined_shared_r)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
// It's guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type join_shared_r[num_warps][output_cache_size],
size_type* join_output_l,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

// No warp sync is necessary here because we are assuming that ShuffleIndex
// is internally using post-CUDA 9.0 synchronization-safe primitives
// (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to
// be safe by the compiler because it is not required by the standard to
// converge divergent branches before executing.
output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx];
}
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

/**
* @brief Computes the output size of joining the left table to the right table.
*
Expand Down Expand Up @@ -103,14 +207,14 @@ CUDF_KERNEL void compute_conditional_join_output_size(
}
}

using BlockReduce = cub::BlockReduce<cudf::size_type, block_size>;
using BlockReduce = cub::BlockReduce<std::size_t, block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter);

// Add block counter to global counter
if (threadIdx.x == 0) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*output_size};
ref.fetch_add(block_counter, cuda::std::memory_order_relaxed);
ref.fetch_add(block_counter, cuda::memory_order_relaxed);
vyasr marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -143,13 +247,13 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* join_output_r,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size,
std::size_t const max_size,
bool const swap_tables)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];
__shared__ cudf::size_type join_shared_r[num_warps][output_cache_size];

Expand Down Expand Up @@ -183,7 +287,7 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,

if (outer_row_index < outer_num_rows) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();
auto const left_row_index = swap_tables ? inner_row_index : outer_row_index;
Expand Down Expand Up @@ -277,12 +381,12 @@ CUDF_KERNEL void conditional_join_anti_semi(
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
std::size_t const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
Expand Down Expand Up @@ -310,7 +414,7 @@ CUDF_KERNEL void conditional_join_anti_semi(
for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows;
outer_row_index += stride) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

Expand Down
95 changes: 0 additions & 95 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -262,101 +262,6 @@ struct valid_range {
}
};

/**
* @brief Adds a pair of indices to the shared memory cache
*
* @param[in] first The first index in the pair
* @param[in] second The second index in the pair
* @param[in,out] current_idx_shared Pointer to shared index that determines
* where in the shared memory cache the pair will be written
* @param[in] warp_id The ID of the warp of the calling the thread
* @param[out] joined_shared_l Pointer to the shared memory cache for left indices
* @param[out] joined_shared_r Pointer to the shared memory cache for right indices
*/
__inline__ __device__ void add_pair_to_cache(size_type const first,
size_type const second,
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l,
size_type* joined_shared_r)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};
// its guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type join_shared_r[num_warps][output_cache_size],
size_type* join_output_l,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

// No warp sync is necessary here because we are assuming that ShuffleIndex
// is internally using post-CUDA 9.0 synchronization-safe primitives
// (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to
// be safe by the compiler because it is not required by the standard to
// converge divergent branches before executing.
output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx];
}
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

} // namespace detail

} // namespace cudf
Loading