Skip to content

Commit

Permalink
Merge pull request rapidsai#1015 from teju85/fea-ext-expose-num-inter…
Browse files Browse the repository at this point in the history
…nal-streams

[REVIEW] Use num internal streams instead of creating cumlHandle's inside the C++ layer
  • Loading branch information
cjnolet authored Sep 26, 2019
2 parents d09f26f + 3b0df08 commit e0e4f80
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
- PR #973: Use cudaDeviceGetAttribute instead of relying on cudaDeviceProp object being passed
- PR #978: Update README for 0.9
- PR #1009: Fix references to notebooks-contrib
- PR #1015: Ability to control the number of internal streams in cumlHandle_impl via cumlHandle

## Bug Fixes

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/common/cumlHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@

namespace ML {

int cumlHandle::getDefaultNumInternalStreams() {
return _default_num_internal_streams;
}

cumlHandle::cumlHandle(int n_streams) : _impl(new cumlHandle_impl(n_streams)) {}
cumlHandle::cumlHandle() : _impl(new cumlHandle_impl()) {}
cumlHandle::~cumlHandle() {}

Expand Down Expand Up @@ -171,12 +176,13 @@ cumlHandle_impl& cumlHandle::getImpl() { return *_impl.get(); }
using MLCommon::defaultDeviceAllocator;
using MLCommon::defaultHostAllocator;

cumlHandle_impl::cumlHandle_impl()
cumlHandle_impl::cumlHandle_impl(int n_streams)
: _dev_id([]() -> int {
int cur_dev = -1;
CUDA_CHECK(cudaGetDevice(&cur_dev));
return cur_dev;
}()),
_num_streams(n_streams),
_deviceAllocator(std::make_shared<defaultDeviceAllocator>()),
_hostAllocator(std::make_shared<defaultHostAllocator>()),
_userStream(NULL) {
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/common/cumlHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace ML {
*/
class cumlHandle_impl {
public:
cumlHandle_impl();
cumlHandle_impl(int n_streams = cumlHandle::getDefaultNumInternalStreams());
~cumlHandle_impl();
int getDevice() const;
void setStream(cudaStream_t stream);
Expand Down Expand Up @@ -65,9 +65,8 @@ class cumlHandle_impl {
const cudaDeviceProp& getDeviceProperties() const;

private:
//TODO: What is the right number?
static constexpr int _num_streams = 3;
const int _dev_id;
const int _num_streams;
std::vector<cudaStream_t> _streams;
cublasHandle_t _cublas_handle;
cusolverDnHandle_t _cusolverDn_handle;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/cuML.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ class cumlHandle {
public:
/**
* @brief construct a cumlHandle with default paramters.
* @param n_streams number of internal streams to be setup
*
* The default paramters are
* - stream: default or NULL stream
* - DeviceAllocator: cudaMalloc
* - HostAllocator: cudaMallocHost
* @{
*/
cumlHandle(int n_streams);
cumlHandle();
/** @} */
/**
* @brief releases all resources internally manged by cumlHandle.
*/
Expand Down Expand Up @@ -94,7 +98,11 @@ class cumlHandle {
*/
cumlHandle_impl& getImpl();

/** for internal use only */
static int getDefaultNumInternalStreams();

private:
static constexpr int _default_num_internal_streams = 0;
std::unique_ptr<cumlHandle_impl> _impl;
};

Expand Down
11 changes: 6 additions & 5 deletions cpp/src/decisiontree/memory.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ TemporaryMemory<T, L>::TemporaryMemory(
}

template <class T, class L>
TemporaryMemory<T, L>::TemporaryMemory(const ML::cumlHandle_impl& handle, int N,
int Ncols, float colper, int n_unique,
int n_bins, const int split_algo,
int depth, bool col_shuffle) {
TemporaryMemory<T, L>::TemporaryMemory(const ML::cumlHandle_impl& handle,
cudaStream_t stream_in, int N, int Ncols,
float colper, int n_unique, int n_bins,
const int split_algo, int depth,
bool col_shuffle) {
//Assign Stream from cumlHandle
stream = handle.getStream();
stream = stream_in;
splitalgo = split_algo;

max_shared_mem = MLCommon::getSharedMemPerBlock();
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/decisiontree/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ struct TemporaryMemory {
const std::shared_ptr<MLCommon::hostAllocator> host_allocator_in,
const cudaStream_t stream_in, int N, int Ncols, float colper, int n_unique,
int n_bins, const int split_algo, int depth, bool col_shuffle);
TemporaryMemory(const ML::cumlHandle_impl &handle, int N, int Ncols,
float colper, int n_unique, int n_bins, const int split_algo,
int depth, bool col_shuffle);
TemporaryMemory(const ML::cumlHandle_impl &handle, cudaStream_t stream_in,
int N, int Ncols, float colper, int n_unique, int n_bins,
const int split_algo, int depth, bool colshuffle);
~TemporaryMemory();
void LevelMemAllocator(int nrows, int ncols, float colper, int n_unique,
int nbins, int depth, const int split_algo,
Expand Down
81 changes: 33 additions & 48 deletions cpp/src/randomforest/randomforest_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -172,29 +172,27 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,
RandomForestMetaData<T, int>*& forest) {
this->error_checking(input, labels, n_rows, n_cols, false);

const cumlHandle_impl& handle = user_handle.getImpl();
int n_sampled_rows = this->rf_params.rows_sample * n_rows;
int n_streams = this->rf_params.n_streams;
ASSERT(n_streams <= handle.getNumInternalStreams(),
"rf_params.n_streams (=%d) should be <= cumlHandle.n_streams (=%d)",
n_streams, handle.getNumInternalStreams());

const cumlHandle_impl& handle = user_handle.getImpl();
cudaStream_t stream = user_handle.getStream();
cumlHandle local_handle[n_streams];
cudaStream_t local_stream[n_streams];
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamCreate(&local_stream[i]));
local_handle[i].setStream(local_stream[i]);
}
cudaStream_t stream = handle.getStream();
// Select n_sampled_rows (with replacement) numbers from [0, n_rows) per tree.
// selected_rows: randomly generated IDs for bootstrapped samples (w/ replacement); a device ptr.
MLCommon::device_buffer<unsigned int>* selected_rows[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
}

std::shared_ptr<TemporaryMemory<T, int>> tempmem[n_streams];
for (int i = 0; i < n_streams; i++) {
tempmem[i] = std::make_shared<TemporaryMemory<T, int>>(
local_handle[i].getImpl(), n_rows, n_cols,
handle, handle.getInternalStream(i), n_rows, n_cols,
this->rf_params.tree_params.max_features, n_unique_labels,
this->rf_params.tree_params.n_bins,
this->rf_params.tree_params.split_algo,
Expand Down Expand Up @@ -225,8 +223,8 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,

this->prepare_fit_per_tree(i, n_rows, n_sampled_rows, rowids,
tempmem[stream_id]->num_sms,
local_handle[stream_id].getStream(),
local_handle[stream_id].getDeviceAllocator());
tempmem[stream_id]->stream,
handle.getDeviceAllocator());

/* Build individual tree in the forest.
- input is a pointer to orig data that have n_cols features and n_rows rows.
Expand All @@ -236,28 +234,23 @@ void rfClassifier<T>::fit(const cumlHandle& user_handle, const T* input,
Expectation: Each tree node will contain (a) # n_sampled_rows and
(b) a pointer to a list of row numbers w.r.t original data.
*/
cumlHandle& myhandle = local_handle[stream_id];
DecisionTree::TreeMetaDataNode<T, int>* tree_ptr = &(forest->trees[i]);
tree_ptr->treeid = i;
trees[i].fit(myhandle.getImpl().getDeviceAllocator(),
myhandle.getImpl().getHostAllocator(),
myhandle.getImpl().getStream(), input, n_cols, n_rows, labels,
trees[i].fit(handle.getDeviceAllocator(),
handle.getHostAllocator(),
tempmem[stream_id]->stream, input, n_cols, n_rows, labels,
rowids, n_sampled_rows, n_unique_labels, tree_ptr,
this->rf_params.tree_params, tempmem[stream_id]);
}
//Cleanup
for (int i = 0; i < n_streams; i++) {
selected_rows[i]->release(stream);
auto s = tempmem[i]->stream;
CUDA_CHECK(cudaStreamSynchronize(s));
selected_rows[i]->release(s);
tempmem[i].reset();
delete selected_rows[i];
}

for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamSynchronize(local_handle[i].getStream()));
}
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamDestroy(local_stream[i]));
}
CUDA_CHECK(cudaStreamSynchronize(user_handle.getStream()));
}

Expand Down Expand Up @@ -445,29 +438,27 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
RandomForestMetaData<T, T>*& forest) {
this->error_checking(input, labels, n_rows, n_cols, false);

const cumlHandle_impl& handle = user_handle.getImpl();
int n_sampled_rows = this->rf_params.rows_sample * n_rows;
int n_streams = this->rf_params.n_streams;
ASSERT(n_streams <= handle.getNumInternalStreams(),
"rf_params.n_streams (=%d) should be <= cumlHandle.n_streams (=%d)",
n_streams, handle.getNumInternalStreams());

const cumlHandle_impl& handle = user_handle.getImpl();
cudaStream_t stream = user_handle.getStream();
cumlHandle local_handle[n_streams];
cudaStream_t local_stream[n_streams];
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamCreate(&local_stream[i]));
local_handle[i].setStream(local_stream[i]);
}
// Select n_sampled_rows (with replacement) numbers from [0, n_rows) per tree.
// selected_rows: randomly generated IDs for bootstrapped samples (w/ replacement); a device ptr.
MLCommon::device_buffer<unsigned int>* selected_rows[n_streams];
for (int i = 0; i < n_streams; i++) {
auto s = handle.getInternalStream(i);
selected_rows[i] = new MLCommon::device_buffer<unsigned int>(
handle.getDeviceAllocator(), stream, n_sampled_rows);
handle.getDeviceAllocator(), s, n_sampled_rows);
}

