Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

[MXNET-331] Single machine All Reduce Topology-aware Communication #11357

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9678143
add multiroot all-reduce communication pattern
Jun 4, 2018
d5e51d6
fix bug with UpdateWeight
Jun 4, 2018
0708dbc
fix PCI-E links appearing in weight matrix bug
Jun 4, 2018
5590920
optimization to skip CopyFromTo in ReduceInner gains a bit of throughput
Jun 4, 2018
4f8f58b
remove unnecessary if statement
Jun 5, 2018
908534a
Add tests
Jun 15, 2018
25cbbdc
add more tests, 6 tests left to add
Jun 16, 2018
310ee4d
get rid of some dead code
Jun 16, 2018
9cce8ea
Add comments
Jun 18, 2018
4d2790d
Add randomized tests for backtrack and kernighan-lin
Jun 18, 2018
b5b42bc
Fix Postprocess
Jun 18, 2018
6327ceb
Add switch for first valid tree when num_gpus > 8, and for maximum we…
Jun 18, 2018
8694fe7
Kernighan-Lin seems to find better trees
Jun 18, 2018
c6cd67a
get rid of printfs
Jun 20, 2018
7466c4d
change defaults
Jun 21, 2018
153ec0b
Merge branch 'feature_multirootv9' of https://github.com/ctcyang/incu…
Jun 21, 2018
7c61b6c
Merge branch 'master' of https://github.com/apache/incubator-mxnet in…
Jun 21, 2018
cc935a2
inherit from CommDevice instead of Comm
Jun 22, 2018
ba60aaa
Fix lint errors
Jun 22, 2018
972e9c0
Add Python test using MXNET_KVSTORE_USETREE, fix CMake compilation pr…
Jun 27, 2018
6627dcf
fix lint errors
Jun 27, 2018
4de89a7
better header guard that works for tests
Jun 27, 2018
317c66b
get rid of unused variable warning
Jun 27, 2018
c364fd3
retrigger jenkins
Jun 28, 2018
3241d71
resolve 2 comments
Jun 29, 2018
bd926bf
address comment using Class to do test, get rid of extraneous test, u…
Jul 2, 2018
0e1a704
resolve merge conflicts
Jul 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 68 additions & 53 deletions src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,31 @@ class CommDevice : public Comm {
}
}

const NDArray& ReduceRowSparse(int key, const std::vector<NDArray>& src,
int priority) {
auto& buf = merge_buf_[key];
std::vector<NDArray> reduce(src.size());

const NDArrayStorageType stype = src[0].storage_type();
NDArray& buf_merged = buf.merged_buf(stype);
if (buf.copy_buf.empty()) {
// initialize buffer for copying during reduce
buf.copy_buf.resize(src.size());
for (size_t j = 0; j < src.size(); ++j) {
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype());
}
}
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type())
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. "
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)";
for (size_t i = 0; i < src.size(); ++i) {
CopyFromTo(src[i], &(buf.copy_buf[i]), priority);
reduce[i] = buf.copy_buf[i];
}
ElementwiseSum(reduce, &buf_merged, priority);
return buf_merged;
}

const NDArray& Reduce(int key, const std::vector<NDArray>& src,
int priority) override {
// when this reduce is called from kvstore_dist, gc is not set
Expand All @@ -490,13 +515,14 @@ class CommDevice : public Comm {

InitBuffersAndComm(src);
auto& buf = merge_buf_[key];
std::vector<NDArray> reduce(src.size());

const NDArrayStorageType stype = src[0].storage_type();
NDArray& buf_merged = buf.merged_buf(stype);
// normal dense reduce
if (stype == kDefaultStorage) {
CopyFromTo(src[0], &buf_merged, priority);

std::vector<NDArray> reduce(src.size());
reduce[0] = buf_merged;

if (buf.copy_buf.empty()) {
Expand All @@ -514,24 +540,11 @@ class CommDevice : public Comm {
CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority);
reduce[i+1] = buf.copy_buf[i];
}
ElementwiseSum(reduce, &buf_merged, priority);
} else {
// sparse reduce
if (buf.copy_buf.empty()) {
// initialize buffer for copying during reduce
buf.copy_buf.resize(src.size());
for (size_t j = 0; j < src.size(); ++j) {
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype());
}
}
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type())
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. "
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)";
for (size_t i = 0; i < src.size(); ++i) {
CopyFromTo(src[i], &(buf.copy_buf[i]), priority);
reduce[i] = buf.copy_buf[i];
}
buf_merged = ReduceRowSparse(key, src, priority);
}
ElementwiseSum(reduce, &buf_merged, priority);
return buf_merged;
}

