Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Use num internal streams instead of creating cumlHandle's inside the C++ layer #1015

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4df20fe
updated our c++ api to expose a num-streams param to the cumlHandle
teju85 Aug 16, 2019
8ea1bf5
updated RF code to not create cumlHandle objects, instead use interna…
teju85 Aug 16, 2019
49857e8
update changelog
teju85 Aug 16, 2019
42e046f
Updated dev-guide to add an example of RF for using internal streams
teju85 Aug 16, 2019
8f3a971
corresponding set of changes for cumlHandle in the python world
teju85 Aug 16, 2019
ed15600
have the default number of internal streams to be 0
teju85 Aug 16, 2019
78a76ea
use ASSERT instead of warnings for num-streams mismatch
teju85 Aug 19, 2019
619cfc7
set proper default value for n-streams in the cython-side as well
teju85 Aug 19, 2019
8b3cd96
added an example for the creation and usage of internal streams
teju85 Aug 19, 2019
8b00d17
Merge branch 'branch-0.10' of https://github.com/rapidsai/cuml into f…
teju85 Sep 5, 2019
3a7c90c
fixed compilation issue due to a mistake in the previous conflict res…
teju85 Sep 5, 2019
bbe309b
Merge branch 'branch-0.10' of https://github.com/rapidsai/cuml into f…
teju85 Sep 11, 2019
0df6d55
pass explicit n_streams for cumlHandle to RF tests
teju85 Sep 11, 2019
8a6a068
flake8 style fixes
teju85 Sep 11, 2019
9e7020c
fixed test_metrics.py to also pass n_streams parameter
teju85 Sep 12, 2019
b8edc7f
added n_streams param for the dask.rf case too
teju85 Sep 12, 2019
d86cc8d
flake8 style fixes
teju85 Sep 12, 2019
9bfe8e4
Merge branch 'branch-0.10' of https://github.com/rapidsai/cuml into f…
teju85 Sep 16, 2019
eaa550e
updated fil tests as well to take care of n_streams issue
teju85 Sep 16, 2019
db818f5
Merge branch 'fea-ext-expose-num-internal-streams' of https://github.…
teju85 Sep 16, 2019
b23357f
added a section in our dev-guide about the usage of internal straems
teju85 Sep 16, 2019
14cd122
tip about the usage of getNumInternalStreams
teju85 Sep 16, 2019
4788cca
linked CUDA Resources section to the point on internal streams
teju85 Sep 16, 2019
6f3e411
Merge branch 'branch-0.10' of https://github.com/rapidsai/cuml into f…
teju85 Sep 17, 2019
dbe54b0
Remove links to PR and instead use links to sections
teju85 Sep 19, 2019
83e1f98
Merge branch 'branch-0.10' of https://github.com/rapidsai/cuml into f…
teju85 Sep 20, 2019
d77ac96
Merge branch 'fea-ext-expose-num-internal-streams' of https://github.…
teju85 Sep 20, 2019
fa8864c
removed the RF with fil unit-test as per the previous PR from Vishal
teju85 Sep 20, 2019
f4c0c25
Creating handle on workers
cjnolet Sep 26, 2019
30f77ba
Making stylechecker happy
cjnolet Sep 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
- PR #973: Use cudaDeviceGetAttribute instead of relying on cudaDeviceProp object being passed
- PR #978: Update README for 0.9
- PR #1009: Fix references to notebooks-contrib
- PR #1015: Ability to control the number of internal streams in cumlHandle_impl via cumlHandle

## Bug Fixes

Expand Down
9 changes: 7 additions & 2 deletions cpp/src/common/cumlHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@

