Skip to content

Commit

Permalink
--device autoでのGPU自動選択をさらに改善。 ( #225 )
Browse files Browse the repository at this point in the history
デバイス使用状況の情報を取得してから実際に使用するデバイスを登録するまで、ロックを解除しないようにした。そうしないと、情報取得後に一斉に同じGPUを使用することが発生しうる。
  • Loading branch information
rigaya committed Dec 2, 2024
1 parent 8a198fd commit 3bfa298
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 40 deletions.
38 changes: 24 additions & 14 deletions QSVPipeline/qsv_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3374,24 +3374,23 @@ RGY_ERR CQSVPipeline::checkGPUListByEncoder(const sInputParams *prm, std::vector
return RGY_ERR_NONE;
}

RGY_ERR CQSVPipeline::deviceAutoSelect(const sInputParams *prm, std::vector<std::unique_ptr<QSVDevice>>& gpuList) {
RGY_ERR CQSVPipeline::deviceAutoSelect(const sInputParams *prm, std::vector<std::unique_ptr<QSVDevice>>& gpuList, const RGYDeviceUsageLockManager *devUsageLock) {
if (gpuList.size() <= 1) {
return RGY_ERR_NONE;
}
int maxDeviceUsageCount = 1;
std::vector<std::pair<int, int64_t>> deviceUsage;
if (gpuList.size() > 1) {
RGYDeviceUsage devUsage;
deviceUsage = devUsage.getUsage();
deviceUsage = m_deviceUsage->getUsage(devUsageLock);
for (size_t i = 0; i < deviceUsage.size(); i++) {
maxDeviceUsageCount = std::max(maxDeviceUsageCount, deviceUsage[i].first);
if (deviceUsage[i].first > 0) {
PrintMes(RGY_LOG_DEBUG, _T("Device #%d: %d usage.\n"), i, deviceUsage[i].first);
PrintMes(RGY_LOG_INFO, _T("Device #%d: %d usage.\n"), i, deviceUsage[i].first);
}
}
}
#if ENABLE_PERF_COUNTER
PrintMes(RGY_LOG_DEBUG, _T("Auto select device from %d devices.\n"), (int)gpuList.size());
PrintMes(RGY_LOG_INFO, _T("Auto select device from %d devices.\n"), (int)gpuList.size());
bool counterIsIntialized = m_pPerfMonitor->isPerfCounterInitialized();
for (int i = 0; i < 4 && !counterIsIntialized; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
Expand Down Expand Up @@ -3431,7 +3430,7 @@ RGY_ERR CQSVPipeline::deviceAutoSelect(const sInputParams *prm, std::vector<std:
double usage_score = 100.0 * (maxDeviceUsageCount - deviceUsageCount) / (double)maxDeviceUsageCount;

gpuscore[gpu->deviceNum()] = usage_score + cc_score + ve_score + gpu_score + core_score + cl_score;
PrintMes(RGY_LOG_DEBUG, _T("GPU #%d (%s) score: %.1f: Use: %.1f, VE %.1f, GPU %.1f, CC %.1f, Core %.1f, CL %.1f.\n"), gpu->deviceNum(), gpu->name().c_str(),
PrintMes(RGY_LOG_INFO, _T("GPU #%d (%s) score: %.1f: Use: %.1f, VE %.1f, GPU %.1f, CC %.1f, Core %.1f, CL %.1f.\n"), gpu->deviceNum(), gpu->name().c_str(),
gpuscore[gpu->deviceNum()], usage_score, ve_score, gpu_score, cc_score, core_score, cl_score);
}
std::sort(gpuList.begin(), gpuList.end(), [&](const std::unique_ptr<QSVDevice> &a, const std::unique_ptr<QSVDevice> &b) {
Expand All @@ -3441,16 +3440,20 @@ RGY_ERR CQSVPipeline::deviceAutoSelect(const sInputParams *prm, std::vector<std:
return a->deviceNum() < b->deviceNum();
});

PrintMes(RGY_LOG_DEBUG, _T("GPU Priority\n"));
PrintMes(RGY_LOG_INFO, _T("GPU Priority\n"));
for (const auto &gpu : gpuList) {
PrintMes(RGY_LOG_DEBUG, _T("GPU #%d (%s): score %.1f\n"), gpu->deviceNum(), gpu->name().c_str(), gpuscore[gpu->deviceNum()]);
PrintMes(RGY_LOG_INFO, _T("GPU #%d (%s): score %.1f\n"), gpu->deviceNum(), gpu->name().c_str(), gpuscore[gpu->deviceNum()]);
}
return RGY_ERR_NONE;
}

RGY_ERR CQSVPipeline::InitSession(const sInputParams *inputParam, std::vector<std::unique_ptr<QSVDevice>>& deviceList) {
auto err = RGY_ERR_NONE;
const int deviceCount = (int)deviceList.size();
std::unique_ptr<RGYDeviceUsageLockManager> m_devUsageLock;
if (deviceList.size() > 1) {
m_deviceUsage = std::make_unique<RGYDeviceUsage>();
m_devUsageLock = m_deviceUsage->lock(); // ロックは親プロセス側でとる
}
if (deviceList.size() == 0) {
PrintMes(RGY_LOG_DEBUG, _T("No device found for QSV encoding!\n"));
return RGY_ERR_DEVICE_NOT_FOUND;
Expand All @@ -3460,16 +3463,21 @@ RGY_ERR CQSVPipeline::InitSession(const sInputParams *inputParam, std::vector<st
if ((err = checkGPUListByEncoder(inputParam, deviceList)) != RGY_ERR_NONE) {
return err;
}
if ((err = deviceAutoSelect(inputParam, deviceList)) != RGY_ERR_NONE) {
if ((err = deviceAutoSelect(inputParam, deviceList, m_devUsageLock.get())) != RGY_ERR_NONE) {
return err;
}
m_device = std::move(deviceList.front());
PrintMes(RGY_LOG_DEBUG, _T("InitSession: selected device #%d: %s.\n"), (int)m_device->deviceNum(), m_device->name().c_str());
}
if (deviceCount > 1) {
m_deviceUsage = std::make_unique<RGYDeviceUsage>();
m_deviceUsage->startProcessMonitor((int)m_device->deviceNum());
if (m_deviceUsage) {
// 登録を解除するプロセスを起動
const auto [err_run_proc, child_pid] = m_deviceUsage->startProcessMonitor((int)m_device->deviceNum());
if (err_run_proc == RGY_ERR_NONE) {
// プロセスが起動できたら、その子プロセスのIDを登録する
m_deviceUsage->add((int)m_device->deviceNum(), child_pid, m_devUsageLock.get());
}
}
m_devUsageLock.reset();

//使用できる最大のversionをチェック
m_device->mfxSession().QueryVersion(&m_mfxVer);
Expand Down Expand Up @@ -4427,7 +4435,9 @@ RGY_ERR CQSVPipeline::RunEncode2() {
PrintMes(RGY_LOG_DEBUG, _T("Write video quality metric results...\n"));
m_videoQualityMetric->showResult();
}
m_deviceUsage->close();
if (m_deviceUsage) {
m_deviceUsage->close();
}
m_pStatus->WriteResults();
if (filter_result.size()) {
PrintMes(RGY_LOG_INFO, _T("\nVpp Filter Performance\n"));
Expand Down
2 changes: 1 addition & 1 deletion QSVPipeline/qsv_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class CQSVPipeline
virtual RGY_ERR InitMfxVpp();
virtual RGY_ERR InitMfxEncode();
RGY_ERR checkGPUListByEncoder(const sInputParams *inputParam, std::vector<std::unique_ptr<QSVDevice>>& deviceList);
RGY_ERR deviceAutoSelect(const sInputParams *inputParam, std::vector<std::unique_ptr<QSVDevice>>& deviceList);
RGY_ERR deviceAutoSelect(const sInputParams *inputParam, std::vector<std::unique_ptr<QSVDevice>>& deviceList, const RGYDeviceUsageLockManager *lock);
virtual RGY_ERR InitSession(const sInputParams *inputParam, std::vector<std::unique_ptr<QSVDevice>>& deviceList);
virtual RGY_ERR InitVideoQualityMetric(sInputParams *pParams);
void applyInputVUIToColorspaceParams(sInputParams *inputParam);
Expand Down
46 changes: 25 additions & 21 deletions QSVPipeline/rgy_device_usage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ RGYDeviceUsageLockManager::~RGYDeviceUsageLockManager() {
m_header->lock = 0;
}

RGYDeviceUsage::RGYDeviceUsage() : m_sharedMem(), m_header(nullptr), m_entries(nullptr), m_monitorProcess(), m_addedEntry(false) {
RGYDeviceUsage::RGYDeviceUsage() : m_sharedMem(), m_header(nullptr), m_entries(nullptr), m_monitorProcess() {
}


Expand Down Expand Up @@ -120,6 +120,13 @@ RGY_ERR RGYDeviceUsage::open() {
return RGY_ERR_NONE;
}

std::unique_ptr<RGYDeviceUsageLockManager> RGYDeviceUsage::lock() {
if (!m_sharedMem) {
open();
}
return std::make_unique<RGYDeviceUsageLockManager>(m_header);
}

void RGYDeviceUsage::check(const time_t now_time_from_epoch) {
bool removed = false;
for (int i = 0; i < RGY_DEVICE_USAGE_MAX_ENTRY; i++) {
Expand All @@ -145,26 +152,22 @@ void RGYDeviceUsage::check(const time_t now_time_from_epoch) {
}
}

RGY_ERR RGYDeviceUsage::add(int32_t device_id) {
if (!m_sharedMem) {
open();
RGY_ERR RGYDeviceUsage::add(const int32_t device_id, const int pid, const RGYDeviceUsageLockManager *lock) {
if (!lock) {
return RGY_ERR_NOT_INITIALIZED;
}
if (m_header == nullptr || m_entries == nullptr) {
return RGY_ERR_DEVICE_NOT_FOUND;
}
const auto time_from_epoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
const auto process_id = GetCurrentProcessId();

RGYDeviceUsageLockManager lock(m_header);

check(time_from_epoch);

for (int i = 0; i < RGY_DEVICE_USAGE_MAX_ENTRY; i++) {
if (m_entries[i].process_id == 0) {
m_entries[i].process_id = process_id;
m_entries[i].process_id = pid;
m_entries[i].device_id = device_id;
m_entries[i].start_time = time_from_epoch;
m_addedEntry = true;
return RGY_ERR_NONE;
}
}
Expand All @@ -175,19 +178,21 @@ void RGYDeviceUsage::resetEntry() {
if (!m_sharedMem) {
open();
}
RGYDeviceUsageLockManager lock(m_header);
char header[RGY_DEVICE_USAGE_HEADER_STR_SIZE] = { 0 };
memcpy(header, RGY_DEVICE_USAGE_SHARED_MEM_NAME, strlen(RGY_DEVICE_USAGE_SHARED_MEM_NAME) + 1);
memset(m_header, 0, sizeof(RGYDeviceUsageHeader));
memcpy(m_header->header, header, sizeof(header));
memset(m_entries, 0, sizeof(RGYDeviceUsageEntry) * RGY_DEVICE_USAGE_MAX_ENTRY);
}

std::vector<std::pair<int, int64_t>> RGYDeviceUsage::getUsage() {
std::vector<std::pair<int, int64_t>> RGYDeviceUsage::getUsage(const RGYDeviceUsageLockManager *lock) {
std::vector<std::pair<int, int64_t>> usage;
if (!m_sharedMem) {
open();
if (!lock) {
return usage;
}
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
const auto time_from_epoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();

RGYDeviceUsageLockManager lock(m_header);
check(time_from_epoch);
for (int i = 0; i < RGY_DEVICE_USAGE_MAX_ENTRY; i++) {
if (m_entries[i].process_id == 0) {
Expand All @@ -203,7 +208,7 @@ std::vector<std::pair<int, int64_t>> RGYDeviceUsage::getUsage() {
}

void RGYDeviceUsage::release() {
if (!m_addedEntry || !m_entries) {
if (!m_entries) {
return;
}
const auto process_id = GetCurrentProcessId();
Expand All @@ -229,10 +234,9 @@ void RGYDeviceUsage::release() {
break;
}
}
m_addedEntry = false;
}

RGY_ERR RGYDeviceUsage::startProcessMonitor(int32_t device_id) {
std::pair<RGY_ERR, int> RGYDeviceUsage::startProcessMonitor(int32_t device_id) {
m_monitorProcess = createRGYPipeProcess();
m_monitorProcess->init(PIPE_MODE_ENABLE | PIPE_MODE_ENABLE_FP, PIPE_MODE_DISABLE, PIPE_MODE_DISABLE);

Expand All @@ -250,21 +254,21 @@ RGY_ERR RGYDeviceUsage::startProcessMonitor(int32_t device_id) {
};

if (auto err = m_monitorProcess->run(args, nullptr, 0, true, true); err != 0) {
return RGY_ERR_UNKNOWN;
return { RGY_ERR_UNKNOWN, 0 };
}
return RGY_ERR_NONE;
return { RGY_ERR_NONE, m_monitorProcess->pid() };
}

int processMonitorRGYDeviceUsage(const int32_t deviceID) {
int ret = 0;
RGYDeviceUsage deviceUsage;
if (deviceUsage.open() != RGY_ERR_NONE) {
fprintf(stderr, "Failed to open shared memory\n"); ret = 1;
} else if (deviceUsage.add(deviceID) != RGY_ERR_NONE) {
fprintf(stderr, "Failed to add entry\n"); ret = 1;
} else {
char buf = 0;
ret = (int)fread(&buf, 1, 1, stdin);
// 親プロセスが行った登録を解除 (子プロセスのIDで登録されている)
deviceUsage.release();
}
return ret;
}
Expand Down
8 changes: 4 additions & 4 deletions QSVPipeline/rgy_device_usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,19 @@ class RGYDeviceUsage {
~RGYDeviceUsage();

RGY_ERR open();
RGY_ERR add(int32_t device_id);
RGY_ERR add(const int32_t device_id, const int pid, const RGYDeviceUsageLockManager *lock);
void check(const time_t now_time_from_epoch);
void release();
void close();
void resetEntry();
RGY_ERR startProcessMonitor(int32_t device_id);
std::vector<std::pair<int, int64_t>> getUsage();
std::pair<RGY_ERR, int> startProcessMonitor(int32_t device_id);
std::vector<std::pair<int, int64_t>> getUsage(const RGYDeviceUsageLockManager *lock);
std::unique_ptr<RGYDeviceUsageLockManager> lock();
protected:
std::unique_ptr<RGYSharedMem> m_sharedMem;
RGYDeviceUsageHeader *m_header;
RGYDeviceUsageEntry *m_entries;
std::unique_ptr<RGYPipeProcess> m_monitorProcess;
bool m_addedEntry;
};

int processMonitorRGYDeviceUsage(const int32_t deviceID);
Expand Down
6 changes: 6 additions & 0 deletions QSVPipeline/rgy_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,15 @@ void RGYPipeProcessWin::close() {
memset(&m_pi, 0, sizeof(m_pi));
}

int RGYPipeProcessWin::pid() const {
return m_pi.dwProcessId;
}

bool RGYPipeProcessWin::processAlive() {
return WAIT_TIMEOUT == WaitForSingleObject(m_phandle, 0);
}


#endif //defined(_WIN32) || defined(_WIN64)


Expand Down
3 changes: 3 additions & 0 deletions QSVPipeline/rgy_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RGYPipeProcess {
virtual int stdErrFpClose() = 0;
virtual int wait(uint32_t timeout) = 0;
virtual int waitAndGetExitCode() = 0;
virtual int pid() const = 0;
protected:
virtual int startPipes() = 0;
PROCESS_HANDLE m_phandle;
Expand Down Expand Up @@ -141,6 +142,7 @@ class RGYPipeProcessWin : public RGYPipeProcess {
virtual int stdErrFpClose() override;
virtual int wait(uint32_t timeout) override;
virtual int waitAndGetExitCode() override;
virtual int pid() const override;
const PROCESS_INFORMATION& getProcessInfo();
protected:
virtual int startPipes() override;
Expand Down Expand Up @@ -171,6 +173,7 @@ class RGYPipeProcessLinux : public RGYPipeProcess {
virtual int stdErrFpClose() override;
virtual int wait(uint32_t timeout) override;
virtual int waitAndGetExitCode() override;
virtual int pid() const override;
protected:
virtual int startPipes() override;
};
Expand Down
5 changes: 5 additions & 0 deletions QSVPipeline/rgy_pipe_linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,9 @@ int RGYPipeProcessLinux::wait(uint32_t timeout) {
return status;

}

int RGYPipeProcessLinux::pid() const {
return (int)m_phandle;
}

#endif //#if !(defined(_WIN32) || defined(_WIN64))

0 comments on commit 3bfa298

Please sign in to comment.