Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master: Bull 2020 update of coll/han #8462

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ompi/mca/coll/han/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ coll_han.h \
coll_han_trigger.h \
coll_han_dynamic.h \
coll_han_dynamic_file.h \
coll_han_barrier.c \
coll_han_bcast.c \
coll_han_reduce.c \
coll_han_scatter.c \
Expand Down
43 changes: 40 additions & 3 deletions ompi/mca/coll/han/coll_han.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@
* $HEADER$
*/

/**
* @file
*
* This component provides hierarchical implementations of MPI collectives.
* Hierarchical approach is efficient in case of too many process wanting a remote
* access to the same local or remote resource (high message rate).
* Some components are also better at local scale (for example with shared memory)
* where others provide scalable implementations. Hierarchical implementation
* enable a fallback on other components for intermediary operation.
* For example a MPI_Bcast will be divided into a sequence of bcasts from the
* highest to the lowest topological level.
* Some algorithms introduce more advanced feature (such as noise resiliency)
* some just link topological levels. The last ones are called 'simple'.
* To perform sub-communications, extra communicators are initialised for
* each topological level.
*/


#ifndef MCA_COLL_HAN_EXPORT_H
#define MCA_COLL_HAN_EXPORT_H

Expand Down Expand Up @@ -198,7 +216,7 @@ typedef struct mca_coll_han_component_t {
/* whether we need reproducible results
* (but disables topological optimisations)
*/
uint32_t han_reproducible;
bool han_reproducible;
bool use_simple_algorithm[COLLCOUNT];

/* Dynamic configuration rules */
Expand All @@ -214,7 +232,6 @@ typedef struct mca_coll_han_component_t {
int max_dynamic_errors;
} mca_coll_han_component_t;

typedef void (*previous_dummy_fn_t) (void);

/*
* Structure used to store what is necessary for the collective operations
Expand All @@ -225,11 +242,11 @@ typedef struct mca_coll_han_single_collective_fallback_s {
mca_coll_base_module_allgather_fn_t allgather;
mca_coll_base_module_allgatherv_fn_t allgatherv;
mca_coll_base_module_allreduce_fn_t allreduce;
mca_coll_base_module_barrier_fn_t barrier;
mca_coll_base_module_bcast_fn_t bcast;
mca_coll_base_module_gather_fn_t gather;
mca_coll_base_module_reduce_fn_t reduce;
mca_coll_base_module_scatter_fn_t scatter;
previous_dummy_fn_t dummy;
};
mca_coll_base_module_t* module;
} mca_coll_han_single_collective_fallback_t;
Expand All @@ -243,6 +260,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
mca_coll_han_single_collective_fallback_t allgather;
mca_coll_han_single_collective_fallback_t allgatherv;
mca_coll_han_single_collective_fallback_t allreduce;
mca_coll_han_single_collective_fallback_t barrier;
mca_coll_han_single_collective_fallback_t bcast;
mca_coll_han_single_collective_fallback_t reduce;
mca_coll_han_single_collective_fallback_t gather;
Expand All @@ -256,7 +274,9 @@ typedef struct mca_coll_han_module_t {

/* Whether this module has been lazily initialized or not yet */
bool enabled;
int recursive_free_depth;

struct ompi_communicator_t *cached_comm;
struct ompi_communicator_t **cached_low_comms;
struct ompi_communicator_t **cached_up_comms;
int *cached_vranks;
Expand Down Expand Up @@ -305,6 +325,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
#define previous_allreduce fallback.allreduce.allreduce
#define previous_allreduce_module fallback.allreduce.module

#define previous_barrier fallback.barrier.barrier
#define previous_barrier_module fallback.barrier.module

#define previous_bcast fallback.bcast.bcast
#define previous_bcast_module fallback.bcast.module

Expand Down Expand Up @@ -333,6 +356,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
/* macro to correctly load /all/ fallback collectives */
#define HAN_LOAD_FALLBACK_COLLECTIVES(HANM, COMM) \
do { \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, barrier); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
Expand Down Expand Up @@ -404,6 +428,9 @@ int
mca_coll_han_allreduce_intra_dynamic(ALLREDUCE_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_barrier_intra_dynamic(BARRIER_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_bcast_intra_dynamic(BCAST_BASE_ARGS,
mca_coll_base_module_t *module);
int
Expand All @@ -416,6 +443,8 @@ int
mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS,
mca_coll_base_module_t *module);

int mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/* Bcast */
int mca_coll_han_bcast_intra_simple(void *buff,
int count,
Expand Down Expand Up @@ -494,6 +523,14 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int
mca_coll_han_scatter_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t * module);

/* Gather */
int
Expand Down
16 changes: 15 additions & 1 deletion ompi/mca/coll/han/coll_han_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of allgather
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -57,6 +63,10 @@ mca_coll_han_set_allgather_args(mca_coll_han_allgather_t * args,
args->req = req;
}


/**
* Main function for taskified allgather: calls lg task, a gather on low comm
*/
int
mca_coll_han_allgather_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
Expand Down Expand Up @@ -91,7 +101,7 @@ mca_coll_han_allgather_intra(const void *sbuf, int scount,
comm, comm->c_coll->coll_allgather_module);
}

ompi_request_t *temp_request = NULL;
ompi_request_t *temp_request;
/* Set up request */
temp_request = OBJ_NEW(ompi_request_t);
temp_request->req_state = OMPI_REQUEST_ACTIVE;
Expand Down Expand Up @@ -276,6 +286,10 @@ int mca_coll_han_allgather_lb_task(void *task_args)

}