std::shared_ptr<TemporaryMemory<T, T>> tempmem[n_streams];
for (int i = 0; i < n_streams; i++) {
tempmem[i] = std::make_shared<TemporaryMemory<T, T>>(
local_handle[i].getImpl(), n_rows, n_cols,
handle, handle.getInternalStream(i), n_rows, n_cols,
this->rf_params.tree_params.max_features, 1,
this->rf_params.tree_params.n_bins,
this->rf_params.tree_params.split_algo,
Expand All @@ -493,12 +484,11 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
#pragma omp parallel for num_threads(n_streams)
for (int i = 0; i < this->rf_params.n_trees; i++) {
int stream_id = omp_get_thread_num();
unsigned int* rowids;
rowids = selected_rows[stream_id]->data();
unsigned int* rowids = selected_rows[stream_id]->data();
this->prepare_fit_per_tree(i, n_rows, n_sampled_rows, rowids,
tempmem[stream_id]->num_sms,
local_handle[stream_id].getStream(),
local_handle[stream_id].getDeviceAllocator());
tempmem[stream_id]->stream,
handle.getDeviceAllocator());

/* Build individual tree in the forest.
- input is a pointer to orig data that have n_cols features and n_rows rows.
Expand All @@ -507,29 +497,24 @@ void rfRegressor<T>::fit(const cumlHandle& user_handle, const T* input,
used to build the bootstrapped sample. Expectation: Each tree node will contain
(a) # n_sampled_rows and (b) a pointer to a list of row numbers w.r.t original data.
*/
cumlHandle& myhandle = local_handle[stream_id];
DecisionTree::TreeMetaDataNode<T, T>* tree_ptr = &(forest->trees[i]);
tree_ptr->treeid = i;
trees[i].fit(myhandle.getImpl().getDeviceAllocator(),
myhandle.getImpl().getHostAllocator(),
myhandle.getImpl().getStream(), input, n_cols, n_rows, labels,
trees[i].fit(handle.getDeviceAllocator(),
handle.getHostAllocator(),
tempmem[stream_id]->stream, input, n_cols, n_rows, labels,
rowids, n_sampled_rows, tree_ptr, this->rf_params.tree_params,
tempmem[stream_id]);
}
//Cleanup
for (int i = 0; i < n_streams; i++) {
selected_rows[i]->release(stream);
auto s = tempmem[i]->stream;
CUDA_CHECK(cudaStreamSynchronize(s));
selected_rows[i]->release(s);
tempmem[i].reset();
delete selected_rows[i];
}

for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamSynchronize(local_handle[i].getStream()));
}
for (int i = 0; i < n_streams; i++) {
CUDA_CHECK(cudaStreamDestroy(local_stream[i]));
}
CUDA_CHECK(cudaStreamSynchronize(user_handle.getStream()));
CUDA_CHECK(cudaStreamSynchronize(handle.getStream()));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/test/sg/rf_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class RfClassifierTest : public ::testing::TestWithParam<RfInputs<T>> {
forest = new typename ML::RandomForestMetaData<T, int>;
null_trees_ptr(forest);

cumlHandle handle;
cumlHandle handle(rf_params.n_streams);
handle.setStream(stream);

fit(handle, forest, data, params.n_rows, params.n_cols, labels,
Expand Down Expand Up @@ -188,7 +188,7 @@ class RfRegressorTest : public ::testing::TestWithParam<RfInputs<T>> {
forest = new typename ML::RandomForestMetaData<T, T>;
null_trees_ptr(forest);

cumlHandle handle;
cumlHandle handle(rf_params.n_streams);
handle.setStream(stream);

fit(handle, forest, data, params.n_rows, params.n_cols, labels, rf_params);
Expand Down
23 changes: 13 additions & 10 deletions cpp/test/sg/rf_treelite_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class RfTreeliteTestCommon : public ::testing::TestWithParam<RfInputs<T>> {
void getResultAndCheck() {
// Predict and compare against known labels
RF_metrics tmp =
score(handle, forest, inference_data_d, labels_d, params.n_inference_rows,
params.n_cols, predicted_labels_d, false);
score(*handle, forest, inference_data_d, labels_d,
params.n_inference_rows, params.n_cols, predicted_labels_d, false);
CUDA_CHECK(cudaStreamSynchronize(stream));

predicted_labels_h.resize(params.n_inference_rows);
Expand Down Expand Up @@ -183,6 +183,7 @@ class RfTreeliteTestCommon : public ::testing::TestWithParam<RfInputs<T>> {
set_all_rf_params(rf_params, params.n_trees, params.bootstrap,
params.rows_sample, params.n_streams, tree_params);
// print(rf_params);
handle.reset(new cumlHandle(rf_params.n_streams));

data_len = params.n_rows * params.n_cols;
inference_data_len = params.n_inference_rows * params.n_cols;
Expand All @@ -197,7 +198,7 @@ class RfTreeliteTestCommon : public ::testing::TestWithParam<RfInputs<T>> {
ref_predicted_labels.resize(params.n_inference_rows);

CUDA_CHECK(cudaStreamCreate(&stream));
handle.setStream(stream);
handle->setStream(stream);

forest = new typename ML::RandomForestMetaData<T, L>;
null_trees_ptr(forest);
Expand Down Expand Up @@ -254,7 +255,7 @@ class RfTreeliteTestCommon : public ::testing::TestWithParam<RfInputs<T>> {
int inference_data_len;

cudaStream_t stream;
cumlHandle handle;
std::shared_ptr<cumlHandle> handle;

std::vector<float> treelite_predicted_labels;
std::vector<float> ref_predicted_labels;
Expand Down Expand Up @@ -295,11 +296,12 @@ class RfTreeliteTestClf : public RfTreeliteTestCommon<T, L> {

LinAlg::transpose<float>(
this->data_d, temp_data_d, this->params.n_rows, this->params.n_cols,
this->handle.getImpl().getCublasHandle(), this->stream);
this->handle->getImpl().getCublasHandle(), this->stream);

LinAlg::gemv<float>(temp_data_d, this->params.n_cols, this->params.n_rows,
weight, temp_label_d, true, 1.f, 1.f,
this->handle.getImpl().getCublasHandle(), this->stream);
this->handle->getImpl().getCublasHandle(),
this->stream);

temp_label_h.resize(this->params.n_rows);
updateHost(temp_label_h.data(), temp_label_d, this->params.n_rows,
Expand All @@ -323,7 +325,7 @@ class RfTreeliteTestClf : public RfTreeliteTestCommon<T, L> {

preprocess_labels(this->params.n_rows, this->labels_h, labels_map);

fit(this->handle, this->forest, this->data_d, this->params.n_rows,
fit(*(this->handle), this->forest, this->data_d, this->params.n_rows,
this->params.n_cols, this->labels_d, labels_map.size(),
this->rf_params);

Expand Down Expand Up @@ -371,18 +373,19 @@ class RfTreeliteTestReg : public RfTreeliteTestCommon<T, L> {

LinAlg::transpose<float>(
this->data_d, temp_data_d, this->params.n_rows, this->params.n_cols,
this->handle.getImpl().getCublasHandle(), this->stream);
this->handle->getImpl().getCublasHandle(), this->stream);

LinAlg::gemv<float>(temp_data_d, this->params.n_cols, this->params.n_rows,
weight, this->labels_d, true, 1.f, 1.f,
this->handle.getImpl().getCublasHandle(), this->stream);
this->handle->getImpl().getCublasHandle(),
this->stream);

this->labels_h.resize(this->params.n_rows);
updateHost(this->labels_h.data(), this->labels_d, this->params.n_rows,
this->stream);
CUDA_CHECK(cudaStreamSynchronize(this->stream));

fit(this->handle, this->forest, this->data_d, this->params.n_rows,
fit(*(this->handle), this->forest, this->data_d, this->params.n_rows,
this->params.n_cols, this->labels_d, this->rf_params);

CUDA_CHECK(cudaStreamSynchronize(this->stream));
Expand Down
Loading

0 comments on commit e0e4f80

Please sign in to comment.