This repository has been archived by the owner on Nov 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[MXNET-331] Single machine All Reduce Topology-aware Communication #11357
Closed
Closed
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
9678143
add multiroot all-reduce communication pattern
d5e51d6
fix bug with UpdateWeight
0708dbc
fix PCI-E links appearing in weight matrix bug
5590920
optimization to skip CopyFromTo in ReduceInner gains a bit of throughput
4f8f58b
remove unnecessary if statement
908534a
Add tests
25cbbdc
add more tests, 6 tests left to add
310ee4d
get rid of some dead code
9cce8ea
Add comments
4d2790d
Add randomized tests for backtrack and kernighan-lin
b5b42bc
Fix Postprocess
6327ceb
Add switch for first valid tree when num_gpus > 8, and for maximum we…
8694fe7
Kernighan-Lin seems to find better trees
c6cd67a
get rid of printfs
7466c4d
change defaults
153ec0b
Merge branch 'feature_multirootv9' of https://github.com/ctcyang/incu…
7c61b6c
Merge branch 'master' of https://github.com/apache/incubator-mxnet in…
cc935a2
inherit from CommDevice instead of Comm
ba60aaa
Fix lint errors
972e9c0
Add Python test using MXNET_KVSTORE_USETREE, fix CMake compilation pr…
6627dcf
fix lint errors
4de89a7
better header guard that works for tests
317c66b
get rid of unused variable warning
c364fd3
retrigger jenkins
3241d71
resolve 2 comments
bd926bf
address comment using Class to do test, get rid of extraneous test, u…
0e1a704
resolve merge conflicts
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -474,6 +474,31 @@ class CommDevice : public Comm { | |
} | ||
} | ||
|
||
const NDArray& ReduceRowSparse(int key, const std::vector<NDArray>& src, | ||
int priority) { | ||
auto& buf = merge_buf_[key]; | ||
std::vector<NDArray> reduce(src.size()); | ||
|
||
const NDArrayStorageType stype = src[0].storage_type(); | ||
NDArray& buf_merged = buf.merged_buf(stype); | ||
if (buf.copy_buf.empty()) { | ||
// initialize buffer for copying during reduce | ||
buf.copy_buf.resize(src.size()); | ||
for (size_t j = 0; j < src.size(); ++j) { | ||
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype()); | ||
} | ||
} | ||
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type()) | ||
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. " | ||
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)"; | ||
for (size_t i = 0; i < src.size(); ++i) { | ||
CopyFromTo(src[i], &(buf.copy_buf[i]), priority); | ||
reduce[i] = buf.copy_buf[i]; | ||
} | ||
ElementwiseSum(reduce, &buf_merged, priority); | ||
return buf_merged; | ||
} | ||
|
||
const NDArray& Reduce(int key, const std::vector<NDArray>& src, | ||
int priority) override { | ||
// when this reduce is called from kvstore_dist, gc is not set | ||
|
@@ -490,13 +515,14 @@ class CommDevice : public Comm { | |
|
||
InitBuffersAndComm(src); | ||
auto& buf = merge_buf_[key]; | ||
std::vector<NDArray> reduce(src.size()); | ||
|
||
const NDArrayStorageType stype = src[0].storage_type(); | ||
NDArray& buf_merged = buf.merged_buf(stype); | ||
// normal dense reduce | ||
if (stype == kDefaultStorage) { | ||
CopyFromTo(src[0], &buf_merged, priority); | ||
|
||
std::vector<NDArray> reduce(src.size()); | ||
reduce[0] = buf_merged; | ||
|
||
if (buf.copy_buf.empty()) { | ||
|
@@ -514,24 +540,11 @@ class CommDevice : public Comm { | |
CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority); | ||
reduce[i+1] = buf.copy_buf[i]; | ||
} | ||
ElementwiseSum(reduce, &buf_merged, priority); | ||
} else { | ||
// sparse reduce | ||
if (buf.copy_buf.empty()) { | ||
// initialize buffer for copying during reduce | ||
buf.copy_buf.resize(src.size()); | ||
for (size_t j = 0; j < src.size(); ++j) { | ||
buf.copy_buf[j] = NDArray(stype, src[0].shape(), buf_merged.ctx(), true, src[0].dtype()); | ||
} | ||
} | ||
CHECK(src[0].storage_type() == buf.copy_buf[0].storage_type()) | ||
<< "Storage type mismatch detected. " << src[0].storage_type() << "(src) vs. " | ||
<< buf.copy_buf[0].storage_type() << "(buf.copy_buf)"; | ||
for (size_t i = 0; i < src.size(); ++i) { | ||
CopyFromTo(src[i], &(buf.copy_buf[i]), priority); | ||
reduce[i] = buf.copy_buf[i]; | ||
} | ||
buf_merged = ReduceRowSparse(key, src, priority); | ||
} | ||
ElementwiseSum(reduce, &buf_merged, priority); | ||
return buf_merged; | ||
} | ||
|
||
|
@@ -658,6 +671,42 @@ class CommDevice : public Comm { | |
} | ||
} | ||
|
||
using KeyAttrs = std::tuple<int, TShape, int>; | ||
// try to allocate buff on device evenly | ||
void InitMergeBuffer(const std::vector<Context>& devs) { | ||
std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), []( | ||
const KeyAttrs& a, const KeyAttrs& b) { | ||
return std::get<1>(a).Size() > std::get<1>(b).Size(); | ||
}); | ||
|
||
std::unordered_map<int, std::pair<Context, size_t>> ctx_info; | ||
for (auto d : devs) { | ||
ctx_info[d.dev_id] = std::make_pair(d, 0); | ||
} | ||
|
||
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { | ||
const int key = std::get<0>(sorted_key_attrs_[i]); | ||
const TShape& shape = std::get<1>(sorted_key_attrs_[i]); | ||
const int type = std::get<2>(sorted_key_attrs_[i]); | ||
auto& buf = merge_buf_[key]; | ||
Context ctx; | ||
size_t min_size = std::numeric_limits<size_t>::max(); | ||
for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) { | ||
size_t size = it->second.second; | ||
if (size <= min_size) { | ||
ctx = it->second.first; | ||
min_size = size; | ||
} | ||
} | ||
// Delayed allocation - as the dense merged buffer might not be used at all if push() | ||
// only sees sparse arrays | ||
bool delay_alloc = true; | ||
buf.merged = NDArray(shape, ctx, delay_alloc, type); | ||
ctx_info[ctx.dev_id].second += shape.Size(); | ||
} | ||
inited_ = true; | ||
} | ||
|
||
private: | ||
void EnableP2P(const std::vector<Context>& devs) { | ||
#if MXNET_USE_CUDA | ||
|
@@ -701,43 +750,6 @@ class CommDevice : public Comm { | |
#endif | ||
} | ||
|
||
using KeyAttrs = std::tuple<int, TShape, int>; | ||
// try to allocate buff on device evenly | ||
void InitMergeBuffer(const std::vector<Context>& devs) { | ||
std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), []( | ||
const KeyAttrs& a, const KeyAttrs& b) { | ||
return std::get<1>(a).Size() > std::get<1>(b).Size(); | ||
}); | ||
|
||
std::unordered_map<int, std::pair<Context, size_t>> ctx_info; | ||
for (auto d : devs) { | ||
ctx_info[d.dev_id] = std::make_pair(d, 0); | ||
} | ||
|
||
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { | ||
const int key = std::get<0>(sorted_key_attrs_[i]); | ||
const TShape& shape = std::get<1>(sorted_key_attrs_[i]); | ||
const int type = std::get<2>(sorted_key_attrs_[i]); | ||
auto& buf = merge_buf_[key]; | ||
Context ctx; | ||
size_t min_size = std::numeric_limits<size_t>::max(); | ||
for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) { | ||
size_t size = it->second.second; | ||
if (size <= min_size) { | ||
ctx = it->second.first; | ||
min_size = size; | ||
} | ||
} | ||
// Delayed allocation - as the dense merged buffer might not be used at all if push() | ||
// only sees sparse arrays | ||
bool delay_alloc = true; | ||
buf.merged = NDArray(shape, ctx, delay_alloc, type); | ||
ctx_info[ctx.dev_id].second += shape.Size(); | ||
} | ||
inited_ = true; | ||
} | ||
|
||
std::vector<KeyAttrs> sorted_key_attrs_; | ||
/// \brief temporal space for pushing and pulling | ||
struct BufferEntry { | ||
/// \brief the dense merged value for reduce and broadcast operations | ||
|
@@ -750,6 +762,8 @@ class CommDevice : public Comm { | |
std::vector<NDArray> compressed_send_buf; | ||
/// \brief the small buffer for compressed data in receiver | ||
std::vector<NDArray> compressed_recv_buf; | ||
/// \brief size of allocation in case we do not actually allocate merged | ||
TShape merged_size; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this being used? |
||
|
||
/// \brief the merged buffer for the given storage type (could be either dense or row_sparse) | ||
inline NDArray& merged_buf(NDArrayStorageType stype) { | ||
|
@@ -772,7 +786,10 @@ class CommDevice : public Comm { | |
NDArray sparse_merged; | ||
}; | ||
std::unordered_map<int, BufferEntry> merge_buf_; | ||
|
||
public: | ||
bool inited_; | ||
std::vector<KeyAttrs> sorted_key_attrs_; | ||
}; | ||
|
||
} // namespace kvstore | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm did you make any change to this function? Asking because the move makes it hard to see the diff for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. I made no changes to the existing
--kv-store device
.