Skip to content

Commit

Permalink
Minor changes to CUDA support and Google trace
Browse files Browse the repository at this point in the history
The Google trace support needs to be refactored, but otherwise
this seems to be working.
  • Loading branch information
khuck committed Jul 27, 2020
1 parent 1b116fe commit cae5e2c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 26 deletions.
44 changes: 36 additions & 8 deletions src/apex/activity_trace_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
#include <stack>
#include <unordered_map>
#include <mutex>
#include <atomic>
#include "apex.hpp"
#include "profiler.hpp"
#include "thread_instance.hpp"
#include "apex_options.hpp"
#include "trace_event_listener.hpp"

static void __attribute__((constructor)) initTrace(void);
//static void __attribute__((destructor)) flushTrace(void);
Expand Down Expand Up @@ -43,6 +46,11 @@ static uint64_t startTimestamp;
/* The callback subscriber */
CUpti_SubscriberHandle subscriber;

/* The buffer count */
std::atomic<uint64_t> num_buffers{0};
std::atomic<uint64_t> num_buffers_processed{0};
bool flushing{false};

/* The map that holds correlation IDs and matches them to GUIDs */
std::unordered_map<uint32_t, std::shared_ptr<apex::task_wrapper>> correlation_map;
std::mutex map_mutex;
Expand Down Expand Up @@ -77,7 +85,7 @@ class foo : public std::set<std::string> {
*/

void store_profiler_data(const std::string &name, uint32_t correlationId,
uint64_t start, uint64_t end) {
uint64_t start, uint64_t end, uint32_t device, uint32_t context, uint32_t stream) {
// Get the singleton APEX instance
static apex::apex* instance = apex::apex::instance();
//static foo kernels;
Expand All @@ -103,6 +111,12 @@ void store_profiler_data(const std::string &name, uint32_t correlationId,
//prof->tt_ptr = tt;
// fake out the profiler_listener
instance->the_profiler_listener->push_profiler_public(prof);
if (apex::apex_options::use_trace_event()) {
apex::trace_event_listener * tel =
(apex::trace_event_listener*)instance->the_trace_event_listener;
tel->on_async_event(device, context, stream, prof);
}

// have the listeners handle the end of this task
instance->complete_task(tt);
}
Expand Down Expand Up @@ -238,6 +252,7 @@ printActivity(CUpti_Activity *record)
{
switch (record->kind)
{
#if 0
case CUPTI_ACTIVITY_KIND_DEVICE:
{
CUpti_ActivityDevice2 *device = (CUpti_ActivityDevice2 *) record;
Expand All @@ -251,7 +266,6 @@ printActivity(CUpti_Activity *record)
device->numMultiprocessors, (unsigned int) (device->coreClockRate / 1000));
break;
}
#if 0
case CUPTI_ACTIVITY_KIND_DEVICE_ATTRIBUTE:
{
CUpti_ActivityDeviceAttribute *attribute = (CUpti_ActivityDeviceAttribute *)record;
Expand Down Expand Up @@ -282,7 +296,7 @@ printActivity(CUpti_Activity *record)
memcpy->correlationId, memcpy->runtimeCorrelationId);
#endif
std::string name{getMemcpyKindString((CUpti_ActivityMemcpyKind) memcpy->copyKind)};
store_profiler_data(name, memcpy->correlationId, memcpy->start, memcpy->end);
store_profiler_data(name, memcpy->correlationId, memcpy->start, memcpy->end, memcpy->deviceId, memcpy->contextId, 0);
if (apex::apex_options::use_cuda_counters()) {
store_counter_data("GPU: Bytes", name, memcpy->end, memcpy->bytes);
uint64_t duration = memcpy->end - memcpy->start;
Expand All @@ -302,7 +316,7 @@ printActivity(CUpti_Activity *record)
(unsigned long long) (memcpy->end - startTimestamp),
memcpy->deviceId, memcpy->contextId, memcpy->streamId,
memcpy->correlationId, memcpy->runtimeCorrelationId);
store_profiler_data(getMemcpyKindString((CUpti_ActivityMemcpyKind) memcpy->copyKind),
store_profiler_data(getMemcpyKindString((CUpti_ActivityMemcpyKind) memcpy->copyKind, memcpy->deviceId, memcpy->contextId, 0),
memcpy->correlationId, memcpy->start, memcpy->end);
break;
}
Expand All @@ -319,7 +333,7 @@ printActivity(CUpti_Activity *record)
memset->correlationId);
#endif
static std::string name{"Memset"};
store_profiler_data(name, memset->correlationId, memset->start, memset->end);
store_profiler_data(name, memset->correlationId, memset->start, memset->end, memset->deviceId, memset->contextId, 0);
break;
}
case CUPTI_ACTIVITY_KIND_KERNEL:
Expand All @@ -344,7 +358,7 @@ printActivity(CUpti_Activity *record)
#endif
//std::string * tmp = apex::demangle(kernel->name);
std::string tmp = std::string(kernel->name);
store_profiler_data(tmp, kernel->correlationId, kernel->start, kernel->end);
store_profiler_data(tmp, kernel->correlationId, kernel->start, kernel->end, kernel->deviceId, kernel->contextId, kernel->streamId);
if (apex::apex_options::use_cuda_counters()) {
//store_counter_data("GPU: Block X", tmp, kernel->end, kernel->blockX);
//store_counter_data("GPU: Block Y", tmp, kernel->end, kernel->blockY);
Expand Down Expand Up @@ -453,6 +467,7 @@ printActivity(CUpti_Activity *record)

void CUPTIAPI bufferRequested(uint8_t **buffer, size_t *size, size_t *maxNumRecords)
{
num_buffers++;
uint8_t *bfr = (uint8_t *) malloc(BUF_SIZE + ALIGN_SIZE);
if (bfr == NULL) {
printf("Error: out of memory\n");
Expand All @@ -467,6 +482,8 @@ void CUPTIAPI bufferRequested(uint8_t **buffer, size_t *size, size_t *maxNumReco
void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t *buffer, size_t size, size_t validSize)
{
static bool registered = register_myself();
num_buffers_processed++;
if (flushing) { std::cout << "." << std::flush; }
APEX_UNUSED(registered);
CUptiResult status;
CUpti_Activity *record = NULL;
Expand Down Expand Up @@ -612,19 +629,19 @@ void initTrace() {
// e.g. to be applied to all device buffer allocations (see documentation).
CUPTI_CALL(cuptiActivityGetAttribute(
CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE, &attrValueSize, &attrValue));
attrValue = attrValue / 4;
printf("%s = %llu\n",
"CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE",
(long long unsigned)attrValue);
attrValue *= 2;
CUPTI_CALL(cuptiActivitySetAttribute(
CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE, &attrValueSize, &attrValue));

CUPTI_CALL(cuptiActivityGetAttribute(
CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT, &attrValueSize, &attrValue));
attrValue = attrValue * 4;
printf("%s = %llu\n",
"CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT",
(long long unsigned)attrValue);
attrValue *= 2;
CUPTI_CALL(cuptiActivitySetAttribute(
CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT, &attrValueSize, &attrValue));

Expand All @@ -647,6 +664,17 @@ void initTrace() {
* that APEX will call directly. */
namespace apex {
void flushTrace(void) {
flushing = true;
bool progress{false};
if (num_buffers_processed < num_buffers) {
progress = true;
std::cout << "Flushing remaining " << std::fixed
<< num_buffers-num_buffers_processed << " of " << num_buffers
<< " CUDA/CUPTI buffers..." << std::endl;
}
cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_NONE);
if (progress) {
std::cout << std::endl;
}
}
}
5 changes: 3 additions & 2 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,9 @@ void flushTrace(void);

void finalize()
{
apex* instance = apex::instance(); // get the Apex static instance
if (!instance) { FUNCTION_EXIT return; } // protect against calls after finalization
instance->the_profiler_listener->stop_main_timer();
#ifdef APEX_WITH_CUDA
flushTrace();
#endif
Expand Down Expand Up @@ -1390,8 +1393,6 @@ void finalize()
dump(false);
// if not done already...
shutdown_throttling();
apex* instance = apex::instance(); // get the Apex static instance
if (!instance) { FUNCTION_EXIT return; } // protect against calls after finalization
stop_all_async_threads();
exit_thread();
if (!_measurement_stopped)
Expand Down
8 changes: 6 additions & 2 deletions src/apex/profiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,18 @@ class profiler {
uint64_t stamp = duration_cast<nanoseconds>(MYCLOCK::now().time_since_epoch()).count();
return stamp;
}
double elapsed(void) {
double elapsed(bool scaled = false) {
if(is_counter) {
return value;
} else {
using namespace std::chrono;
duration<double> time_span =
duration_cast<duration<double>>(end - start);
return time_span.count();
if (scaled) {
return time_span.count()*get_cpu_mhz();
} else {
return time_span.count();
}
}
}
double exclusive_elapsed(void) {
Expand Down
16 changes: 15 additions & 1 deletion src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,13 @@ std::unordered_set<profile*> free_profiles;
#ifdef APEX_HAVE_HPX
num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
#endif
/*
std::chrono::duration<double> time_span =
std::chrono::duration_cast<std::chrono::duration<double>>
(MYCLOCK::now() - main_timer->start);
double total_main = time_span.count() *
*/
double total_main = main_timer->elapsed(true) *
fmin(hardware_concurrency(), num_worker_threads);
double elapsed = total_main - non_idle_time;
elapsed = elapsed > 0.0 ? elapsed : 0.0;
Expand All @@ -204,10 +207,13 @@ std::unordered_set<profile*> free_profiles;
#ifdef APEX_HAVE_HPX
num_worker_threads = num_worker_threads - num_non_worker_threads_registered;
#endif
/*
std::chrono::duration<double> time_span =
std::chrono::duration_cast<std::chrono::duration<double>>
(MYCLOCK::now() - main_timer->start);
double total_main = time_span.count() *
*/
double total_main = main_timer->elapsed(true) *
fmin(hardware_concurrency(), num_worker_threads);
double elapsed = total_main - non_idle_time;
double rate = elapsed > 0.0 ? ((elapsed/total_main)) : 0.0;
Expand Down Expand Up @@ -1338,7 +1344,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
if (_done) { return; }

// stop the main timer, and process that profile?
main_timer->stop(true);
stop_main_timer();
push_profiler((unsigned int)thread_instance::get_id(), main_timer);

// trigger statistics updating
Expand Down Expand Up @@ -1740,6 +1746,14 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl

}

void profiler_listener::stop_main_timer(void) {
static bool stopped{false};
if (!stopped) {
main_timer->stop(true);
stopped = true;
}
}

}

#ifdef APEX_HAVE_HPX
Expand Down
1 change: 1 addition & 0 deletions src/apex/profiler_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class profiler_listener : public event_listener {
void push_profiler_public(std::shared_ptr<profiler> &p) {
push_profiler(0, p);
}
void stop_main_timer(void);
};

}
Expand Down
53 changes: 42 additions & 11 deletions src/apex/trace_event_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ namespace apex {
bool trace_event_listener::_initialized(false);

trace_event_listener::trace_event_listener (void) : _terminate(false),
saved_node_id(apex::instance()->get_node_id()) {
saved_node_id(apex::instance()->get_node_id()), num_events(0) {
std::stringstream ss;
ss << "trace_events." << saved_node_id << ".json";
trace.open(ss.str());
trace_file.open(ss.str());
trace << fixed << "{\n";
trace << "\"displayTimeUnit\": \"ns\",\n";
trace << "\"traceEvents\": [\n";
Expand All @@ -33,7 +33,8 @@ trace_event_listener::~trace_event_listener (void) {
trace << "{\"name\": \"program\", \"cat\": \"PERF\", \"ph\": \"E\", \"pid\": \"" <<
saved_node_id << "\", \"tid\": \"0\", \"ts\": \"" << profiler::get_time() << "\"}\n";
trace << "]\n}\n" << std::endl;
trace.close();
flush_trace();
trace_file.close();
_initialized = true;
}

Expand Down Expand Up @@ -81,12 +82,15 @@ void trace_event_listener::on_exit_thread(event_data &data) {

inline bool trace_event_listener::_common_start(std::shared_ptr<task_wrapper> &tt_ptr) {
if (!_terminate) {
_mutex.lock();
trace << "{\"name\": \"" << tt_ptr->task_id->get_name()
std::stringstream ss;
ss << "{\"name\": \"" << tt_ptr->task_id->get_name()
<< "\", \"cat\": \"PERF\", \"ph\": \"B\", \"pid\": \""
<< saved_node_id << "\", \"tid\": " << thread_instance::get_id()
<< ", \"ts\": \"" << tt_ptr->prof->get_start() << "\"},\n";
_mutex.lock();
trace << ss.rdbuf();
_mutex.unlock();
flush_trace_if_necessary();
} else {
return false;
}
Expand All @@ -103,12 +107,15 @@ bool trace_event_listener::on_resume(std::shared_ptr<task_wrapper> &tt_ptr) {

inline void trace_event_listener::_common_stop(std::shared_ptr<profiler> &p) {
if (!_terminate) {
_mutex.lock();
trace << "{\"name\": \"" << p->get_task_id()->get_name()
std::stringstream ss;
ss << "{\"name\": \"" << p->get_task_id()->get_name()
<< "\", \"cat\": \"PERF\", \"ph\": \"E\", \"pid\": \"" << saved_node_id
<< "\", \"tid\": " << thread_instance::get_id()
<< ", \"ts\": \"" << p->get_stop() << "\"},\n";
_mutex.lock();
trace << ss.rdbuf();
_mutex.unlock();
flush_trace_if_necessary();
}
return;
}
Expand Down Expand Up @@ -159,25 +166,49 @@ std::string trace_event_listener::make_tid (uint32_t device, uint32_t context, u
vthread_map.insert(std::pair<cuda_thread_node, size_t>(tmp,vthread_map.size()));
}
tid = vthread_map[tmp];
APEX_UNUSED(tid);
std::stringstream ss;
ss << "GPU: " << tid;
ss << "\"GPU Dev:" << device << " Ctx:" << context;
if (stream > 0) {
ss << " Str:" << stream;
}
ss << "\"";
std::string label{ss.str()};
return label;
}

void trace_event_listener::on_async_event(uint32_t device, uint32_t context,
uint32_t stream, std::shared_ptr<profiler> &p) {
if (!_terminate) {
_mutex.lock();
trace << "{\"name\": \"" << p->get_task_id()->get_name()
std::stringstream ss;
ss << "{\"name\": \"" << p->get_task_id()->get_name()
<< "\", \"cat\": \"PERF\", \"ph\": \"B\", \"pid\": \""
<< saved_node_id << "\", \"tid\": " << make_tid(device, context, stream)
<< ", \"ts\": \"" << p->get_start() << "\"},\n";
trace << "{\"name\": \"" << p->get_task_id()->get_name()
ss << "{\"name\": \"" << p->get_task_id()->get_name()
<< "\", \"cat\": \"PERF\", \"ph\": \"E\", \"pid\": \"" << saved_node_id
<< "\", \"tid\": " << make_tid(device, context, stream)
<< ", \"ts\": \"" << p->get_stop() << "\"},\n";
_mutex.lock();
trace << ss.rdbuf();
_mutex.unlock();
flush_trace_if_necessary();
}
}

void trace_event_listener::flush_trace(void) {
_mutex.lock();
// flush the trace
trace_file << trace.rdbuf() << std::flush;
// reset the buffer
trace.str("");
_mutex.unlock();
}

void trace_event_listener::flush_trace_if_necessary(void) {
auto tmp = ++num_events;
if (tmp % 1000000 == 0) {
flush_trace();
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/apex/trace_event_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <iostream>
#include <fstream>
#include <map>
#include <atomic>

namespace apex {

Expand Down Expand Up @@ -74,13 +75,17 @@ class trace_event_listener : public event_listener {
private:
void _init(void);
bool _terminate;
void flush_trace(void);
void flush_trace_if_necessary(void);
bool _common_start(std::shared_ptr<task_wrapper> &tt_ptr);
void _common_stop(std::shared_ptr<profiler> &p);
static bool _initialized;
std::ofstream trace;
int saved_node_id;
std::atomic<size_t> num_events;
std::ofstream trace_file;
std::stringstream trace;
std::mutex _mutex;
std::map<cuda_thread_node, size_t> vthread_map;
int saved_node_id;
};

int initialize_worker_thread_for_tau(void);
Expand Down

0 comments on commit cae5e2c

Please sign in to comment.