Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

[MXNET-331] Single machine All Reduce Topology-aware Communication #11357

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9678143
add multiroot all-reduce communication pattern
Jun 4, 2018
d5e51d6
fix bug with UpdateWeight
Jun 4, 2018
0708dbc
fix PCI-E links appearing in weight matrix bug
Jun 4, 2018
5590920
optimization to skip CopyFromTo in ReduceInner gains a bit of throughput
Jun 4, 2018
4f8f58b
remove unnecessary if statement
Jun 5, 2018
908534a
Add tests
Jun 15, 2018
25cbbdc
add more tests, 6 tests left to add
Jun 16, 2018
310ee4d
get rid of some dead code
Jun 16, 2018
9cce8ea
Add comments
Jun 18, 2018
4d2790d
Add randomized tests for backtrack and kernighan-lin
Jun 18, 2018
b5b42bc
Fix Postprocess
Jun 18, 2018
6327ceb
Add switch for first valid tree when num_gpus > 8, and for maximum we…
Jun 18, 2018
8694fe7
Kernighan-Lin seems to find better trees
Jun 18, 2018
c6cd67a
get rid of printfs
Jun 20, 2018
7466c4d
change defaults
Jun 21, 2018
153ec0b
Merge branch 'feature_multirootv9' of https://github.com/ctcyang/incu…
Jun 21, 2018
7c61b6c
Merge branch 'master' of https://github.com/apache/incubator-mxnet in…
Jun 21, 2018
cc935a2
inherit from CommDevice instead of Comm
Jun 22, 2018
ba60aaa
Fix lint errors
Jun 22, 2018
972e9c0
Add Python test using MXNET_KVSTORE_USETREE, fix CMake compilation pr…
Jun 27, 2018
6627dcf
fix lint errors
Jun 27, 2018
4de89a7
better header guard that works for tests
Jun 27, 2018
317c66b
get rid of unused variable warning
Jun 27, 2018
c364fd3
retrigger jenkins
Jun 28, 2018
3241d71
resolve 2 comments
Jun 29, 2018
bd926bf
address comment using Class to do test, get rid of extraneous test, u…
Jul 2, 2018
0e1a704
resolve merge conflicts
Jul 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,31 @@ class CommDevice : public Comm {
}
}

const NDArray& ReduceRowSparse(int key, const std::vector<NDArray>& src,
int priority) {
auto& buf = merge_buf_[key];
std::vector<NDArray> reduce(src.size());

const NDArrayStorageType stype = src[0].storage_type();
NDArray& buf_merged = buf.merged_buf(stype);
if (buf.copy_buf.empty()) {
// initialize buffer for copying during reduce
buf.copy_buf.resize(src.size());
for (size_t j = 0; j < src.size(); ++j) {
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype());
}
}
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type())
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. "
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)";
for (size_t i = 0; i < src.size(); ++i) {
CopyFromTo(src[i], &(buf.copy_buf[i]), priority);
reduce[i] = buf.copy_buf[i];
}
ElementwiseSum(reduce, &buf_merged, priority);
return buf_merged;
}

const NDArray& Reduce(int key, const std::vector<NDArray>& src,
int priority) override {
// when this reduce is called from kvstore_dist, gc is not set
Expand All @@ -490,13 +515,14 @@ class CommDevice : public Comm {

InitBuffersAndComm(src);
auto& buf = merge_buf_[key];
std::vector<NDArray> reduce(src.size());

const NDArrayStorageType stype = src[0].storage_type();
NDArray& buf_merged = buf.merged_buf(stype);
// normal dense reduce
if (stype == kDefaultStorage) {
CopyFromTo(src[0], &buf_merged, priority);

std::vector<NDArray> reduce(src.size());
reduce[0] = buf_merged;

if (buf.copy_buf.empty()) {
Expand All @@ -514,24 +540,11 @@ class CommDevice : public Comm {
CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority);
reduce[i+1] = buf.copy_buf[i];
}
ElementwiseSum(reduce, &buf_merged, priority);
} else {
// sparse reduce
if (buf.copy_buf.empty()) {
// initialize buffer for copying during reduce
buf.copy_buf.resize(src.size());
for (size_t j = 0; j < src.size(); ++j) {
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype());
}
}
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type())
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. "
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)";
for (size_t i = 0; i < src.size(); ++i) {
CopyFromTo(src[i], &(buf.copy_buf[i]), priority);
reduce[i] = buf.copy_buf[i];
}
buf_merged = ReduceRowSparse(key, src, priority);
}
ElementwiseSum(reduce, &buf_merged, priority);
return buf_merged;
}

Expand Down Expand Up @@ -772,6 +785,8 @@ class CommDevice : public Comm {
NDArray sparse_merged;
};
std::unordered_map<int, BufferEntry> merge_buf_;

public:
bool inited_;
};

Expand Down
Loading