Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into memory_resource
Browse files Browse the repository at this point in the history
  • Loading branch information
harrism authored Nov 15, 2023
2 parents c298fbc + ba99ff4 commit 41c1bea
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 45 deletions.
38 changes: 34 additions & 4 deletions include/rmm/cuda_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct cuda_device_id {
using value_type = int; ///< Integer type used for device identifier

/**
* @brief Construct a `cuda_device_id` from the specified integer value
* @brief Construct a `cuda_device_id` from the specified integer value.
*
* @param dev_id The device's integer identifier
*/
Expand All @@ -43,6 +43,35 @@ struct cuda_device_id {
/// @briefreturn{The wrapped integer value}
[[nodiscard]] constexpr value_type value() const noexcept { return id_; }

// TODO re-add doxygen comment specifier /** for these hidden friend operators once this Breathe
// bug is fixed: https://github.com/breathe-doc/breathe/issues/916
//! @cond Doxygen_Suppress
/**
* @brief Compare two `cuda_device_id`s for equality.
*
* @param lhs The first `cuda_device_id` to compare.
* @param rhs The second `cuda_device_id` to compare.
* @return true if the two `cuda_device_id`s wrap the same integer value, false otherwise.
*/
[[nodiscard]] constexpr friend bool operator==(cuda_device_id const& lhs,
cuda_device_id const& rhs) noexcept
{
return lhs.value() == rhs.value();
}

/**
* @brief Compare two `cuda_device_id`s for inequality.
*
* @param lhs The first `cuda_device_id` to compare.
* @param rhs The second `cuda_device_id` to compare.
* @return true if the two `cuda_device_id`s wrap different integer values, false otherwise.
*/
[[nodiscard]] constexpr friend bool operator!=(cuda_device_id const& lhs,
cuda_device_id const& rhs) noexcept
{
return lhs.value() != rhs.value();
}
//! @endcond
private:
value_type id_;
};
Expand Down Expand Up @@ -84,16 +113,17 @@ struct cuda_set_device_raii {
* @param dev_id The device to set as the current CUDA device
*/
explicit cuda_set_device_raii(cuda_device_id dev_id)
: old_device_{get_current_cuda_device()}, needs_reset_{old_device_.value() != dev_id.value()}
: old_device_{get_current_cuda_device()},
needs_reset_{dev_id.value() >= 0 && old_device_ != dev_id}
{
if (needs_reset_) RMM_ASSERT_CUDA_SUCCESS(cudaSetDevice(dev_id.value()));
if (needs_reset_) { RMM_ASSERT_CUDA_SUCCESS(cudaSetDevice(dev_id.value())); }
}
/**
* @brief Reactivates the previous CUDA device
*/
~cuda_set_device_raii() noexcept
{
if (needs_reset_) RMM_ASSERT_CUDA_SUCCESS(cudaSetDevice(old_device_.value()));
if (needs_reset_) { RMM_ASSERT_CUDA_SUCCESS(cudaSetDevice(old_device_.value())); }
}

cuda_set_device_raii(cuda_set_device_raii const&) = delete;
Expand Down
19 changes: 17 additions & 2 deletions include/rmm/device_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <rmm/cuda_device.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/detail/error.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
Expand Down Expand Up @@ -113,6 +114,7 @@ class device_buffer {
async_resource_ref mr = mr::get_current_device_resource())
: _stream{stream}, _mr{mr}
{
cuda_set_device_raii dev{_device};
allocate_async(size);
}

Expand Down Expand Up @@ -141,6 +143,7 @@ class device_buffer {
async_resource_ref mr = rmm::mr::get_current_device_resource())
: _stream{stream}, _mr{mr}
{
cuda_set_device_raii dev{_device};
allocate_async(size);
copy_async(source_data, size);
}
Expand Down Expand Up @@ -189,12 +192,14 @@ class device_buffer {
_size{other._size},
_capacity{other._capacity},
_stream{other.stream()},
_mr{other._mr}
_mr{other._mr},
_device{other._device}
{
other._data = nullptr;
other._size = 0;
other._capacity = 0;
other.set_stream(cuda_stream_view{});
other._device = cuda_device_id{-1};
}

/**
Expand All @@ -214,18 +219,21 @@ class device_buffer {
device_buffer& operator=(device_buffer&& other) noexcept
{
if (&other != this) {
cuda_set_device_raii dev{_device};
deallocate_async();

_data = other._data;
_size = other._size;
_capacity = other._capacity;
set_stream(other.stream());
_mr = other._mr;
_mr = other._mr;
_device = other._device;

other._data = nullptr;
other._size = 0;
other._capacity = 0;
other.set_stream(cuda_stream_view{});
other._device = cuda_device_id{-1};
}
return *this;
}
Expand All @@ -239,6 +247,7 @@ class device_buffer {
*/
~device_buffer() noexcept
{
cuda_set_device_raii dev{_device};
deallocate_async();
_stream = cuda_stream_view{};
}
Expand All @@ -265,6 +274,7 @@ class device_buffer {
{
set_stream(stream);
if (new_capacity > capacity()) {
cuda_set_device_raii dev{_device};
auto tmp = device_buffer{new_capacity, stream, _mr};
auto const old_size = size();
RMM_CUDA_TRY(cudaMemcpyAsync(tmp.data(), data(), size(), cudaMemcpyDefault, stream.value()));
Expand Down Expand Up @@ -306,6 +316,7 @@ class device_buffer {
if (new_size <= capacity()) {
_size = new_size;
} else {
cuda_set_device_raii dev{_device};
auto tmp = device_buffer{new_size, stream, _mr};
RMM_CUDA_TRY(cudaMemcpyAsync(tmp.data(), data(), size(), cudaMemcpyDefault, stream.value()));
*this = std::move(tmp);
Expand All @@ -329,6 +340,7 @@ class device_buffer {
{
set_stream(stream);
if (size() != capacity()) {
cuda_set_device_raii dev{_device};
// Invoke copy ctor on self which only copies `[0, size())` and swap it
// with self. The temporary `device_buffer` will hold the old contents
// which will then be destroyed
Expand Down Expand Up @@ -414,9 +426,11 @@ class device_buffer {
std::size_t _size{}; ///< Requested size of the device memory allocation
std::size_t _capacity{}; ///< The actual size of the device memory allocation
cuda_stream_view _stream{}; ///< Stream to use for device memory deallocation

async_resource_ref _mr{
rmm::mr::get_current_device_resource()}; ///< The memory resource used to
///< allocate/deallocate device memory
cuda_device_id _device{get_current_cuda_device()};

/**
* @brief Allocates the specified amount of memory and updates the size/capacity accordingly.
Expand Down Expand Up @@ -467,6 +481,7 @@ class device_buffer {
{
if (bytes > 0) {
RMM_EXPECTS(nullptr != source, "Invalid copy from nullptr.");
RMM_EXPECTS(nullptr != _data, "Invalid copy to nullptr.");

RMM_CUDA_TRY(cudaMemcpyAsync(_data, source, bytes, cudaMemcpyDefault, stream().value()));
}
Expand Down
50 changes: 14 additions & 36 deletions include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@
#include <fmt/core.h>

#include <cstddef>
#include <functional>
#include <limits>
#include <map>
#include <mutex>
#include <set>
#include <thread>
#include <unordered_map>

namespace rmm::mr::detail {
Expand Down Expand Up @@ -259,23 +255,6 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
}

private:
/**
* @brief RAII wrapper for a CUDA event.
*/
struct event_wrapper {
event_wrapper()
{
RMM_ASSERT_CUDA_SUCCESS(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
}
~event_wrapper() { RMM_ASSERT_CUDA_SUCCESS(cudaEventDestroy(event)); }
cudaEvent_t event{};

event_wrapper(event_wrapper const&) = delete;
event_wrapper& operator=(event_wrapper const&) = delete;
event_wrapper(event_wrapper&&) noexcept = delete;
event_wrapper& operator=(event_wrapper&&) = delete;
};

/**
* @brief get a unique CUDA event (possibly new) associated with `stream`
*
Expand All @@ -289,17 +268,20 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
stream_event_pair get_event(cuda_stream_view stream)
{
if (stream.is_per_thread_default()) {
// Create a thread-local shared event wrapper for each device. Shared pointers in the thread
// and in each MR instance ensure the wrappers are destroyed only after all are finished
// with them.
thread_local std::vector<std::shared_ptr<event_wrapper>> events_tls(
rmm::get_num_cuda_devices());
auto event = [&, device_id = this->device_id_]() {
if (events_tls[device_id.value()]) { return events_tls[device_id.value()]->event; }

auto event = std::make_shared<event_wrapper>();
this->default_stream_events.insert(event);
return (events_tls[device_id.value()] = std::move(event))->event;
// Create a thread-local event for each device. These events are
// deliberately leaked since the destructor needs to call into
// the CUDA runtime and thread_local destructors (can) run below
// main: it is undefined behaviour to call into the CUDA
// runtime below main.
thread_local std::vector<cudaEvent_t> events_tls(rmm::get_num_cuda_devices());
auto event = [device_id = this->device_id_]() {
auto& e = events_tls[device_id.value()];
if (!e) {
// These events are deliberately not destructed and therefore live until
// program exit.
RMM_ASSERT_CUDA_SUCCESS(cudaEventCreateWithFlags(&e, cudaEventDisableTiming));
}
return e;
}();
return stream_event_pair{stream.value(), event};
}
Expand Down Expand Up @@ -505,10 +487,6 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
// bidirectional mapping between non-default streams and events
std::unordered_map<cudaStream_t, stream_event_pair> stream_events_;

// shared pointers to events keeps the events alive as long as either the thread that created
// them or the MR that is using them exists.
std::set<std::shared_ptr<event_wrapper>> default_stream_events;

std::mutex mtx_; // mutex for thread-safe access

rmm::cuda_device_id device_id_{rmm::get_current_cuda_device()};
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,7 @@ ConfigureTest(BINNING_MR_TEST mr/device/binning_mr_tests.cpp)
# callback memory resource tests
ConfigureTest(CALLBACK_MR_TEST mr/device/callback_mr_tests.cpp)

# container multidevice tests
ConfigureTest(CONTAINER_MULTIDEVICE_TEST container_multidevice_tests.cu)

rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing DESTINATION bin/gtests/librmm)
Loading

0 comments on commit 41c1bea

Please sign in to comment.