/**
* Short implementation of allgather that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_allgather_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
Expand Down
31 changes: 21 additions & 10 deletions ompi/mca/coll/han/coll_han_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of allreduce
* Only work with regular situation (each node has equal number of processes)
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -172,16 +179,16 @@ mca_coll_han_allreduce_intra(const void *sbuf,
issue_task(t3);

while (t->completed[0] != t->num_segments) {
/* Create t3 tasks for the current segment */
mca_coll_task_t *t3 = OBJ_NEW(mca_coll_task_t);
/* Setup up t3 task arguments */
t->cur_task = t3;
/* Create t_next_seg tasks for the current segment */
mca_coll_task_t *t_next_seg = OBJ_NEW(mca_coll_task_t);
/* Setup up t_next_seg task arguments */
t->cur_task = t_next_seg;
t->sbuf = (char *) t->sbuf + extent * t->seg_count;
t->rbuf = (char *) t->rbuf + extent * t->seg_count;
t->cur_seg = t->cur_seg + 1;
/* Init t3 task */
init_task(t3, mca_coll_han_allreduce_t3_task, (void *) t);
issue_task(t3);
/* Init t_next_seg task */
init_task(t_next_seg, mca_coll_han_allreduce_t3_task, (void *) t);
issue_task(t_next_seg);
}
free(t->completed);
t->completed = NULL;
Expand All @@ -194,7 +201,7 @@ mca_coll_han_allreduce_intra(const void *sbuf,
comm, han_module->previous_allreduce_module);
}

/* t0 task */
/* t0 task that performs a local reduction */
int mca_coll_han_allreduce_t0_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -224,7 +231,7 @@ int mca_coll_han_allreduce_t0_task(void *task_args)
return OMPI_SUCCESS;
}

/* t1 task */
/* t1 task that performs a ireduce on top communicator */
int mca_coll_han_allreduce_t1_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -326,7 +333,7 @@ int mca_coll_han_allreduce_t2_task(void *task_args)
return OMPI_SUCCESS;
}

/* t3 task */
/* t3 task that performs broadcasts */
int mca_coll_han_allreduce_t3_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -397,6 +404,10 @@ int mca_coll_han_allreduce_t3_task(void *task_args)
return OMPI_SUCCESS;
}

/*
* Short implementation of allreduce that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_allreduce_intra_simple(const void *sbuf,
void *rbuf,
Expand Down
62 changes: 62 additions & 0 deletions ompi/mca/coll/han/coll_han_barrier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2018-2020 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2020 Bull S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of barrier
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"


/**
* Short implementation of barrier that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module;
ompi_communicator_t *low_comm, *up_comm;

/* create the subcommunicators */
if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle barrier with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
* future calls will then be automatically redirected.
*/
HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm);
return comm->c_coll->coll_barrier(comm, comm->c_coll->coll_bcast_module);
}

low_comm = han_module->sub_comm[INTRA_NODE];
up_comm = han_module->sub_comm[INTER_NODE];

int low_rank = ompi_comm_rank(low_comm);
int root_low_rank = 0; /* rank leader will be 0 on each node */

/* TODO: extend coll interface with half barrier */
low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);

if (low_rank == root_low_rank) {
up_comm->c_coll->coll_barrier(up_comm, up_comm->c_coll->coll_barrier_module);
}

low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);

return OMPI_SUCCESS;
}
14 changes: 12 additions & 2 deletions ompi/mca/coll/han/coll_han_bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of bcast
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -71,7 +77,7 @@ mca_coll_han_bcast_intra(void *buff,

/* Create the subcommunicators */
err = mca_coll_han_comm_create(comm, han_module);
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
if( OMPI_SUCCESS != err ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle bcast with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
Expand Down Expand Up @@ -211,6 +217,10 @@ int mca_coll_han_bcast_t1_task(void *task_args)
return OMPI_SUCCESS;
}

/*
* Short implementation of bcast that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_bcast_intra_simple(void *buff,
int count,
Expand All @@ -229,7 +239,7 @@ mca_coll_han_bcast_intra_simple(void *buff,

/* Create the subcommunicators */
err = mca_coll_han_comm_create_new(comm, han_module);
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
if( OMPI_SUCCESS != err ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle bcast with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
Expand Down
Loading