Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ch4/posix: decrease shm_limit_counter when freeing comm obj #4864

Merged
merged 3 commits into from
Nov 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mpi/coll/src/coll_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- name : MPIR_CVAR_COLLECTIVE_FALLBACK
category : COLLECTIVE
type : enum
default : error
default : silent
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
Expand Down
6 changes: 6 additions & 0 deletions src/mpid/ch4/shm/posix/posix_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ int MPIDI_POSIX_coll_init(int rank, int size)
int MPIDI_POSIX_coll_finalize(void)
{
int mpi_errno = MPI_SUCCESS;
static MPL_atomic_uint64_t MPIDI_POSIX_dummy_shm_limit_counter;

MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_POSIX_COLL_FINALIZE);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_POSIX_COLL_FINALIZE);
Expand All @@ -265,6 +266,11 @@ int MPIDI_POSIX_coll_finalize(void)
* per node for intra-node collectives */
mpi_errno = MPIDU_Init_shm_free(MPIDI_POSIX_global.shm_ptr);

/* MPIDI_POSIX_global.shm_ptr is freed but will be referenced during builtin
* comm free; here we set MPIDI_POSIX_shm_limit_counter as dummy counter to
* avoid segmentation fault */
MPIDI_POSIX_shm_limit_counter = &MPIDI_POSIX_dummy_shm_limit_counter;

if (MPIDI_global.shm.posix.csel_root) {
mpi_errno = MPIR_Csel_free(MPIDI_global.shm.posix.csel_root);
MPIR_ERR_CHECK(mpi_errno);
Expand Down
29 changes: 27 additions & 2 deletions src/mpid/ch4/shm/posix/release_gather/release_gather.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
MPIR_Bcast_impl(&fallback, 1, MPI_INT, 0, comm_ptr, &errflag);
MPIR_ERR_SETANDJUMP(mpi_errno_ret, MPI_ERR_NO_MEM, "**nomem");
} else {
/* More shm can be created, update the shared counter */
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter, memory_to_be_allocated);
fallback = 0;
mpi_errno = MPIR_Bcast_impl(&fallback, 1, MPI_INT, 0, comm_ptr, &errflag);
if (mpi_errno) {
Expand Down Expand Up @@ -376,6 +374,10 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
release_gather_info_ptr->reduce_buf_addr = NULL;
release_gather_info_ptr->child_reduce_buf_addr = NULL;

RELEASE_GATHER_FIELD(comm_ptr, flags_shm_size) = flags_shm_size;
/* update the shared counter */
if (rank == 0)
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter, flags_shm_size);
mpi_errno =
MPIDU_shm_alloc(comm_ptr, flags_shm_size,
(void **) &(release_gather_info_ptr->flags_addr), &mapfail_flag);
Expand Down Expand Up @@ -411,6 +413,12 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
}

if (initialize_bcast_buf) {
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size) =
MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
if (rank == 0)
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size));

/* Allocate the shared memory for bcast buffer */
mpi_errno =
MPIDU_shm_alloc(comm_ptr, MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE,
Expand All @@ -432,6 +440,12 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
RELEASE_GATHER_FIELD(comm_ptr, child_reduce_buf_addr) =
MPL_malloc(num_ranks * sizeof(void *), MPL_MEM_COLL);

RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size) =
MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
if (rank == 0)
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size));

mpi_errno =
MPIDU_shm_alloc(comm_ptr, num_ranks * MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE,
(void **) &(RELEASE_GATHER_FIELD(comm_ptr, reduce_buf_addr)),
Expand Down Expand Up @@ -480,6 +494,11 @@ int MPIDI_POSIX_mpi_release_gather_comm_free(MPIR_Comm * comm_ptr)
goto fn_exit;
}

/* decrease shm memory limit counter */
if (comm_ptr->rank == 0)
MPL_atomic_fetch_sub_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, flags_shm_size));

/* destroy and detach shared memory used for flags */
mpi_errno = MPIDU_shm_free(RELEASE_GATHER_FIELD(comm_ptr, flags_addr));
if (mpi_errno) {
Expand All @@ -492,6 +511,9 @@ int MPIDI_POSIX_mpi_release_gather_comm_free(MPIR_Comm * comm_ptr)
}

if (RELEASE_GATHER_FIELD(comm_ptr, bcast_buf_addr) != NULL) {
if (comm_ptr->rank == 0)
MPL_atomic_fetch_sub_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size));
/* destroy and detach shared memory used for bcast buffer */
mpi_errno = MPIDU_shm_free(RELEASE_GATHER_FIELD(comm_ptr, bcast_buf_addr));
if (mpi_errno) {
Expand All @@ -505,6 +527,9 @@ int MPIDI_POSIX_mpi_release_gather_comm_free(MPIR_Comm * comm_ptr)
}

if (RELEASE_GATHER_FIELD(comm_ptr, reduce_buf_addr) != NULL) {
if (comm_ptr->rank == 0)
MPL_atomic_fetch_sub_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size));
/* destroy and detach shared memory used for reduce buffers */
mpi_errno = MPIDU_shm_free(RELEASE_GATHER_FIELD(comm_ptr, reduce_buf_addr));
if (mpi_errno) {
Expand Down
5 changes: 4 additions & 1 deletion src/mpid/ch4/shm/posix/release_gather/release_gather_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ typedef struct MPIDI_POSIX_release_gather_comm_t {
int num_collective_calls;

MPIR_Treealgo_tree_t bcast_tree, reduce_tree;
int flags_shm_size;
/* shm mem allocated to this comm */
uint64_t flags_shm_size;
uint64_t bcast_shm_size;
uint64_t reduce_shm_size;
uint64_t gather_state, release_state;

void *flags_addr, *bcast_buf_addr, *reduce_buf_addr;
Expand Down