Skip to content

Commit

Permalink
dstore: Added locking by pthread.
Browse files Browse the repository at this point in the history
Added alternative locking by `pthread` with use shared memory region
for the lock. The code path of old locking by `flock` has been saved
for evaluate performance against locking by `pthread` and other purposes.

To change the locking type needs to enable the code paths by macro:
  #define PTHREAD_LOCK_ENABLE  1
  #define FCNTL_LOCK_ENABLE    1
In priority uses the locking by 'pthread'

Thanks to @artpol84 for this proof of concept.
- The  results of locking by `pthread` here: openpmix#260 (comment)
- Code base of concept: https://github.com/artpol84/poc/tree/master/benchmarks/shmem_locking

Signed-off-by: Boris Karasev <[email protected]>
(cherry picked from commit 289e30b)
  • Loading branch information
karasevb authored and artpol84 committed Feb 7, 2017
1 parent 0ff5c94 commit d76dab4
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 10 deletions.
165 changes: 155 additions & 10 deletions src/dstore/pmix_esh.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/file.h>
#include <dirent.h>
#include <errno.h>
Expand All @@ -35,6 +34,14 @@
#include "pmix_dstore.h"
#include "pmix_esh.h"

#ifdef FCNTL_LOCK
#include <fcntl.h>
#endif

#ifdef PTHREAD_LOCK
#include <pthread.h>
#endif

static int _esh_init(pmix_info_t info[], size_t ninfo);
static int _esh_finalize(void);
static int _esh_store(const char *nspace, int rank, pmix_kval_t *kv);
Expand Down Expand Up @@ -119,6 +126,43 @@ __extension__ ({ \
buffer, size); \
})

#ifdef PTHREAD_LOCK
#define _ESH_LOCK(rwlock, operation) \
__extension__ ({ \
pmix_status_t ret = PMIX_SUCCESS; \
int rc; \
switch (operation) { \
case F_WRLCK: \
rc = pthread_rwlock_wrlock(rwlock); \
break; \
case F_RDLCK: \
rc = pthread_rwlock_rdlock(rwlock); \
break; \
case F_UNLCK: \
rc = pthread_rwlock_unlock(rwlock); \
break; \
default: \
rc = PMIX_ERR_BAD_PARAM; \
} \
if (0 != rc) { \
switch (errno) { \
case EINVAL: \
ret = PMIX_ERR_INIT; \
break; \
case EPERM: \
ret = PMIX_ERR_NO_PERMISSIONS; \
break; \
} \
} \
if (ret) { \
pmix_output(0, "%s %d:%s lock failed: %s", \
__FILE__, __LINE__, __func__, strerror(errno)); \
} \
ret; \
})
#endif

