From 43cda2524c73868d185abdc59f6583adfd42545e Mon Sep 17 00:00:00 2001 From: Vincent Sonnier Date: Mon, 4 Mar 2019 18:49:28 +0100 Subject: [PATCH] Streaming improvements (#14) * Removed additional refill_thread plus misc. - Simplified getting Stream* using the same tricks as SoapySDRPlay, - Revised some scoped locks BEWARE: TX has not been modified accordingly yet. * Replaced rx/tx_sreamer shared_ptr by unique_ptr, restore rx/tx independent closeStream * Return two SoapySDR::Stream for either RX or TX streams, and add guard code to use them properly * Removed unused function * Replaced device_mutex by a custom spin-lock pluto_splin_mutex, and protect everything including streaming with this spn-lock * Use independent RX and TX locks, plus misc cleanups * std::make_unique is only C++14+, let's stick ti c++11 for now --- PlutoSDR_Settings.cpp | 75 ++++++----- PlutoSDR_Streaming.cpp | 293 ++++++++++++++++++++++------------------- SoapyPlutoSDR.hpp | 48 +++++-- 3 files changed, 231 insertions(+), 185 deletions(-) diff --git a/PlutoSDR_Settings.cpp b/PlutoSDR_Settings.cpp index c0939cf..7a4da0e 100644 --- a/PlutoSDR_Settings.cpp +++ b/PlutoSDR_Settings.cpp @@ -4,6 +4,7 @@ #endif static iio_context *ctx = nullptr; + SoapyPlutoSDR::SoapyPlutoSDR( const SoapySDR::Kwargs &args ): dev(nullptr), rx_dev(nullptr),tx_dev(nullptr), decimation(false), interpolation(false), rx_stream(nullptr) { @@ -157,14 +158,13 @@ std::vector SoapyPlutoSDR::listAntennas( const int direction, const void SoapyPlutoSDR::setAntenna( const int direction, const size_t channel, const std::string &name ) { - std::lock_guard lock(device_mutex); - if (direction == SOAPY_SDR_RX) { - + if (direction == SOAPY_SDR_RX) { + std::lock_guard lock(rx_device_mutex); iio_channel_attr_write(iio_device_find_channel(dev, "voltage0", false), "rf_port_select", name.c_str()); } - if (direction == SOAPY_SDR_TX) { - + else if (direction == SOAPY_SDR_TX) { + std::lock_guard lock(tx_device_mutex); iio_channel_attr_write(iio_device_find_channel(dev, "voltage0", true), "rf_port_select", name.c_str()); } @@ -178,7 +178,7 @@ std::string SoapyPlutoSDR::getAntenna( const int direction, const size_t channel if (direction == SOAPY_SDR_RX) { options = "A_BALANCED"; } - if (direction == SOAPY_SDR_TX) { + else if (direction == SOAPY_SDR_TX) { options = "A"; } @@ -214,10 +214,10 @@ bool SoapyPlutoSDR::hasGainMode(const int direction, const size_t channel) const void SoapyPlutoSDR::setGainMode( const int direction, const size_t channel, const bool automatic ) { - std::lock_guard lock(device_mutex); + gainMode = automatic; if(direction==SOAPY_SDR_RX){ - + std::lock_guard lock(rx_device_mutex); if (gainMode) { iio_channel_attr_write(iio_device_find_channel(dev, "voltage0", false), "gain_control_mode", "slow_attack"); @@ -237,16 +237,15 @@ bool SoapyPlutoSDR::getGainMode(const int direction, const size_t channel) const void SoapyPlutoSDR::setGain( const int direction, const size_t channel, const double value ) { - std::lock_guard lock(device_mutex); long long gain = (long long) value; if(direction==SOAPY_SDR_RX){ - + std::lock_guard lock(rx_device_mutex); iio_channel_attr_write_longlong(iio_device_find_channel(dev, "voltage0", false),"hardwaregain", gain); } - if(direction==SOAPY_SDR_TX){ - + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); gain = gain - 89; iio_channel_attr_write_longlong(iio_device_find_channel(dev, "voltage0", true),"hardwaregain", gain); @@ -256,25 +255,25 @@ void SoapyPlutoSDR::setGain( const int direction, const size_t channel, const do void SoapyPlutoSDR::setGain( const int direction, const size_t channel, const std::string &name, const double value ) { - this->setGain(direction,channel,value); - } double SoapyPlutoSDR::getGain( const int direction, const size_t channel, const std::string &name ) const { - std::lock_guard lock(device_mutex); long long gain = 0; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); + if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "voltage0", false),"hardwaregain",&gain )!=0) return 0; } - if(direction==SOAPY_SDR_TX){ + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "voltage0", true),"hardwaregain",&gain )!=0) return 0; @@ -297,15 +296,15 @@ SoapySDR::Range SoapyPlutoSDR::getGainRange( const int direction, const size_t c void SoapyPlutoSDR::setFrequency( const int direction, const size_t channel, const std::string &name, const double frequency, const SoapySDR::Kwargs &args ) { - std::lock_guard lock(device_mutex); long long freq = (long long)frequency; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); iio_channel_attr_write_longlong(iio_device_find_channel(dev, "altvoltage0", true),"frequency", freq); } - if(direction==SOAPY_SDR_TX){ - + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); iio_channel_attr_write_longlong(iio_device_find_channel(dev, "altvoltage1", true),"frequency", freq); } @@ -314,17 +313,20 @@ void SoapyPlutoSDR::setFrequency( const int direction, const size_t channel, con double SoapyPlutoSDR::getFrequency( const int direction, const size_t channel, const std::string &name ) const { - std::lock_guard lock(device_mutex); - long long freq; + long long freq; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); + if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "altvoltage0", true),"frequency",&freq )!=0) return 0; } - if(direction==SOAPY_SDR_TX){ + else if(direction==SOAPY_SDR_TX){ + + std::lock_guard lock(tx_device_mutex); if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "altvoltage1", true),"frequency",&freq )!=0) return 0; @@ -361,10 +363,10 @@ SoapySDR::RangeList SoapyPlutoSDR::getFrequencyRange( const int direction, const ******************************************************************/ void SoapyPlutoSDR::setSampleRate( const int direction, const size_t channel, const double rate ) { - std::lock_guard lock(device_mutex); long long samplerate =(long long) rate; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); decimation = false; if (samplerate<(25e6 / 48)) { if (samplerate * 8 < (25e6 / 48)) { @@ -383,7 +385,8 @@ void SoapyPlutoSDR::setSampleRate( const int direction, const size_t channel, co rx_stream->set_buffer_size_by_samplerate(decimation ? samplerate / 8 : samplerate); } - if(direction==SOAPY_SDR_TX){ + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); interpolation = false; if (samplerate<(25e6 / 48)) { if (samplerate * 8 < (25e6 / 48)) { @@ -401,7 +404,7 @@ void SoapyPlutoSDR::setSampleRate( const int direction, const size_t channel, co } #ifdef HAS_AD9361_IIO - if(ad9361_set_bb_rate(dev,samplerate)) + if(ad9361_set_bb_rate(dev,(unsigned long)samplerate)) SoapySDR_logf(SOAPY_SDR_ERROR, "Unable to set BB rate."); #endif @@ -409,16 +412,19 @@ void SoapyPlutoSDR::setSampleRate( const int direction, const size_t channel, co double SoapyPlutoSDR::getSampleRate( const int direction, const size_t channel ) const { - std::lock_guard lock(device_mutex); long long samplerate; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); + if(iio_channel_attr_read_longlong(iio_device_find_channel(rx_dev, "voltage0", false),"sampling_frequency",&samplerate )!=0) return 0; } - if(direction==SOAPY_SDR_TX){ + else if(direction==SOAPY_SDR_TX){ + + std::lock_guard lock(tx_device_mutex); if(iio_channel_attr_read_longlong(iio_device_find_channel(tx_dev, "voltage0", true),"sampling_frequency",&samplerate)!=0) return 0; @@ -450,34 +456,33 @@ std::vector SoapyPlutoSDR::listSampleRates( const int direction, const s void SoapyPlutoSDR::setBandwidth( const int direction, const size_t channel, const double bw ) { - std::lock_guard lock(device_mutex); long long bandwidth = (long long) bw; if(direction==SOAPY_SDR_RX){ - + std::lock_guard lock(rx_device_mutex); iio_channel_attr_write_longlong(iio_device_find_channel(dev, "voltage0", false),"rf_bandwidth", bandwidth); } - if(direction==SOAPY_SDR_TX){ - + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); iio_channel_attr_write_longlong(iio_device_find_channel(dev, "voltage0", true),"rf_bandwidth", bandwidth); - } } double SoapyPlutoSDR::getBandwidth( const int direction, const size_t channel ) const { - std::lock_guard lock(device_mutex); - long long bandwidth; + long long bandwidth; if(direction==SOAPY_SDR_RX){ + std::lock_guard lock(rx_device_mutex); if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "voltage0", false),"rf_bandwidth",&bandwidth )!=0) return 0; } - if(direction==SOAPY_SDR_TX){ + else if(direction==SOAPY_SDR_TX){ + std::lock_guard lock(tx_device_mutex); if(iio_channel_attr_read_longlong(iio_device_find_channel(dev, "voltage0", true),"rf_bandwidth",&bandwidth )!=0) return 0; diff --git a/PlutoSDR_Streaming.cpp b/PlutoSDR_Streaming.cpp index e2833af..cca7afd 100644 --- a/PlutoSDR_Streaming.cpp +++ b/PlutoSDR_Streaming.cpp @@ -5,16 +5,13 @@ #include #include #include +#include + +# define RX_STREAM_MTU (65536) -struct PlutoSDRStream -{ - std::shared_ptr rx; - std::shared_ptr tx; -}; std::vector SoapyPlutoSDR::getStreamFormats(const int direction, const size_t channel) const { - std::vector formats; formats.push_back(SOAPY_SDR_CS8); @@ -23,7 +20,6 @@ std::vector SoapyPlutoSDR::getStreamFormats(const int direction, co formats.push_back(SOAPY_SDR_CF32); return formats; - } std::string SoapyPlutoSDR::getNativeStreamFormat(const int direction, const size_t channel, double &fullScale) const @@ -44,16 +40,49 @@ SoapySDR::ArgInfoList SoapyPlutoSDR::getStreamArgsInfo(const int direction, cons return streamArgs; } + +bool SoapyPlutoSDR::IsValidRxStreamHandle(SoapySDR::Stream* handle) +{ + if (handle == nullptr) { + return false; + } + + //handle is an opaque pointer hiding either rx_stream or tx_streamer: + //check that the handle matches one of them, onsistently with direction: + if (rx_stream) { + //test if these handles really belong to us: + if (reinterpret_cast(handle) == rx_stream.get()) { + return true; + } + } + + return false; +} + +bool SoapyPlutoSDR::IsValidTxStreamHandle(SoapySDR::Stream* handle) +{ + if (handle == nullptr) { + return false; + } + + //handle is an opaque pointer hiding either rx_stream or tx_streamer: + //check that the handle matches one of them, onsistently with direction: + if (tx_stream) { + //test if these handles really belong to us: + if (reinterpret_cast(handle) == tx_stream.get()) { + return true; + } + } + + return false; +} + SoapySDR::Stream *SoapyPlutoSDR::setupStream( const int direction, const std::string &format, const std::vector &channels, const SoapySDR::Kwargs &args ) { - std::lock_guard lock(device_mutex); - - PlutoSDRStream *stream = new PlutoSDRStream(); - //check the format plutosdrStreamFormat streamFormat; if (format == SOAPY_SDR_CF32) { @@ -77,36 +106,53 @@ SoapySDR::Stream *SoapyPlutoSDR::setupStream( "setupStream invalid format '" + format + "' -- Only CS8, CS12, CS16 and CF32 are supported by SoapyPlutoSDR module."); } - if(direction ==SOAPY_SDR_RX){ + + if(direction == SOAPY_SDR_RX){ + + std::lock_guard lock(rx_device_mutex); + + this->rx_stream = std::unique_ptr(new rx_streamer (rx_dev, streamFormat, channels, args)); - stream->rx = std::shared_ptr(new rx_streamer(rx_dev, streamFormat, channels, args)); - rx_stream = stream->rx; + return reinterpret_cast(this->rx_stream.get()); } - if (direction == SOAPY_SDR_TX) { + else if (direction == SOAPY_SDR_TX) { + + std::lock_guard lock(tx_device_mutex); + + this->tx_stream = std::unique_ptr(new tx_streamer (tx_dev, streamFormat, channels, args)); - stream->tx = std::shared_ptr(new tx_streamer(tx_dev, streamFormat, channels, args)); + return reinterpret_cast(this->tx_stream.get()); } - return reinterpret_cast(stream); + return nullptr; } void SoapyPlutoSDR::closeStream( SoapySDR::Stream *handle) { - std::lock_guard lock(device_mutex); - PlutoSDRStream *stream = reinterpret_cast(handle); - if (stream->rx) - rx_stream.reset(); - if (stream->tx) - stream->tx->flush(); - delete stream; - + //scope lock: + { + std::lock_guard lock(rx_device_mutex); + + if (IsValidRxStreamHandle(handle)) { + this->rx_stream.reset(); + } + } + + //scope lock : + { + std::lock_guard lock(tx_device_mutex); + + if (IsValidTxStreamHandle(handle)) { + this->tx_stream.reset(); + } + } } size_t SoapyPlutoSDR::getStreamMTU( SoapySDR::Stream *handle) const { - return 8196; + return RX_STREAM_MTU; } int SoapyPlutoSDR::activateStream( @@ -114,17 +160,17 @@ int SoapyPlutoSDR::activateStream( const int flags, const long long timeNs, const size_t numElems ) -{ - std::lock_guard lock(device_mutex); - PlutoSDRStream *stream = reinterpret_cast(handle); - +{ if (flags & ~SOAPY_SDR_END_BURST) return SOAPY_SDR_NOT_SUPPORTED; - if (not stream->rx) - return 0; + std::lock_guard lock(rx_device_mutex); + + if (IsValidRxStreamHandle(handle)) { + return this->rx_stream->start(flags, timeNs, numElems); + } - return stream->rx->start(flags,timeNs,numElems); + return 0; } int SoapyPlutoSDR::deactivateStream( @@ -132,16 +178,26 @@ int SoapyPlutoSDR::deactivateStream( const int flags, const long long timeNs ) { - std::lock_guard lock(device_mutex); - PlutoSDRStream *stream = reinterpret_cast(handle); + //scope lock: + { + std::lock_guard lock(rx_device_mutex); + + if (IsValidRxStreamHandle(handle)) { + return this->rx_stream->stop(flags, timeNs); + } + } - if (stream->tx) - stream->tx->flush(); + //scope lock : + { + std::lock_guard lock(tx_device_mutex); - if (not stream->rx) - return 0; + if (IsValidTxStreamHandle(handle)) { + this->tx_stream->flush(); + return 0; + } + } - return stream->rx->stop(flags,timeNs); + return 0; } int SoapyPlutoSDR::readStream( @@ -152,12 +208,14 @@ int SoapyPlutoSDR::readStream( long long &timeNs, const long timeoutUs ) { - PlutoSDRStream *stream = reinterpret_cast(handle); - - if (not stream->rx) { - return SOAPY_SDR_NOT_SUPPORTED; - } - return int(stream->rx->recv(buffs, numElems, flags, timeNs, timeoutUs)); + //the spin_mutex is especially very useful here for minimum overhead ! + std::lock_guard lock(rx_device_mutex); + + if (IsValidRxStreamHandle(handle)) { + return int(this->rx_stream->recv(buffs, numElems, flags, timeNs, timeoutUs)); + } else { + return SOAPY_SDR_NOT_SUPPORTED; + } } int SoapyPlutoSDR::writeStream( @@ -168,13 +226,14 @@ int SoapyPlutoSDR::writeStream( const long long timeNs, const long timeoutUs ) { - PlutoSDRStream *stream = reinterpret_cast(handle); - - if (not stream->tx) { - return SOAPY_SDR_NOT_SUPPORTED; - } - return stream->tx->send(buffs,numElems,flags,timeNs,timeoutUs);; - + std::lock_guard lock(tx_device_mutex); + + if (IsValidTxStreamHandle(handle)) { + return this->tx_stream->send(buffs, numElems, flags, timeNs, timeoutUs);; + } else { + return SOAPY_SDR_NOT_SUPPORTED; + } + } int SoapyPlutoSDR::readStreamStatus( @@ -196,7 +255,9 @@ void rx_streamer::set_buffer_size_by_samplerate(const size_t _samplerate) { if ((x >> 30) == 0) { n = n + 2; x = x << 2; } n = n - (x >> 31); - this->set_buffer_size(std::max(1 << (31 - n - 2), 16384)); + //this->set_buffer_size(std::max(1 << (31 - n - 2), 16384)); // too large for CubicSDR + //TODO: find smarter way w.r.t MTU and sample rate ? + this->set_buffer_size((size_t)(RX_STREAM_MTU)); SoapySDR_logf(SOAPY_SDR_INFO, "Auto setting Buffer Size: %lu", (unsigned long)buffer_size); } @@ -241,17 +302,20 @@ rx_streamer::rx_streamer(const iio_device *_dev, const plutosdrStreamFormat _for this->set_buffer_size_by_samplerate(samplerate); } - - thread_stopped = true; - } rx_streamer::~rx_streamer() { - if (buf) { iio_buffer_destroy(buf); } + if (buf) { + iio_buffer_cancel(buf); + iio_buffer_destroy(buf); + } + + for (unsigned int i = 0; i < channel_list.size(); ++i) { + iio_channel_disable(channel_list[i]); + } + - for(unsigned int i=0;i lock(mutex); + // auto before = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); - if (!buf) { - return 0; - } + if (!buf) { + return 0; + } - if (thread_stopped){ + ssize_t ret = iio_buffer_refill(buf); - return SOAPY_SDR_TIMEOUT; - } + // auto after = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); - if (!please_refill_buffer && !items_in_buffer) { - please_refill_buffer = true; - cond.notify_all(); - } + if (ret < 0) + return SOAPY_SDR_TIMEOUT; - while (please_refill_buffer) { - cond2.wait_for(lock,std::chrono::milliseconds(timeoutUs)); + items_in_buffer = (unsigned long)ret / iio_buffer_step(buf); - if (thread_stopped) - return SOAPY_SDR_TIMEOUT; + // SoapySDR_logf(SOAPY_SDR_INFO, "iio_buffer_refill took %d ms to refill %d items", (int)(after - before), items_in_buffer); + byte_offset = 0; } size_t items = std::min(items_in_buffer,numElems); @@ -392,23 +454,13 @@ int rx_streamer::start(const int flags, const long long timeNs, const size_t numElems) { - std::unique_lock lock(mutex); - - if (!thread_stopped) { - return SOAPY_SDR_NOT_SUPPORTED; - } + //force proper stop before + stop(flags, timeNs); - items_in_buffer = 0; - please_refill_buffer = false; - thread_stopped = false; + // re-create buffer + buf = iio_device_create_buffer(dev, buffer_size, false); if (!buf) { - buf = iio_device_create_buffer(dev, buffer_size, false); - } - - if (buf) { - refill_thd = std::thread(&rx_streamer::refill_thread, this); - } else { SoapySDR_logf(SOAPY_SDR_ERROR, "Unable to create buffer!"); throw std::runtime_error("Unable to create buffer!\n"); } @@ -424,35 +476,25 @@ int rx_streamer::start(const int flags, int rx_streamer::stop(const int flags, const long long timeNs) { - if (buf) - iio_buffer_cancel(buf); - - std::unique_lock lock(mutex); - - if (!thread_stopped) { - - please_refill_buffer = true; - cond.notify_all(); - lock.unlock(); - - refill_thd.join(); - lock.lock(); - - } - + //cancel first + if (buf) { + iio_buffer_cancel(buf); + } + //then destroy if (buf) { iio_buffer_destroy(buf); buf = nullptr; } + items_in_buffer = 0; + byte_offset = 0; + return 0; } void rx_streamer::set_buffer_size(const size_t _buffer_size){ - std::unique_lock lock(mutex); - if (!buf || this->buffer_size != _buffer_size) { if (buf) { iio_buffer_destroy(buf); @@ -471,32 +513,6 @@ void rx_streamer::set_buffer_size(const size_t _buffer_size){ } -void rx_streamer::refill_thread(){ - - std::unique_lock lock(mutex); - ssize_t ret; - - for (;;) { - - while (!please_refill_buffer) - cond.wait(lock); - - lock.unlock(); - ret = iio_buffer_refill(buf); - lock.lock(); - please_refill_buffer = false; - if (ret < 0) - break; - - items_in_buffer = (unsigned long)ret / iio_buffer_step(buf); - byte_offset = 0; - cond2.notify_one(); - - } - thread_stopped = true; - cond2.notify_all(); -} - // return wether can we optimize for single RX, 2 channel (I/Q), same endianess direct copy bool rx_streamer::has_direct_copy() { @@ -571,7 +587,10 @@ int tx_streamer::send( const void * const *buffs, const long timeoutUs ) { - std::lock_guard lock(mutex); + if (!buf) { + return 0; + } + size_t items = std::min(buf_size - items_in_buf, numElems); int16_t src = 0; @@ -668,14 +687,14 @@ int tx_streamer::send( const void * const *buffs, int tx_streamer::flush() { - std::lock_guard lock(mutex); - return send_buf(); - } int tx_streamer::send_buf() { + if (!buf) { + return 0; + } if (items_in_buf > 0) { if (items_in_buf < buf_size) { diff --git a/SoapyPlutoSDR.hpp b/SoapyPlutoSDR.hpp index 83ce8fa..3e562dd 100644 --- a/SoapyPlutoSDR.hpp +++ b/SoapyPlutoSDR.hpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,17 +37,9 @@ class rx_streamer { void set_buffer_size(const size_t _buffer_size); - void channel_read(const struct iio_channel *chn, void *dst, size_t len); - - void refill_thread(); - bool has_direct_copy(); - std::thread refill_thd; - std::mutex mutex; - std::condition_variable cond, cond2; std::vector channel_list; - volatile bool thread_stopped, please_refill_buffer; const iio_device *dev; size_t buffer_size; @@ -74,7 +66,7 @@ class tx_streamer { std::vector channel_list; const iio_device *dev; const plutosdrStreamFormat format; - std::mutex mutex; + iio_buffer *buf; size_t buf_size; size_t items_in_buf; @@ -82,6 +74,27 @@ class tx_streamer { }; +// A local spin_mutex usable with std::lock_guard + //for lightweight locking for short periods. +class pluto_spin_mutex { + +public: + pluto_spin_mutex() = default; + + pluto_spin_mutex(const pluto_spin_mutex&) = delete; + + pluto_spin_mutex& operator=(const pluto_spin_mutex&) = delete; + + ~pluto_spin_mutex() { lock_state.clear(std::memory_order_release); } + + void lock() { while (lock_state.test_and_set(std::memory_order_acquire)); } + + void unlock() { lock_state.clear(std::memory_order_release); } + +private: + std::atomic_flag lock_state = ATOMIC_FLAG_INIT; +}; + class SoapyPlutoSDR : public SoapySDR::Device{ @@ -268,16 +281,25 @@ class SoapyPlutoSDR : public SoapySDR::Device{ std::vector listBandwidths( const int direction, const size_t channel ) const; - + private: + bool IsValidRxStreamHandle(SoapySDR::Stream* handle); + bool IsValidTxStreamHandle(SoapySDR::Stream* handle); + iio_device *dev; iio_device *rx_dev; iio_device *tx_dev; bool gainMode; - mutable std::mutex device_mutex; + + mutable pluto_spin_mutex rx_device_mutex; + mutable pluto_spin_mutex tx_device_mutex; + bool decimation, interpolation; - std::shared_ptr rx_stream; + std::unique_ptr rx_stream; + std::unique_ptr tx_stream; + + };