Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
construct row_sparse ndarray for dist-async
Browse files Browse the repository at this point in the history
fix bug in rsp add

rsp sync push

race condition for push

fix bug in rsp pull. refactor test

cleanup comments

refactor dist server

fix lint

fix storage shape issue with the new ndarray constructor

data sharding draft;

fix lint. add comment

add support for zeros gradients

use std::upper_bound/lower_bound

remove special init function for rowsparse dist kvstore

temporary support for inplace operators for sparse

add test. fix return type

store kRowSparseNDArray in kv server

remove fcomp_ex sgd with dns weight and rsp gradient

bug fix in sparse retain

sparse pull c_api

revise rowsparse pull api

use engine to compute unique to ensure thread safety

add rowsparse pull to dist-kv

fix lint

add example for rsp_pull

remove name2idx;

add sparse_pull_dict param to module

fix unit test and  c rowid conversion
  • Loading branch information
eric-haibin-lin committed Jul 26, 2017
1 parent 6644d22 commit 6acd61a
Show file tree
Hide file tree
Showing 18 changed files with 812 additions and 335 deletions.
20 changes: 20 additions & 0 deletions include/mxnet/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,26 @@ MXNET_DLL int MXKVStorePullEx(KVStoreHandle handle,
const char** keys,
NDArrayHandle* vals,
int priority);

/*!
* \brief pull a list of (key, value) pairs from the kvstore, where each key is a string.
* The NDArray pulled back will be in row_sparse storage with only the specified
* row_ids present based row_ids (others rows are zeros).
* \param handle handle to the kvstore
* \param num the number of key-value pairs
* \param keys the list of keys
* \param vals the list of values
* \param row_ids the list of row_id NDArrays
* \param priority the priority of the action
* \return 0 when success, -1 when failure happens
*/
MXNET_DLL int MXKVStorePullRowSparse(KVStoreHandle handle,
mx_uint num,
const char** keys,
NDArrayHandle* vals,
const NDArrayHandle* row_ids,
int priority);

/*!
* \brief user-defined updater for the kvstore
* It's this updater's responsibility to delete \a recv and \a local
Expand Down
24 changes: 24 additions & 0 deletions include/mxnet/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define MXNET_KVSTORE_H_
#include <dmlc/io.h>
#include <vector>
#include <utility>
#include <unordered_map>
#include <string>
#include <functional>
Expand Down Expand Up @@ -155,6 +156,29 @@ class KVStore {
const std::vector<NDArray*>& values,
int priority = 0) = 0;

/*!
* \brief pull a list of key-value pairs from the store.
* The NDArray pulled back will be in row_sparse storage with only the
* specified row_ids present (others rows are zeros).
* \param keys the list of keys
* \param values the list of buffers - row_id pairs
* \param priority the priority of the action.
*/
virtual void PullRowSparse(const std::vector<int>& str_keys,
const std::vector<std::pair<NDArray*, NDArray>>& val_rowids,
const int priority = 0) = 0;

/*!
* \brief pull a list of key-value pairs from the store, where each key is a string.
* The NDArray pulled back will be in row_sparse storage with only the
* specified row_ids present (others rows are zeros).
* \param keys the list of keys in string format
* \param values the list of buffers - row_id pairs
* \param priority the priority of the action.
*/
virtual void PullRowSparse(const std::vector<std::string>& str_keys,
const std::vector<std::pair<NDArray*, NDArray>>& val_rowids,
const int priority = 0) = 0;

/**
* \brief the prototype of user-defined updater
Expand Down
11 changes: 8 additions & 3 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,13 @@ class NDArray {
return shape_;
}
/*!
* \return the shape of underlying chunk which stores the NDArray values.
* For default storage, it is the same as shape(). For row-sparse storage, it is the shape of
* \return the shape of underlying chunk which stores the NDArray data/value.
* It is only intended for non-default storage. For row-sparse storage, it is the shape of
* the tensor which stores the non-zero values.
*/
inline const TShape &storage_shape() const {
CHECK(ptr_ != nullptr);
CHECK_NE(storage_type(), kDefaultStorage);
return ptr_->storage_shape;
}