#ifdef FCNTL_LOCK
#define _ESH_LOCK(lockfd, operation) \
__extension__ ({ \
pmix_status_t ret = PMIX_SUCCESS; \
Expand Down Expand Up @@ -156,10 +200,11 @@ __extension__ ({ \
} \
ret; \
})
#endif

#define _ESH_WRLOCK(lockfd) _ESH_LOCK(lockfd, F_WRLCK)
#define _ESH_RDLOCK(lockfd) _ESH_LOCK(lockfd, F_RDLCK)
#define _ESH_UNLOCK(lockfd) _ESH_LOCK(lockfd, F_UNLCK)
#define _ESH_WRLOCK(lock) _ESH_LOCK(lock, F_WRLCK)
#define _ESH_RDLOCK(lock) _ESH_LOCK(lock, F_RDLCK)
#define _ESH_UNLOCK(lock) _ESH_LOCK(lock, F_UNLCK)

#define ESH_INIT_SESSION_TBL_SIZE 2
#define ESH_INIT_NS_MAP_TBL_SIZE 2
Expand Down Expand Up @@ -198,6 +243,7 @@ static size_t _max_ns_num;
static size_t _meta_segment_size = 0;
static size_t _max_meta_elems;
static size_t _data_segment_size = 0;
static size_t _lock_segment_size = 0;
static uid_t _jobuid;
static char _setjobuid = 0;

Expand All @@ -210,11 +256,21 @@ ns_map_data_t * (*_esh_session_map_search)(const char *nspace) = NULL;
#define _ESH_SESSION_path(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].nspace_path)
#define _ESH_SESSION_lockfile(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfile)
#define _ESH_SESSION_jobuid(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].jobuid)
#define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd)
#define _ESH_SESSION_sm_seg_first(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_first)
#define _ESH_SESSION_sm_seg_last(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_last)
#define _ESH_SESSION_ns_info(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].ns_info)

#ifdef PTHREAD_LOCK
#define _ESH_SESSION_pthread_rwlock(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock)
#define _ESH_SESSION_pthread_seg(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock_seg)
#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_pthread_rwlock(tbl_idx)
#endif

#ifdef FCNTL_LOCK
#define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd)
#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_lockfd(tbl_idx)
#endif

/* If _direct_mode is set, it means that we use linear search
* along the array of rank meta info objects inside a meta segment
* to find the requested rank. Otherwise, we do a fast lookup
Expand Down Expand Up @@ -252,6 +308,85 @@ static inline void _esh_session_map_clean(ns_map_t *m) {
m->data.track_idx = -1;
}

#ifdef PTHREAD_LOCK
static inline int _rwlock_init(size_t idx, char *lockfile) {
pmix_status_t rc = PMIX_SUCCESS;
size_t size = _lock_segment_size;
pthread_rwlockattr_t attr;

if ((NULL != _ESH_SESSION_pthread_seg(idx)) || (NULL != _ESH_SESSION_pthread_rwlock(idx))) {
rc = PMIX_ERR_INIT;
return rc;
}
_ESH_SESSION_pthread_seg(idx) = (pmix_sm_seg_t *)malloc(sizeof(pmix_sm_seg_t));
if (NULL == _ESH_SESSION_pthread_seg(idx)) {
rc = PMIX_ERR_OUT_OF_RESOURCE;
return rc;
}

if (_is_server()) {
if (PMIX_SUCCESS != (rc = pmix_sm_segment_create(_ESH_SESSION_pthread_seg(idx), lockfile, size))) {
return rc;
}
memset(_ESH_SESSION_pthread_seg(idx)->seg_base_addr, 0, size);
_ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr;

if (0 != pthread_rwlockattr_init(&attr)) {
rc = PMIX_ERR_INIT;
pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx));
return rc;
}
if (0 != pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) {
rc = PMIX_ERR_INIT;
pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx));
pthread_rwlockattr_destroy(&attr);
return rc;
}
if (0 != pthread_rwlock_init(_ESH_SESSION_pthread_rwlock(idx), &attr)) {
rc = PMIX_ERR_INIT;
pmix_sm_segment_detach(_ESH_SESSION_pthread_seg(idx));
pthread_rwlockattr_destroy(&attr);
return rc;
}
if (0 != pthread_rwlockattr_destroy(&attr)) {
rc = PMIX_ERR_INIT;
return rc;
}

}
else {
_ESH_SESSION_pthread_seg(idx)->seg_size = size;
snprintf(_ESH_SESSION_pthread_seg(idx)->seg_name, PMIX_PATH_MAX, "%s", lockfile);
if (PMIX_SUCCESS != (rc = pmix_sm_segment_attach(_ESH_SESSION_pthread_seg(idx), PMIX_SM_RW))) {
return rc;
}
_ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr;
}

return rc;
}

static inline void _rwlock_release(session_t *s) {
pmix_status_t rc;

if (0 != pthread_rwlock_destroy(s->rwlock)) {
rc = PMIX_ERROR;
PMIX_ERROR_LOG(rc);
return;
}

/* detach & unlink from current desc */
if (s->rwlock_seg->seg_cpid == getpid()) {
pmix_sm_segment_unlink(s->rwlock_seg);
}
pmix_sm_segment_detach(s->rwlock_seg);

free(s->rwlock_seg);
s->rwlock_seg = NULL;
s->rwlock = NULL;
}
#endif

static inline const char *_unique_id(void)
{
static const char *str = NULL;
Expand Down Expand Up @@ -656,6 +791,12 @@ static inline int _esh_session_init(size_t idx, ns_map_data_t *m, size_t jobuid,
}
}

#ifdef PTHREAD_LOCK
if ( PMIX_SUCCESS != (rc = _rwlock_init(m->tbl_idx, s->lockfile))) {
PMIX_ERROR_LOG(rc);
return rc;
}
#endif
s->sm_seg_first = seg;
s->sm_seg_last = s->sm_seg_first;
return PMIX_SUCCESS;
Expand All @@ -682,6 +823,9 @@ static inline void _esh_session_release(session_t *s)
}
free(s->nspace_path);
}
#ifdef PTHREAD_LOCK
_rwlock_release(s);
#endif
memset ((char *) s, 0, sizeof(*s));
}

