Skip to content

Commit

Permalink
Merge pull request #711 from devreal/data_copy_alloc_callbacks
Browse files Browse the repository at this point in the history
Allocate host copy on demand once data is moved to the host
  • Loading branch information
abouteiller authored Feb 19, 2025
2 parents ad7a882 + 04f24df commit 758dc3c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 6 deletions.
6 changes: 6 additions & 0 deletions parsec/data.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static void parsec_data_copy_construct(parsec_data_copy_t* obj)
obj->arena_chunk = NULL;
obj->data_transfer_status = PARSEC_DATA_STATUS_NOT_TRANSFER;
obj->dtt = PARSEC_DATATYPE_NULL;
obj->alloc_cb = NULL;
obj->release_cb = NULL;
PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Allocate data copy %p", obj);
}

Expand All @@ -52,6 +54,10 @@ static void parsec_data_copy_destruct(parsec_data_copy_t* obj)
* obj is already detached from obj->original, but this frees the arena chunk */
parsec_arena_release(obj);
}

if (NULL != obj->release_cb) {
obj->release_cb(obj, obj->device_index);
}
}

PARSEC_OBJ_CLASS_INSTANCE(parsec_data_copy_t, parsec_list_item_t,
Expand Down
4 changes: 4 additions & 0 deletions parsec/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ typedef uint8_t parsec_data_status_t;
typedef uint8_t parsec_data_flag_t;
#define PARSEC_DATA_FLAG_ARENA ((parsec_data_flag_t)1<<0)
#define PARSEC_DATA_FLAG_TRANSIT ((parsec_data_flag_t)1<<1)
#define PARSEC_DATA_FLAG_EVICTED ((parsec_data_flag_t)1<<5)
#define PARSEC_DATA_FLAG_PARSEC_MANAGED ((parsec_data_flag_t)1<<6)
#define PARSEC_DATA_FLAG_PARSEC_OWNED ((parsec_data_flag_t)1<<7)

typedef void (parsec_data_copy_alloc_cb) (parsec_data_copy_t*, int device);
typedef void (parsec_data_copy_release_cb)(parsec_data_copy_t*, int device);

/**
* Initialize the PaRSEC data infrastructure
*/
Expand Down
4 changes: 3 additions & 1 deletion parsec/data_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ struct parsec_data_copy_s {
* so for many reasons, not necessarily because a transfer is ongoing.
* We use this transfer_status to guard scheduling multiple transfers
* on the same data. */
parsec_datatype_t dtt; /**< the appropriate type for the network engine to send an element */
parsec_datatype_t dtt; /**< the appropriate type for the network engine to send an element */
parsec_data_copy_alloc_cb *alloc_cb; /**< callback to allocate data copy memory */
parsec_data_copy_release_cb *release_cb; /**< callback to release data copy memory */
};

#define PARSEC_DATA_CREATE_ON_DEMAND ((parsec_data_copy_t*)(intptr_t)(-1))
Expand Down
56 changes: 51 additions & 5 deletions parsec/mca/device/device_gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask,
for(int i = 0; i < task->task_class->nb_flows; i++) {
if( !(flow_mask & (1U << i)) ) continue;
source = gtask->sources[i];
assert(source->device_private != NULL);
dest = task->data[i].data_out;
src_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(source->device_index);
dst_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(dest->device_index);
Expand Down Expand Up @@ -1238,6 +1239,12 @@ parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask,
dir = parsec_device_gpu_transfer_direction_d2d;
} else {
dir = parsec_device_gpu_transfer_direction_d2h;
if (dest->device_private == NULL && dest->alloc_cb != NULL) {
dest->alloc_cb(dest, 0); // allocate on host
}
if (dest->device_private == NULL) {
return PARSEC_HOOK_RETURN_ERROR;
}
}
ret = src_dev->memcpy_async( src_dev, gpu_stream,
dest->device_private,
Expand Down Expand Up @@ -1662,6 +1669,48 @@ parsec_device_callback_complete_push(parsec_device_gpu_module_t *gpu_device,
parsec_data_end_transfer_ownership_to_copy(task->data[i].data_out->original,
gpu_device->super.device_index,
flow->flow_flags);

parsec_data_copy_t* source = gtask->sources[i];
parsec_device_gpu_module_t *src_device =
(parsec_device_gpu_module_t*)parsec_mca_device_get( source->device_index );
if (task->data[i].data_in->flags & PARSEC_DATA_FLAG_EVICTED) {
/**
* The device copy had been evicted to the host and brought back in.
* If this is the only device on which that data is used we can release
* the host memory back to the application. If there are other devices
* we cannot release the host memory because the data may actually be used
* by a host task (e.g., after being sent there from a different device)
* or be used as input to the other device.
*/
parsec_data_copy_t *cpu_copy = task->data[i].data_out->original->device_copies[0];
parsec_data_copy_t *gpu_copy = task->data[i].data_out;
parsec_data_t *original = task->data[i].data_out->original;
/* release host memory if requested */
if (cpu_copy->device_private != NULL &&
cpu_copy->release_cb != NULL) {
bool may_release = true;
/* check if there are any other device copies */
for (uint32_t i = 1; i < parsec_nb_devices; ++i) {
parsec_data_copy_t *copy = original->device_copies[i];
if (NULL != copy && copy != gpu_copy) {
may_release = false;
break;
}
}
if (may_release) {
PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream,
"GPU[%d:%s]: CPU copy %p [ref_count %d] memory %p eager release",
gpu_device->super.device_index, gpu_device->super.name,
cpu_copy, cpu_copy->super.super.obj_reference_count, cpu_copy->device_private);
cpu_copy->release_cb(cpu_copy, 0);
cpu_copy->device_private = NULL;
}
}
task->data[i].data_in->flags ^= PARSEC_DATA_FLAG_EVICTED;
}

parsec_atomic_unlock(&task->data[i].data_out->original->lock);

#if defined(PARSEC_PROF_TRACE)
if(gpu_device->trackable_events & PARSEC_PROFILE_GPU_TRACK_DATA_IN) {
PARSEC_PROFILING_TRACE(gpu_stream->profiling,
Expand All @@ -1671,10 +1720,6 @@ parsec_device_callback_complete_push(parsec_device_gpu_module_t *gpu_device,
NULL);
}
#endif
parsec_atomic_unlock(&task->data[i].data_out->original->lock);
parsec_data_copy_t* source = gtask->sources[i];
parsec_device_gpu_module_t *src_device =
(parsec_device_gpu_module_t*)parsec_mca_device_get( source->device_index );
if( PARSEC_DEV_IS_GPU(src_device->super.type) ) {
int om;
while(1) {
Expand Down Expand Up @@ -2011,7 +2056,8 @@ parsec_device_kernel_push( parsec_device_gpu_module_t *gpu_device,
if( NULL != this_task->data[i].data_out &&
(0 == (this_task->data[i].data_out->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ) ) continue;

assert( NULL != parsec_data_copy_get_ptr(this_task->data[i].data_in) );
assert( NULL != parsec_data_copy_get_ptr(this_task->data[i].data_in)
|| NULL != this_task->data[i].data_in->alloc_cb );

PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream,
"GPU[%d:%s]:\t\tIN Data of %s <%x> on GPU",
Expand Down
5 changes: 5 additions & 0 deletions parsec/mca/device/transfer_gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,15 @@ int parsec_gpu_complete_w2r_task(parsec_device_gpu_module_t *gpu_device,
PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream,
"D2H[%d:%s] task %p:%i GPU data copy %p [%p] has a backup in memory",
gpu_device->super.device_index, gpu_device->super.name, (void*)task, i, gpu_copy, gpu_copy->original);
if (cpu_copy->release_cb != NULL) {
/* the data is used again so release the host copy */
cpu_copy->release_cb(cpu_copy, 0);
}
} else {
gpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED;
cpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED;
cpu_copy->version = gpu_copy->version;
cpu_copy->flags |= PARSEC_DATA_FLAG_EVICTED;
PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream,
"D2H[%d:%s]: CPU copy %p gets the same version %d as GPU copy %p at %s:%d",
gpu_device->super.device_index, gpu_device->super.name,
Expand Down

0 comments on commit 758dc3c

Please sign in to comment.