From d76dab48edb6020e622ca5e7f5c651789031d92f Mon Sep 17 00:00:00 2001 From: Boris Karasev Date: Thu, 26 Jan 2017 19:54:18 +0600 Subject: [PATCH] dstore: Added locking by pthread. Added alternative locking by `pthread` with use shared memory region for the lock. The code path of old locking by `flock` has been saved for evaluate performance against locking by `pthread` and other purposes. To change the locking type needs to enable the code paths by macro: #define PTHREAD_LOCK_ENABLE 1 #define FCNTL_LOCK_ENABLE 1 In priority uses the locking by 'pthread' Thanks to @artpol84 for this proof of concept. - The results of locking by `pthread` here: https://github.com/pmix/master/pull/260#issuecomment-273945190 - Code base of concept: https://github.com/artpol84/poc/tree/master/benchmarks/shmem_locking Signed-off-by: Boris Karasev (cherry picked from commit 289e30bf06ce05b1e1ee1732c7be9019d7a6cb24) --- src/dstore/pmix_esh.c | 165 +++++++++++++++++++++++++++++++++++++++--- src/dstore/pmix_esh.h | 22 ++++++ 2 files changed, 177 insertions(+), 10 deletions(-) diff --git a/src/dstore/pmix_esh.c b/src/dstore/pmix_esh.c index 191485cce0..4d513de1b3 100644 --- a/src/dstore/pmix_esh.c +++ b/src/dstore/pmix_esh.c @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -35,6 +34,14 @@ #include "pmix_dstore.h" #include "pmix_esh.h" +#ifdef FCNTL_LOCK +#include +#endif + +#ifdef PTHREAD_LOCK +#include +#endif + static int _esh_init(pmix_info_t info[], size_t ninfo); static int _esh_finalize(void); static int _esh_store(const char *nspace, int rank, pmix_kval_t *kv); @@ -119,6 +126,43 @@ __extension__ ({ \ buffer, size); \ }) +#ifdef PTHREAD_LOCK +#define _ESH_LOCK(rwlock, operation) \ +__extension__ ({ \ + pmix_status_t ret = PMIX_SUCCESS; \ + int rc; \ + switch (operation) { \ + case F_WRLCK: \ + rc = pthread_rwlock_wrlock(rwlock); \ + break; \ + case F_RDLCK: \ + rc = pthread_rwlock_rdlock(rwlock); \ + break; \ + case F_UNLCK: \ + rc = pthread_rwlock_unlock(rwlock); \ + break; \ + default: \ + rc = PMIX_ERR_BAD_PARAM; \ + } \ + if (0 != rc) { \ + switch (errno) { \ + case EINVAL: \ + ret = PMIX_ERR_INIT; \ + break; \ + case EPERM: \ + ret = PMIX_ERR_NO_PERMISSIONS; \ + break; \ + } \ + } \ + if (ret) { \ + pmix_output(0, "%s %d:%s lock failed: %s", \ + __FILE__, __LINE__, __func__, strerror(errno)); \ + } \ + ret; \ +}) +#endif + +#ifdef FCNTL_LOCK #define _ESH_LOCK(lockfd, operation) \ __extension__ ({ \ pmix_status_t ret = PMIX_SUCCESS; \ @@ -156,10 +200,11 @@ __extension__ ({ \ } \ ret; \ }) +#endif -#define _ESH_WRLOCK(lockfd) _ESH_LOCK(lockfd, F_WRLCK) -#define _ESH_RDLOCK(lockfd) _ESH_LOCK(lockfd, F_RDLCK) -#define _ESH_UNLOCK(lockfd) _ESH_LOCK(lockfd, F_UNLCK) +#define _ESH_WRLOCK(lock) _ESH_LOCK(lock, F_WRLCK) +#define _ESH_RDLOCK(lock) _ESH_LOCK(lock, F_RDLCK) +#define _ESH_UNLOCK(lock) _ESH_LOCK(lock, F_UNLCK) #define ESH_INIT_SESSION_TBL_SIZE 2 #define ESH_INIT_NS_MAP_TBL_SIZE 2 @@ -198,6 +243,7 @@ static size_t _max_ns_num; static size_t _meta_segment_size = 0; static size_t _max_meta_elems; static size_t _data_segment_size = 0; +static size_t _lock_segment_size = 0; static uid_t _jobuid; static char _setjobuid = 0; @@ -210,11 +256,21 @@ ns_map_data_t * (*_esh_session_map_search)(const char *nspace) = NULL; #define _ESH_SESSION_path(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].nspace_path) #define _ESH_SESSION_lockfile(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfile) #define _ESH_SESSION_jobuid(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].jobuid) -#define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd) #define _ESH_SESSION_sm_seg_first(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_first) #define _ESH_SESSION_sm_seg_last(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_last) #define _ESH_SESSION_ns_info(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].ns_info) +#ifdef PTHREAD_LOCK +#define _ESH_SESSION_pthread_rwlock(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock) +#define _ESH_SESSION_pthread_seg(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock_seg) +#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_pthread_rwlock(tbl_idx) +#endif + +#ifdef FCNTL_LOCK +#define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd) +#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_lockfd(tbl_idx) +#endif + /* If _direct_mode is set, it means that we use linear search * along the array of rank meta info objects inside a meta segment * to find the requested rank. Otherwise, we do a fast lookup @@ -252,6 +308,85 @@ static inline void _esh_session_map_clean(ns_map_t *m) { m->data.track_idx = -1; } +#ifdef PTHREAD_LOCK +static inline int _rwlock_init(size_t idx, char *lockfile) { + pmix_status_t rc = PMIX_SUCCESS; + size_t size = _lock_segment_size; + pthread_rwlockattr_t attr; + + if ((NULL != _ESH_SESSION_pthread_seg(idx)) || (NULL != _ESH_SESSION_pthread_rwlock(idx))) { + rc = PMIX_ERR_INIT; + return rc; + } + _ESH_SESSION_pthread_seg(idx) = (pmix_sm_seg_t *)malloc(sizeof(pmix_sm_seg_t)); + if (NULL == _ESH_SESSION_pthread_seg(idx)) { + rc = PMIX_ERR_OUT_OF_RESOURCE; + return rc; + } + + if (_is_server()) { + if (PMIX_SUCCESS != (rc = pmix_sm_segment_create(_ESH_SESSION_pthread_seg(idx), lockfile, size))) { + return rc; + } + memset(_ESH_SESSION_pthread_seg(idx)->seg_base_addr, 0, size); + _ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr; + + if (0 != pthread_rwlockattr_init(&attr)) { + rc = PMIX_ERR_INIT; + pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx)); + return rc; + } + if (0 != pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) { + rc = PMIX_ERR_INIT; + pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx)); + pthread_rwlockattr_destroy(&attr); + return rc; + } + if (0 != pthread_rwlock_init(_ESH_SESSION_pthread_rwlock(idx), &attr)) { + rc = PMIX_ERR_INIT; + pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx)); + pthread_rwlockattr_destroy(&attr); + return rc; + } + if (0 != pthread_rwlockattr_destroy(&attr)) { + rc = PMIX_ERR_INIT; + return rc; + } + + } + else { + _ESH_SESSION_pthread_seg(idx)->seg_size = size; + snprintf(_ESH_SESSION_pthread_seg(idx)->seg_name, PMIX_PATH_MAX, "%s", lockfile); + if (PMIX_SUCCESS != (rc = pmix_sm_segment_attach(_ESH_SESSION_pthread_seg(idx), PMIX_SM_RW))) { + return rc; + } + _ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr; + } + + return rc; +} + +static inline void _rwlock_release(session_t *s) { + pmix_status_t rc; + + if (0 != pthread_rwlock_destroy(s->rwlock)) { + rc = PMIX_ERROR; + PMIX_ERROR_LOG(rc); + return; + } + + /* detach & unlink from current desc */ + if (s->rwlock_seg->seg_cpid == getpid()) { + pmix_sm_segment_unlink(s->rwlock_seg); + } + pmix_sm_segment_detach(s->rwlock_seg); + + free(s->rwlock_seg); + s->rwlock_seg = NULL; + s->rwlock = NULL; +} +#endif + static inline const char *_unique_id(void) { static const char *str = NULL; @@ -656,6 +791,12 @@ static inline int _esh_session_init(size_t idx, ns_map_data_t *m, size_t jobuid, } } +#ifdef PTHREAD_LOCK + if ( PMIX_SUCCESS != (rc = _rwlock_init(m->tbl_idx, s->lockfile))) { + PMIX_ERROR_LOG(rc); + return rc; + } +#endif s->sm_seg_first = seg; s->sm_seg_last = s->sm_seg_first; return PMIX_SUCCESS; @@ -682,6 +823,9 @@ static inline void _esh_session_release(session_t *s) } free(s->nspace_path); } +#ifdef PTHREAD_LOCK + _rwlock_release(s); +#endif memset ((char *) s, 0, sizeof(*s)); } @@ -885,7 +1029,7 @@ int _esh_store(const char *nspace, int rank, pmix_kval_t *kv) } /* set exclusive lock */ - if (PMIX_SUCCESS != (rc = _ESH_WRLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) { + if (PMIX_SUCCESS != (rc = _ESH_WRLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { PMIX_ERROR_LOG(rc); return rc; } @@ -950,14 +1094,14 @@ int _esh_store(const char *nspace, int rank, pmix_kval_t *kv) } /* unset lock */ - if (PMIX_SUCCESS != (rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) { + if (PMIX_SUCCESS != (rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { PMIX_ERROR_LOG(rc); } return rc; err_exit: /* unset lock */ - if (PMIX_SUCCESS != (tmp_rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) { + if (PMIX_SUCCESS != (tmp_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { PMIX_ERROR_LOG(tmp_rc); } return rc; @@ -1018,7 +1162,7 @@ int _esh_fetch(const char *nspace, int rank, const char *key, pmix_value_t **kvs } /* grab shared lock */ - if (PMIX_SUCCESS != (lock_rc = _ESH_RDLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) { + if (PMIX_SUCCESS != (lock_rc = _ESH_RDLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { /* Something wrong with the lock. The error is fatal */ rc = PMIX_ERROR; PMIX_ERROR_LOG(lock_rc); @@ -1177,7 +1321,7 @@ int _esh_fetch(const char *nspace, int rank, const char *key, pmix_value_t **kvs done: /* unset lock */ - if (PMIX_SUCCESS != (lock_rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) { + if (PMIX_SUCCESS != (lock_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { PMIX_ERROR_LOG(lock_rc); } @@ -1377,6 +1521,7 @@ static void _set_constants_from_env() } } + _lock_segment_size = page_size; _max_ns_num = (_initial_segment_size - sizeof(size_t) * 2) / sizeof(ns_seg_info_t); _max_meta_elems = (_meta_segment_size - sizeof(size_t)) / sizeof(rank_meta_info); diff --git a/src/dstore/pmix_esh.h b/src/dstore/pmix_esh.h index 47ad97103c..9d6e410a85 100644 --- a/src/dstore/pmix_esh.h +++ b/src/dstore/pmix_esh.h @@ -24,6 +24,23 @@ BEGIN_C_DECLS #define PMIX_DSTORE_ESH_BASE_PATH "PMIX_DSTORE_ESH_BASE_PATH" +#define PTHREAD_LOCK_ENABLE 1 +#define FCNTL_LOCK_ENABLE 0 + +#if defined(PTHREAD_LOCK_ENABLE) || defined(FCNTL_LOCK_ENABLE) +#if (PTHREAD_LOCK_ENABLE == 1) +#undef FCNTL_LOCK_ENABLE +#define PTHREAD_LOCK +#else +#undef PTHREAD_LOCK_ENABLE +#define FCNTL_LOCK +#endif +#elif !(defined(PTHREAD_LOCK_ENABLE) || defined(FCNTL_LOCK_ENABLE)) +#error not enabled any one type of locking +#else +#define PTHREAD_LOCK +#endif + /* this structs are used to store information about * shared segments addresses locally at each process, * so they are common for different types of segments @@ -47,12 +64,17 @@ struct seg_desc_t { typedef struct ns_map_data_s ns_map_data_t; typedef struct session_s session_t; typedef struct ns_map_s ns_map_t; +typedef struct rwlock_map_s rwlock_map_t; struct session_s { int in_use; uid_t jobuid; char *nspace_path; char *lockfile; +#ifdef PTHREAD_LOCK + pmix_sm_seg_t *rwlock_seg; + pthread_rwlock_t *rwlock; +#endif int lockfd; seg_desc_t *sm_seg_first; seg_desc_t *sm_seg_last;