Expand Down Expand Up @@ -271,7 +272,11 @@ class NDArray {
if (is_none()) return false;
auto stype = storage_type();
CHECK_NE(stype, kDefaultStorage);
if (stype == kRowSparseStorage || stype == kCSRStorage) {
if (stype == kRowSparseStorage) {
CHECK_EQ(aux_shape(rowsparse::kIdx)[0], storage_shape()[0]);
return aux_shape(0).Size() != 0;
} else if (stype == kCSRStorage) {
CHECK_EQ(aux_shape(csr::kIdx)[0], storage_shape()[0]);
return aux_shape(0).Size() != 0;
} else {
LOG(FATAL) << "Unknown storage type";
Expand Down
93 changes: 87 additions & 6 deletions python/mxnet/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ctypes
import pickle
from .ndarray import NDArray
from .sparse_ndarray import _ndarray_cls
from .base import _LIB
from .base import check_call, c_array, c_str, string_types, mx_uint, py_str
from .base import NDArrayHandle, KVStoreHandle
Expand Down Expand Up @@ -48,15 +49,15 @@ def _updater_wrapper(updater):
"""A wrapper for the user-defined handle."""
def updater_handle(key, lhs_handle, rhs_handle, _):
""" ctypes function """
lhs = NDArray(NDArrayHandle(lhs_handle))
rhs = NDArray(NDArrayHandle(rhs_handle))
lhs = _ndarray_cls(NDArrayHandle(lhs_handle))
rhs = _ndarray_cls(NDArrayHandle(rhs_handle))
updater(key, lhs, rhs)
return updater_handle


class KVStore(object):
"""A key-value store for synchronization of values, over multiple devices."""
def __init__(self, handle, name2idx=None):
def __init__(self, handle):
"""Initializes a new KVStore.
Parameters
Expand All @@ -66,7 +67,6 @@ def __init__(self, handle, name2idx=None):
"""
assert isinstance(handle, KVStoreHandle)
self.handle = handle
self.name2idx = name2idx if name2idx is not None else {}
self._updater = None
self._updater_func = None

Expand Down Expand Up @@ -184,6 +184,8 @@ def pull(self, key, out=None, priority=0):
The returned values are gauranteed to be the latest values in the store.
For row_sparse values, please use `row_sparse_pull` instead.
Parameters
----------
key : int or list of int
Expand Down Expand Up @@ -229,12 +231,91 @@ def pull(self, key, out=None, priority=0):
[ 2. 2. 2.]]
"""
assert(out is not None)
if not isinstance(out, (list, tuple)):
out = [out]
for val in out:
if not isinstance(val, (list, tuple)):
assert(val.storage_type == 'default')
else:
for v in val:
assert(v.storage_type == 'default')
key = _cast_to_str_keys(key)
ckeys, cvals = _ctype_str_key_value(key, out)
check_call(_LIB.MXKVStorePullEx(
self.handle, mx_uint(len(ckeys)), ckeys, cvals,
ctypes.c_int(priority)))

def row_sparse_pull(self, key, out=None, priority=0, row_ids=None):
""" Pulls a single row_sparse value or a sequence of row_sparse values from the store
with specified row_ids.
`row_sparse_pull` is executed asynchronously after all previous
`push`/`pull`/`row_sparse_pull` calls for the same input key(s) are finished.
The returned values are guaranteed to be the latest values in the store.
Parameters
----------
key : str or list of str
Keys.
out: NDArray or list of NDArray or list of list of NDArray
Values corresponding to the keys. The storage_type is expected to be row_sparse
priority : int, optional
The priority of the pull operation.
Higher priority pull operations are likely to be executed before
other pull actions.
row_ids : NDArray or list of NDArray
The row_ids for which to pull for each value. The row_ids doesn't have to be unique
or sorted.
Examples
--------
>>> shape = (3, 3)
>>> kv.init('3', mx.nd.ones(shape)._to_rsp())
>>> a = mx.nd.zeros(shape)
>>> row_ids = mx.nd.array([0, 2], dtype='int64')
>>> kv.row_sparse_pull('3', out=a, row_ids=row_ids)
>>> print a.asnumpy()
[[ 1. 1. 1.]
[ 0. 0. 0.]
[ 1. 1. 1.]]
>>> duplicate_row_ids = mx.nd.array([2, 2], dtype='int64')
>>> kv.row_sparse_pull('3', out=a, row_ids=duplicate_row_ids)
>>> print a.asnumpy()
[[ 0. 0. 0.]
[ 0. 0. 0.]
[ 1. 1. 1.]]
>>> unsorted_row_ids = mx.nd.array([1, 0], dtype='int64')
>>> kv.row_sparse_pull('3', out=a, row_ids=unsorted_row_ids)
>>> print a.asnumpy()
[[ 1. 1. 1.]
[ 1. 1. 1.]
[ 0. 0. 0.]]
"""
assert(out is not None)
assert(row_ids is not None)
if isinstance(row_ids, NDArray):
row_ids = [row_ids]
if not isinstance(out, (list, tuple)):
out = [out]
for val in out:
if not isinstance(val, (list, tuple)):
assert(val.storage_type == 'row_sparse')
else:
for v in val:
assert(v.storage_type == 'row_sparse')
key = _cast_to_str_keys(key)
ckeys, cvals = _ctype_str_key_value(key, out)
_, crow_ids = _ctype_str_key_value(key, row_ids)
assert(len(crow_ids) == len(cvals)), (len(crow_ids), len(cvals))
#TODO(haibin) pickup upstream changes which removed `_cast_to_str_keys`

check_call(_LIB.MXKVStorePullRowSparse(
self.handle, mx_uint(len(ckeys)), ckeys, cvals, crow_ids, ctypes.c_int(priority)))


def set_optimizer(self, optimizer):
""" Registers an optimizer with the kvstore.
Expand Down Expand Up @@ -407,7 +488,7 @@ def _send_command_to_servers(self, head, body):
check_call(_LIB.MXKVStoreSendCommmandToServers(
self.handle, mx_uint(head), c_str(body)))

def create(name='local', name2idx=None):
def create(name='local'):
"""Creates a new KVStore.
For single machine training, there are two commonly used types:
Expand Down Expand Up @@ -447,4 +528,4 @@ def create(name='local', name2idx=None):
handle = KVStoreHandle()
check_call(_LIB.MXKVStoreCreate(c_str(name),
ctypes.byref(handle)))
return KVStore(handle, name2idx=name2idx)
return KVStore(handle)
31 changes: 22 additions & 9 deletions python/mxnet/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
'eval_metric',
'locals'])

def _create_kvstore(kvstore, num_device, arg_params, name2idx=None):
def _create_kvstore(kvstore, num_device, arg_params):
"""Create kvstore
This function select and create a proper kvstore if given the kvstore type.
Expand All @@ -61,7 +61,7 @@ def _create_kvstore(kvstore, num_device, arg_params, name2idx=None):
# no need to use kv for single device and single machine
kv = None
else:
kv = kvs.create(kvstore, name2idx=name2idx)
kv = kvs.create(kvstore)
if kvstore == 'local':
# automatically select a proper local
max_size = max(np.prod(param.shape) for param in
Expand All @@ -77,16 +77,21 @@ def _create_kvstore(kvstore, num_device, arg_params, name2idx=None):
return (kv, update_on_kvstore)

def _initialize_kvstore(kvstore, param_arrays, arg_params, param_names,
update_on_kvstore):
update_on_kvstore, sparse_pull_dict=None):
"""Initialize kvstore"""
for idx, param_on_devs in enumerate(param_arrays):
name = param_names[idx]
kvstore.init(name, arg_params[name])

if update_on_kvstore:
kvstore.pull(name, param_on_devs, priority=-idx)
if sparse_pull_dict is not None and name in sparse_pull_dict:
kvstore.row_sparse_pull(name, param_on_devs, priority=-idx,
row_ids=sparse_pull_dict[name])
else:
kvstore.pull(name, param_on_devs, priority=-idx)

def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names):
def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names,
sparse_pull_dict=None):
"""Perform update of param_arrays from grad_arrays on kvstore."""
for index, pair in enumerate(zip(param_arrays, grad_arrays)):
arg_list, grad_list = pair
Expand All @@ -96,10 +101,14 @@ def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names):
# push gradient, priority is negative index
kvstore.push(name, grad_list, priority=-index)
# pull back the weights
kvstore.pull(name, arg_list, priority=-index)
if sparse_pull_dict is not None and name in sparse_pull_dict:
kvstore.row_sparse_pull(name, arg_list, priority=-index,
row_ids=sparse_pull_dict[name])
else:
kvstore.pull(name, arg_list, priority=-index)

