Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support storage fallback with mutable inputs #147

Merged
merged 4 commits into from
Aug 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/mxnet/ndarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
# pylint: disable=wildcard-import
from .ndarray import *
from .ndarray_utils import load, save, zeros, empty, array
from .sparse_ndarray import _ndarray_cls
from .sparse_ndarray import csr, row_sparse, BaseSparseNDArray, todense, RowSparseNDArray, CSRNDArray
from .sparse_ndarray import _ndarray_cls, todense
from .sparse_ndarray import csr, row_sparse, BaseSparseNDArray, RowSparseNDArray, CSRNDArray
8 changes: 4 additions & 4 deletions python/mxnet/ndarray/sparse_ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ def copyto(self, other):
value and ``other`` will point to the same ``NDArray`` or ``CSRNDArray``.
"""
if isinstance(other, Context):
super(CSRNDArray, self).copyto(other)
return super(CSRNDArray, self).copyto(other)
elif isinstance(other, NDArray):
stype = other.stype
if stype == 'default' or stype == 'csr':
super(CSRNDArray, self).copyto(other)
return super(CSRNDArray, self).copyto(other)
else:
raise TypeError('copyto does not support destination NDArray stype ' + str(stype))
else:
Expand Down Expand Up @@ -597,11 +597,11 @@ def copyto(self, other):
return value and ``other`` will point to the same ``NDArray`` or ``RowSparseNDArray``.
"""
if isinstance(other, Context):
super(RowSparseNDArray, self).copyto(other)
return super(RowSparseNDArray, self).copyto(other)
elif isinstance(other, NDArray):
stype = other.stype
if stype == 'default' or stype == 'row_sparse':
super(RowSparseNDArray, self).copyto(other)
return super(RowSparseNDArray, self).copyto(other)
else:
raise TypeError('copyto does not support destination NDArray stype ' + str(stype))
else:
Expand Down
14 changes: 8 additions & 6 deletions python/mxnet/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,10 @@ def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8,
self.epsilon = epsilon