namespace ML {

cumlHandle::cumlHandle() : _impl(new cumlHandle_impl()) {}
int cumlHandle::getDefaultNumInternalStreams() {
return _default_num_internal_streams;
}

cumlHandle::cumlHandle(int n_streams) : _impl(new cumlHandle_impl(n_streams)) {}
cumlHandle::~cumlHandle() {}

void cumlHandle::setStream(cudaStream_t stream) { _impl->setStream(stream); }
Expand Down Expand Up @@ -171,12 +175,13 @@ cumlHandle_impl& cumlHandle::getImpl() { return *_impl.get(); }
using MLCommon::defaultDeviceAllocator;
using MLCommon::defaultHostAllocator;

cumlHandle_impl::cumlHandle_impl()
cumlHandle_impl::cumlHandle_impl(int n_streams)
: _dev_id([]() -> int {
int cur_dev = -1;
CUDA_CHECK(cudaGetDevice(&cur_dev));
return cur_dev;
}()),
_num_streams(n_streams),
_deviceAllocator(std::make_shared<defaultDeviceAllocator>()),
_hostAllocator(std::make_shared<defaultHostAllocator>()),
_userStream(NULL) {
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/common/cumlHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace ML {
*/
class cumlHandle_impl {
public:
cumlHandle_impl();
cumlHandle_impl(int n_streams = cumlHandle::getDefaultNumInternalStreams());
~cumlHandle_impl();
int getDevice() const;
void setStream(cudaStream_t stream);
Expand Down Expand Up @@ -65,9 +65,8 @@ class cumlHandle_impl {
const cudaDeviceProp& getDeviceProperties() const;

private:
//TODO: What is the right number?
static constexpr int _num_streams = 3;
const int _dev_id;
const int _num_streams;
std::vector<cudaStream_t> _streams;
cublasHandle_t _cublas_handle;
cusolverDnHandle_t _cusolverDn_handle;
Expand Down
10 changes: 9 additions & 1 deletion cpp/src/cuML.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,21 @@ class cumlHandle_impl;
* @brief Handle to manage resources needed by cuML algorithms.
*/
class cumlHandle {
private:
//TODO: What is the right number?
static constexpr int _default_num_internal_streams = 3;
teju85 marked this conversation as resolved.
Show resolved Hide resolved

public:
/**
* @brief construct a cumlHandle with default paramters.
* @param n_streams number of internal streams to be setup
*
* The default paramters are
* - stream: default or NULL stream
* - DeviceAllocator: cudaMalloc
* - HostAllocator: cudaMallocHost
*/
cumlHandle();
cumlHandle(int n_streams = _default_num_internal_streams);
/**
* @brief releases all resources internally manged by cumlHandle.
*/
Expand Down Expand Up @@ -94,6 +99,9 @@ class cumlHandle {
*/
cumlHandle_impl& getImpl();

/** for internal use only */
static int getDefaultNumInternalStreams();

private:
std::unique_ptr<cumlHandle_impl> _impl;
};
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/decisiontree/decisiontree_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ void DecisionTreeBase<T, L>::plant(
tempmem = in_tempmem;
} else {
tempmem = std::make_shared<TemporaryMemory<T, L>>(
handle, nrows, ncols, unique_labels, n_bins, split_algo, maxdepth);
handle, nrows, ncols, unique_labels, n_bins, split_algo, maxdepth,
handle.getStream());
quantile_per_tree = true;
}
if (split_algo == SPLIT_ALGO::GLOBAL_QUANTILE && quantile_per_tree) {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/decisiontree/memory.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
template <class T, class L>
TemporaryMemory<T, L>::TemporaryMemory(const ML::cumlHandle_impl& handle, int N,
int Ncols, int n_unique, int n_bins,
const int split_algo, int depth)
const int split_algo, int depth,
cudaStream_t _stream)
: ml_handle(handle) {
//Assign Stream from cumlHandle
stream = ml_handle.getStream();
stream = _stream;
splitalgo = split_algo;

max_shared_mem = MLCommon::getSharedMemPerBlock();
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/decisiontree/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ struct TemporaryMemory {
int max_nodes_per_level = 0;

TemporaryMemory(const ML::cumlHandle_impl &handle, int N, int Ncols,
int n_unique, int n_bins, const int split_algo, int depth);
int n_unique, int n_bins, const int split_algo, int depth,
cudaStream_t _stream);
~TemporaryMemory();
void NodeMemAllocator(int N, int Ncols, int n_unique, int n_bins,
const int split_algo);
Expand Down
104 changes: 47 additions & 57 deletions cpp/src/randomforest/randomforest_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -189,53 +189,56 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,
RandomForestMetaData<T, int>*& forest) {
this->error_checking(input, labels, n_rows, n_cols, false);

const cumlHandle_impl& handle = user_handle.getImpl();
int n_sampled_rows = this->rf_params.rows_sample * n_rows;
int n_streams = this->rf_params.n_streams;

const cumlHandle_impl& handle = user_handle.getImpl();
cudaStream_t stream = user_handle.getStream();
cumlHandle local_handle[n_streams];
cudaStream_t local_stream[n_streams];
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamCreate(&local_stream[i]));
local_handle[i].setStream(local_stream[i]);
if (n_streams > handle.getNumInternalStreams()) {
teju85 marked this conversation as resolved.
Show resolved Hide resolved
std::cerr << "Warning: rf_params.n_streams=" << n_streams
<< " cumlHandle.n_streams=" << handle.getNumInternalStreams()
<< " limiting num-streams to the one from cumlHandle!"
<< std::endl;
n_streams = handle.getNumInternalStreams();
}

cudaStream_t stream = handle.getStream();
// Select n_sampled_rows (with replacement) numbers from [0, n_rows) per tree.
// selected_rows: randomly generated IDs for bootstrapped samples (w/ replacement); a device ptr.
MLCommon::device_buffer<unsigned int>* selected_rows[n_streams];
MLCommon::device_buffer<unsigned int>* sorted_selected_rows[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
sorted_selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
}
}

// Will sort selected_rows (row IDs), prior to fit, to improve access patterns
MLCommon::device_buffer<char>* rows_temp_storage[n_streams];
size_t temp_storage_bytes[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
rows_temp_storage[i] = nullptr;
temp_storage_bytes[i] = 0;
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
CUDA_CHECK(cub::DeviceRadixSort::SortKeys(
rows_temp_storage[i], temp_storage_bytes[i], selected_rows[i]->data(),
sorted_selected_rows[i]->data(), n_sampled_rows, 0,
8 * sizeof(unsigned int), stream));
8 * sizeof(unsigned int), s));
// Allocate temporary storage
rows_temp_storage[i] = new MLCommon::device_buffer<char>(
handle.getDeviceAllocator(), stream, temp_storage_bytes[i]);
handle.getDeviceAllocator(), s, temp_storage_bytes[i]);
}
}
std::shared_ptr<TemporaryMemory<T, int>> tempmem[n_streams];
for (int i = 0; i < n_streams; i++) {
tempmem[i] = std::make_shared<TemporaryMemory<T, int>>(
local_handle[i].getImpl(), n_rows, n_cols, n_unique_labels,
handle, n_rows, n_cols, n_unique_labels,
this->rf_params.tree_params.n_bins,
this->rf_params.tree_params.split_algo,
this->rf_params.tree_params.max_depth);
this->rf_params.tree_params.max_depth, handle.getInternalStream(i));
}
//Preprocess once only per forest
if ((this->rf_params.tree_params.split_algo == SPLIT_ALGO::GLOBAL_QUANTILE) &&
Expand Down Expand Up @@ -269,8 +272,8 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,
this->prepare_fit_per_tree(
i, n_rows, n_sampled_rows, selected_rows[stream_id]->data(), selected_ptr,
temp_storage_ptr, temp_storage_bytes[stream_id],
tempmem[stream_id]->num_sms, local_handle[stream_id].getStream(),
local_handle[stream_id].getDeviceAllocator());
tempmem[stream_id]->num_sms, tempmem[stream_id]->stream,
handle.getDeviceAllocator());

