Skip to content

Commit

Permalink
Bull update of coll/han
Browse files Browse the repository at this point in the history
This completes and fixes current code for coll/Han:
- a barrier is added
- a "simple" scatter is added
- some Doxygen documentation is added

Fix:
- compilation and cppcheck warnings
- corner case errors when parsing a rule file

Signed-off-by: Emmanuel Brelle <[email protected]>
  • Loading branch information
EmmanuelBRELLE committed Feb 3, 2021
1 parent 2823d97 commit a1c5295
Show file tree
Hide file tree
Showing 18 changed files with 605 additions and 87 deletions.
1 change: 1 addition & 0 deletions ompi/mca/coll/han/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ coll_han.h \
coll_han_trigger.h \
coll_han_dynamic.h \
coll_han_dynamic_file.h \
coll_han_barrier.c \
coll_han_bcast.c \
coll_han_reduce.c \
coll_han_scatter.c \
Expand Down
44 changes: 41 additions & 3 deletions ompi/mca/coll/han/coll_han.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@
* $HEADER$
*/

/**
* @file
*
* This component provides hierarchical implementations of MPI collectives.
* Hierarchical approach is efficient in case of too many process wanting a remote
* access to the same local or remote resource (high message rate).
* Some components are also better at local scale (for example with shared memory)
* where others provide scalable implementations. Hierarchical implementation
* enable a fallback on other components for intermediary operation.
* For example a MPI_Bcast will be divided into a sequence of bcasts from the
* highest to the lowest topological level.
* Some algorithms introduce more advanced feature (such as noise resiliency)
* some just link topological levels. The last ones are called 'simple'.
* To perform sub-communications, extra communicators are initialised for
* each topological level.
*/


#ifndef MCA_COLL_HAN_EXPORT_H
#define MCA_COLL_HAN_EXPORT_H

Expand Down Expand Up @@ -198,7 +216,7 @@ typedef struct mca_coll_han_component_t {
/* whether we need reproducible results
* (but disables topological optimisations)
*/
uint32_t han_reproducible;
bool han_reproducible;
bool use_simple_algorithm[COLLCOUNT];

/* Dynamic configuration rules */
Expand All @@ -209,12 +227,12 @@ typedef struct mca_coll_han_component_t {
mca_coll_han_dynamic_rules_t dynamic_rules;
/* Dynamic rules from mca parameter */
COMPONENT_T mca_rules[COLLCOUNT][NB_TOPO_LVL];
int topo_level;

/* Define maximum dynamic errors printed by rank 0 with a 0 verbosity level */
int max_dynamic_errors;
} mca_coll_han_component_t;

typedef void (*previous_dummy_fn_t) (void);

/*
* Structure used to store what is necessary for the collective operations
Expand All @@ -225,11 +243,11 @@ typedef struct mca_coll_han_single_collective_fallback_s {
mca_coll_base_module_allgather_fn_t allgather;
mca_coll_base_module_allgatherv_fn_t allgatherv;
mca_coll_base_module_allreduce_fn_t allreduce;
mca_coll_base_module_barrier_fn_t barrier;
mca_coll_base_module_bcast_fn_t bcast;
mca_coll_base_module_gather_fn_t gather;
mca_coll_base_module_reduce_fn_t reduce;
mca_coll_base_module_scatter_fn_t scatter;
previous_dummy_fn_t dummy;
};
mca_coll_base_module_t* module;
} mca_coll_han_single_collective_fallback_t;
Expand All @@ -243,6 +261,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
mca_coll_han_single_collective_fallback_t allgather;
mca_coll_han_single_collective_fallback_t allgatherv;
mca_coll_han_single_collective_fallback_t allreduce;
mca_coll_han_single_collective_fallback_t barrier;
mca_coll_han_single_collective_fallback_t bcast;
mca_coll_han_single_collective_fallback_t reduce;
mca_coll_han_single_collective_fallback_t gather;
Expand All @@ -256,7 +275,9 @@ typedef struct mca_coll_han_module_t {

/* Whether this module has been lazily initialized or not yet */
bool enabled;
int recursive_free_depth;

struct ompi_communicator_t *cached_comm;
struct ompi_communicator_t **cached_low_comms;
struct ompi_communicator_t **cached_up_comms;
int *cached_vranks;
Expand Down Expand Up @@ -305,6 +326,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
#define previous_allreduce fallback.allreduce.allreduce
#define previous_allreduce_module fallback.allreduce.module

#define previous_barrier fallback.barrier.barrier
#define previous_barrier_module fallback.barrier.module

#define previous_bcast fallback.bcast.bcast
#define previous_bcast_module fallback.bcast.module

Expand Down Expand Up @@ -333,6 +357,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
/* macro to correctly load /all/ fallback collectives */
#define HAN_LOAD_FALLBACK_COLLECTIVES(HANM, COMM) \
do { \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, barrier); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
Expand Down Expand Up @@ -404,6 +429,9 @@ int
mca_coll_han_allreduce_intra_dynamic(ALLREDUCE_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_barrier_intra_dynamic(BARRIER_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_bcast_intra_dynamic(BCAST_BASE_ARGS,
mca_coll_base_module_t *module);
int
Expand All @@ -416,6 +444,8 @@ int
mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS,
mca_coll_base_module_t *module);

int mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/* Bcast */
int mca_coll_han_bcast_intra_simple(void *buff,
int count,
Expand Down Expand Up @@ -494,6 +524,14 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int
mca_coll_han_scatter_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t * module);

/* Gather */
int
Expand Down
16 changes: 15 additions & 1 deletion ompi/mca/coll/han/coll_han_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of allgather
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -57,6 +63,10 @@ mca_coll_han_set_allgather_args(mca_coll_han_allgather_t * args,
args->req = req;
}


/**
* Main function for taskified allgather: calls lg task, a gather on low comm
*/
int
mca_coll_han_allgather_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
Expand Down Expand Up @@ -91,7 +101,7 @@ mca_coll_han_allgather_intra(const void *sbuf, int scount,
comm, comm->c_coll->coll_allgather_module);
}

ompi_request_t *temp_request = NULL;
ompi_request_t *temp_request;
/* Set up request */
temp_request = OBJ_NEW(ompi_request_t);
temp_request->req_state = OMPI_REQUEST_ACTIVE;
Expand Down Expand Up @@ -276,6 +286,10 @@ int mca_coll_han_allgather_lb_task(void *task_args)

}

