Skip to content

Commit

Permalink
Move common multi-threading code specific to capture projects to capt…
Browse files Browse the repository at this point in the history
…urelib library from screencapture RecordingThread, WiP #73
  • Loading branch information
vmdocua committed Feb 7, 2024
1 parent b9a4d31 commit 033dfbc
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 96 deletions.
196 changes: 196 additions & 0 deletions Capture/capturelib/include/CaptureThreading.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#ifndef CAPTURE_WORKERTHREAD_H
#define CAPTURE_WORKERTHREAD_H

#include <iostream>
#include <atomic>
#include <thread>
#include "CaptureLib.h"

namespace reprostim {

//////////////////////////////////////////////////////////////////////////
// WorkerThread template

template<typename T>
class WorkerThread {

private:
void runInternal();

protected:
const T m_params;
std::atomic<bool> m_running;
std::atomic<bool> m_terminated;
bool verbose;

public:
WorkerThread(const T &params);
virtual ~WorkerThread();

const T& getParams() const;
bool isRunning() const;
bool isTerminated() const;
void run();
void start();
void stop();

// static helpers
static WorkerThread<T>* newInstance(const T &params);
static void deleteInstance(WorkerThread<T>* p);
};

//////////////////////////////////////////////////////////////////////////
// WorkerThread Implementation

template<typename T>
WorkerThread<T>::WorkerThread(const T &params) : m_params(params) {
verbose = params.verbose;
m_running = false;
m_terminated = false;
}

template<typename T>
WorkerThread<T>::~WorkerThread() {
}

template<typename T>
inline const T& WorkerThread<T>::getParams() const {
return m_params;
}

template<typename T>
inline bool WorkerThread<T>::isRunning() const {
return m_running;
}

template<typename T>
inline bool WorkerThread<T>::isTerminated() const {
return m_terminated;
}

template<typename T>
void WorkerThread<T>::run() {
// provide own template specialization
_INFO("WorkerThread::run: not implemented");
}

template<typename T>
void WorkerThread<T>::runInternal() {
m_running = true;
try {
run();
} catch (std::exception &e) {
_ERROR("WorkerThread::runInternal: " << e.what());
} catch (...) {
_ERROR("WorkerThread::runInternal: unknown exception");
}
m_running = false;
}

template<typename T>
void WorkerThread<T>::start() {
m_running = false;
m_terminated = false;
std::thread t(&WorkerThread::runInternal, this);
for (int i = 0; i < 10; i++) {
SLEEP_MS(100);
if (m_running) {
break;
}
}
t.detach();
}

template<typename T>
void WorkerThread<T>::stop() {
m_terminated = true;
for (int i = 0; i < 10; i++) {
SLEEP_MS(100);
if (!m_running) {
break;
}
}
}

template<typename T>
inline WorkerThread<T>* WorkerThread<T>::newInstance(const T &params) {
return new WorkerThread<T>(params);
}

template<typename T>
inline void WorkerThread<T>::deleteInstance(WorkerThread<T>* p) {
if( p!= nullptr ) {
delete p;
}
}


//////////////////////////////////////////////////////////////////////////
// SingleThreadExecutor

template<typename T>
class SingleThreadExecutor {
private:
T* m_pCur;
T* m_pPrev;

void safeDelete(T* &p);

public:
SingleThreadExecutor();
~SingleThreadExecutor();

void schedule(T* pThread);
void shutdown();
};

template<typename T>
inline SingleThreadExecutor<T>::SingleThreadExecutor() {
m_pCur = nullptr;
m_pPrev = nullptr;
}

template<typename T>
inline SingleThreadExecutor<T>::~SingleThreadExecutor() {
safeDelete(m_pCur);
safeDelete(m_pPrev);
}

template<typename T>
inline void SingleThreadExecutor<T>::safeDelete(T* &p) {
if( p ) {
if( p->isRunning() ) {
p->stop();
}
if( !p->isRunning() ) {
T::deleteInstance(p);
} else {
// NOTE: memory-leaks are possible in rare conditions
_INFO("Failed to stop worker thread: " << p);
}
p = nullptr;
}
}

template<typename T>
void SingleThreadExecutor<T>::schedule(T* pThread) {
safeDelete(m_pPrev);
m_pPrev = m_pCur;
safeDelete(m_pPrev);

m_pCur = pThread;

if( m_pCur!= nullptr && !m_pCur->isRunning()) {
m_pCur->start();
}
}

template<typename T>
inline void SingleThreadExecutor<T>::shutdown() {
schedule(nullptr);
schedule(nullptr);
}

}

#endif //CAPTURE_WORKERTHREAD_H
48 changes: 7 additions & 41 deletions Capture/screencapture/RecordingThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ inline int calcFrameDiff(unsigned char *f1, unsigned char *f2, size_t len) {
return difference;
}

