diff --git a/lib/api/Logger.cpp b/lib/api/Logger.cpp index 54d883664..f76f85734 100644 --- a/lib/api/Logger.cpp +++ b/lib/api/Logger.cpp @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN Logger::~Logger() noexcept { - LOG_TRACE("%p: Destroyed", this); + // Intentionally empty — logging here triggers a static-destruction-order + // crash on iOS simulator (recursive_mutex used after teardown). } ISemanticContext* Logger::GetSemanticContext() const diff --git a/lib/http/HttpClientManager.cpp b/lib/http/HttpClientManager.cpp index 58fa5fb4a..a1c228556 100644 --- a/lib/http/HttpClientManager.cpp +++ b/lib/http/HttpClientManager.cpp @@ -149,8 +149,15 @@ namespace MAT_NS_BEGIN { void HttpClientManager::cancelAllRequests() { cancelAllRequestsAsync(); - while (!m_httpCallbacks.empty()) + while (true) + { + { + LOCKGUARD(m_httpCallbacksMtx); + if (m_httpCallbacks.empty()) + break; + } std::this_thread::yield(); + } } // start async cancellation diff --git a/lib/http/HttpClient_Apple.mm b/lib/http/HttpClient_Apple.mm index 05817087a..579b05313 100644 --- a/lib/http/HttpClient_Apple.mm +++ b/lib/http/HttpClient_Apple.mm @@ -132,23 +132,6 @@ void HandleResponse(NSData* data, NSURLResponse* response, NSError* error) void Cancel() { [m_dataTask cancel]; - [session getTasksWithCompletionHandler:^(NSArray* dataTasks, NSArray* uploadTasks, NSArray* downloadTasks) - { - for (NSURLSessionTask* _task in dataTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in downloadTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in uploadTasks) - { - [_task cancel]; - } - }]; } private: @@ -214,8 +197,13 @@ void Cancel() for (const auto &id : ids) CancelRequestAsync(id); - while (!m_requests.empty()) + while (true) { + { + std::lock_guard lock(m_requestsMtx); + if (m_requests.empty()) + break; + } PAL::sleep(100); std::this_thread::yield(); } diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp index 11e9d4096..941931c1e 100644 --- a/lib/http/HttpResponseDecoder.cpp +++ b/lib/http/HttpResponseDecoder.cpp @@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN { break; case HttpResult_Aborted: - ctx->httpResponse = nullptr; outcome = Abort; break; case HttpResult_LocalFailure: case HttpResult_NetworkFailure: - ctx->httpResponse = nullptr; outcome = RetryNetwork; break; } @@ -129,7 +127,6 @@ namespace MAT_NS_BEGIN { evt.param1 = 0; // response.GetStatusCode(); DispatchEvent(evt); } - ctx->httpResponse = nullptr; // eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected requestAborted(ctx); break; @@ -253,4 +250,3 @@ namespace MAT_NS_BEGIN { } } MAT_NS_END - diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 2bdbf6c67..5eccbb5f2 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -6,6 +6,8 @@ #include "pal/WorkerThread.hpp" #include "pal/PAL.hpp" +#include + #if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32) /* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */ @@ -35,6 +37,7 @@ namespace PAL_NS_BEGIN { std::list m_timerQueue; Event m_event; MAT::Task* m_itemInProgress; + bool m_shuttingDown = false; int count = 0; public: @@ -53,25 +56,54 @@ namespace PAL_NS_BEGIN { void Join() final { - auto item = new WorkerThreadShutdownItem(); - Queue(item); std::thread::id this_id = std::this_thread::get_id(); + bool joined = false; + { + LOCKGUARD(m_lock); + if (!m_shuttingDown) { + m_shuttingDown = true; + m_queue.push_back(new WorkerThreadShutdownItem()); + count++; + m_event.post(); + } + } try { - if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) + if (!m_hThread.joinable()) { + return; + } + if (m_hThread.get_id() != this_id) { m_hThread.join(); - else + joined = true; + } else { m_hThread.detach(); + } + } + catch (const std::system_error& e) { + LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what()); + } + catch (const std::exception& e) { + LOG_ERROR("Thread join/detach failed: %s", e.what()); } - catch (...) {}; - // TODO: [MG] - investigate if we ever drop work items on shutdown. - if (!m_queue.empty()) - { - LOG_WARN("m_queue is not empty!"); + // Log pending work in both paths so operators can see if + // shutdown is dropping tasks. + LOCKGUARD(m_lock); + if (!m_queue.empty()) { + LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size()); } - if (!m_timerQueue.empty()) - { - LOG_WARN("m_timerQueue is not empty!"); + if (!m_timerQueue.empty()) { + LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size()); + } + + // Clean up any tasks remaining in the queues after shutdown. + // Only safe after join() — the thread has fully exited. + // After detach(), the thread still needs the shutdown item + // and may still be accessing the queues. + if (joined) { + for (auto task : m_queue) { delete task; } + m_queue.clear(); + for (auto task : m_timerQueue) { delete task; } + m_timerQueue.clear(); } } @@ -79,6 +111,11 @@ namespace PAL_NS_BEGIN { { LOG_INFO("queue item=%p", &item); LOCKGUARD(m_lock); + if (m_shuttingDown) { + LOG_WARN("Dropping queued task %p during shutdown", item); + delete item; + return; + } if (item->Type == MAT::Task::TimedCall) { auto it = m_timerQueue.begin(); while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) { diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 83b82cf2a..f4c1a800c 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -111,26 +111,35 @@ namespace MAT_NS_BEGIN { LOG_TRACE("Collector URL is not set, no upload."); return; } - LOCKGUARD(m_scheduledUploadMutex); - if (delay.count() < 0 || m_timerdelay.count() < 0) - { - LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); - return; - } - if (m_scheduledUploadAborted) - { - LOG_TRACE("Scheduled upload aborted, no upload."); - return; - } - if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ]) ) + auto shouldSkipScheduling = [&delay, this]() -> bool { - LOG_TRACE("Maximum number of HTTP requests reached"); - return; - } + if (delay.count() < 0 || m_timerdelay.count() < 0) + { + LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); + return true; + } + if (m_scheduledUploadAborted) + { + LOG_TRACE("Scheduled upload aborted, no upload."); + return true; + } + if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ])) + { + LOG_TRACE("Maximum number of HTTP requests reached"); + return true; + } + if (m_isPaused) + { + LOG_TRACE("Paused, not uploading anything until resumed"); + return true; + } + + return false; + }; - if (m_isPaused) + std::unique_lock scheduledUploadLock(m_scheduledUploadMutex); + if (shouldSkipScheduling()) { - LOG_TRACE("Paused, not uploading anything until resumed"); return; } @@ -151,7 +160,6 @@ namespace MAT_NS_BEGIN { if (delta <= static_cast(delay.count())) { // Don't need to cancel and reschedule if it's about to happen now anyways. - // m_isUploadScheduled check does not have to be strictly atomic because // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); @@ -162,15 +170,32 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - if (!cancelUploadTask()) + if (!cancelUploadTaskNoWaitLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); + // Cancel can return false when the previous upload task is + // currently executing on the worker. If uploadAsync hasn't + // yet entered its own LOCKGUARD (m_isUploadScheduled is + // still set under the mutex we hold), propagate the + // requested latency so the running task picks it up when + // it acquires m_scheduledUploadMutex. Otherwise the + // running task has already cleared the flag and the + // schedule below will queue a fresh task. + if (m_isUploadScheduled) + { + m_runningLatency = latency; + } + } + if (shouldSkipScheduling()) + { + return; } } // Schedule new upload - if (!m_isUploadScheduled.exchange(true)) + if (!m_isUploadScheduled) { + m_isUploadScheduled = true; m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); @@ -184,16 +209,15 @@ namespace MAT_NS_BEGIN { if (guard.isPaused()) { return; } - m_runningLatency = latency; - m_scheduledUploadTime = std::numeric_limits::max(); - + EventLatency requestedLatency = latency; { LOCKGUARD(m_scheduledUploadMutex); + requestedLatency = m_runningLatency; + m_scheduledUploadTime = std::numeric_limits::max(); m_isUploadScheduled = false; // Allow to schedule another uploadAsync if ((m_isPaused) || (m_scheduledUploadAborted)) { - LOG_TRACE("Paused or upload aborted: cancel pending upload task."); - cancelUploadTask(); // If there is a pending upload task, kill it + LOG_TRACE("Paused or upload aborted: skip upload."); return; } } @@ -210,14 +234,14 @@ namespace MAT_NS_BEGIN { unsigned delayMs = 1000; LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later", proposedBandwidthBps, minimumBandwidthBps, delayMs); - scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later + scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later return; } } #endif auto ctx = m_system.createEventsUploadContext(); - ctx->requestedMinLatency = m_runningLatency; + ctx->requestedMinLatency = requestedLatency; addUpload(ctx); initiateUpload(ctx); } @@ -284,9 +308,9 @@ namespace MAT_NS_BEGIN { LOCKGUARD(m_scheduledUploadMutex); // Prevent execution of all upload tasks m_scheduledUploadAborted = true; - // Make sure we wait for completion of the upload scheduling task that may be running - cancelUploadTask(); } + // Make sure we wait for completion of the upload scheduling task that may be running + cancelUploadTask(); // Make sure we wait for all active upload callbacks to finish while (uploadCount() > 0) @@ -342,7 +366,12 @@ namespace MAT_NS_BEGIN { } // Schedule async upload if not scheduled yet - if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) + bool isUploadScheduled = false; + { + LOCKGUARD(m_scheduledUploadMutex); + isUploadScheduled = m_isUploadScheduled; + } + if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) { if (updateTimersIfNecessary()) { @@ -374,7 +403,13 @@ namespace MAT_NS_BEGIN { return EventLatency_RealTime; } - if (m_runningLatency == EventLatency_RealTime) + EventLatency runningLatency = EventLatency_RealTime; + { + LOCKGUARD(m_scheduledUploadMutex); + runningLatency = m_runningLatency; + } + + if (runningLatency == EventLatency_RealTime) { return EventLatency_Normal; } @@ -453,16 +488,42 @@ namespace MAT_NS_BEGIN { return (m_scheduledUploadAborted) ? DefaultTaskCancelTime : std::chrono::milliseconds {}; } + bool TransmissionPolicyManager::cancelUploadTaskNoWaitLocked() + { + bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count()); + + // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. + // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to + // ensure those tasks are canceled when the log manager is destroyed. Issue 388 + if (result) + { + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); + } + return result; + } + bool TransmissionPolicyManager::cancelUploadTask() { - bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count()); + auto waitTime = std::chrono::milliseconds{}; + { + LOCKGUARD(m_scheduledUploadMutex); + waitTime = getCancelWaitTime(); + if (waitTime.count() == 0) + { + return cancelUploadTaskNoWaitLocked(); + } + } + bool result = m_scheduledUpload.Cancel(waitTime.count()); // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { - m_isUploadScheduled.exchange(false); + LOCKGUARD(m_scheduledUploadMutex); + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); } return result; } @@ -476,6 +537,7 @@ namespace MAT_NS_BEGIN { bool TransmissionPolicyManager::isUploadInProgress() const noexcept { // unfinished uploads that haven't processed callbacks or pending upload task + LOCKGUARD(m_scheduledUploadMutex); return (uploadCount() > 0) || m_isUploadScheduled; } diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index e1a91ad10..d6c97beb0 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -90,9 +90,9 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; DeviceStateHandler m_deviceStateHandler; std::atomic m_isPaused { true }; - std::atomic m_isUploadScheduled { false }; + bool m_isUploadScheduled { false }; uint64_t m_scheduledUploadTime { std::numeric_limits::max() }; - std::mutex m_scheduledUploadMutex; + mutable std::mutex m_scheduledUploadMutex; PAL::DeferredCallbackHandle m_scheduledUpload; bool m_scheduledUploadAborted { false }; @@ -119,6 +119,12 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; std::chrono::milliseconds getCancelWaitTime() const noexcept; + /// + /// Cancels a pending upload task without waiting for a running task to finish. + /// The caller must already hold m_scheduledUploadMutex. + /// + bool cancelUploadTaskNoWaitLocked(); + /// /// Cancels pending upload task. /// @@ -160,4 +166,3 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; } MAT_NS_END #endif // TRANSMISSIONPOLICYMANAGER_HPP - diff --git a/tests/unittests/HttpResponseDecoderTests.cpp b/tests/unittests/HttpResponseDecoderTests.cpp index 314cdb513..7d11ae4b8 100644 --- a/tests/unittests/HttpResponseDecoderTests.cpp +++ b/tests/unittests/HttpResponseDecoderTests.cpp @@ -88,20 +88,29 @@ TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryServerFailures) TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryNetworkFailures) { auto ctx = createContextWith(HttpResult_LocalFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_LocalFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); ctx = createContextWith(HttpResult_NetworkFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_NetworkFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } TEST_F(HttpResponseDecoderTests, SkipsAbortedRequests) { auto ctx = createContextWith(HttpResult_Aborted, -1, ""); - EXPECT_CALL(*this, resultRequestAborted(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultRequestAborted(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_Aborted); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index 6cbdb99f5..23e72eeb7 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -11,14 +11,24 @@ #include "tpm/TransmissionPolicyManager.hpp" #include "TransmitProfiles.hpp" +#include +#include +#include +#include + using namespace testing; using namespace MAT; class TransmissionPolicyManager4Test : public TransmissionPolicyManager { public: + TransmissionPolicyManager4Test(ITelemetrySystem& system, ITaskDispatcher& taskDispatcher, IBandwidthController* bandwidthController) + : TransmissionPolicyManager(system, taskDispatcher, bandwidthController) + { + } + TransmissionPolicyManager4Test(ITelemetrySystem& system, IBandwidthController* bandwidthController) - : TransmissionPolicyManager(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) + : TransmissionPolicyManager4Test(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) { } @@ -69,6 +79,141 @@ class TransmissionPolicyManager4Test : public TransmissionPolicyManager { } }; +class BlockingCancelTaskDispatcher : public ITaskDispatcher +{ +public: + ~BlockingCancelTaskDispatcher() override + { + Join(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + + { + std::lock_guard lock(m_tasksMutex); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it == m_tasks.end()) + { + return false; + } + delete *it; + m_tasks.erase(it); + } + + { + std::lock_guard lock(m_cancelMutex); + m_cancelEntered = true; + } + m_cancelEnteredCv.notify_all(); + + std::unique_lock lock(m_cancelMutex); + m_cancelReleasedCv.wait(lock, [this]() { return m_cancelReleased; }); + return true; + } + + bool WaitForCancel(const std::chrono::milliseconds timeout) + { + std::unique_lock lock(m_cancelMutex); + return m_cancelEnteredCv.wait_for(lock, timeout, [this]() { return m_cancelEntered; }); + } + + void ReleaseCancel() + { + { + std::lock_guard lock(m_cancelMutex); + m_cancelReleased = true; + } + m_cancelReleasedCv.notify_all(); + } + +private: + std::mutex m_tasksMutex; + std::vector m_tasks; + + std::mutex m_cancelMutex; + std::condition_variable m_cancelEnteredCv; + std::condition_variable m_cancelReleasedCv; + bool m_cancelEntered = false; + bool m_cancelReleased = false; +}; + +class RunningTaskDispatcher : public ITaskDispatcher +{ +public: + ~RunningTaskDispatcher() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(task); + UNREFERENCED_PARAMETER(waitTime); + // Simulate a task that is currently executing on the worker: + // cancellation can never proceed without waiting for the run + // to complete, so a no-wait cancel must return false. + std::lock_guard lock(m_tasksMutex); + m_cancelCount++; + return false; + } + + size_t QueuedCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_tasks.size(); + } + + size_t CancelCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_cancelCount; + } + +private: + mutable std::mutex m_tasksMutex; + std::vector m_tasks; + size_t m_cancelCount = 0; +}; + class TransmissionPolicyManagerTests : public StrictMock { protected: StrictMock runtimeConfigMock; @@ -608,6 +753,72 @@ TEST_F(TransmissionPolicyManagerTests, cancelUploadTask_ScheduledUpload_IsUpload ASSERT_FALSE(tpm.m_isUploadScheduled); } +TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCancelBlocks) +{ + BlockingCancelTaskDispatcher dispatcher; + TransmissionPolicyManager4Test blockingTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + blockingTpm.paused(false); + + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + + auto forceSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + }); + + ASSERT_TRUE(dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 })); + + auto delayedSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + }); + + EXPECT_EQ(delayedSchedule.wait_for(std::chrono::milliseconds{ 100 }), std::future_status::timeout); + + dispatcher.ReleaseCancel(); + + forceSchedule.get(); + delayedSchedule.get(); + + ASSERT_TRUE(blockingTpm.m_isUploadScheduled); + + auto remainingDelayMs = + static_cast(blockingTpm.m_scheduledUploadTime) - static_cast(PAL::getMonotonicTimeMs()); + + EXPECT_GT(remainingDelayMs, -100); + EXPECT_LT(remainingDelayMs, 250); +} + +TEST_F(TransmissionPolicyManagerTests, ForceScheduleAppliesLatencyWhenRunningCancelFails) +{ + RunningTaskDispatcher dispatcher; + TransmissionPolicyManager4Test runningTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + runningTpm.paused(false); + + // Queue an initial upload so m_scheduledUpload has a non-null task and + // m_isUploadScheduled is set; the dispatcher's Cancel will fail later + // (simulating the "task currently executing on worker" race). + runningTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + ASSERT_TRUE(runningTpm.m_isUploadScheduled); + ASSERT_EQ(dispatcher.QueuedCount(), 1u); + + auto scheduledTimeBefore = runningTpm.m_scheduledUploadTime; + // Reset m_runningLatency so we can observe the force path updating it + // (the initial schedule may have bumped it depending on the active + // profile's timers). + runningTpm.runningLatency(EventLatency_Normal); + + // Force a higher-priority schedule. The dispatcher's no-wait cancel + // returns false, so the previous task remains in flight. The fix in + // scheduleUpload must propagate the new latency to m_runningLatency + // so the running task picks it up under the same mutex. + runningTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + + EXPECT_GE(dispatcher.CancelCount(), 1u); + EXPECT_EQ(dispatcher.QueuedCount(), 1u); + EXPECT_TRUE(runningTpm.m_isUploadScheduled); + EXPECT_EQ(runningTpm.m_runningLatency, EventLatency_RealTime); + EXPECT_EQ(runningTpm.m_scheduledUploadTime, scheduledTimeBefore); +} + TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero) { tpm.m_backoff = nullptr;