From a7835176c6f5267885b7811952408d2a73bd4226 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:46:59 -0700 Subject: [PATCH 1/8] Fix HTTP client torn reads and response memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - HttpClient_Apple: scope Cancel() to m_dataTask only instead of blanket-cancelling every task on the shared session. Fix torn read on m_requests.empty() in CancelAllRequests spin loop. - HttpClientManager: fix torn read on m_httpCallbacks.empty() in cancelAllRequests spin loop — read under lock. - HttpResponseDecoder: add missing delete ctx->httpResponse before nullptr in Abort and RetryNetwork paths (memory leak). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/http/HttpClientManager.cpp | 9 ++++++++- lib/http/HttpClient_Apple.mm | 24 ++++++------------------ lib/http/HttpResponseDecoder.cpp | 5 +++-- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/lib/http/HttpClientManager.cpp b/lib/http/HttpClientManager.cpp index 58fa5fb4a..a1c228556 100644 --- a/lib/http/HttpClientManager.cpp +++ b/lib/http/HttpClientManager.cpp @@ -149,8 +149,15 @@ namespace MAT_NS_BEGIN { void HttpClientManager::cancelAllRequests() { cancelAllRequestsAsync(); - while (!m_httpCallbacks.empty()) + while (true) + { + { + LOCKGUARD(m_httpCallbacksMtx); + if (m_httpCallbacks.empty()) + break; + } std::this_thread::yield(); + } } // start async cancellation diff --git a/lib/http/HttpClient_Apple.mm b/lib/http/HttpClient_Apple.mm index 05817087a..579b05313 100644 --- a/lib/http/HttpClient_Apple.mm +++ b/lib/http/HttpClient_Apple.mm @@ -132,23 +132,6 @@ void HandleResponse(NSData* data, NSURLResponse* response, NSError* error) void Cancel() { [m_dataTask cancel]; - [session getTasksWithCompletionHandler:^(NSArray* dataTasks, NSArray* uploadTasks, NSArray* downloadTasks) - { - for (NSURLSessionTask* _task in dataTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in downloadTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in uploadTasks) - { - [_task cancel]; - } - }]; } private: @@ -214,8 +197,13 @@ void Cancel() for (const auto &id : ids) CancelRequestAsync(id); - while (!m_requests.empty()) + while (true) { + { + std::lock_guard lock(m_requestsMtx); + if (m_requests.empty()) + break; + } PAL::sleep(100); std::this_thread::yield(); } diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp index 11e9d4096..2bb652fdf 100644 --- a/lib/http/HttpResponseDecoder.cpp +++ b/lib/http/HttpResponseDecoder.cpp @@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN { break; case HttpResult_Aborted: - ctx->httpResponse = nullptr; outcome = Abort; break; case HttpResult_LocalFailure: case HttpResult_NetworkFailure: - ctx->httpResponse = nullptr; outcome = RetryNetwork; break; } @@ -129,6 +127,7 @@ namespace MAT_NS_BEGIN { evt.param1 = 0; // response.GetStatusCode(); DispatchEvent(evt); } + delete ctx->httpResponse; ctx->httpResponse = nullptr; // eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected requestAborted(ctx); @@ -159,6 +158,8 @@ namespace MAT_NS_BEGIN { evt.param1 = response.GetStatusCode(); DispatchEvent(evt); } + delete ctx->httpResponse; + ctx->httpResponse = nullptr; temporaryNetworkFailure(ctx); break; } From 28cf17d40082f4771f96d803a2463fa2b9f3dbd9 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:10 -0700 Subject: [PATCH 2/8] Fix WorkerThread shutdown: safe cleanup and diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Only delete queued tasks after successful join (not after detach, where the thread may still access them — undefined behavior) - Replace catch(...) with std::system_error and std::exception handlers that log error code and message - Log pending queue sizes in both join and detach paths Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 2bdbf6c67..5e843790d 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -6,6 +6,8 @@ #include "pal/WorkerThread.hpp" #include "pal/PAL.hpp" +#include + #if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32) /* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */ @@ -56,22 +58,40 @@ namespace PAL_NS_BEGIN { auto item = new WorkerThreadShutdownItem(); Queue(item); std::thread::id this_id = std::this_thread::get_id(); + bool joined = false; try { - if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) + if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) { m_hThread.join(); - else + joined = true; + } else { m_hThread.detach(); + } + } + catch (const std::system_error& e) { + LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what()); + } + catch (const std::exception& e) { + LOG_ERROR("Thread join/detach failed: %s", e.what()); } - catch (...) {}; - // TODO: [MG] - investigate if we ever drop work items on shutdown. - if (!m_queue.empty()) - { - LOG_WARN("m_queue is not empty!"); + // Log pending work in both paths so operators can see if + // shutdown is dropping tasks. + if (!m_queue.empty()) { + LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size()); } - if (!m_timerQueue.empty()) - { - LOG_WARN("m_timerQueue is not empty!"); + if (!m_timerQueue.empty()) { + LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size()); + } + + // Clean up any tasks remaining in the queues after shutdown. + // Only safe after join() — the thread has fully exited. + // After detach(), the thread still needs the shutdown item + // and may still be accessing the queues. + if (joined) { + for (auto task : m_queue) { delete task; } + m_queue.clear(); + for (auto task : m_timerQueue) { delete task; } + m_timerQueue.clear(); } } From a355ec5cd6b773349437c9a5691035c4f2ec588f Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:24 -0700 Subject: [PATCH 3/8] Make m_runningLatency and m_scheduledUploadTime atomic Both variables are read and written from different threads during normal upload scheduling. Declare as std::atomic to eliminate data races per the C++ memory model. Add .load() for variadic LOG_TRACE calls. Add comment explaining why unlocked stores in uploadAsync are safe. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 10 ++++++---- lib/tpm/TransmissionPolicyManager.hpp | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 83b82cf2a..7f24344e3 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -147,14 +147,14 @@ namespace MAT_NS_BEGIN { m_runningLatency = latency; } auto now = PAL::getMonotonicTimeMs(); - auto delta = Abs64(m_scheduledUploadTime, now); + auto delta = Abs64(m_scheduledUploadTime.load(), now); if (delta <= static_cast(delay.count())) { // Don't need to cancel and reschedule if it's about to happen now anyways. // m_isUploadScheduled check does not have to be strictly atomic because // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. - LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); + LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency.load()); return; } } @@ -173,7 +173,7 @@ namespace MAT_NS_BEGIN { { m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; - LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); + LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency.load()); m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency); } } @@ -184,9 +184,11 @@ namespace MAT_NS_BEGIN { if (guard.isPaused()) { return; } + // These stores happen outside the lock but are safe: scheduleUpload + // only reads them when m_isUploadScheduled is true, and we don't + // clear that flag until inside the LOCKGUARD below. m_runningLatency = latency; m_scheduledUploadTime = std::numeric_limits::max(); - { LOCKGUARD(m_scheduledUploadMutex); m_isUploadScheduled = false; // Allow to schedule another uploadAsync diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index e1a91ad10..dc7f91cf9 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -91,7 +91,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; std::atomic m_isPaused { true }; std::atomic m_isUploadScheduled { false }; - uint64_t m_scheduledUploadTime { std::numeric_limits::max() }; + std::atomic m_scheduledUploadTime { std::numeric_limits::max() }; std::mutex m_scheduledUploadMutex; PAL::DeferredCallbackHandle m_scheduledUpload; bool m_scheduledUploadAborted { false }; @@ -131,7 +131,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; size_t uploadCount() const noexcept; std::chrono::milliseconds m_timerdelay { std::chrono::seconds { 2 } }; - EventLatency m_runningLatency { EventLatency_RealTime }; + std::atomic m_runningLatency { EventLatency_RealTime }; TimerArray m_timers; public: From de46cb27cc44800d22bf957fbfcc257ab3ce3edc Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:34 -0700 Subject: [PATCH 4/8] Fix static-destruction-order crash in Logger destructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove LOG_TRACE from Logger destructor — it triggers a crash on iOS simulator when the recursive_mutex used by logging has already been destroyed during static destruction. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/api/Logger.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/api/Logger.cpp b/lib/api/Logger.cpp index 54d883664..f76f85734 100644 --- a/lib/api/Logger.cpp +++ b/lib/api/Logger.cpp @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN Logger::~Logger() noexcept { - LOG_TRACE("%p: Destroyed", this); + // Intentionally empty — logging here triggers a static-destruction-order + // crash on iOS simulator (recursive_mutex used after teardown). } ISemanticContext* Logger::GetSemanticContext() const From 706a01ff8baa2710b460c78e9bbbb896ea1b8b9e Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Wed, 29 Apr 2026 18:04:57 -0700 Subject: [PATCH 5/8] Use cleaner shutdown and scheduler synchronization fixes Reject new worker-thread tasks once shutdown starts so queue cleanup cannot race with late producers, and move the TPM scheduled-upload state back under a single mutex so latency/next-upload decisions stay consistent without mixed atomic and mutex access. Files changed: - lib/pal/WorkerThread.cpp - lib/tpm/TransmissionPolicyManager.cpp - lib/tpm/TransmissionPolicyManager.hpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 23 +++++++++-- lib/tpm/TransmissionPolicyManager.cpp | 55 ++++++++++++++++++--------- lib/tpm/TransmissionPolicyManager.hpp | 9 +++-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 5e843790d..5eccbb5f2 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -37,6 +37,7 @@ namespace PAL_NS_BEGIN { std::list m_timerQueue; Event m_event; MAT::Task* m_itemInProgress; + bool m_shuttingDown = false; int count = 0; public: @@ -55,12 +56,22 @@ namespace PAL_NS_BEGIN { void Join() final { - auto item = new WorkerThreadShutdownItem(); - Queue(item); std::thread::id this_id = std::this_thread::get_id(); bool joined = false; + { + LOCKGUARD(m_lock); + if (!m_shuttingDown) { + m_shuttingDown = true; + m_queue.push_back(new WorkerThreadShutdownItem()); + count++; + m_event.post(); + } + } try { - if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) { + if (!m_hThread.joinable()) { + return; + } + if (m_hThread.get_id() != this_id) { m_hThread.join(); joined = true; } else { @@ -76,6 +87,7 @@ namespace PAL_NS_BEGIN { // Log pending work in both paths so operators can see if // shutdown is dropping tasks. + LOCKGUARD(m_lock); if (!m_queue.empty()) { LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size()); } @@ -99,6 +111,11 @@ namespace PAL_NS_BEGIN { { LOG_INFO("queue item=%p", &item); LOCKGUARD(m_lock); + if (m_shuttingDown) { + LOG_WARN("Dropping queued task %p during shutdown", item); + delete item; + return; + } if (item->Type == MAT::Task::TimedCall) { auto it = m_timerQueue.begin(); while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) { diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 7f24344e3..e7421bc7f 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -147,14 +147,13 @@ namespace MAT_NS_BEGIN { m_runningLatency = latency; } auto now = PAL::getMonotonicTimeMs(); - auto delta = Abs64(m_scheduledUploadTime.load(), now); + auto delta = Abs64(m_scheduledUploadTime, now); if (delta <= static_cast(delay.count())) { // Don't need to cancel and reschedule if it's about to happen now anyways. - // m_isUploadScheduled check does not have to be strictly atomic because // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. - LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency.load()); + LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); return; } } @@ -162,18 +161,19 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - if (!cancelUploadTask()) + if (!cancelUploadTaskLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } } // Schedule new upload - if (!m_isUploadScheduled.exchange(true)) + if (!m_isUploadScheduled) { + m_isUploadScheduled = true; m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; - LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency.load()); + LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency); } } @@ -184,18 +184,16 @@ namespace MAT_NS_BEGIN { if (guard.isPaused()) { return; } - // These stores happen outside the lock but are safe: scheduleUpload - // only reads them when m_isUploadScheduled is true, and we don't - // clear that flag until inside the LOCKGUARD below. - m_runningLatency = latency; - m_scheduledUploadTime = std::numeric_limits::max(); + EventLatency requestedLatency = latency; { LOCKGUARD(m_scheduledUploadMutex); + requestedLatency = m_runningLatency; + m_scheduledUploadTime = std::numeric_limits::max(); m_isUploadScheduled = false; // Allow to schedule another uploadAsync if ((m_isPaused) || (m_scheduledUploadAborted)) { LOG_TRACE("Paused or upload aborted: cancel pending upload task."); - cancelUploadTask(); // If there is a pending upload task, kill it + cancelUploadTaskLocked(); // If there is a pending upload task, kill it return; } } @@ -212,14 +210,14 @@ namespace MAT_NS_BEGIN { unsigned delayMs = 1000; LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later", proposedBandwidthBps, minimumBandwidthBps, delayMs); - scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later + scheduleUpload(delayMs, requestedLatency); // reschedule uploadAsync to run again 1000 ms later return; } } #endif auto ctx = m_system.createEventsUploadContext(); - ctx->requestedMinLatency = m_runningLatency; + ctx->requestedMinLatency = requestedLatency; addUpload(ctx); initiateUpload(ctx); } @@ -286,9 +284,9 @@ namespace MAT_NS_BEGIN { LOCKGUARD(m_scheduledUploadMutex); // Prevent execution of all upload tasks m_scheduledUploadAborted = true; - // Make sure we wait for completion of the upload scheduling task that may be running - cancelUploadTask(); } + // Make sure we wait for completion of the upload scheduling task that may be running + cancelUploadTask(); // Make sure we wait for all active upload callbacks to finish while (uploadCount() > 0) @@ -344,7 +342,12 @@ namespace MAT_NS_BEGIN { } // Schedule async upload if not scheduled yet - if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) + bool isUploadScheduled = false; + { + LOCKGUARD(m_scheduledUploadMutex); + isUploadScheduled = m_isUploadScheduled; + } + if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) { if (updateTimersIfNecessary()) { @@ -376,7 +379,13 @@ namespace MAT_NS_BEGIN { return EventLatency_RealTime; } - if (m_runningLatency == EventLatency_RealTime) + EventLatency runningLatency = EventLatency_RealTime; + { + LOCKGUARD(m_scheduledUploadMutex); + runningLatency = m_runningLatency; + } + + if (runningLatency == EventLatency_RealTime) { return EventLatency_Normal; } @@ -456,6 +465,12 @@ namespace MAT_NS_BEGIN { } bool TransmissionPolicyManager::cancelUploadTask() + { + LOCKGUARD(m_scheduledUploadMutex); + return cancelUploadTaskLocked(); + } + + bool TransmissionPolicyManager::cancelUploadTaskLocked() { bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count()); @@ -464,7 +479,8 @@ namespace MAT_NS_BEGIN { // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { - m_isUploadScheduled.exchange(false); + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); } return result; } @@ -478,6 +494,7 @@ namespace MAT_NS_BEGIN { bool TransmissionPolicyManager::isUploadInProgress() const noexcept { // unfinished uploads that haven't processed callbacks or pending upload task + LOCKGUARD(m_scheduledUploadMutex); return (uploadCount() > 0) || m_isUploadScheduled; } diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index dc7f91cf9..029b6623f 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -90,9 +90,9 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; DeviceStateHandler m_deviceStateHandler; std::atomic m_isPaused { true }; - std::atomic m_isUploadScheduled { false }; - std::atomic m_scheduledUploadTime { std::numeric_limits::max() }; - std::mutex m_scheduledUploadMutex; + bool m_isUploadScheduled { false }; + uint64_t m_scheduledUploadTime { std::numeric_limits::max() }; + mutable std::mutex m_scheduledUploadMutex; PAL::DeferredCallbackHandle m_scheduledUpload; bool m_scheduledUploadAborted { false }; @@ -123,6 +123,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; /// Cancels pending upload task. /// bool cancelUploadTask(); + bool cancelUploadTaskLocked(); /// /// Calculate the number of pending upload contexts. @@ -131,7 +132,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; size_t uploadCount() const noexcept; std::chrono::milliseconds m_timerdelay { std::chrono::seconds { 2 } }; - std::atomic m_runningLatency { EventLatency_RealTime }; + EventLatency m_runningLatency { EventLatency_RealTime }; TimerArray m_timers; public: From 0b277171a9e2481fa54f4d5150d780ea69916bf6 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Thu, 30 Apr 2026 06:40:21 -0700 Subject: [PATCH 6/8] Avoid holding TPM scheduler mutex during cancel Keep the scheduled-upload state mutex-based, but stop holding m_scheduledUploadMutex across DeferredCallbackHandle::Cancel so shutdown and pause paths do not block uploadAsync behind the same lock. While touching the path, use std::chrono::milliseconds for the bandwidth-controller reschedule call so ENABLE_BW_CONTROLLER builds cleanly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 70 ++++++++++++++++----------- lib/tpm/TransmissionPolicyManager.hpp | 1 - 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index e7421bc7f..c52ccfc61 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -111,26 +111,35 @@ namespace MAT_NS_BEGIN { LOG_TRACE("Collector URL is not set, no upload."); return; } - LOCKGUARD(m_scheduledUploadMutex); - if (delay.count() < 0 || m_timerdelay.count() < 0) - { - LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); - return; - } - if (m_scheduledUploadAborted) + auto shouldSkipScheduling = [&delay, this]() -> bool { - LOG_TRACE("Scheduled upload aborted, no upload."); - return; - } - if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ]) ) - { - LOG_TRACE("Maximum number of HTTP requests reached"); - return; - } + if (delay.count() < 0 || m_timerdelay.count() < 0) + { + LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); + return true; + } + if (m_scheduledUploadAborted) + { + LOG_TRACE("Scheduled upload aborted, no upload."); + return true; + } + if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ])) + { + LOG_TRACE("Maximum number of HTTP requests reached"); + return true; + } + if (m_isPaused) + { + LOG_TRACE("Paused, not uploading anything until resumed"); + return true; + } - if (m_isPaused) + return false; + }; + + std::unique_lock scheduledUploadLock(m_scheduledUploadMutex); + if (shouldSkipScheduling()) { - LOG_TRACE("Paused, not uploading anything until resumed"); return; } @@ -161,10 +170,16 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - if (!cancelUploadTaskLocked()) + scheduledUploadLock.unlock(); + if (!cancelUploadTask()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } + scheduledUploadLock.lock(); + if (shouldSkipScheduling()) + { + return; + } } // Schedule new upload @@ -192,8 +207,7 @@ namespace MAT_NS_BEGIN { m_isUploadScheduled = false; // Allow to schedule another uploadAsync if ((m_isPaused) || (m_scheduledUploadAborted)) { - LOG_TRACE("Paused or upload aborted: cancel pending upload task."); - cancelUploadTaskLocked(); // If there is a pending upload task, kill it + LOG_TRACE("Paused or upload aborted: skip upload."); return; } } @@ -210,7 +224,7 @@ namespace MAT_NS_BEGIN { unsigned delayMs = 1000; LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later", proposedBandwidthBps, minimumBandwidthBps, delayMs); - scheduleUpload(delayMs, requestedLatency); // reschedule uploadAsync to run again 1000 ms later + scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later return; } } @@ -466,19 +480,19 @@ namespace MAT_NS_BEGIN { bool TransmissionPolicyManager::cancelUploadTask() { - LOCKGUARD(m_scheduledUploadMutex); - return cancelUploadTaskLocked(); - } - - bool TransmissionPolicyManager::cancelUploadTaskLocked() - { - bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count()); + auto waitTime = std::chrono::milliseconds{}; + { + LOCKGUARD(m_scheduledUploadMutex); + waitTime = getCancelWaitTime(); + } + bool result = m_scheduledUpload.Cancel(waitTime.count()); // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { + LOCKGUARD(m_scheduledUploadMutex); m_isUploadScheduled = false; m_scheduledUploadTime = std::numeric_limits::max(); } diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index 029b6623f..a9cf39a23 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -123,7 +123,6 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; /// Cancels pending upload task. /// bool cancelUploadTask(); - bool cancelUploadTaskLocked(); /// /// Calculate the number of pending upload contexts. From 2cdf8177f4c43ac2b8d9b4b1aa8d9344f7514439 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 4 May 2026 07:26:26 -0500 Subject: [PATCH 7/8] Address runtime review comments Keep forced upload scheduling atomic around no-wait cancellation and preserve HTTP responses until downstream abort/network-failure handlers finish. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/http/HttpResponseDecoder.cpp | 5 - lib/tpm/TransmissionPolicyManager.cpp | 23 +++- lib/tpm/TransmissionPolicyManager.hpp | 7 +- tests/unittests/HttpResponseDecoderTests.cpp | 21 ++- .../TransmissionPolicyManagerTests.cpp | 122 +++++++++++++++++- 5 files changed, 162 insertions(+), 16 deletions(-) diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp index 2bb652fdf..941931c1e 100644 --- a/lib/http/HttpResponseDecoder.cpp +++ b/lib/http/HttpResponseDecoder.cpp @@ -127,8 +127,6 @@ namespace MAT_NS_BEGIN { evt.param1 = 0; // response.GetStatusCode(); DispatchEvent(evt); } - delete ctx->httpResponse; - ctx->httpResponse = nullptr; // eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected requestAborted(ctx); break; @@ -158,8 +156,6 @@ namespace MAT_NS_BEGIN { evt.param1 = response.GetStatusCode(); DispatchEvent(evt); } - delete ctx->httpResponse; - ctx->httpResponse = nullptr; temporaryNetworkFailure(ctx); break; } @@ -254,4 +250,3 @@ namespace MAT_NS_BEGIN { } } MAT_NS_END - diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index c52ccfc61..100d2339a 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -170,12 +170,10 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - scheduledUploadLock.unlock(); - if (!cancelUploadTask()) + if (!cancelUploadTaskNoWaitLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } - scheduledUploadLock.lock(); if (shouldSkipScheduling()) { return; @@ -478,12 +476,31 @@ namespace MAT_NS_BEGIN { return (m_scheduledUploadAborted) ? DefaultTaskCancelTime : std::chrono::milliseconds {}; } + bool TransmissionPolicyManager::cancelUploadTaskNoWaitLocked() + { + bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count()); + + // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. + // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to + // ensure those tasks are canceled when the log manager is destroyed. Issue 388 + if (result) + { + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); + } + return result; + } + bool TransmissionPolicyManager::cancelUploadTask() { auto waitTime = std::chrono::milliseconds{}; { LOCKGUARD(m_scheduledUploadMutex); waitTime = getCancelWaitTime(); + if (waitTime.count() == 0) + { + return cancelUploadTaskNoWaitLocked(); + } } bool result = m_scheduledUpload.Cancel(waitTime.count()); diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index a9cf39a23..d6c97beb0 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -119,6 +119,12 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; std::chrono::milliseconds getCancelWaitTime() const noexcept; + /// + /// Cancels a pending upload task without waiting for a running task to finish. + /// The caller must already hold m_scheduledUploadMutex. + /// + bool cancelUploadTaskNoWaitLocked(); + /// /// Cancels pending upload task. /// @@ -160,4 +166,3 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; } MAT_NS_END #endif // TRANSMISSIONPOLICYMANAGER_HPP - diff --git a/tests/unittests/HttpResponseDecoderTests.cpp b/tests/unittests/HttpResponseDecoderTests.cpp index 314cdb513..7d11ae4b8 100644 --- a/tests/unittests/HttpResponseDecoderTests.cpp +++ b/tests/unittests/HttpResponseDecoderTests.cpp @@ -88,20 +88,29 @@ TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryServerFailures) TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryNetworkFailures) { auto ctx = createContextWith(HttpResult_LocalFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_LocalFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); ctx = createContextWith(HttpResult_NetworkFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_NetworkFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } TEST_F(HttpResponseDecoderTests, SkipsAbortedRequests) { auto ctx = createContextWith(HttpResult_Aborted, -1, ""); - EXPECT_CALL(*this, resultRequestAborted(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultRequestAborted(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_Aborted); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index 6cbdb99f5..b961df15f 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -11,14 +11,24 @@ #include "tpm/TransmissionPolicyManager.hpp" #include "TransmitProfiles.hpp" +#include +#include +#include +#include + using namespace testing; using namespace MAT; class TransmissionPolicyManager4Test : public TransmissionPolicyManager { public: + TransmissionPolicyManager4Test(ITelemetrySystem& system, ITaskDispatcher& taskDispatcher, IBandwidthController* bandwidthController) + : TransmissionPolicyManager(system, taskDispatcher, bandwidthController) + { + } + TransmissionPolicyManager4Test(ITelemetrySystem& system, IBandwidthController* bandwidthController) - : TransmissionPolicyManager(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) + : TransmissionPolicyManager4Test(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) { } @@ -69,6 +79,82 @@ class TransmissionPolicyManager4Test : public TransmissionPolicyManager { } }; +class BlockingCancelTaskDispatcher : public ITaskDispatcher +{ +public: + ~BlockingCancelTaskDispatcher() override + { + Join(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + + { + std::lock_guard lock(m_tasksMutex); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it == m_tasks.end()) + { + return false; + } + delete *it; + m_tasks.erase(it); + } + + { + std::lock_guard lock(m_cancelMutex); + m_cancelEntered = true; + } + m_cancelEnteredCv.notify_all(); + + std::unique_lock lock(m_cancelMutex); + m_cancelReleasedCv.wait(lock, [this]() { return m_cancelReleased; }); + return true; + } + + bool WaitForCancel(const std::chrono::milliseconds timeout) + { + std::unique_lock lock(m_cancelMutex); + return m_cancelEnteredCv.wait_for(lock, timeout, [this]() { return m_cancelEntered; }); + } + + void ReleaseCancel() + { + { + std::lock_guard lock(m_cancelMutex); + m_cancelReleased = true; + } + m_cancelReleasedCv.notify_all(); + } + +private: + std::mutex m_tasksMutex; + std::vector m_tasks; + + std::mutex m_cancelMutex; + std::condition_variable m_cancelEnteredCv; + std::condition_variable m_cancelReleasedCv; + bool m_cancelEntered = false; + bool m_cancelReleased = false; +}; + class TransmissionPolicyManagerTests : public StrictMock { protected: StrictMock runtimeConfigMock; @@ -608,6 +694,40 @@ TEST_F(TransmissionPolicyManagerTests, cancelUploadTask_ScheduledUpload_IsUpload ASSERT_FALSE(tpm.m_isUploadScheduled); } +TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCancelBlocks) +{ + BlockingCancelTaskDispatcher dispatcher; + TransmissionPolicyManager4Test blockingTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + blockingTpm.paused(false); + + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + + auto forceSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + }); + + ASSERT_TRUE(dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 })); + + auto delayedSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + }); + + EXPECT_EQ(delayedSchedule.wait_for(std::chrono::milliseconds{ 100 }), std::future_status::timeout); + + dispatcher.ReleaseCancel(); + + forceSchedule.get(); + delayedSchedule.get(); + + ASSERT_TRUE(blockingTpm.m_isUploadScheduled); + + auto remainingDelayMs = + static_cast(blockingTpm.m_scheduledUploadTime) - static_cast(PAL::getMonotonicTimeMs()); + + EXPECT_GT(remainingDelayMs, -100); + EXPECT_LT(remainingDelayMs, 250); +} + TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero) { tpm.m_backoff = nullptr; From 95519efd3239812498d9fad5475586b7a363f880 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 4 May 2026 10:34:15 -0500 Subject: [PATCH 8/8] Apply force-scheduled latency when running cancel fails When scheduleUpload is called with force=true (or zero delay) and the previously scheduled upload task is currently executing on the worker, the no-wait cancel returns false and m_isUploadScheduled stays set. The existing m_isUploadScheduled check then skipped scheduling a new task, silently dropping the requested latency for force-scheduled profile changes. Propagate the requested latency to m_runningLatency under the same mutex when this race occurs. uploadAsync re-reads m_runningLatency inside its own LOCKGUARD, so a task that hasn't yet entered that critical section will pick up the new latency. If uploadAsync has already cleared m_isUploadScheduled (past its LOCKGUARD), the existing fallthrough at line 184 schedules a fresh task with the new latency. Add a regression test using a fake dispatcher whose Cancel always returns false, asserting that a force-scheduled call updates m_runningLatency without enqueueing a duplicate task. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 12 +++ .../TransmissionPolicyManagerTests.cpp | 91 +++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 100d2339a..f4c1a800c 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -173,6 +173,18 @@ namespace MAT_NS_BEGIN { if (!cancelUploadTaskNoWaitLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); + // Cancel can return false when the previous upload task is + // currently executing on the worker. If uploadAsync hasn't + // yet entered its own LOCKGUARD (m_isUploadScheduled is + // still set under the mutex we hold), propagate the + // requested latency so the running task picks it up when + // it acquires m_scheduledUploadMutex. Otherwise the + // running task has already cleared the flag and the + // schedule below will queue a fresh task. + if (m_isUploadScheduled) + { + m_runningLatency = latency; + } } if (shouldSkipScheduling()) { diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index b961df15f..23e72eeb7 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -155,6 +155,65 @@ class BlockingCancelTaskDispatcher : public ITaskDispatcher bool m_cancelReleased = false; }; +class RunningTaskDispatcher : public ITaskDispatcher +{ +public: + ~RunningTaskDispatcher() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(task); + UNREFERENCED_PARAMETER(waitTime); + // Simulate a task that is currently executing on the worker: + // cancellation can never proceed without waiting for the run + // to complete, so a no-wait cancel must return false. + std::lock_guard lock(m_tasksMutex); + m_cancelCount++; + return false; + } + + size_t QueuedCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_tasks.size(); + } + + size_t CancelCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_cancelCount; + } + +private: + mutable std::mutex m_tasksMutex; + std::vector m_tasks; + size_t m_cancelCount = 0; +}; + class TransmissionPolicyManagerTests : public StrictMock { protected: StrictMock runtimeConfigMock; @@ -728,6 +787,38 @@ TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCa EXPECT_LT(remainingDelayMs, 250); } +TEST_F(TransmissionPolicyManagerTests, ForceScheduleAppliesLatencyWhenRunningCancelFails) +{ + RunningTaskDispatcher dispatcher; + TransmissionPolicyManager4Test runningTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + runningTpm.paused(false); + + // Queue an initial upload so m_scheduledUpload has a non-null task and + // m_isUploadScheduled is set; the dispatcher's Cancel will fail later + // (simulating the "task currently executing on worker" race). + runningTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + ASSERT_TRUE(runningTpm.m_isUploadScheduled); + ASSERT_EQ(dispatcher.QueuedCount(), 1u); + + auto scheduledTimeBefore = runningTpm.m_scheduledUploadTime; + // Reset m_runningLatency so we can observe the force path updating it + // (the initial schedule may have bumped it depending on the active + // profile's timers). + runningTpm.runningLatency(EventLatency_Normal); + + // Force a higher-priority schedule. The dispatcher's no-wait cancel + // returns false, so the previous task remains in flight. The fix in + // scheduleUpload must propagate the new latency to m_runningLatency + // so the running task picks it up under the same mutex. + runningTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + + EXPECT_GE(dispatcher.CancelCount(), 1u); + EXPECT_EQ(dispatcher.QueuedCount(), 1u); + EXPECT_TRUE(runningTpm.m_isUploadScheduled); + EXPECT_EQ(runningTpm.m_runningLatency, EventLatency_RealTime); + EXPECT_EQ(runningTpm.m_scheduledUploadTime, scheduledTimeBefore); +} + TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero) { tpm.m_backoff = nullptr;