/**
* Short implementation of allgather that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_allgather_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
Expand Down
31 changes: 21 additions & 10 deletions ompi/mca/coll/han/coll_han_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of allreduce
* Only work with regular situation (each node has equal number of processes)
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -172,16 +179,16 @@ mca_coll_han_allreduce_intra(const void *sbuf,
issue_task(t3);

while (t->completed[0] != t->num_segments) {
/* Create t3 tasks for the current segment */
mca_coll_task_t *t3 = OBJ_NEW(mca_coll_task_t);
/* Setup up t3 task arguments */
t->cur_task = t3;
/* Create t_next_seg tasks for the current segment */
mca_coll_task_t *t_next_seg = OBJ_NEW(mca_coll_task_t);
/* Setup up t_next_seg task arguments */
t->cur_task = t_next_seg;
t->sbuf = (char *) t->sbuf + extent * t->seg_count;
t->rbuf = (char *) t->rbuf + extent * t->seg_count;
t->cur_seg = t->cur_seg + 1;
/* Init t3 task */
init_task(t3, mca_coll_han_allreduce_t3_task, (void *) t);
issue_task(t3);
/* Init t_next_seg task */
init_task(t_next_seg, mca_coll_han_allreduce_t3_task, (void *) t);
issue_task(t_next_seg);
}
free(t->completed);
t->completed = NULL;
Expand All @@ -194,7 +201,7 @@ mca_coll_han_allreduce_intra(const void *sbuf,
comm, han_module->previous_allreduce_module);
}

/* t0 task */
/* t0 task that performs a local reduction */
int mca_coll_han_allreduce_t0_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -224,7 +231,7 @@ int mca_coll_han_allreduce_t0_task(void *task_args)
return OMPI_SUCCESS;
}

/* t1 task */
/* t1 task that performs a ireduce on top communicator */
int mca_coll_han_allreduce_t1_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -326,7 +333,7 @@ int mca_coll_han_allreduce_t2_task(void *task_args)
return OMPI_SUCCESS;
}

/* t3 task */
/* t3 task that performs broadcasts */
int mca_coll_han_allreduce_t3_task(void *task_args)
{
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
Expand Down Expand Up @@ -397,6 +404,10 @@ int mca_coll_han_allreduce_t3_task(void *task_args)
return OMPI_SUCCESS;
}

/*
* Short implementation of allreduce that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_allreduce_intra_simple(const void *sbuf,
void *rbuf,
Expand Down
62 changes: 62 additions & 0 deletions ompi/mca/coll/han/coll_han_barrier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2018-2020 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2020 Bull S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of barrier
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"


/**
* Short implementation of barrier that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module;
ompi_communicator_t *low_comm, *up_comm;

/* create the subcommunicators */
if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle barrier with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
* future calls will then be automatically redirected.
*/
HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm);
return comm->c_coll->coll_barrier(comm, comm->c_coll->coll_bcast_module);
}

low_comm = han_module->sub_comm[INTRA_NODE];
up_comm = han_module->sub_comm[INTER_NODE];

int low_rank = ompi_comm_rank(low_comm);
int root_low_rank = 0; /* rank leader will be 0 on each node */

/* TODO: extend coll interface with half barrier */
low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);

if (low_rank == root_low_rank) {
up_comm->c_coll->coll_barrier(up_comm, up_comm->c_coll->coll_barrier_module);
}

low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);

return OMPI_SUCCESS;
}
14 changes: 12 additions & 2 deletions ompi/mca/coll/han/coll_han_bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
* $HEADER$
*/

/**
* @file
*
* This files contains all the hierarchical implementations of bcast
*/

#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
Expand Down Expand Up @@ -71,7 +77,7 @@ mca_coll_han_bcast_intra(void *buff,

/* Create the subcommunicators */
err = mca_coll_han_comm_create(comm, han_module);
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
if( OMPI_SUCCESS != err ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle bcast with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
Expand Down Expand Up @@ -211,6 +217,10 @@ int mca_coll_han_bcast_t1_task(void *task_args)
return OMPI_SUCCESS;
}

/*
* Short implementation of bcast that only does hierarchical
* communications without tasks.
*/
int
mca_coll_han_bcast_intra_simple(void *buff,
int count,
Expand All @@ -229,7 +239,7 @@ mca_coll_han_bcast_intra_simple(void *buff,

/* Create the subcommunicators */
err = mca_coll_han_comm_create_new(comm, han_module);
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
if( OMPI_SUCCESS != err ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle bcast with this communicator. Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
Expand Down
Loading

0 comments on commit a1c5295

Please sign in to comment.