diff --git a/CMakeLists.txt b/CMakeLists.txt index 54fba91..45f28d9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8.7) +cmake_minimum_required(VERSION 3.10) project(SoapyLoopback CXX) find_package(SoapySDR "0.8.0" NO_MODULE REQUIRED) diff --git a/Settings.cpp b/Settings.cpp index 84d0b0f..8432580 100644 --- a/Settings.cpp +++ b/Settings.cpp @@ -48,7 +48,18 @@ SoapyLoopback::SoapyLoopback(const SoapySDR::Kwargs &args): bufferedElems(0), resetBuffer(false), gainMin(0.0), - gainMax(0.0) + gainMax(0.0), + // TX/loopback state + _txActive(false), + _txBufferLength(DEFAULT_BUFFER_LENGTH), + _txNumBuffers(DEFAULT_NUM_BUFFERS), + _loopbackFormat(""), + _loopbackBytesPerSample(8), // Default to CF32, will be set in setupStream + _loopback_head(0), + _loopback_tail(0), + _loopback_count(0), + _loopback_overflow(false), + _loopbackEnabled(false) { } @@ -96,7 +107,7 @@ size_t SoapyLoopback::getNumChannels(const int dir) const bool SoapyLoopback::getFullDuplex(const int direction, const size_t channel) const { - return false; + return true; // Support simultaneous TX and RX for loopback testing } /******************************************************************* @@ -113,10 +124,7 @@ std::vector SoapyLoopback::listAntennas(const int direction, const void SoapyLoopback::setAntenna(const int direction, const size_t channel, const std::string &name) { - if (direction != SOAPY_SDR_RX) - { - throw std::runtime_error("setAntena failed: RTL-SDR only supports RX"); - } + // Accept any antenna setting for both RX and TX } std::string SoapyLoopback::getAntenna(const int direction, const size_t channel) const diff --git a/SoapyLoopback.hpp b/SoapyLoopback.hpp index 8d9fe0b..48120ef 100644 --- a/SoapyLoopback.hpp +++ b/SoapyLoopback.hpp @@ -95,6 +95,14 @@ class SoapyLoopback: public SoapySDR::Device long long &timeNs, const long timeoutUs = 100000); + int writeStream( + SoapySDR::Stream *stream, + const void * const *buffs, + const size_t numElems, + int &flags, + const long long timeNs = 0, + const long timeoutUs = 100000); + /******************************************************************* * Direct buffer access API ******************************************************************/ @@ -306,4 +314,23 @@ class SoapyLoopback: public SoapySDR::Device std::atomic resetBuffer; double gainMin, gainMax; + + // TX state + bool _txActive; + size_t _txBufferLength; + size_t _txNumBuffers; + + // Loopback format tracking + std::string _loopbackFormat; // Store the format for loopback operations + size_t _loopbackBytesPerSample; // Bytes per sample for the loopback format + + // Loopback ring buffer (TX writes here, RX reads from here) + std::mutex _loopback_mutex; + std::condition_variable _loopback_cond; + std::vector _loopback_buffs; + size_t _loopback_head; + size_t _loopback_tail; + std::atomic _loopback_count; + std::atomic _loopback_overflow; + bool _loopbackEnabled; // when true, RX reads from loopback buffer instead of synthetic data }; diff --git a/Streaming.cpp b/Streaming.cpp index b0c0623..c081ec4 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -147,34 +147,49 @@ SoapySDR::Stream *SoapyLoopback::setupStream( throw std::runtime_error("setupStream invalid channel selection"); } - //check the format + //check the format and determine bytes per sample + size_t bytesPerSample = 0; if (format == SOAPY_SDR_CF32) { SoapySDR_log(SOAPY_SDR_INFO, "Using format CF32."); - //rxFormat = RTL_RX_FORMAT_FLOAT32; - } - else if (format == SOAPY_SDR_CS12) - { - SoapySDR_log(SOAPY_SDR_INFO, "Using format CS12."); - //rxFormat = RTL_RX_FORMAT_INT16; + bytesPerSample = 8; // 2 * sizeof(float) } else if (format == SOAPY_SDR_CS16) { SoapySDR_log(SOAPY_SDR_INFO, "Using format CS16."); - //rxFormat = RTL_RX_FORMAT_INT16; + bytesPerSample = 4; // 2 * sizeof(int16_t) + } + else if (format == SOAPY_SDR_CS12) + { + SoapySDR_log(SOAPY_SDR_INFO, "Using format CS12."); + bytesPerSample = 3; // packed 12-bit I/Q } else if (format == SOAPY_SDR_CS8) { SoapySDR_log(SOAPY_SDR_INFO, "Using format CS8."); - //rxFormat = RTL_RX_FORMAT_INT8; + bytesPerSample = 2; // 2 * sizeof(int8_t) } else { throw std::runtime_error( "setupStream invalid format '" + format - + "' -- Only CS8, CS16 and CF32 are supported by SoapyLoopback module."); + + "' -- Only CS8, CS12, CS16 and CF32 are supported by SoapyLoopback module."); } - bufferLength = DEFAULT_BUFFER_LENGTH; + // Store format for loopback operations (first stream setup wins) + if (_loopbackFormat.empty()) + { + _loopbackFormat = format; + _loopbackBytesPerSample = bytesPerSample; + SoapySDR_logf(SOAPY_SDR_DEBUG, "Loopback format set to %s (%zu bytes/sample)", format.c_str(), bytesPerSample); + } + else if (_loopbackFormat != format) + { + SoapySDR_logf(SOAPY_SDR_WARNING, "Loopback: TX and RX using different formats (%s vs %s). Using %s for loopback.", + _loopbackFormat.c_str(), format.c_str(), _loopbackFormat.c_str()); + } + + // Parse buffer configuration + size_t localBufferLength = DEFAULT_BUFFER_LENGTH; if (args.count("bufflen") != 0) { try @@ -182,14 +197,13 @@ SoapySDR::Stream *SoapyLoopback::setupStream( int bufferLength_in = std::stoi(args.at("bufflen")); if (bufferLength_in > 0) { - bufferLength = bufferLength_in; + localBufferLength = bufferLength_in; } } catch (const std::invalid_argument &){} } - SoapySDR_logf(SOAPY_SDR_DEBUG, "RTL-SDR Using buffer length %d", bufferLength); - numBuffers = DEFAULT_NUM_BUFFERS; + size_t localNumBuffers = DEFAULT_NUM_BUFFERS; if (args.count("buffers") != 0) { try @@ -197,49 +211,74 @@ SoapySDR::Stream *SoapyLoopback::setupStream( int numBuffers_in = std::stoi(args.at("buffers")); if (numBuffers_in > 0) { - numBuffers = numBuffers_in; + localNumBuffers = numBuffers_in; } } catch (const std::invalid_argument &){} } - SoapySDR_logf(SOAPY_SDR_DEBUG, "RTL-SDR Using %d buffers", numBuffers); - asyncBuffs = 0; - if (args.count("asyncBuffs") != 0) + SoapySDR_logf(SOAPY_SDR_DEBUG, "Loopback setupStream: direction=%s, buffer length %zu, %zu buffers", + direction == SOAPY_SDR_RX ? "RX" : "TX", localBufferLength, localNumBuffers); + + // Only initialize buffers if they haven't been set up yet + // (multiple setupStream calls should not reset state) + if (_buffs.empty()) { - try + bufferLength = localBufferLength; + numBuffers = localNumBuffers; + + asyncBuffs = 0; + if (args.count("asyncBuffs") != 0) { - int asyncBuffs_in = std::stoi(args.at("asyncBuffs")); - if (asyncBuffs_in > 0) + try { - asyncBuffs = asyncBuffs_in; + int asyncBuffs_in = std::stoi(args.at("asyncBuffs")); + if (asyncBuffs_in > 0) + { + asyncBuffs = asyncBuffs_in; + } } + catch (const std::invalid_argument &){} } - catch (const std::invalid_argument &){} + + // Initialize RX fifo + _buf_tail = 0; + _buf_count = 0; + _buf_head = 0; + + // Allocate RX buffers + _buffs.resize(numBuffers); + for (auto &buff : _buffs) buff.data.reserve(bufferLength); + for (auto &buff : _buffs) buff.data.resize(bufferLength); + + SoapySDR_logf(SOAPY_SDR_DEBUG, "Loopback: RX buffers allocated (%zu buffers, %zu bytes each)", numBuffers, bufferLength); } - //if (tunerType == RTLSDR_TUNER_E4000) { - // IFGain[0] = 6; - // IFGain[1] = 9; - // IFGain[2] = 3; - // IFGain[3] = 2; - // IFGain[4] = 3; - // IFGain[5] = 3; - //} else { - // for (int i = 0; i < 6; i++) { - // IFGain[i] = 0; - // } - //} - //tunerGain = rtlsdr_get_tuner_gain(dev) / 10.0; - - //clear async fifo counts - _buf_tail = 0; - _buf_count = 0; - _buf_head = 0; - - //allocate buffers - _buffs.resize(numBuffers); - for (auto &buff : _buffs) buff.data.reserve(bufferLength); - for (auto &buff : _buffs) buff.data.resize(bufferLength); + + if (_loopback_buffs.empty()) + { + // For CF32 format, each sample is 8 bytes (2 * sizeof(float)) + // The MTU is in samples, so we need bufferLength/2 * 8 = bufferLength * 4 bytes + // Use 4x the configured buffer length to be safe for any format + _txBufferLength = localBufferLength * 4; + _txNumBuffers = localNumBuffers; + + // Initialize loopback ring buffer (TX -> RX path) + _loopback_head = 0; + _loopback_tail = 0; + _loopback_count = 0; + _loopback_overflow = false; + + // Allocate loopback buffers + _loopback_buffs.resize(_txNumBuffers); + for (auto &buff : _loopback_buffs) buff.data.reserve(_txBufferLength); + for (auto &buff : _loopback_buffs) buff.data.resize(_txBufferLength); + + SoapySDR_logf(SOAPY_SDR_DEBUG, "Loopback: TX loopback buffers allocated (%zu buffers, %zu bytes each)", _txNumBuffers, _txBufferLength); + } + + // Enable loopback mode and TX immediately + _loopbackEnabled = true; + _txActive = true; return (SoapySDR::Stream *) this; } @@ -262,21 +301,35 @@ int SoapyLoopback::activateStream( const size_t numElems) { if (flags != 0) return SOAPY_SDR_NOT_SUPPORTED; + + // Note: We can't easily distinguish RX vs TX stream here since we return + // the same pointer. We'll handle both cases. + + // RX activation resetBuffer = true; bufferedElems = 0; - //start the async thread + //start the async thread (for synthetic data when loopback not enabled) if (not _rx_async_thread.joinable()) { _rx_async_thread = std::thread(&SoapyLoopback::rx_async_operation, this); } + // TX activation - enable loopback mode so RX reads from loopback buffer + _txActive = true; + _loopbackEnabled = true; + return 0; } int SoapyLoopback::deactivateStream(SoapySDR::Stream *stream, const int flags, const long long timeNs) { if (flags != 0) return SOAPY_SDR_NOT_SUPPORTED; + + // Deactivate TX + _txActive = false; + + // Deactivate RX if (_rx_async_thread.joinable()) { _rx_async_thread.join(); @@ -319,9 +372,16 @@ int SoapyLoopback::readStream( size_t returnedElems = std::min(bufferedElems, numElems); + // Determine bytes per sample based on loopback mode + // In loopback mode use the stored format, otherwise CS8 (2 bytes) for synthetic data + const size_t bytesPerSample = _loopbackEnabled ? _loopbackBytesPerSample : BYTES_PER_SAMPLE; + + // Copy data to user's buffer + std::memcpy(buff0, _currentBuff, returnedElems * bytesPerSample); + //bump variables for next call into readStream bufferedElems -= returnedElems; - _currentBuff += returnedElems*BYTES_PER_SAMPLE; + _currentBuff += returnedElems * bytesPerSample; bufTicks += returnedElems; //for the next call to readStream if there is a remainder //return number of elements written to buff0 @@ -353,6 +413,44 @@ int SoapyLoopback::acquireReadBuffer( long long &timeNs, const long timeoutUs) { + // When loopback is enabled, read from the loopback buffer (TX -> RX) + if (_loopbackEnabled) + { + // Handle loopback overflow + if (_loopback_overflow) + { + _loopback_head = (_loopback_head + _loopback_count.exchange(0)) % _txNumBuffers; + _loopback_overflow = false; + SoapySDR::log(SOAPY_SDR_SSI, "O"); + return SOAPY_SDR_OVERFLOW; + } + + // Wait for loopback data if none available + if (_loopback_count == 0) + { + std::unique_lock lock(_loopback_mutex); + _loopback_cond.wait_for(lock, std::chrono::microseconds(timeoutUs), + [this]{ return _loopback_count != 0; }); + if (_loopback_count == 0) return SOAPY_SDR_TIMEOUT; + } + + // Extract from loopback buffer + handle = _loopback_head; + _loopback_head = (_loopback_head + 1) % _txNumBuffers; + + auto &lbuff = _loopback_buffs[handle]; + bufTicks = lbuff.tick; + timeNs = SoapySDR::ticksToTimeNs(lbuff.tick, sampleRate); + buffs[0] = (void *)lbuff.data.data(); + flags = SOAPY_SDR_HAS_TIME; + + // Use stored bytes per sample for the loopback format + size_t numElems = lbuff.data.size() / _loopbackBytesPerSample; + _loopback_count--; + return numElems; + } + + // Original behavior: read from synthetic RX buffer //reset is issued by various settings //to drain old data out of the queue if (resetBuffer) @@ -397,6 +495,73 @@ void SoapyLoopback::releaseReadBuffer( SoapySDR::Stream *stream, const size_t handle) { + // In loopback mode, _loopback_count is already decremented in acquireReadBuffer + // so we don't need to do anything here + if (_loopbackEnabled) + { + return; + } + //TODO this wont handle out of order releases _buf_count--; } + +/******************************************************************* + * TX Stream API - writeStream + ******************************************************************/ + +int SoapyLoopback::writeStream( + SoapySDR::Stream *stream, + const void * const *buffs, + const size_t numElems, + int &flags, + const long long timeNs, + const long timeoutUs) +{ + if (!_txActive) + { + SoapySDR_log(SOAPY_SDR_ERROR, "writeStream: TX not active"); + return SOAPY_SDR_STREAM_ERROR; + } + + if (buffs == nullptr || buffs[0] == nullptr) + { + SoapySDR_log(SOAPY_SDR_ERROR, "writeStream: null buffer pointer"); + return SOAPY_SDR_STREAM_ERROR; + } + + // Use the stored bytes per sample for the loopback format + const size_t numBytes = numElems * _loopbackBytesPerSample; + + // Get input buffer + const char *input = (const char *)buffs[0]; + + // Check for overflow condition + if (_loopback_count >= _txNumBuffers) + { + _loopback_overflow = true; + SoapySDR::log(SOAPY_SDR_SSI, "U"); // Underflow on TX side means overflow on loopback + return SOAPY_SDR_OVERFLOW; + } + + // Get the current tick for timestamps + unsigned long long tick = ticks.fetch_add(numElems); + + // Copy data into loopback ring buffer + { + std::lock_guard lock(_loopback_mutex); + + auto &buff = _loopback_buffs[_loopback_tail]; + buff.tick = tick; + buff.data.resize(numBytes); + std::memcpy(buff.data.data(), input, numBytes); + + // Increment tail pointer + _loopback_tail = (_loopback_tail + 1) % _txNumBuffers; + _loopback_count++; + } + + // Notify any waiting readers + _loopback_cond.notify_one(); + return numElems; +}