int recordScreens(const RecordingParams& rp, std::atomic<bool>& terminated) {
int recordScreens(const RecordingParams& rp, std::function<bool()> isTerminated) {
bool verbose = rp.verbose;
_VERBOSE("recordScreens enter, sessionId=" << rp.sessionId);
int fd = open(rp.videoDevPath.c_str(), O_RDWR);
Expand Down Expand Up @@ -125,7 +125,7 @@ int recordScreens(const RecordingParams& rp, std::atomic<bool>& terminated) {

// Capturing and comparing loop
while (true) {
if( terminated ) {
if( isTerminated() ) {
_INFO("Capture terminated for session " << rp.sessionId);
break;
}
Expand Down Expand Up @@ -230,46 +230,12 @@ int recordScreens(const RecordingParams& rp, std::atomic<bool>& terminated) {


////////////////////////////////////////////////////////////////////////
RecordingThread::RecordingThread(const RecordingParams &params) : m_params(params) {
verbose = params.verbose;
m_running = false;
m_terminate = false;
}

RecordingThread::~RecordingThread() {

}

// specialization/override for default WorkerThread::run
template<>
void RecordingThread::run() {
m_running = true;
recordScreens(m_params, m_terminate);
m_running = false;
}

void RecordingThread::start() {
m_running = false;
m_terminate = false;
std::thread t(&RecordingThread::run, this);
for (int i = 0; i < 10; i++) {
SLEEP_MS(100);
if (m_running) {
break;
}
}
t.detach();
}

void RecordingThread::stop() {
m_terminate = true;
for (int i = 0; i < 10; i++) {
SLEEP_MS(100);
if (!m_running) {
break;
}
}
}

bool RecordingThread::isRunning() const {
return m_running;
recordScreens(m_params,
[this]() { return isTerminated(); }
);
}

26 changes: 4 additions & 22 deletions Capture/screencapture/RecordingThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

#include <iostream>
#include <atomic>
#include "CaptureThreading.h"

using namespace reprostim;

// Screen capture params shared between threads
// make sure it's thread-safe in usage
struct RecordingParams {
bool verbose;
int sessionId;
Expand All @@ -18,27 +21,6 @@ struct RecordingParams {
int intervalMs;
};


class RecordingThread {

private:
const RecordingParams m_params;
std::atomic<bool> m_running;
std::atomic<bool> m_terminate;
bool verbose;

void run();

public:
RecordingThread(const RecordingParams &params);

virtual ~RecordingThread();

bool isRunning() const;

void start();

void stop();
};
using RecordingThread = WorkerThread<RecordingParams>;

#endif //CAPTURE_RECORDINGTHREAD_H
33 changes: 4 additions & 29 deletions Capture/screencapture/ScreenCapture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,16 @@ using namespace reprostim;

static std::atomic<int> g_activeSessionId(0);

inline void rtSafeDelete(RecordingThread* &prt) {
if( prt ) {
if( prt->isRunning() ) {
prt->stop();
}
if( !prt->isRunning() ) {
delete prt;
} else {
// NOTE: memory-leaks are possible in rare conditions
_INFO("Failed to stop recording thread: " << prt);
}
prt = nullptr;
}
}

////////////////////////////////////////////////////////////////////////////
// ScreenCaptureApp class

ScreenCaptureApp::ScreenCaptureApp() {
appName = "reprostim-screencapture";
audioEnabled = false;
m_prtCur = nullptr;
m_prtPrev = nullptr;
}

ScreenCaptureApp::~ScreenCaptureApp() {
rtSafeDelete(m_prtCur);
rtSafeDelete(m_prtPrev);
m_recExec.shutdown();
}

void ScreenCaptureApp::onCaptureStart() {
Expand All @@ -48,11 +30,7 @@ void ScreenCaptureApp::onCaptureStart() {
SLEEP_MS(200);
recording = 1;

rtSafeDelete(m_prtPrev);
m_prtPrev = m_prtCur;
rtSafeDelete(m_prtPrev);

m_prtCur = new RecordingThread(RecordingParams{
RecordingThread* pt = RecordingThread::newInstance(RecordingParams{
opts.verbose,
sessionId,
vssCur.cx, vssCur.cy,
Expand All @@ -63,16 +41,13 @@ void ScreenCaptureApp::onCaptureStart() {
m_scOpts.interval_ms
});

m_prtCur->start();
m_recExec.schedule(pt);
}

void ScreenCaptureApp::onCaptureStop(const std::string& message) {
if( recording>0 ) {
_INFO("Stop recording snapshots for session " << g_activeSessionId );
rtSafeDelete(m_prtPrev);
m_prtPrev = m_prtCur;
rtSafeDelete(m_prtPrev);
m_prtCur = nullptr;
m_recExec.schedule(nullptr);
recording = 0;
SLEEP_SEC(1);
}
Expand Down
5 changes: 2 additions & 3 deletions Capture/screencapture/ScreenCapture.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ struct ScreenCaptureOpts {

class ScreenCaptureApp: public CaptureApp {
private:
RecordingThread* m_prtCur;
RecordingThread* m_prtPrev;
ScreenCaptureOpts m_scOpts;
SingleThreadExecutor<RecordingThread> m_recExec;
ScreenCaptureOpts m_scOpts;
public:
ScreenCaptureApp();
~ScreenCaptureApp();
Expand Down
2 changes: 1 addition & 1 deletion Capture/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.0.24
1.1.0.27

0 comments on commit 033dfbc

Please sign in to comment.