Expand Down Expand Up @@ -885,7 +1029,7 @@ int _esh_store(const char *nspace, int rank, pmix_kval_t *kv)
}

/* set exclusive lock */
if (PMIX_SUCCESS != (rc = _ESH_WRLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) {
if (PMIX_SUCCESS != (rc = _ESH_WRLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) {
PMIX_ERROR_LOG(rc);
return rc;
}
Expand Down Expand Up @@ -950,14 +1094,14 @@ int _esh_store(const char *nspace, int rank, pmix_kval_t *kv)
}

/* unset lock */
if (PMIX_SUCCESS != (rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) {
if (PMIX_SUCCESS != (rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) {
PMIX_ERROR_LOG(rc);
}
return rc;

err_exit:
/* unset lock */
if (PMIX_SUCCESS != (tmp_rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) {
if (PMIX_SUCCESS != (tmp_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) {
PMIX_ERROR_LOG(tmp_rc);
}
return rc;
Expand Down Expand Up @@ -1018,7 +1162,7 @@ int _esh_fetch(const char *nspace, int rank, const char *key, pmix_value_t **kvs
}

/* grab shared lock */
if (PMIX_SUCCESS != (lock_rc = _ESH_RDLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) {
if (PMIX_SUCCESS != (lock_rc = _ESH_RDLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) {
/* Something wrong with the lock. The error is fatal */
rc = PMIX_ERROR;
PMIX_ERROR_LOG(lock_rc);
Expand Down Expand Up @@ -1177,7 +1321,7 @@ int _esh_fetch(const char *nspace, int rank, const char *key, pmix_value_t **kvs

done:
/* unset lock */
if (PMIX_SUCCESS != (lock_rc = _ESH_UNLOCK(_ESH_SESSION_lockfd(ns_map->tbl_idx)))) {
if (PMIX_SUCCESS != (lock_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) {
PMIX_ERROR_LOG(lock_rc);
}

Expand Down Expand Up @@ -1377,6 +1521,7 @@ static void _set_constants_from_env()
}
}

_lock_segment_size = page_size;
_max_ns_num = (_initial_segment_size - sizeof(size_t) * 2) / sizeof(ns_seg_info_t);
_max_meta_elems = (_meta_segment_size - sizeof(size_t)) / sizeof(rank_meta_info);

Expand Down
22 changes: 22 additions & 0 deletions src/dstore/pmix_esh.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ BEGIN_C_DECLS

#define PMIX_DSTORE_ESH_BASE_PATH "PMIX_DSTORE_ESH_BASE_PATH"

#define PTHREAD_LOCK_ENABLE 1
#define FCNTL_LOCK_ENABLE 0

#if defined(PTHREAD_LOCK_ENABLE) || defined(FCNTL_LOCK_ENABLE)
#if (PTHREAD_LOCK_ENABLE == 1)
#undef FCNTL_LOCK_ENABLE
#define PTHREAD_LOCK
#else
#undef PTHREAD_LOCK_ENABLE
#define FCNTL_LOCK
#endif
#elif !(defined(PTHREAD_LOCK_ENABLE) || defined(FCNTL_LOCK_ENABLE))
#error not enabled any one type of locking
#else
#define PTHREAD_LOCK
#endif

/* this structs are used to store information about
* shared segments addresses locally at each process,
* so they are common for different types of segments
Expand All @@ -47,12 +64,17 @@ struct seg_desc_t {
typedef struct ns_map_data_s ns_map_data_t;
typedef struct session_s session_t;
typedef struct ns_map_s ns_map_t;
typedef struct rwlock_map_s rwlock_map_t;

struct session_s {
int in_use;
uid_t jobuid;
char *nspace_path;
char *lockfile;
#ifdef PTHREAD_LOCK
pmix_sm_seg_t *rwlock_seg;
pthread_rwlock_t *rwlock;
#endif
int lockfd;
seg_desc_t *sm_seg_first;
seg_desc_t *sm_seg_last;
Expand Down

0 comments on commit d76dab4

Please sign in to comment.