Expand Down Expand Up @@ -658,6 +671,42 @@ class CommDevice : public Comm {
}
}

using KeyAttrs = std::tuple<int, TShape, int>;
// try to allocate buff on device evenly
void InitMergeBuffer(const std::vector<Context>& devs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm did you make any change to this function? Asking because the move makes it hard to see the diff for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I made no changes to the existing --kv-store device.

std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), [](
const KeyAttrs& a, const KeyAttrs& b) {
return std::get<1>(a).Size() > std::get<1>(b).Size();
});

std::unordered_map<int, std::pair<Context, size_t>> ctx_info;
for (auto d : devs) {
ctx_info[d.dev_id] = std::make_pair(d, 0);
}

for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) {
const int key = std::get<0>(sorted_key_attrs_[i]);
const TShape& shape = std::get<1>(sorted_key_attrs_[i]);
const int type = std::get<2>(sorted_key_attrs_[i]);
auto& buf = merge_buf_[key];
Context ctx;
size_t min_size = std::numeric_limits<size_t>::max();
for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) {
size_t size = it->second.second;
if (size <= min_size) {
ctx = it->second.first;
min_size = size;
}
}
// Delayed allocation - as the dense merged buffer might not be used at all if push()
// only sees sparse arrays
bool delay_alloc = true;
buf.merged = NDArray(shape, ctx, delay_alloc, type);
ctx_info[ctx.dev_id].second += shape.Size();
}
inited_ = true;
}

private:
void EnableP2P(const std::vector<Context>& devs) {
#if MXNET_USE_CUDA
Expand Down Expand Up @@ -701,43 +750,6 @@ class CommDevice : public Comm {
#endif
}

using KeyAttrs = std::tuple<int, TShape, int>;
// try to allocate buff on device evenly
void InitMergeBuffer(const std::vector<Context>& devs) {
std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), [](
const KeyAttrs& a, const KeyAttrs& b) {
return std::get<1>(a).Size() > std::get<1>(b).Size();
});

std::unordered_map<int, std::pair<Context, size_t>> ctx_info;
for (auto d : devs) {
ctx_info[d.dev_id] = std::make_pair(d, 0);
}

for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) {
const int key = std::get<0>(sorted_key_attrs_[i]);
const TShape& shape = std::get<1>(sorted_key_attrs_[i]);
const int type = std::get<2>(sorted_key_attrs_[i]);
auto& buf = merge_buf_[key];
Context ctx;
size_t min_size = std::numeric_limits<size_t>::max();
for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) {
size_t size = it->second.second;
if (size <= min_size) {
ctx = it->second.first;
min_size = size;
}
}
// Delayed allocation - as the dense merged buffer might not be used at all if push()
// only sees sparse arrays
bool delay_alloc = true;
buf.merged = NDArray(shape, ctx, delay_alloc, type);
ctx_info[ctx.dev_id].second += shape.Size();
}
inited_ = true;
}

std::vector<KeyAttrs> sorted_key_attrs_;
/// \brief temporal space for pushing and pulling
struct BufferEntry {
/// \brief the dense merged value for reduce and broadcast operations
Expand Down Expand Up @@ -772,7 +784,10 @@ class CommDevice : public Comm {
NDArray sparse_merged;
};
std::unordered_map<int, BufferEntry> merge_buf_;

public:
bool inited_;
std::vector<KeyAttrs> sorted_key_attrs_;
};

} // namespace kvstore
Expand Down
Loading