def create_state(self, index, weight):
return (zeros(weight.shape, weight.context, dtype=weight.dtype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype)) # variance
return (zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype)) # variance

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down Expand Up @@ -649,11 +651,11 @@ def __init__(self, learning_rate=0.001, gamma1=0.9, gamma2=0.9,
def create_state(self, index, weight):
if self.centered:
return (
zeros(weight.shape, weight.context), # n
zeros(weight.shape, weight.context), # g
zeros(weight.shape, weight.context)) # delta
zeros(weight.shape, weight.context, stype=weight.stype), # n
zeros(weight.shape, weight.context, stype=weight.stype), # g
zeros(weight.shape, weight.context, stype=weight.stype)) # delta
else:
return (zeros(weight.shape, weight.context),) # n
return (zeros(weight.shape, weight.context, stype=weight.stype),) # n

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down
79 changes: 52 additions & 27 deletions src/c_api/c_api_ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void SetShapeType(const nnvm::Op* op,
void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle> *p_write_vars,
std::vector<Resource> *p_requested,
std::vector<uint32_t> *p_auxidx,
std::vector<uint32_t> *p_mutate_idx,
const nnvm::Op* op,
const nnvm::NodeAttrs& attrs,
const Context& ctx,
Expand All @@ -235,7 +235,7 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle>& read_vars = *p_read_vars;
std::vector<engine::VarHandle>& write_vars = *p_write_vars;
std::vector<Resource>& requested = *p_requested;
std::vector<uint32_t>& auxidx = *p_auxidx;
std::vector<uint32_t>& mutate_idx = *p_mutate_idx;

if (tmp_resource.count(op)) {
int ntmp = 0;
Expand All @@ -261,9 +261,9 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
write_vars.push_back(i.var());
}
if (mutate.count(op)) {
auxidx = mutate[op](attrs);
std::sort(auxidx.begin(), auxidx.end());
for (auto & i : auxidx) {
mutate_idx = mutate[op](attrs);
std::sort(mutate_idx.begin(), mutate_idx.end());
for (auto & i : mutate_idx) {
write_vars.push_back(ndinputs[i].var());
}
}
Expand Down Expand Up @@ -293,36 +293,49 @@ void PushFCompute(const FCompute& fn,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
bool is_train = AutogradRuntime::Get()->IsTraining();
Engine::Get()->PushAsync(
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train](
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
auto map_iter = in_temp_idx_map.find(idx);
if (map_iter != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[map_iter->second]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
OpContext opctx{is_train, rctx,
engine::CallbackOnComplete(),
requested};
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (ctx.dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(post_temp_src, post_temp_dst, opctx);
rctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
on_complete();
}, ctx, read_vars, write_vars, FnProperty::kNormal,
Expand Down Expand Up @@ -365,7 +378,8 @@ void PushOperator(const OpStatePtr& state,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
static auto& fexec_type = nnvm::Op::GetAttr<FExecType>("FExecType");

Expand All @@ -379,28 +393,39 @@ void PushOperator(const OpStatePtr& state,
if (fcompute != nullptr) {
CHECK(exec_type == ExecType::kSync || exec_type == ExecType::kAsync);
Engine::Get()->PushAsync(
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type](
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
OpContext opctx{is_train, rctx, on_complete, requested};

std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
if (in_temp_idx_map.find(idx) != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[in_temp_idx_map[idx]]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(post_temp_src, post_temp_dst, opctx);
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
if (exec_type == ExecType::kSync) {
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
Expand Down Expand Up @@ -463,8 +488,8 @@ void ImperativeInvokeImpl(const Context& default_ctx,

std::vector<engine::VarHandle> read_vars, write_vars;
std::vector<Resource> requested;
std::vector<uint32_t> auxidx;
SetDependency(&read_vars, &write_vars, &requested, &auxidx,
std::vector<uint32_t> mutate_idx;
SetDependency(&read_vars, &write_vars, &requested, &mutate_idx,
op, attrs, ctx, ndinputs, ndoutputs);

FCompute fn = common::GetFCompute<FCompute>(op, "FCompute", ctx);
Expand All @@ -482,7 +507,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
attrs, &ndinputs, &ndoutputs);
}
PushFCompute(fn, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else if (createop.count(op)) {
auto state =
createop[op](attrs, ctx, ret->arg_shapes, ret->arg_types);
Expand All @@ -492,7 +517,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
}
write_vars.push_back(state.get_var());
PushOperator(state, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else {
LOG(FATAL)
<< "Operator " << op->name << " is not implemented for "
Expand Down
36 changes: 27 additions & 9 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,30 @@ template<typename xpu>
void CastStorageDispatch(const OpContext& ctx, const NDArray& input, const NDArray& output);

/*
* \brief get the corresponding tensor blobs from default storage NDArrays.
* If any NDArray is of non-default storage, it will be added to `temp_src`
* \return true if any input storage needs to be casted
* \brief setup default-storage tblobs from source NDArrays. If any source NDArray has non-default
* storage, it creates a temp NDArray with default storage and uses the temp tblob. The
* function also records the indices of non-default source NDArrays and the indices of
* their corresponding temporary NDArrays in the temp array.
* \param src list of source NDArray
* \param blobs list of tblobs to return
* \param temp_src list of source NDArrays which requires temporary default storage representation
* \param temp_dst list of temporary destination NDArrays for default storage representation
* \param idx_map mapping from indices in source NDArrays to indices in temp_dst. When not set,
indices are not recorded
* \return true if any source NDArray need to cast storage
*/
inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst) {
inline bool SetupDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst,
std::unordered_map<uint32_t, uint32_t> *idx_map = nullptr) {
bool require_cast = false;
for (size_t i = 0; i < src.size(); i++) {
auto& nd = src[i];
if (nd.storage_type() != kDefaultStorage) {
if (idx_map != nullptr) {
(*idx_map)[i] = temp_dst->size();
}
NDArray temp(nd.shape(), nd.ctx(), false);
temp_src->emplace_back(nd);
temp_dst->emplace_back(temp);
Expand All @@ -56,10 +68,15 @@ inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
}

/*
* \brief cast the NDArrays in `src` to NDArrays in `dst`. This is only used
* for storage fallback mechanism in executor.
* \brief cast the NDArrays in `src` and store the result in NDArrays in `dst`.
* This is only used for storage fallback in executor.
* When storage_fallback is false, and `MXNET_EXEC_STORAGE_FALLBACK` == 0,
* storage fallback is disallowed.
* \param src list of source NDArray to cast
* \param dst list of destionation NDArray which hold the result of cast_storage operation
* \param ctx operator context for cast_storage operation
* \param storage_fallback whether storage_fallback is allowed. When set to false,
* its value depends on `MXNET_EXEC_STORAGE_FALLBACK`.
*/
template <typename xpu>
inline void CastNonDefaultStorage(const std::vector<NDArray>& src,
Expand Down Expand Up @@ -89,6 +106,7 @@ inline bool ContainsNonDefaultStorage(const StorageTypeVector& vstorage) {
return false;
}

// Check if any NDArray in the list has default storage
inline bool ContainsDefaultStorage(const std::vector<NDArray>& ndarrays) {
for (const auto &nd : ndarrays) {
if (nd.storage_type() == kDefaultStorage) {
Expand Down
Loading