/* Build individual tree in the forest.
- input is a pointer to orig data that have n_cols features and n_rows rows.
Expand All @@ -281,30 +284,23 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,
(b) a pointer to a list of row numbers w.r.t original data.
*/
DecisionTree::TreeMetaDataNode<T, int>* tree_ptr = &(forest->trees[i]);
trees[i].fit(local_handle[stream_id], input, n_cols, n_rows, labels, rowids,
trees[i].fit(user_handle, input, n_cols, n_rows, labels, rowids,
n_sampled_rows, n_unique_labels, tree_ptr,
this->rf_params.tree_params, tempmem[stream_id]);
}
//Cleanup
for (int i = 0; i < n_streams; i++) {
selected_rows[i]->release(stream);
auto s = handle.getInternalStream(i);
selected_rows[i]->release(s);
tempmem[i].reset();
delete selected_rows[i];
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
rows_temp_storage[i]->release(stream);
sorted_selected_rows[i]->release(stream);
rows_temp_storage[i]->release(s);
sorted_selected_rows[i]->release(s);
delete rows_temp_storage[i];
delete sorted_selected_rows[i];
}
}

for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamSynchronize(local_handle[i].getStream()));
}
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamDestroy(local_stream[i]));
}
CUDA_CHECK(cudaStreamSynchronize(user_handle.getStream()));
}

