Skip to content

Commit

Permalink
Decouple device flow count from parameters
Browse files Browse the repository at this point in the history
Make PARSEC_MAX_DEVICE_FLOWS configurable and select a proper integer
type, up to int128_t. Make sure the flow mask is properly checked.


Signed-off-by: Joseph Schuchart <[email protected]>
  • Loading branch information
devreal committed Feb 14, 2025
1 parent a9ab33d commit e3a7829
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 40 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ set(PARSEC_MAX_LOCAL_COUNT 20 CACHE STRING "Number of local variables for tasks
set(PARSEC_MAX_PARAM_COUNT 20 CACHE STRING "Number of parameters for tasks (default 20)")
set(PARSEC_MAX_DEP_IN_COUNT 10 CACHE STRING "Number of input flows for each task (default 10)")
set(PARSEC_MAX_DEP_OUT_COUNT 10 CACHE STRING "Number of output flows for each task (default 10)")
set(PARSEC_MAX_DEVICE_FLOWS ${PARSEC_MAX_PARAM_COUNT} CACHE STRING "Number of parameters for tasks (default: same as PARSEC_MAX_PARAM_COUNT)")

### PaRSEC PP options
set(PARSEC_PTGPP_FLAGS "--noline" CACHE STRING "Additional parsec-ptgpp precompiling flags (separate flags with ';')" )
Expand Down
18 changes: 18 additions & 0 deletions parsec/include/parsec/parsec_config_bottom.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ typedef int32_t parsec_dependency_t;
typedef int32_t parsec_dependency_t;
#endif

#if ((MAX_PARAM_COUNT <= 16) && (PARSEC_MAX_DEVICE_FLOWS <= 16))
typedef uint16_t parsec_flow_mask_t;
#elif ((MAX_PARAM_COUNT <= 32) && (PARSEC_MAX_DEVICE_FLOWS <= 32))
typedef uint32_t parsec_flow_mask_t;
#elif ((MAX_PARAM_COUNT <= 64) && (PARSEC_MAX_DEVICE_FLOWS <= 64))
typedef uint64_t parsec_flow_mask_t;
#elif ((MAX_PARAM_COUNT <= 128) && (PARSEC_MAX_DEVICE_FLOWS <= 128)) && defined(PARSEC_HAVE_INT128)
typedef __int128_t parsec_flow_mask_t;
#else
#error Failed to find proper type for PaRSEC flow mask type. \
Make sure MAX_PARAM_COUNT and PARSEC_MAX_DEVICE_FLOWS \
is max 128 or 64 if 128bit integer are not supported.
#endif

#define PARSEC_FLOW_MASK(_id) (((parsec_flow_mask_t)1) << _id)
#define PARSEC_CHECK_FLOW_MASK(_mask, _id) (!!(_mask & PARSEC_FLOW_MASK(_id)))


/*
* A set of constants defining the capabilities of the underlying
* runtime.
Expand Down
3 changes: 3 additions & 0 deletions parsec/include/parsec/parsec_options.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@
/* The max number of output dependencies (not flows) for each task */
#define MAX_DEP_OUT_COUNT @PARSEC_MAX_DEP_OUT_COUNT@

/* The max number of flows handled by device tasks */
#define PARSEC_MAX_DEVICE_FLOWS @PARSEC_MAX_DEVICE_FLOWS@

#include "parsec/parsec_config_bottom.h"

#endif /* PARSEC_CONFIG_H_HAS_BEEN_INCLUDED */
Expand Down
20 changes: 10 additions & 10 deletions parsec/mca/device/device_gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device,
parsec_gpu_task_t *gpu_task )
{
parsec_task_t *this_task = gpu_task->ec;
parsec_gpu_data_copy_t* temp_loc[MAX_PARAM_COUNT], *gpu_elem, *lru_gpu_elem;
parsec_gpu_data_copy_t* temp_loc[PARSEC_MAX_DEVICE_FLOWS], *gpu_elem, *lru_gpu_elem;
parsec_data_t* master, *oldmaster;
const parsec_flow_t *flow;
int i, j, data_avail_epoch = 0, copy_readers_update = 0;
Expand Down Expand Up @@ -1163,7 +1163,7 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device,
*/
int
parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream)
{
int ret;
Expand All @@ -1176,7 +1176,7 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
parsec_device_transfer_direction_t dir;

for(int i = 0; i < task->task_class->nb_flows; i++) {
if( !(flow_mask & (1U << i)) ) continue;
if( !PARSEC_CHECK_FLOW_MASK(flow_mask, i) ) continue;
source = gtask->sources[i];
dest = task->data[i].data_out;
src_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(source->device_index);
Expand Down Expand Up @@ -1213,7 +1213,7 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
*/
int
parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream)
{
int ret;
Expand All @@ -1225,7 +1225,7 @@ parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask,
parsec_device_transfer_direction_t dir;
int i;
for(i = 0; i < task->task_class->nb_flows; i++){
if(flow_mask & (1U << i)){
if( PARSEC_CHECK_FLOW_MASK(flow_mask, i) ){
source = task->data[i].data_out;
dest = source->original->device_copies[0];
dst_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(dest->device_index);
Expand Down Expand Up @@ -1497,7 +1497,7 @@ parsec_device_data_stage_in( parsec_device_gpu_module_t* gpu_device,
#endif
gpu_task->sources[flow->flow_index] = candidate; /* save the candidate for release on transfer completion */
/* Push data into the GPU from the source device */
int rc = gpu_task->stage_in ? gpu_task->stage_in(gpu_task, (1U << flow->flow_index), gpu_stream): PARSEC_SUCCESS;
int rc = gpu_task->stage_in ? gpu_task->stage_in(gpu_task, PARSEC_FLOW_MASK(flow->flow_index), gpu_stream): PARSEC_SUCCESS;
if(PARSEC_SUCCESS != rc) {
parsec_warning( "GPU[%d:%s]: gpu_task->stage_in to device rc=%d @%s:%d\n"
"\t<<%p on device %d:%s>> -> <<%p on device %d:%s>> [%zu, %s]",
Expand Down Expand Up @@ -2117,7 +2117,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device,
/* If the gpu copy is not owned by parsec, we don't manage it at all */
if( 0 == (gpu_copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ) continue;
original = gpu_copy->original;
rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, (1U << i), gpu_stream): PARSEC_SUCCESS;
rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, PARSEC_FLOW_MASK(i), gpu_stream): PARSEC_SUCCESS;
if(PARSEC_SUCCESS != rc) {
parsec_warning( "GPU[%d:%s]: gpu_task->stage_out from device rc=%d @%s:%d\n"
"\tdata %s <<%p>> -> <<%p>>\n",
Expand Down Expand Up @@ -2206,7 +2206,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device,
assert( ((parsec_list_item_t*)gpu_copy)->list_prev == (parsec_list_item_t*)gpu_copy );

assert( PARSEC_DATA_COHERENCY_OWNED == gpu_copy->coherency_state );
if( gpu_task->pushout & (1 << i) ) {
if( PARSEC_CHECK_FLOW_MASK(gpu_task->pushout, i) ) {
/* TODO: make sure no readers are working on the CPU version */
original = gpu_copy->original;
PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream,
Expand Down Expand Up @@ -2238,7 +2238,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device,
}
#endif
/* Move the data back into main memory */
rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, (1U << flow->flow_index), gpu_stream): PARSEC_SUCCESS;
rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, PARSEC_FLOW_MASK(flow->flow_index), gpu_stream): PARSEC_SUCCESS;
if(PARSEC_SUCCESS != rc) {
parsec_warning( "GPU[%d:%s]: gpu_task->stage_out from device rc=%d @%s:%d\n"
"\tdata %s <<%p>> -> <<%p>>\n",
Expand Down Expand Up @@ -2342,7 +2342,7 @@ parsec_device_kernel_epilog( parsec_device_gpu_module_t *gpu_device,

assert( 0 <= gpu_copy->readers );

if( gpu_task->pushout & (1 << i) ) {
if( PARSEC_CHECK_FLOW_MASK(gpu_task->pushout, i) ) {
PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream,
"GPU copy %p [ref_count %d] moved to the read LRU in %s",
gpu_copy, gpu_copy->super.super.obj_reference_count, __func__);
Expand Down
42 changes: 21 additions & 21 deletions parsec/mca/device/device_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ typedef int (*parsec_advance_task_function_t)(parsec_device_gpu_module_t *gpu_d
*
*/
typedef int (parsec_stage_in_function_t)(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream);


Expand All @@ -74,7 +74,7 @@ typedef int (parsec_stage_in_function_t)(parsec_gpu_task_t *gtask,
*
*/
typedef int (parsec_stage_out_function_t)(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream);

/* Function type for releasing a device task. The DSL is responsible for allocating such tasks,
Expand All @@ -86,8 +86,8 @@ typedef void (*parsec_release_device_task_function_t)(void*);
struct parsec_gpu_task_s {
parsec_list_item_t list_item;
uint16_t task_type;
uint16_t pushout;
int32_t last_status;
parsec_flow_mask_t pushout;
parsec_advance_task_function_t submit;
parsec_complete_stage_function_t complete_stage;
parsec_stage_in_function_t *stage_in;
Expand All @@ -102,23 +102,23 @@ struct parsec_gpu_task_s {
struct {
parsec_task_t *ec;
uint64_t last_data_check_epoch;
const parsec_flow_t *flow[MAX_PARAM_COUNT]; /* There is no consistent way to access the flows from the task_class,
* so the DSL need to provide these flows here.
*/
size_t flow_nb_elts[MAX_PARAM_COUNT]; /* for each flow, size of the data to be allocated
* on the GPU.
*/
parsec_data_collection_t *flow_dc[MAX_PARAM_COUNT]; /* for each flow, data collection from which the data
* to be transferred logically belongs to.
* This gives the user the chance to indicate on the JDF
* a data collection to inspect during GPU transfer.
* User may want info from the DC (e.g. mtype),
* & otherwise remote copies don't have any info.
*/
const parsec_flow_t *flow[PARSEC_MAX_DEVICE_FLOWS]; /* There is no consistent way to access the flows from the task_class,
* so the DSL need to provide these flows here.
*/
size_t flow_nb_elts[PARSEC_MAX_DEVICE_FLOWS]; /* for each flow, size of the data to be allocated
* on the GPU.
*/
parsec_data_collection_t *flow_dc[PARSEC_MAX_DEVICE_FLOWS]; /* for each flow, data collection from which the data
* to be transferred logically belongs to.
* This gives the user the chance to indicate on the JDF
* a data collection to inspect during GPU transfer.
* User may want info from the DC (e.g. mtype),
* & otherwise remote copies don't have any info.
*/
/* These are private and should not be used outside the device driver */
parsec_data_copy_t *sources[MAX_PARAM_COUNT]; /* If the driver decides to acquire the data from a different
* source, it will temporary store the best candidate here.
*/
parsec_data_copy_t *sources[PARSEC_MAX_DEVICE_FLOWS]; /* If the driver decides to acquire the data from a different
* source, it will temporary store the best candidate here.
*/
};
struct {
parsec_data_copy_t *copy;
Expand Down Expand Up @@ -376,7 +376,7 @@ parsec_device_kernel_scheduler( parsec_device_module_t *module,
*/
int
parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream);

/* Default stage_out function to transfer data from the GPU device.
Expand All @@ -390,7 +390,7 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
*/
int
parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream);

END_C_DECLS
Expand Down
10 changes: 5 additions & 5 deletions parsec/mca/device/transfer_gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static int
datatype_lookup_of_gpu_d2h_task( parsec_execution_stream_t * es,
const parsec_gpu_d2h_task_t* this_task,
const parsec_task_t * parent_task,
uint32_t * flow_mask,
parsec_flow_mask_t * flow_mask,
parsec_dep_data_description_t * data)
{
(void)es; (void)this_task; (void)parent_task; (void)flow_mask; (void)data;
Expand Down Expand Up @@ -183,9 +183,9 @@ int32_t parsec_gpu_d2h_max_flows = 0;
static const parsec_task_class_t parsec_gpu_d2h_task_class = {
.name = "GPU D2H data transfer",
.task_class_id = 0,
.nb_flows = MAX_PARAM_COUNT, /* This value will have an impact on the duration of the
* search for additional data to move. As this search is linear
* we need to keep this upper bound set to a reasonable value. */
.nb_flows = PARSEC_MAX_DEVICE_FLOWS, /* This value will have an impact on the duration of the
* search for additional data to move. As this search is linear
* we need to keep this upper bound set to a reasonable value. */
.nb_parameters = 1,
.nb_locals = 0,
.params = {&symb_gpu_d2h_task_param},
Expand Down Expand Up @@ -217,7 +217,7 @@ static const parsec_task_class_t parsec_gpu_d2h_task_class = {


/**
* Transfer at most the MAX_PARAM_COUNT oldest data from the GPU back
* Transfer at most the PARSEC_MAX_DEVICE_FLOWS oldest data from the GPU back
* to main memory. Create a single task to move them all out, then switch the
* GPU data copy in shared mode.
*/
Expand Down
8 changes: 4 additions & 4 deletions tests/runtime/cuda/stage_custom.jdf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern "C" %{

static int
stage_stride_in(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream){
parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
cudaError_t ret = 0;
Expand All @@ -39,7 +39,7 @@ stage_stride_in(parsec_gpu_task_t *gtask,
int elem_sz;
int i;
for(i = 0; i < task->task_class->nb_flows; i++){
if(flow_mask & (1U << i)){
if(PARSEC_CHECK_FLOW_MASK(flow_mask, i)){
copy_in = task->data[i].data_in;
copy_out = task->data[i].data_out;
dc = (parsec_tiled_matrix_t*)gtask->flow_dc[i];
Expand Down Expand Up @@ -75,7 +75,7 @@ stage_stride_in(parsec_gpu_task_t *gtask,

static int
stage_stride_out(parsec_gpu_task_t *gtask,
uint32_t flow_mask,
parsec_flow_mask_t flow_mask,
parsec_gpu_exec_stream_t *gpu_stream){
parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t*)gpu_stream;
cudaError_t ret;
Expand All @@ -86,7 +86,7 @@ stage_stride_out(parsec_gpu_task_t *gtask,
int elem_sz;
int i;
for(i = 0; i < task->task_class->nb_flows; i++){
if(flow_mask & (1U << i)){
if(PARSEC_CHECK_FLOW_MASK(flow_mask, i)){
copy_in = task->data[i].data_out;
copy_out = copy_in->original->device_copies[0];
dc = (parsec_tiled_matrix_t*)gtask->flow_dc[i];
Expand Down

0 comments on commit e3a7829

Please sign in to comment.