From 508e283e902ea3046e504d9517bb1f12e664168e Mon Sep 17 00:00:00 2001 From: Brelle Emmanuel Date: Tue, 2 Feb 2021 13:53:46 +0100 Subject: [PATCH] Bull update of coll/han This completes and fixes current code for coll/Han: - a barrier is added - a "simple" scatter is added - some Doxygen documentation is added Fix: - compilation and cppcheck warnings - corner case errors when parsing a rule file Signed-off-by: Emmanuel Brelle (cherry picked from commit ca663de22cc96d560716ef35ffd149bc9dbad18f) --- ompi/mca/coll/han/Makefile.am | 1 + ompi/mca/coll/han/coll_han.h | 43 +++++- ompi/mca/coll/han/coll_han_allgather.c | 16 +- ompi/mca/coll/han/coll_han_allreduce.c | 31 ++-- ompi/mca/coll/han/coll_han_barrier.c | 62 ++++++++ ompi/mca/coll/han/coll_han_bcast.c | 14 +- ompi/mca/coll/han/coll_han_component.c | 26 +++- ompi/mca/coll/han/coll_han_dynamic.c | 147 +++++++++++++++--- ompi/mca/coll/han/coll_han_dynamic.h | 1 + ompi/mca/coll/han/coll_han_dynamic_file.c | 26 +++- ompi/mca/coll/han/coll_han_dynamic_file.h | 2 + ompi/mca/coll/han/coll_han_gather.c | 32 ++-- ompi/mca/coll/han/coll_han_module.c | 26 +++- ompi/mca/coll/han/coll_han_reduce.c | 21 ++- ompi/mca/coll/han/coll_han_scatter.c | 180 ++++++++++++++++++++-- ompi/mca/coll/han/coll_han_subcomms.c | 2 +- ompi/mca/coll/han/coll_han_topo.c | 33 ++-- ompi/mca/coll/han/coll_han_trigger.h | 9 ++ 18 files changed, 574 insertions(+), 98 deletions(-) create mode 100644 ompi/mca/coll/han/coll_han_barrier.c diff --git a/ompi/mca/coll/han/Makefile.am b/ompi/mca/coll/han/Makefile.am index 61b40d97c51..78136d73090 100644 --- a/ompi/mca/coll/han/Makefile.am +++ b/ompi/mca/coll/han/Makefile.am @@ -14,6 +14,7 @@ coll_han.h \ coll_han_trigger.h \ coll_han_dynamic.h \ coll_han_dynamic_file.h \ +coll_han_barrier.c \ coll_han_bcast.c \ coll_han_reduce.c \ coll_han_scatter.c \ diff --git a/ompi/mca/coll/han/coll_han.h b/ompi/mca/coll/han/coll_han.h index 16efcbe8e5a..7880021978e 100644 --- a/ompi/mca/coll/han/coll_han.h +++ b/ompi/mca/coll/han/coll_han.h @@ -10,6 +10,24 @@ * $HEADER$ */ +/** + * @file + * + * This component provides hierarchical implementations of MPI collectives. + * Hierarchical approach is efficient in case of too many process wanting a remote + * access to the same local or remote resource (high message rate). + * Some components are also better at local scale (for example with shared memory) + * where others provide scalable implementations. Hierarchical implementation + * enable a fallback on other components for intermediary operation. + * For example a MPI_Bcast will be divided into a sequence of bcasts from the + * highest to the lowest topological level. + * Some algorithms introduce more advanced feature (such as noise resiliency) + * some just link topological levels. The last ones are called 'simple'. + * To perform sub-communications, extra communicators are initialised for + * each topological level. + */ + + #ifndef MCA_COLL_HAN_EXPORT_H #define MCA_COLL_HAN_EXPORT_H @@ -198,7 +216,7 @@ typedef struct mca_coll_han_component_t { /* whether we need reproducible results * (but disables topological optimisations) */ - uint32_t han_reproducible; + bool han_reproducible; bool use_simple_algorithm[COLLCOUNT]; /* Dynamic configuration rules */ @@ -214,7 +232,6 @@ typedef struct mca_coll_han_component_t { int max_dynamic_errors; } mca_coll_han_component_t; -typedef void (*previous_dummy_fn_t) (void); /* * Structure used to store what is necessary for the collective operations @@ -225,11 +242,11 @@ typedef struct mca_coll_han_single_collective_fallback_s { mca_coll_base_module_allgather_fn_t allgather; mca_coll_base_module_allgatherv_fn_t allgatherv; mca_coll_base_module_allreduce_fn_t allreduce; + mca_coll_base_module_barrier_fn_t barrier; mca_coll_base_module_bcast_fn_t bcast; mca_coll_base_module_gather_fn_t gather; mca_coll_base_module_reduce_fn_t reduce; mca_coll_base_module_scatter_fn_t scatter; - previous_dummy_fn_t dummy; }; mca_coll_base_module_t* module; } mca_coll_han_single_collective_fallback_t; @@ -243,6 +260,7 @@ typedef struct mca_coll_han_collectives_fallback_s { mca_coll_han_single_collective_fallback_t allgather; mca_coll_han_single_collective_fallback_t allgatherv; mca_coll_han_single_collective_fallback_t allreduce; + mca_coll_han_single_collective_fallback_t barrier; mca_coll_han_single_collective_fallback_t bcast; mca_coll_han_single_collective_fallback_t reduce; mca_coll_han_single_collective_fallback_t gather; @@ -256,7 +274,9 @@ typedef struct mca_coll_han_module_t { /* Whether this module has been lazily initialized or not yet */ bool enabled; + int recursive_free_depth; + struct ompi_communicator_t *cached_comm; struct ompi_communicator_t **cached_low_comms; struct ompi_communicator_t **cached_up_comms; int *cached_vranks; @@ -305,6 +325,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t); #define previous_allreduce fallback.allreduce.allreduce #define previous_allreduce_module fallback.allreduce.module +#define previous_barrier fallback.barrier.barrier +#define previous_barrier_module fallback.barrier.module + #define previous_bcast fallback.bcast.bcast #define previous_bcast_module fallback.bcast.module @@ -333,6 +356,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t); /* macro to correctly load /all/ fallback collectives */ #define HAN_LOAD_FALLBACK_COLLECTIVES(HANM, COMM) \ do { \ + HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, barrier); \ HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \ HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \ HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \ @@ -404,6 +428,9 @@ int mca_coll_han_allreduce_intra_dynamic(ALLREDUCE_BASE_ARGS, mca_coll_base_module_t *module); int +mca_coll_han_barrier_intra_dynamic(BARRIER_BASE_ARGS, + mca_coll_base_module_t *module); +int mca_coll_han_bcast_intra_dynamic(BCAST_BASE_ARGS, mca_coll_base_module_t *module); int @@ -416,6 +443,8 @@ int mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS, mca_coll_base_module_t *module); +int mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); /* Bcast */ int mca_coll_han_bcast_intra_simple(void *buff, int count, @@ -494,6 +523,14 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *rdtype, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t * module); +int +mca_coll_han_scatter_intra_simple(const void *sbuf, int scount, + struct ompi_datatype_t *sdtype, + void *rbuf, int rcount, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); /* Gather */ int diff --git a/ompi/mca/coll/han/coll_han_allgather.c b/ompi/mca/coll/han/coll_han_allgather.c index cc7dfaff266..4255df23faa 100644 --- a/ompi/mca/coll/han/coll_han_allgather.c +++ b/ompi/mca/coll/han/coll_han_allgather.c @@ -10,6 +10,12 @@ * $HEADER$ */ +/** + * @file + * + * This files contains all the hierarchical implementations of allgather + */ + #include "coll_han.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_tags.h" @@ -57,6 +63,10 @@ mca_coll_han_set_allgather_args(mca_coll_han_allgather_t * args, args->req = req; } + +/** + * Main function for taskified allgather: calls lg task, a gather on low comm + */ int mca_coll_han_allgather_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype, @@ -91,7 +101,7 @@ mca_coll_han_allgather_intra(const void *sbuf, int scount, comm, comm->c_coll->coll_allgather_module); } - ompi_request_t *temp_request = NULL; + ompi_request_t *temp_request; /* Set up request */ temp_request = OBJ_NEW(ompi_request_t); temp_request->req_state = OMPI_REQUEST_ACTIVE; @@ -276,6 +286,10 @@ int mca_coll_han_allgather_lb_task(void *task_args) } +/** + * Short implementation of allgather that only does hierarchical + * communications without tasks. + */ int mca_coll_han_allgather_intra_simple(const void *sbuf, int scount, struct ompi_datatype_t *sdtype, diff --git a/ompi/mca/coll/han/coll_han_allreduce.c b/ompi/mca/coll/han/coll_han_allreduce.c index 3082ac6e05b..5bc1f2f8bfe 100644 --- a/ompi/mca/coll/han/coll_han_allreduce.c +++ b/ompi/mca/coll/han/coll_han_allreduce.c @@ -12,6 +12,13 @@ * $HEADER$ */ +/** + * @file + * + * This files contains all the hierarchical implementations of allreduce + * Only work with regular situation (each node has equal number of processes) + */ + #include "coll_han.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_tags.h" @@ -172,16 +179,16 @@ mca_coll_han_allreduce_intra(const void *sbuf, issue_task(t3); while (t->completed[0] != t->num_segments) { - /* Create t3 tasks for the current segment */ - mca_coll_task_t *t3 = OBJ_NEW(mca_coll_task_t); - /* Setup up t3 task arguments */ - t->cur_task = t3; + /* Create t_next_seg tasks for the current segment */ + mca_coll_task_t *t_next_seg = OBJ_NEW(mca_coll_task_t); + /* Setup up t_next_seg task arguments */ + t->cur_task = t_next_seg; t->sbuf = (char *) t->sbuf + extent * t->seg_count; t->rbuf = (char *) t->rbuf + extent * t->seg_count; t->cur_seg = t->cur_seg + 1; - /* Init t3 task */ - init_task(t3, mca_coll_han_allreduce_t3_task, (void *) t); - issue_task(t3); + /* Init t_next_seg task */ + init_task(t_next_seg, mca_coll_han_allreduce_t3_task, (void *) t); + issue_task(t_next_seg); } free(t->completed); t->completed = NULL; @@ -194,7 +201,7 @@ mca_coll_han_allreduce_intra(const void *sbuf, comm, han_module->previous_allreduce_module); } -/* t0 task */ +/* t0 task that performs a local reduction */ int mca_coll_han_allreduce_t0_task(void *task_args) { mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args; @@ -224,7 +231,7 @@ int mca_coll_han_allreduce_t0_task(void *task_args) return OMPI_SUCCESS; } -/* t1 task */ +/* t1 task that performs a ireduce on top communicator */ int mca_coll_han_allreduce_t1_task(void *task_args) { mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args; @@ -326,7 +333,7 @@ int mca_coll_han_allreduce_t2_task(void *task_args) return OMPI_SUCCESS; } -/* t3 task */ +/* t3 task that performs broadcasts */ int mca_coll_han_allreduce_t3_task(void *task_args) { mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args; @@ -397,6 +404,10 @@ int mca_coll_han_allreduce_t3_task(void *task_args) return OMPI_SUCCESS; } +/* + * Short implementation of allreduce that only does hierarchical + * communications without tasks. + */ int mca_coll_han_allreduce_intra_simple(const void *sbuf, void *rbuf, diff --git a/ompi/mca/coll/han/coll_han_barrier.c b/ompi/mca/coll/han/coll_han_barrier.c new file mode 100644 index 00000000000..01a930692fe --- /dev/null +++ b/ompi/mca/coll/han/coll_han_barrier.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2018-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2020 Bull S.A.S. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + * + * This files contains all the hierarchical implementations of barrier + */ + +#include "coll_han.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" + + +/** + * Short implementation of barrier that only does hierarchical + * communications without tasks. + */ +int +mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module; + ompi_communicator_t *low_comm, *up_comm; + + /* create the subcommunicators */ + if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "han cannot handle barrier with this communicator. Fall back on another component\n")); + /* Put back the fallback collective support and call it once. All + * future calls will then be automatically redirected. + */ + HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm); + return comm->c_coll->coll_barrier(comm, comm->c_coll->coll_bcast_module); + } + + low_comm = han_module->sub_comm[INTRA_NODE]; + up_comm = han_module->sub_comm[INTER_NODE]; + + int low_rank = ompi_comm_rank(low_comm); + int root_low_rank = 0; /* rank leader will be 0 on each node */ + + /* TODO: extend coll interface with half barrier */ + low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module); + + if (low_rank == root_low_rank) { + up_comm->c_coll->coll_barrier(up_comm, up_comm->c_coll->coll_barrier_module); + } + + low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module); + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/coll/han/coll_han_bcast.c b/ompi/mca/coll/han/coll_han_bcast.c index 1e79834d1bb..36bd42cb378 100644 --- a/ompi/mca/coll/han/coll_han_bcast.c +++ b/ompi/mca/coll/han/coll_han_bcast.c @@ -11,6 +11,12 @@ * $HEADER$ */ +/** + * @file + * + * This files contains all the hierarchical implementations of bcast + */ + #include "coll_han.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_tags.h" @@ -71,7 +77,7 @@ mca_coll_han_bcast_intra(void *buff, /* Create the subcommunicators */ err = mca_coll_han_comm_create(comm, han_module); - if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */ + if( OMPI_SUCCESS != err ) { OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "han cannot handle bcast with this communicator. Fall back on another component\n")); /* Put back the fallback collective support and call it once. All @@ -211,6 +217,10 @@ int mca_coll_han_bcast_t1_task(void *task_args) return OMPI_SUCCESS; } +/* + * Short implementation of bcast that only does hierarchical + * communications without tasks. + */ int mca_coll_han_bcast_intra_simple(void *buff, int count, @@ -229,7 +239,7 @@ mca_coll_han_bcast_intra_simple(void *buff, /* Create the subcommunicators */ err = mca_coll_han_comm_create_new(comm, han_module); - if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */ + if( OMPI_SUCCESS != err ) { OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "han cannot handle bcast with this communicator. Fall back on another component\n")); /* Put back the fallback collective support and call it once. All diff --git a/ompi/mca/coll/han/coll_han_component.c b/ompi/mca/coll/han/coll_han_component.c index f45f4de0441..09be6480ca8 100644 --- a/ompi/mca/coll/han/coll_han_component.c +++ b/ompi/mca/coll/han/coll_han_component.c @@ -111,6 +111,9 @@ static int han_close(void) return OMPI_SUCCESS; } +/* + * @return true if the collective has a simple version that does not use tasks. + */ static bool is_simple_implemented(COLLTYPE_T coll) { switch(coll) { @@ -119,12 +122,16 @@ static bool is_simple_implemented(COLLTYPE_T coll) case BCAST: case GATHER: case REDUCE: + case SCATTER: return true; default: return false; } } +/* + * Stringifier for topological level + */ const char* mca_coll_han_topo_lvl_to_str(TOPO_LVL_T topo_lvl) { switch(topo_lvl) { @@ -271,10 +278,16 @@ static int han_register(void) "whether we need reproducible results " "(enabling this disables optimisations using topology)" "0 disable 1 enable, default 0", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_READONLY, &cs->han_reproducible); - /* Simple algorithms MCA parameters */ + + /* + * Simple algorithms MCA parameters : + * using simple algorithms will just perform hierarchical communications. + * By default communications are also splitted into tasks + * to handle thread noise + */ for(coll = 0 ; coll < COLLCOUNT ; coll++) { cs->use_simple_algorithm[coll] = false; if(is_simple_implemented(coll)) { @@ -294,6 +307,7 @@ static int han_register(void) /* Dynamic rules MCA parameters */ memset(cs->mca_rules, 0, COLLCOUNT * (GLOBAL_COMMUNICATOR+1) * sizeof(COMPONENT_T)); + for(coll = 0; coll < COLLCOUNT; coll++) { if(!mca_coll_han_is_coll_dynamic_implemented(coll)) { continue; @@ -304,7 +318,15 @@ static int han_register(void) cs->mca_rules[coll][INTRA_NODE] = TUNED; cs->mca_rules[coll][INTER_NODE] = BASIC; cs->mca_rules[coll][GLOBAL_COMMUNICATOR] = HAN; + } + /* Specific default values */ + cs->mca_rules[BARRIER][INTER_NODE] = TUNED; + /* Dynamic rule MCA var registration */ + for(coll = 0; coll < COLLCOUNT; coll++) { + if(!mca_coll_han_is_coll_dynamic_implemented(coll)) { + continue; + } for(topo_lvl = 0; topo_lvl < NB_TOPO_LVL; topo_lvl++) { snprintf(param_name, sizeof(param_name), "%s_dynamic_%s_module", diff --git a/ompi/mca/coll/han/coll_han_dynamic.c b/ompi/mca/coll/han/coll_han_dynamic.c index d32b12fbcd7..9e3469b0160 100644 --- a/ompi/mca/coll/han/coll_han_dynamic.c +++ b/ompi/mca/coll/han/coll_han_dynamic.c @@ -9,6 +9,14 @@ * $HEADER$ */ + +/* + * @file + * This files contains all functions to dynamically select for each collective + * the coll module based on given MCA parameters, configuration file and + * messages characteristics +*/ + #include "opal/class/opal_list.h" #include "ompi/mca/coll/han/coll_han.h" #include "ompi/mca/coll/han/coll_han_dynamic.h" @@ -26,6 +34,7 @@ bool mca_coll_han_is_coll_dynamic_implemented(COLLTYPE_T coll_id) case ALLGATHER: case ALLGATHERV: case ALLREDUCE: + case BARRIER: case BCAST: case GATHER: case REDUCE: @@ -130,7 +139,7 @@ get_dynamic_rule(COLLTYPE_T collective, int conf_idx, msg_size_idx; /* Aliases */ - const mca_coll_han_dynamic_rules_t *dynamic_rules = NULL; + const mca_coll_han_dynamic_rules_t *dynamic_rules; const collective_rule_t *coll_rule = NULL; const topologic_rule_t *topo_rule = NULL; const configuration_rule_t *conf_rule = NULL; @@ -150,7 +159,8 @@ get_dynamic_rule(COLLTYPE_T collective, break; } } - if(coll_idx < 0) { /* No dynamic rules for this collective */ + if(coll_idx < 0 || NULL == coll_rule) { + /* No dynamic rules for this collective */ opal_output_verbose(60, mca_coll_han_component.han_output, "coll:han:get_dynamic_rule HAN searched for collective %d (%s) " "but did not find any rule for this collective\n", @@ -166,7 +176,8 @@ get_dynamic_rule(COLLTYPE_T collective, break; } } - if(topo_idx < 0) { /* No topologic level rules for this collective */ + if(topo_idx < 0 || NULL == topo_rule) { + /* No topologic level rules for this collective */ opal_output_verbose(60, mca_coll_han_component.han_output, "coll:han:get_dynamic_rule HAN searched for topologic level %d (%s) rule " "for collective %d (%s) but did not find any rule\n", @@ -183,7 +194,7 @@ get_dynamic_rule(COLLTYPE_T collective, break; } } - if(conf_idx < 0) { + if(conf_idx < 0 || NULL == conf_rule) { /* No corresponding configuration. Should not have happen with a correct file */ opal_output_verbose(60, mca_coll_han_component.han_output, "coll:han:get_dynamic_rule HAN searched a rule for collective %d (%s) " @@ -204,7 +215,7 @@ get_dynamic_rule(COLLTYPE_T collective, break; } } - if(msg_size_idx < 0) { + if(msg_size_idx < 0 || NULL == msg_size_rule) { /* No corresponding message size. Should not happen with a correct file */ opal_output_verbose(60, mca_coll_han_component.han_output, "coll:han:get_dynamic_rule HAN searched a rule for collective %d (%s) " @@ -482,10 +493,6 @@ mca_coll_han_allgatherv_intra_dynamic(const void *sbuf, int scount, "Falling back to another component\n")); allgatherv = han_module->previous_allgatherv; sub_module = han_module->previous_allgatherv_module; - return han_module->previous_allgatherv(sbuf, scount, sdtype, - rbuf, rcounts, displs, - rdtype, comm, - han_module->previous_allgatherv_module); } else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) { /* * No fallback mechanism activated for this configuration @@ -630,6 +637,96 @@ mca_coll_han_allreduce_intra_dynamic(const void *sbuf, } +/* + * Barrier selector: + * On a sub-communicator, checks the stored rules to find the module to use + * On the global communicator, calls the han collective implementation, or + * calls the correct module if fallback mechanism is activated + */ +int +mca_coll_han_barrier_intra_dynamic(struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module; + TOPO_LVL_T topo_lvl = han_module->topologic_level; + mca_coll_base_module_barrier_fn_t barrier; + mca_coll_base_module_t *sub_module; + int rank, verbosity = 0; + + + /* Compute configuration information for dynamic rules */ + sub_module = get_module(BARRIER, + 0, + comm, + han_module); + + /* First errors are always printed by rank 0 */ + rank = ompi_comm_rank(comm); + if( (0 == rank) && (han_module->dynamic_errors < mca_coll_han_component.max_dynamic_errors) ) { + verbosity = 30; + } + + if(NULL == sub_module) { + /* + * No valid collective module from dynamic rules + * nor from mca parameter + */ + han_module->dynamic_errors++; + opal_output_verbose(verbosity, mca_coll_han_component.han_output, + "coll:han:mca_coll_han_barrier_intra_dynamic " + "Han did not find any valid module for collective %d (%s) " + "with topological level %d (%s) on communicator (%d/%s). " + "Please check dynamic file/mca parameters\n", + BARRIER, mca_coll_base_colltype_to_str(BARRIER), + topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl), + comm->c_contextid, comm->c_name); + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "HAN/BARRIER: No module found for the sub-communicator. " + "Falling back to another component\n")); + barrier = han_module->previous_barrier; + sub_module = han_module->previous_barrier_module; + } else if (NULL == sub_module->coll_barrier) { + /* + * No valid collective from dynamic rules + * nor from mca parameter + */ + han_module->dynamic_errors++; + opal_output_verbose(verbosity, mca_coll_han_component.han_output, + "coll:han:mca_coll_han_barrier_intra_dynamic " + "Han found valid module for collective %d (%s) " + "with topological level %d (%s) on communicator (%d/%s) " + "but this module cannot handle this collective. " + "Please check dynamic file/mca parameters\n", + BARRIER, mca_coll_base_colltype_to_str(BARRIER), + topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl), + comm->c_contextid, comm->c_name); + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "HAN/BARRIER: the module found for the sub-" + "communicator cannot handle the BARRIER operation. " + "Falling back to another component\n")); + barrier = han_module->previous_barrier; + sub_module = han_module->previous_barrier_module; + } else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) { + /* + * No fallback mechanism activated for this configuration + * sub_module is valid + * sub_module->coll_barrier is valid and point to this function + * Call han topological collective algorithm + */ + barrier = mca_coll_han_barrier_intra_simple; + } else { + /* + * If we get here: + * sub_module is valid + * sub_module->coll_barrier is valid + * They points to the collective to use, according to the dynamic rules + * Selector's job is done, call the collective + */ + barrier = sub_module->coll_barrier; + } + return barrier(comm, sub_module); +} + /* * Bcast selector: * On a sub-communicator, checks the stored rules to find the module to use @@ -923,9 +1020,7 @@ mca_coll_han_reduce_intra_dynamic(const void *sbuf, "Falling back to another component\n")); reduce = han_module->previous_reduce; sub_module = han_module->previous_reduce_module; - } - - if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) { + } else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) { /* Reproducibility: fallback on reproducible algo */ if (mca_coll_han_component.han_reproducible) { reduce = mca_coll_han_reduce_reproducible; @@ -981,8 +1076,13 @@ mca_coll_han_scatter_intra_dynamic(const void *sbuf, int scount, int rank, verbosity = 0; /* Compute configuration information for dynamic rules */ - ompi_datatype_type_size(rdtype, &dtype_size); - dtype_size = dtype_size * rcount; + if( MPI_IN_PLACE != rbuf ) { + ompi_datatype_type_size(rdtype, &dtype_size); + dtype_size = dtype_size * rcount; + } else { + ompi_datatype_type_size(sdtype, &dtype_size); + dtype_size = dtype_size * scount; + } sub_module = get_module(SCATTER, dtype_size, @@ -1042,16 +1142,19 @@ mca_coll_han_scatter_intra_dynamic(const void *sbuf, int scount, * sub_module->coll_scatter is valid and point to this function * Call han topological collective algorithm */ - scatter = mca_coll_han_scatter_intra; + if(mca_coll_han_component.use_simple_algorithm[SCATTER]) { + scatter = mca_coll_han_scatter_intra_simple; + } else { + scatter = mca_coll_han_scatter_intra; + } + } else { /* - * TODO: Uncomment when scatter simple is merged - * if(mca_coll_han_component.use_simple_algorithm[SCATTER]) { - * scatter = mca_coll_han_scatter_intra_simple; - * } else { - * scatter = mca_coll_han_scatter_intra; - * } + * If we get here: + * sub_module is valid + * sub_module->coll_scatter is valid + * They points to the collective to use, according to the dynamic rules + * Selector's job is done, call the collective */ - } else { scatter = sub_module->coll_scatter; } diff --git a/ompi/mca/coll/han/coll_han_dynamic.h b/ompi/mca/coll/han/coll_han_dynamic.h index f95fdae6180..88e512f1d2c 100644 --- a/ompi/mca/coll/han/coll_han_dynamic.h +++ b/ompi/mca/coll/han/coll_han_dynamic.h @@ -25,6 +25,7 @@ /* + * @file * ################################################# * # Dynamic rules global architecture description # * ################################################# diff --git a/ompi/mca/coll/han/coll_han_dynamic_file.c b/ompi/mca/coll/han/coll_han_dynamic_file.c index 47a54f8ac79..850462f102b 100644 --- a/ompi/mca/coll/han/coll_han_dynamic_file.c +++ b/ompi/mca/coll/han/coll_han_dynamic_file.c @@ -12,6 +12,11 @@ * $HEADER$ */ +/* + *@file + * Implementation of configuration file parser to set collective components to use. + */ + #ifdef HAVE_STDLIB_H #include #endif @@ -35,6 +40,12 @@ static void check_dynamic_rules(void); /* Current file line for verbose message */ static int fileline = 1; +/* + * File parsing function. Allocated memory depending on the number of rules. + * This functions expects a file formatted as describbed in coll_han_dynamic_file.h. + * The configuration is then used by coll/han component to determine which module to + * use at each topological level. + */ int mca_coll_han_init_dynamic_rules(void) { @@ -69,7 +80,6 @@ mca_coll_han_init_dynamic_rules(void) /* If the dynamic rules are not used, do not even read the file */ if(!mca_coll_han_component.use_dynamic_file_rules) { - nb_coll = 0; return OMPI_SUCCESS; } @@ -416,6 +426,9 @@ mca_coll_han_init_dynamic_rules(void) return OMPI_SUCCESS; } +/* + * Memory free all the rules parsed in the file + */ void mca_coll_han_free_dynamic_rules(void) { @@ -423,7 +436,7 @@ mca_coll_han_free_dynamic_rules(void) int i, j, k; /* Loop ranges */ - int nb_coll, nb_topo, nb_conf; + int nb_coll, nb_conf; /* Aliases */ collective_rule_t *coll_rules; @@ -434,7 +447,7 @@ mca_coll_han_free_dynamic_rules(void) coll_rules = mca_coll_han_component.dynamic_rules.collective_rules; for(i=0 ; ireq = req; } + +/* + * Main function for taskified gather: calls lg task, a gather on low comm + */ int mca_coll_han_gather_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype, @@ -71,12 +82,12 @@ mca_coll_han_gather_intra(const void *sbuf, int scount, int w_rank, w_size; /* information about the global communicator */ int root_low_rank, root_up_rank; /* root ranks for both sub-communicators */ char *reorder_buf = NULL, *reorder_rbuf = NULL; - int i, err, *vranks, low_rank, low_size, *topo; + int err, *vranks, low_rank, low_size, *topo; ompi_request_t *temp_request = NULL; /* Create the subcommunicators */ err = mca_coll_han_comm_create(comm, han_module); - if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */ + if( OMPI_SUCCESS != err ) { OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "han cannot handle gather with this communicator. Fall back on another component\n")); /* HAN cannot work with this communicator so fallback on all collectives */ @@ -155,12 +166,11 @@ mca_coll_han_gather_intra(const void *sbuf, int scount, ptrdiff_t rextent; ompi_datatype_type_extent(rdtype, &rextent); ptrdiff_t block_size = rextent * (ptrdiff_t)rcount; - ptrdiff_t src_shift = block_size * w_rank; - ptrdiff_t dest_shift = block_size * w_rank; + ptrdiff_t shift = block_size * w_rank; ompi_datatype_copy_content_same_ddt(rdtype, (ptrdiff_t)rcount, - (char *)rbuf + dest_shift, - reorder_rbuf + src_shift); + (char *)rbuf + shift, + reorder_rbuf + shift); } } } @@ -192,7 +202,7 @@ mca_coll_han_gather_intra(const void *sbuf, int scount, if (w_rank == root && !han_module->is_mapbycore) { ptrdiff_t rextent; ompi_datatype_type_extent(rdtype, &rextent); - for (i=0; ireproducible_allreduce_module = NULL; } +/* + * Module constructor + */ static void mca_coll_han_module_construct(mca_coll_han_module_t * module) { int i; module->enabled = true; + module->recursive_free_depth = 0; module->super.coll_module_disable = mca_coll_han_module_disable; module->cached_low_comms = NULL; module->cached_up_comms = NULL; @@ -90,7 +100,16 @@ mca_coll_han_module_destruct(mca_coll_han_module_t * module) { int i; + module->recursive_free_depth++; module->enabled = false; + /* If the current module is in its caches during its destruction + * (i.e. last collective used HAN on a subcomm with a fallback + * on previous components) + */ + if (module->recursive_free_depth > 1){ + return; + } + if (module->cached_low_comms != NULL) { for (i = 0; i < COLL_HAN_LOW_MODULES; i++) { ompi_comm_free(&(module->cached_low_comms[i])); @@ -160,7 +179,6 @@ mca_coll_base_module_t * mca_coll_han_comm_query(struct ompi_communicator_t * comm, int *priority) { int flag; - char info_val[OPAL_MAX_INFO_VAL+1]; mca_coll_han_module_t *han_module; /* @@ -204,6 +222,8 @@ mca_coll_han_comm_query(struct ompi_communicator_t * comm, int *priority) han_module->topologic_level = GLOBAL_COMMUNICATOR; if (NULL != comm->super.s_info) { + char info_val[OPAL_MAX_INFO_VAL+1]; + /* Get the info value disaqualifying coll components */ opal_info_get(comm->super.s_info, "ompi_comm_coll_han_topo_level", sizeof(info_val), info_val, &flag); @@ -222,12 +242,12 @@ mca_coll_han_comm_query(struct ompi_communicator_t * comm, int *priority) han_module->super.coll_alltoall = NULL; han_module->super.coll_alltoallv = NULL; han_module->super.coll_alltoallw = NULL; - han_module->super.coll_barrier = NULL; han_module->super.coll_exscan = NULL; han_module->super.coll_gatherv = NULL; han_module->super.coll_reduce_scatter = NULL; han_module->super.coll_scan = NULL; han_module->super.coll_scatterv = NULL; + han_module->super.coll_barrier = mca_coll_han_barrier_intra_dynamic; han_module->super.coll_scatter = mca_coll_han_scatter_intra_dynamic; han_module->super.coll_reduce = mca_coll_han_reduce_intra_dynamic; han_module->super.coll_gather = mca_coll_han_gather_intra_dynamic; @@ -281,6 +301,7 @@ han_module_enable(mca_coll_base_module_t * module, HAN_SAVE_PREV_COLL_API(allgather); HAN_SAVE_PREV_COLL_API(allgatherv); HAN_SAVE_PREV_COLL_API(allreduce); + HAN_SAVE_PREV_COLL_API(barrier); HAN_SAVE_PREV_COLL_API(bcast); HAN_SAVE_PREV_COLL_API(gather); HAN_SAVE_PREV_COLL_API(reduce); @@ -316,6 +337,7 @@ mca_coll_han_module_disable(mca_coll_base_module_t * module, OBJ_RELEASE_IF_NOT_NULL(han_module->previous_allgather_module); OBJ_RELEASE_IF_NOT_NULL(han_module->previous_allgatherv_module); OBJ_RELEASE_IF_NOT_NULL(han_module->previous_allreduce_module); + OBJ_RELEASE_IF_NOT_NULL(han_module->previous_barrier_module); OBJ_RELEASE_IF_NOT_NULL(han_module->previous_bcast_module); OBJ_RELEASE_IF_NOT_NULL(han_module->previous_gather_module); OBJ_RELEASE_IF_NOT_NULL(han_module->previous_reduce_module); diff --git a/ompi/mca/coll/han/coll_han_reduce.c b/ompi/mca/coll/han/coll_han_reduce.c index 03968b6f475..e47b9d29466 100644 --- a/ompi/mca/coll/han/coll_han_reduce.c +++ b/ompi/mca/coll/han/coll_han_reduce.c @@ -10,6 +10,11 @@ * $HEADER$ */ +/* + * @file + * This files contains all the hierarchical implementations of reduce + */ + #include "coll_han.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/pml/pml.h" @@ -48,7 +53,7 @@ mca_coll_han_set_reduce_args(mca_coll_han_reduce_args_t * args, mca_coll_task_t /* * Each segment of the messsage needs to go though 2 steps to perform MPI_Reduce: * lb: low level (shared-memory or intra-node) reduce. -* ub: upper level (inter-node) reduce + * ub: upper level (inter-node) reduce * Hence, in each iteration, there is a combination of collective operations which is called a task. * | seg 0 | seg 1 | seg 2 | seg 3 | * iter 0 | lr | | | | task: t0, contains lr @@ -162,18 +167,18 @@ mca_coll_han_reduce_intra(const void *sbuf, issue_task(t1); while (t->cur_seg <= t->num_segments - 2) { - /* Create t1 task */ - mca_coll_task_t *t1 = OBJ_NEW(mca_coll_task_t); - /* Setup up t1 task arguments */ - t->cur_task = t1; + /* Create t_next_seg task */ + mca_coll_task_t *t_next_seg = OBJ_NEW(mca_coll_task_t); + /* Setup up t_next_seg task arguments */ + t->cur_task = t_next_seg; t->sbuf = (char *) t->sbuf + extent * t->seg_count; if (up_rank == root_up_rank) { t->rbuf = (char *) t->rbuf + extent * t->seg_count; } t->cur_seg = t->cur_seg + 1; - /* Init the t1 task */ - init_task(t1, mca_coll_han_reduce_t1_task, (void *) t); - issue_task(t1); + /* Init the t_next_seg task */ + init_task(t_next_seg, mca_coll_han_reduce_t1_task, (void *) t); + issue_task(t_next_seg); } free(t); diff --git a/ompi/mca/coll/han/coll_han_scatter.c b/ompi/mca/coll/han/coll_han_scatter.c index c52cc1911ac..31ec78f5a6b 100644 --- a/ompi/mca/coll/han/coll_han_scatter.c +++ b/ompi/mca/coll/han/coll_han_scatter.c @@ -15,6 +15,12 @@ #include "ompi/mca/pml/pml.h" #include "coll_han_trigger.h" +/* + * @file + * + * This files contains all the hierarchical implementations of scatter + */ + static int mca_coll_han_scatter_us_task(void *task_args); static int mca_coll_han_scatter_ls_task(void *task_args); @@ -57,6 +63,10 @@ mca_coll_han_set_scatter_args(mca_coll_han_scatter_args_t * args, args->req = req; } +/* + * Main function for taskified scatter: + * after data reordring, calls us task, a scatter on up communicator + */ int mca_coll_han_scatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype, @@ -66,12 +76,12 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount, struct ompi_communicator_t *comm, mca_coll_base_module_t * module) { mca_coll_han_module_t *han_module = (mca_coll_han_module_t *) module; - int i, j, w_rank, w_size; + int w_rank, w_size; w_rank = ompi_comm_rank(comm); w_size = ompi_comm_size(comm); /* Create the subcommunicators */ - if( OMPI_SUCCESS != mca_coll_han_comm_create(comm, han_module) ) { /* Let's hope the error is consistently returned across the entire communicator */ + if( OMPI_SUCCESS != mca_coll_han_comm_create(comm, han_module) ) { OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "han cannot handle scatter with this communicator. Fall back on another component\n")); /* HAN cannot work with this communicator so fallback on all collectives */ @@ -139,8 +149,8 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount, ssize = opal_datatype_span(&sdtype->super, (int64_t) scount * w_size, &sgap); reorder_buf = (char *) malloc(ssize); reorder_sbuf = reorder_buf - sgap; - for (i = 0; i < up_size; i++) { - for (j = 0; j < low_size; j++) { + for (int i = 0; i < up_size; i++) { + for (int j = 0; j < low_size; j++) { OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "[%d]: Han Scatter copy from %d %d\n", w_rank, (i * low_size + j) * 2 + 1, @@ -158,22 +168,12 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount, } } - - void *dest_buf = rbuf; - int dest_count = rcount; - ompi_datatype_t *dest_dtype = rdtype; - if (MPI_IN_PLACE == rbuf) { - dest_buf = (void*)sbuf; - dest_count = scount; - dest_dtype = sdtype; - } - /* Create us task */ mca_coll_task_t *us = OBJ_NEW(mca_coll_task_t); /* Setup us task arguments */ mca_coll_han_scatter_args_t *us_args = malloc(sizeof(mca_coll_han_scatter_args_t)); mca_coll_han_set_scatter_args(us_args, us, reorder_sbuf, NULL, reorder_buf, scount, sdtype, - (char *) dest_buf, dest_count, dest_dtype, root, root_up_rank, root_low_rank, + (char *) rbuf, rcount, rdtype, root, root_up_rank, root_low_rank, up_comm, low_comm, w_rank, low_rank != root_low_rank, temp_request); /* Init us task */ @@ -213,7 +213,7 @@ int mca_coll_han_scatter_us_task(void *task_args) "[%d] Han Scatter: us scatter\n", t->w_rank)); /* Inter node scatter */ t->up_comm->c_coll->coll_scatter((char *) t->sbuf, t->scount * low_size, t->sdtype, - tmp_rbuf, t->rcount * low_size, t->rdtype, t->root_up_rank, + tmp_rbuf, count * low_size, dtype, t->root_up_rank, t->up_comm, t->up_comm->c_coll->coll_scatter_module); t->sbuf = tmp_rbuf; t->sbuf_inter_free = tmp_buf; @@ -256,3 +256,151 @@ int mca_coll_han_scatter_ls_task(void *task_args) ompi_request_complete(temp_req, 1); return OMPI_SUCCESS; } + + +int +mca_coll_han_scatter_intra_simple(const void *sbuf, int scount, + struct ompi_datatype_t *sdtype, + void *rbuf, int rcount, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + int w_rank, w_size; + struct ompi_datatype_t * dtype; + int count; + + w_rank = ompi_comm_rank(comm); + w_size = ompi_comm_size(comm); + + mca_coll_han_module_t *han_module = (mca_coll_han_module_t *) module; + /* create the subcommunicators */ + if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "han cannot handle allgather within this communicator." + " Fall back on another component\n")); + /* HAN cannot work with this communicator so fallback on all collectives */ + HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm); + return comm->c_coll->coll_scatter(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, + comm, han_module->previous_scatter_module); + } + /* Topo must be initialized to know rank distribution which then is used to + * determine if han can be used */ + int *topo = mca_coll_han_topo_init(comm, han_module, 2); + if (han_module->are_ppn_imbalanced){ + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "han cannot handle scatter with this communicator. It needs to fall back on another component\n")); + HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm); + return comm->c_coll->coll_scatter(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, + comm, han_module->previous_scatter_module); + } + ompi_communicator_t *low_comm = han_module->sub_comm[INTRA_NODE]; + ompi_communicator_t *up_comm = han_module->sub_comm[INTER_NODE]; + + /* Get the 'virtual ranks' mapping corresponding to the communicators */ + int *vranks = han_module->cached_vranks; + /* information about sub-communicators */ + int low_rank = ompi_comm_rank(low_comm); + int low_size = ompi_comm_size(low_comm); + /* Get root ranks for low and up comms */ + int root_low_rank, root_up_rank; /* root ranks for both sub-communicators */ + mca_coll_han_get_ranks(vranks, root, low_size, &root_low_rank, &root_up_rank); + + if (w_rank == root) { + dtype = sdtype; + count = scount; + } else { + dtype = rdtype; + count = rcount; + } + + /* allocate buffer to store unordered result on root + * if the processes are mapped-by core, no need to reorder: + * distribution of ranks on core first and node next, + * in a increasing order for both patterns */ + char *reorder_buf = NULL; // allocated memory + size_t block_size; + + ompi_datatype_type_size(dtype, &block_size); + block_size *= count; + + if (w_rank == root) { + int is_contiguous = ompi_datatype_is_contiguous_memory_layout(dtype, count); + + if (han_module->is_mapbycore && is_contiguous) { + /* The copy of the data is avoided */ + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "[%d]: Han scatter: no need to reorder: ", w_rank)); + reorder_buf = (char *)sbuf; + } else { + /* Data must be copied, let's be efficient packing it */ + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "[%d]: Han scatter: needs reordering or compacting: ", w_rank)); + + reorder_buf = malloc(block_size * w_size); + if ( NULL == reorder_buf){ + return OMPI_ERROR; + } + + /** Reorder and packing: + * Suppose, the message is 0 1 2 3 4 5 6 7 but the processes are + * mapped on 2 nodes, for example |0 2 4 6| |1 3 5 7|. The messages to + * leaders must be 0 2 4 6 and 1 3 5 7. + * So the upper scatter must send 0 2 4 6 1 3 5 7. + * In general, the topo[i*topolevel +1] must be taken. + */ + ptrdiff_t extent, block_extent; + ompi_datatype_type_extent(dtype, &extent); + block_extent = extent * (ptrdiff_t)count; + + for(int i = 0 ; i < w_size ; ++i){ + ompi_datatype_sndrcv((char*)sbuf + block_extent*topo[2*i+1], count, dtype, + reorder_buf + block_size*i, block_size, MPI_BYTE); + } + dtype = MPI_BYTE; + count = block_size; + } + } + + /* allocate the intermediary buffer + * to scatter from leaders on the low sub communicators */ + char *tmp_buf = NULL; // allocated memory + if (low_rank == root_low_rank) { + tmp_buf = (char *) malloc(block_size * low_size); + + /* 1. up scatter (internode) between node leaders */ + up_comm->c_coll->coll_scatter((char*) reorder_buf, + count * low_size, + dtype, + (char *)tmp_buf, + block_size * low_size, + MPI_BYTE, + root_up_rank, + up_comm, + up_comm->c_coll->coll_scatter_module); + if(reorder_buf != sbuf){ + free(reorder_buf); + reorder_buf = NULL; + } + } + + /* 2. low scatter on nodes leaders */ + low_comm->c_coll->coll_scatter((char *)tmp_buf, + block_size, + MPI_BYTE, + (char*)rbuf, + rcount, + rdtype, + root_low_rank, + low_comm, + low_comm->c_coll->coll_scatter_module); + + if (low_rank == root_low_rank) { + free(tmp_buf); + tmp_buf = NULL; + } + + return OMPI_SUCCESS; + +} diff --git a/ompi/mca/coll/han/coll_han_subcomms.c b/ompi/mca/coll/han/coll_han_subcomms.c index 27e17caa217..580940ec6ac 100644 --- a/ompi/mca/coll/han/coll_han_subcomms.c +++ b/ompi/mca/coll/han/coll_han_subcomms.c @@ -83,7 +83,7 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, /** * HAN is not yet optimized for a single process per node case, we should * avoid selecting it for collective communication support in such cases. - * However, in order to decide if this is tru, we need to know how many + * However, in order to decide if this is true, we need to know how many * local processes are on each node, a condition that cannot be verified * outside the MPI support (with PRRTE the info will be eventually available, * but we don't want to delay anything until then). We can achieve the same diff --git a/ompi/mca/coll/han/coll_han_topo.c b/ompi/mca/coll/han/coll_han_topo.c index e25e37207e2..e60d8fd819d 100644 --- a/ompi/mca/coll/han/coll_han_topo.c +++ b/ompi/mca/coll/han/coll_han_topo.c @@ -1,8 +1,8 @@ /* - * Copyright (c) 2018-2020 The University of Tennessee and The University + * Copyright (c) 2018-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2020 Bull S.A.S. All rights reserved. + * Copyright (c) 2020-2021 Bull S.A.S. All rights reserved. * * $COPYRIGHT$ * @@ -13,6 +13,13 @@ /** * @file * + * This file provides information about current run rank mapping in the shape + * of a integer array where each rank will provides a set of contiguous integer : + * its rank and its location at the different topological levels (from the + * highest to the lowest). + * At the end, the order for these data chunks uses the topological level as keys: + * the ranks are sorted first by the top level, then by the next level, ... etc. + * * Warning: this is not for the faint of heart -- don't even bother * reading this source code if you don't have a strong understanding * of nested data structures and pointer math (remember that @@ -90,7 +97,7 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, int *topo = (int *)malloc(sizeof(int) * size * num_topo_level); int is_imbalanced = 1; - int ranks_consecutive = 1; + int ranks_non_consecutive = 0; /* node leaders translate the node-local ranks to global ranks and check whether they are placed consecutively */ if (0 == low_rank) { @@ -104,22 +111,22 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, int rank = my_low_rank_map[0] + 1; for (int i = 1; i < low_size; ++i, ++rank) { if (my_low_rank_map[i] != rank) { - ranks_consecutive = 0; + ranks_non_consecutive = 1; break; } } - int reduce_vals[] = {ranks_consecutive, -ranks_consecutive, low_size, -low_size}; + int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size}; - up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 4, + up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 3, MPI_INT, MPI_MAX, up_comm, up_comm->c_coll->coll_allreduce_module); /* is the distribution of processes balanced per node? */ - is_imbalanced = (reduce_vals[2] == -reduce_vals[3]) ? 0 : 1; - ranks_consecutive = (reduce_vals[0] == -reduce_vals[1]) ? 1 : 0; + is_imbalanced = (reduce_vals[1] == -reduce_vals[2]) ? 0 : 1; + ranks_non_consecutive = reduce_vals[0]; - if ( !ranks_consecutive && !is_imbalanced ) { + if ( ranks_non_consecutive && !is_imbalanced ) { /* kick off up_comm allgather to collect non-consecutive rank information at node leaders */ ranks_map = malloc(sizeof(int)*size); up_comm->c_coll->coll_iallgather(my_low_rank_map, low_size, MPI_INT, @@ -130,11 +137,11 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, /* broadcast balanced and consecutive properties from node leaders to remaining ranks */ - int bcast_vals[] = {is_imbalanced, ranks_consecutive}; + int bcast_vals[] = {is_imbalanced, ranks_non_consecutive}; low_comm->c_coll->coll_bcast(bcast_vals, 2, MPI_INT, 0, low_comm, low_comm->c_coll->coll_bcast_module); is_imbalanced = bcast_vals[0]; - ranks_consecutive = bcast_vals[1]; + ranks_non_consecutive = bcast_vals[1]; /* error out if the rank distribution is not balanced */ if (is_imbalanced) { @@ -148,7 +155,7 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, han_module->are_ppn_imbalanced = false; - if (ranks_consecutive) { + if (!ranks_non_consecutive) { /* fast-path: all ranks are consecutive and balanced so fill topology locally */ for (int i = 0; i < size; ++i) { topo[2*i] = (i/low_size); // node leader is node ID @@ -156,6 +163,7 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, } han_module->is_mapbycore = true; } else { + han_module->is_mapbycore = false; /* * Slow path: gather global-to-node-local rank mappings at node leaders * @@ -192,4 +200,3 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm, return topo; } - diff --git a/ompi/mca/coll/han/coll_han_trigger.h b/ompi/mca/coll/han/coll_han_trigger.h index 413e393be61..ac4a017b98f 100644 --- a/ompi/mca/coll/han/coll_han_trigger.h +++ b/ompi/mca/coll/han/coll_han_trigger.h @@ -2,6 +2,7 @@ * Copyright (c) 2018-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. + * Copyright (c) 2020 Bull S.A.S. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -9,6 +10,14 @@ * $HEADER$ */ +/* + * @file + * + * This file defines the API for tasks: a collective operation may be + * splitted in tasks to balance compute load on all the resources. + * This solution provide some noise resiliency. + */ + #ifndef MCA_COLL_HAN_TRIGGER_EXPORT_H #define MCA_COLL_HAN_TRIGGER_EXPORT_H