/**
Expand Down Expand Up @@ -496,54 +492,55 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
RandomForestMetaData<T, T>*& forest) {
this->error_checking(input, labels, n_rows, n_cols, false);

const cumlHandle_impl& handle = user_handle.getImpl();
int n_sampled_rows = this->rf_params.rows_sample * n_rows;
int n_streams = this->rf_params.n_streams;
if (n_streams > handle.getNumInternalStreams()) {
std::cerr << "Warning: rf_params.n_streams=" << n_streams
<< " cumlHandle.n_streams=" << handle.getNumInternalStreams()
<< " limiting num-streams to the one from cumlHandle!"
<< std::endl;
n_streams = handle.getNumInternalStreams();
}

const cumlHandle_impl& handle = user_handle.getImpl();
cudaStream_t stream = user_handle.getStream();
cumlHandle local_handle[n_streams];
cudaStream_t local_stream[n_streams];
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamCreate(&local_stream[i]));
local_handle[i].setStream(local_stream[i]);
}
// Select n_sampled_rows (with replacement) numbers from [0, n_rows) per tree.
// selected_rows: randomly generated IDs for bootstrapped samples (w/ replacement); a device ptr.
MLCommon::device_buffer<unsigned int>* selected_rows[n_streams];
MLCommon::device_buffer<unsigned int>* sorted_selected_rows[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
sorted_selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
}
}

// Will sort selected_rows (row IDs), prior to fit, to improve access patterns
MLCommon::device_buffer<char>* rows_temp_storage[n_streams];
size_t temp_storage_bytes[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
rows_temp_storage[i] = nullptr;
temp_storage_bytes[i] = 0;
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
CUDA_CHECK(cub::DeviceRadixSort::SortKeys(
rows_temp_storage[i], temp_storage_bytes[i], selected_rows[i]->data(),
sorted_selected_rows[i]->data(), n_sampled_rows, 0,
8 * sizeof(unsigned int), stream));
8 * sizeof(unsigned int), s));
// Allocate temporary storage
rows_temp_storage[i] = new MLCommon::device_buffer<char>(
handle.getDeviceAllocator(), stream, temp_storage_bytes[i]);
handle.getDeviceAllocator(), s, temp_storage_bytes[i]);
}
}

std::shared_ptr<TemporaryMemory<T, T>> tempmem[n_streams];
for (int i = 0; i < n_streams; i++) {
tempmem[i] = std::make_shared<TemporaryMemory<T, T>>(
local_handle[i].getImpl(), n_rows, n_cols, 1,
this->rf_params.tree_params.n_bins,
handle, n_rows, n_cols, 1, this->rf_params.tree_params.n_bins,
this->rf_params.tree_params.split_algo,
this->rf_params.tree_params.max_depth);
this->rf_params.tree_params.max_depth, handle.getInternalStream(i));
}
//Preprocess once only per forest
if ((this->rf_params.tree_params.split_algo == SPLIT_ALGO::GLOBAL_QUANTILE) &&
Expand Down Expand Up @@ -577,8 +574,8 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
this->prepare_fit_per_tree(
i, n_rows, n_sampled_rows, selected_rows[stream_id]->data(), selected_ptr,
temp_storage_ptr, temp_storage_bytes[stream_id],
tempmem[stream_id]->num_sms, local_handle[stream_id].getStream(),
local_handle[stream_id].getDeviceAllocator());
tempmem[stream_id]->num_sms, tempmem[stream_id]->stream,
handle.getDeviceAllocator());

/* Build individual tree in the forest.
- input is a pointer to orig data that have n_cols features and n_rows rows.
Expand All @@ -588,30 +585,23 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
(a) # n_sampled_rows and (b) a pointer to a list of row numbers w.r.t original data.
*/
DecisionTree::TreeMetaDataNode<T, T>* tree_ptr = &(forest->trees[i]);
trees[i].fit(local_handle[stream_id], input, n_cols, n_rows, labels, rowids,
trees[i].fit(user_handle, input, n_cols, n_rows, labels, rowids,
n_sampled_rows, tree_ptr, this->rf_params.tree_params,
tempmem[stream_id]);
}
//Cleanup
for (int i = 0; i < n_streams; i++) {
selected_rows[i]->release(stream);
auto s = handle.getInternalStream(i);
selected_rows[i]->release(s);
tempmem[i].reset();
delete selected_rows[i];
if (this->rf_params.tree_params.split_algo != SPLIT_ALGO::GLOBAL_QUANTILE) {
rows_temp_storage[i]->release(stream);
sorted_selected_rows[i]->release(stream);
rows_temp_storage[i]->release(s);
sorted_selected_rows[i]->release(s);
delete rows_temp_storage[i];
delete sorted_selected_rows[i];
}
}

for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamSynchronize(local_handle[i].getStream()));
}
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamDestroy(local_stream[i]));
}
CUDA_CHECK(cudaStreamSynchronize(user_handle.getStream()));
}

/**
Expand Down