Skip to content

Commit

Permalink
Merge pull request #713 from cjcliffe/vso_use_spin_locks
Browse files Browse the repository at this point in the history
Let's go to production and ask for forgiveness later.
  • Loading branch information
vsonnier authored Mar 3, 2019
2 parents 792bf20 + 6afa3a6 commit 8b0d4c2
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 51 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ SET (cubicsdr_headers
src/util/GLExt.h
src/util/GLFont.h
src/util/DataTree.h
src/util/SpinMutex.h
src/panel/ScopePanel.h
src/panel/SpectrumPanel.h
src/panel/WaterfallPanel.h
Expand Down
7 changes: 4 additions & 3 deletions src/IOThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <climits>
#include "ThreadBlockingQueue.h"
#include "Timer.h"
#include "SpinMutex.h"

struct map_string_less : public std::binary_function<std::string,std::string,bool>
{
Expand Down Expand Up @@ -62,7 +63,7 @@ class ReBuffer {
/// Return a new ReBuffer_ptr usable by the application.
ReBufferPtr getBuffer() {

std::lock_guard < std::mutex > lock(m_mutex);
std::lock_guard < SpinMutex > lock(m_mutex);

// iterate the ReBufferAge list: if the std::shared_ptr count == 1, it means
//it is only referenced in outputBuffers itself, so available for re-use.
Expand Down Expand Up @@ -131,7 +132,7 @@ class ReBuffer {

/// Purge the cache.
void purge() {
std::lock_guard < std::mutex > lock(m_mutex);
std::lock_guard < SpinMutex > lock(m_mutex);

// since outputBuffers are full std::shared_ptr,
//purging if will effectively loose the local reference,
Expand All @@ -152,7 +153,7 @@ class ReBuffer {
typedef typename std::deque< ReBufferAge < ReBufferPtr > >::iterator outputBuffersI;

//mutex protecting access to outputBuffers.
std::mutex m_mutex;
SpinMutex m_mutex;
};


Expand Down
8 changes: 4 additions & 4 deletions src/demod/DemodulatorThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBasePtr thread
if (name == "AudioVisualOutput") {

//protects because it may be changed at runtime
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);

audioVisOutputQueue = std::static_pointer_cast<DemodulatorThreadOutputQueue>(threadQueue);
}

if (name == "AudioSink") {
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);

audioSinkOutputQueue = std::static_pointer_cast<AudioThreadInputQueue>(threadQueue);
}
Expand Down Expand Up @@ -247,7 +247,7 @@ void DemodulatorThread::run() {
//variable, and works with it with now on until the next while-turn.
DemodulatorThreadOutputQueuePtr localAudioVisOutputQueue = nullptr;
{
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
localAudioVisOutputQueue = audioVisOutputQueue;
}

Expand Down Expand Up @@ -336,7 +336,7 @@ void DemodulatorThread::run() {
// Capture audioSinkOutputQueue state in a local variable
DemodulatorThreadOutputQueuePtr localAudioSinkOutputQueue = nullptr;
{
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
localAudioSinkOutputQueue = audioSinkOutputQueue;
}

Expand Down
3 changes: 2 additions & 1 deletion src/demod/DemodulatorThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "DemodDefs.h"
#include "AudioThread.h"
#include "Modem.h"
#include "SpinMutex.h"

#define DEMOD_VIS_SIZE 2048
#define DEMOD_SIGNAL_MIN -30
Expand Down Expand Up @@ -70,5 +71,5 @@ class DemodulatorThread : public IOThread {
DemodulatorThreadOutputQueuePtr audioSinkOutputQueue = nullptr;

//protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr)
std::mutex m_mutexAudioVisOutputQueue;
SpinMutex m_mutexAudioVisOutputQueue;
};
40 changes: 20 additions & 20 deletions src/process/SpectrumVisualProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ SpectrumVisualProcessor::~SpectrumVisualProcessor() {

bool SpectrumVisualProcessor::isView() {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return is_view;
}

void SpectrumVisualProcessor::setView(bool bView) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

is_view = bView;
}

void SpectrumVisualProcessor::setView(bool bView, long long centerFreq_in, long bandwidth_in) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);
is_view = bView;
bandwidth = bandwidth_in;
centerFreq = centerFreq_in;
Expand All @@ -72,49 +72,49 @@ void SpectrumVisualProcessor::setView(bool bView, long long centerFreq_in, long

void SpectrumVisualProcessor::setFFTAverageRate(float fftAverageRate) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

this->fft_average_rate = fftAverageRate;
}

float SpectrumVisualProcessor::getFFTAverageRate() {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return this->fft_average_rate;
}

void SpectrumVisualProcessor::setCenterFrequency(long long centerFreq_in) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

centerFreq = centerFreq_in;
}

long long SpectrumVisualProcessor::getCenterFrequency() {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return centerFreq;
}

void SpectrumVisualProcessor::setBandwidth(long bandwidth_in) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

bandwidth = bandwidth_in;
}

long SpectrumVisualProcessor::getBandwidth() {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return bandwidth;
}

void SpectrumVisualProcessor::setPeakHold(bool peakHold_in) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

if (peakHold && peakHold_in) {
peakReset = PEAK_RESET_COUNT;
Expand All @@ -126,20 +126,20 @@ void SpectrumVisualProcessor::setPeakHold(bool peakHold_in) {

bool SpectrumVisualProcessor::getPeakHold() {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return peakHold;
}

int SpectrumVisualProcessor::getDesiredInputSize() {
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

return desiredInputSize;
}

void SpectrumVisualProcessor::setup(unsigned int fftSize_in) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

fftSize = fftSize_in;
fftSizeInternal = fftSize_in * SPECTRUM_VZM;
Expand Down Expand Up @@ -180,7 +180,7 @@ void SpectrumVisualProcessor::setup(unsigned int fftSize_in) {
void SpectrumVisualProcessor::setFFTSize(unsigned int fftSize_in) {

//then get the busy_lock
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

if (fftSize_in == fftSize) {
return;
Expand All @@ -192,7 +192,7 @@ void SpectrumVisualProcessor::setFFTSize(unsigned int fftSize_in) {
unsigned int SpectrumVisualProcessor::getFFTSize() {

//then get the busy_lock
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

if (fftSizeChanged) {
return newFFTSize;
Expand All @@ -203,7 +203,7 @@ unsigned int SpectrumVisualProcessor::getFFTSize() {

void SpectrumVisualProcessor::setHideDC(bool hideDC) {

std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

this->hideDC = hideDC;
}
Expand All @@ -220,7 +220,7 @@ void SpectrumVisualProcessor::process() {
bool executeSetup = false;

{ // scoped lock here
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);
if (fftSizeChanged) {
executeSetup = true;
fftSizeChanged = false;
Expand All @@ -242,7 +242,7 @@ void SpectrumVisualProcessor::process() {
}

//then get the busy_lock for the rest of the processing.
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

bool doPeak = peakHold && (peakReset == 0);

Expand Down Expand Up @@ -638,14 +638,14 @@ void SpectrumVisualProcessor::process() {


void SpectrumVisualProcessor::setScaleFactor(float sf) {
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);

scaleFactor = sf;
}


float SpectrumVisualProcessor::getScaleFactor() {
std::lock_guard < std::mutex > busy_lock(busy_run);
std::lock_guard < SpinMutex > busy_lock(busy_run);
return scaleFactor;
}

3 changes: 2 additions & 1 deletion src/process/SpectrumVisualProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "DemodDefs.h"
#include <cmath>
#include <memory>
#include "SpinMutex.h"

#define SPECTRUM_VZM 2
#define PEAK_RESET_COUNT 30
Expand Down Expand Up @@ -65,7 +66,7 @@ class SpectrumVisualProcessor : public VisualProcessor<DemodulatorThreadIQData,

private:
//protects all access to fields below
std::mutex busy_run;
SpinMutex busy_run;

bool is_view;
size_t fftSize, newFFTSize;
Expand Down
31 changes: 19 additions & 12 deletions src/process/VisualProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <algorithm>
#include <vector>
#include <typeinfo>
#include "SpinMutex.h"

template<typename InputDataType, typename OutputDataType>
class VisualProcessor {
Expand All @@ -28,7 +29,7 @@ class VisualProcessor {
}

bool isInputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);

if (input) {
return input->empty();
Expand All @@ -38,7 +39,7 @@ class VisualProcessor {
}

bool isOutputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);

for (VisualOutputQueueTypePtr single_output : outputs) {
if (single_output->full()) {
Expand All @@ -49,7 +50,7 @@ class VisualProcessor {
}

bool isAnyOutputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);

for (VisualOutputQueueTypePtr single_output : outputs) {
if (!(single_output)->full()) {
Expand All @@ -61,7 +62,7 @@ class VisualProcessor {

//Set a (new) 'input' queue for incoming data.
void setInput(VisualInputQueueTypePtr vis_in) {
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);
input = vis_in;

}
Expand All @@ -70,14 +71,14 @@ class VisualProcessor {
//dispatched by distribute().
void attachOutput(VisualOutputQueueTypePtr vis_out) {
// attach an output queue
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);
outputs.push_back(vis_out);
}

//reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(VisualOutputQueueTypePtr vis_out) {
// remove an output queue
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);

auto it = std::find(outputs.begin(), outputs.end(), vis_out);
if (it != outputs.end()) {
Expand All @@ -98,7 +99,7 @@ class VisualProcessor {
//scoped-lock: create a local copy of outputs, and work with it.
std::vector<VisualOutputQueueTypePtr> local_outputs;
{
std::lock_guard < std::mutex > busy_lock(busy_update);
std::lock_guard < SpinMutex > busy_lock(busy_update);
local_outputs = outputs;
}

Expand Down Expand Up @@ -132,11 +133,17 @@ class VisualProcessor {
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) {

std::lock_guard < std::mutex > busy_lock(busy_update);
//We will try to distribute 'output' among all 'outputs',
//so 'output' will a-priori be shared among all 'outputs'.
//scoped-lock: create a local copy of outputs, and work with it.
std::vector<VisualOutputQueueTypePtr> local_outputs;
{
std::lock_guard < SpinMutex > busy_lock(busy_update);
local_outputs = outputs;
}

//We will try to distribute 'output' among all 'local_outputs',
//so 'output' will a-priori be shared among all 'local_outputs'.

for (VisualOutputQueueTypePtr single_output : outputs) {
for (VisualOutputQueueTypePtr single_output : local_outputs) {
//'output' can fail to be given to an single_output,
//using a blocking push, with a timeout
if (!(single_output)->push(item, timeout, errorMessage)) {
Expand All @@ -152,7 +159,7 @@ class VisualProcessor {
std::vector<VisualOutputQueueTypePtr> outputs;

//protects input and outputs
std::mutex busy_update;
SpinMutex busy_update;
};

//Specialization much like VisualDataReDistributor, except
Expand Down
4 changes: 2 additions & 2 deletions src/util/GLFont.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ void GLFont::drawString(const std::wstring& str, int pxHeight, float xpos, float
if (cacheable) {
gcCounter++;

std::lock_guard<std::mutex> lock(cache_busy);
std::lock_guard<SpinMutex> lock(cache_busy);

if (gcCounter > GC_DRAW_COUNT_PERIOD) {

Expand Down Expand Up @@ -793,7 +793,7 @@ void GLFont::doCacheGC() {

void GLFont::clearCache() {

std::lock_guard<std::mutex> lock(cache_busy);
std::lock_guard<SpinMutex> lock(cache_busy);

std::map<std::wstring, GLFontStringCache * >::iterator cache_iter;

Expand Down
Loading

0 comments on commit 8b0d4c2

Please sign in to comment.