def _update_params(param_arrays, grad_arrays, updater, num_device,
kvstore=None, param_names=None):
kvstore=None, param_names=None, sparse_pull_dict=None):
"""Perform update of param_arrays from grad_arrays not on kvstore."""
for i, pair in enumerate(zip(param_arrays, grad_arrays)):
arg_list, grad_list = pair
Expand All @@ -110,8 +119,12 @@ def _update_params(param_arrays, grad_arrays, updater, num_device,
name = param_names[index]
# push gradient, priority is negative index
kvstore.push(name, grad_list, priority=-index)
# pull back the sum gradients, to the same locations.
kvstore.pull(name, grad_list, priority=-index)
if sparse_pull_dict is not None and name in sparse_pull_dict:
kvstore.row_sparse_pull(name, grad_list, priority=-index,
row_ids=sparse_pull_dict[name])
else:
# pull back the sum gradients, to the same locations.
kvstore.pull(name, grad_list, priority=-index)
for k, p in enumerate(zip(arg_list, grad_list)):
# faked an index here, to make optimizer create diff
# state for the same index but on diff devs, TODO(mli)
Expand Down
6 changes: 4 additions & 2 deletions python/mxnet/module/base_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,10 @@ def bind(self, data_shapes, label_shapes=None, for_training=True,
raise NotImplementedError()

def init_optimizer(self, kvstore='local', optimizer='sgd',
optimizer_params=(('learning_rate', 0.01),), force_init=False):
"""Installs and initializes optimizers.
optimizer_params=(('learning_rate', 0.01),), force_init=False,
sparse_pull_dict=None):
"""Installs and initializes optimizers, as well as initialize kvstore for
distributed training
Parameters
----------
Expand Down
Loading

0 comments on commit 6acd61a

Please sign in to comment.