Skip to content

Commit b9a0540

Browse files
committed
Fix executor thread-safety for feedback subscription content filter
The background executor thread re-enters rcl_wait immediately after signaling the main thread, creating a data race when the main thread modifies the feedback subscription's content filter during callback processing. Add a condition variable so the background thread waits until the main thread finishes ExecuteReadyHandles before re-entering rcl_wait. Also fix the RMW content filter probe in the ActionClient constructor: - Use node.destroySubscription() instead of probeSub.destroy() (which does not exist on Subscription), preventing silent probe failure that was disabling the optimization unconditionally. - Use interface_loader in tests to access FeedbackMessage type correctly. - Change test probe from before() to beforeEach() so the outer beforeEach-created node is available. - Add delay before feedback publish in test executeCallback to allow the client time to set up the content filter after goal acceptance.
1 parent 084bd4f commit b9a0540

4 files changed

Lines changed: 91 additions & 3 deletions

File tree

lib/action/client.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,29 @@ class ActionClient extends Entity {
118118
this.qos.statusSubQosProfile
119119
);
120120

121+
// Probe that the RMW actually supports content filtering before enabling
122+
// the optimization. If the RMW does not support content filtering, the
123+
// rcl layer returns RCL_RET_UNSUPPORTED which disables the optimization
124+
// gracefully. However, probing upfront avoids the warning log on first use.
125+
if (this._enableFeedbackMsgOptimization) {
126+
try {
127+
const probeSub = node.createSubscription(
128+
this._typeClass.impl.FeedbackMessage,
129+
actionName + '/_action/feedback',
130+
() => {}
131+
);
132+
const supported =
133+
typeof probeSub.isContentFilterSupported === 'function' &&
134+
probeSub.isContentFilterSupported();
135+
node.destroySubscription(probeSub);
136+
if (!supported) {
137+
this._enableFeedbackMsgOptimization = false;
138+
}
139+
} catch {
140+
this._enableFeedbackMsgOptimization = false;
141+
}
142+
}
143+
121144
node._addActionClient(this);
122145
}
123146

src/executor.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ Executor::Executor(Napi::Env env, HandleManager* handle_manager,
4848
handle_manager_(handle_manager),
4949
delegate_(delegate),
5050
context_(nullptr),
51-
env_(env) {
51+
env_(env),
52+
work_pending_(false) {
5253
running_.store(false);
5354
}
5455

@@ -105,6 +106,8 @@ void Executor::Stop() {
105106
// Stop thread first, and then uv_close
106107
// Make sure async_ is not used anymore
107108
running_.store(false);
109+
// Wake the background thread in case it is waiting on the condvar.
110+
work_done_cv_.notify_all();
108111
handle_manager_->StopWaitingHandles();
109112
uv_thread_join(&background_thread_);
110113

@@ -134,6 +137,13 @@ bool Executor::IsMainThread() {
134137
void Executor::DoWork(uv_async_t* handle) {
135138
Executor* executor = reinterpret_cast<Executor*>(handle->data);
136139
executor->ExecuteReadyHandles();
140+
141+
// Signal the background thread that it is safe to re-enter rcl_wait.
142+
{
143+
std::lock_guard<std::mutex> lock(executor->work_done_mutex_);
144+
executor->work_pending_ = false;
145+
}
146+
executor->work_done_cv_.notify_one();
137147
}
138148

139149
void Executor::Run(void* arg) {
@@ -159,7 +169,23 @@ void Executor::Run(void* arg) {
159169

160170
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(executor->async_)) &&
161171
handle_manager->ready_handles_count() > 0) {
172+
// Tell the main thread there is work to do, then wait for it to
173+
// finish before re-entering rcl_wait. This prevents a data race
174+
// where the background thread holds subscriptions in the wait set
175+
// while the main thread modifies their state (e.g. content filter).
176+
{
177+
std::lock_guard<std::mutex> lock(executor->work_done_mutex_);
178+
executor->work_pending_ = true;
179+
}
162180
uv_async_send(executor->async_);
181+
182+
// Wait until DoWork() signals completion.
183+
{
184+
std::unique_lock<std::mutex> lock(executor->work_done_mutex_);
185+
executor->work_done_cv_.wait(lock, [executor] {
186+
return !executor->work_pending_ || !executor->running_.load();
187+
});
188+
}
163189
}
164190
}
165191

src/executor.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <uv.h>
2121

2222
#include <atomic>
23+
#include <condition_variable>
2324
#include <exception>
25+
#include <mutex>
2426
#include <vector>
2527

2628
#include "rcl_handle.h"
@@ -72,6 +74,15 @@ class Executor {
7274
Napi::Env env_;
7375

7476
std::atomic_bool running_;
77+
78+
// Synchronization: the background thread waits after uv_async_send until
79+
// the main thread finishes ExecuteReadyHandles. This prevents the
80+
// background thread from re-entering rcl_wait (which holds a reference to
81+
// subscriptions) while the main thread modifies subscription state (e.g.
82+
// content filter changes).
83+
std::mutex work_done_mutex_;
84+
std::condition_variable work_done_cv_;
85+
bool work_pending_; // true while the main thread is processing handles
7586
};
7687

7788
} // namespace rclnodejs

test/test-action-client.js

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ describe('rclnodejs action client', function () {
3131
let publishFeedback = null;
3232

3333
async function executeCallback(goalHandle) {
34+
// Delay before publishing feedback to allow the client time to process
35+
// the goal response and set up the content filter (if enabled).
36+
await assertUtils.createDelay(50);
37+
3438
if (
3539
publishFeedback &&
3640
ActionUuid.fromMessage(publishFeedback).toString() ===
@@ -317,6 +321,30 @@ describe('rclnodejs action client', function () {
317321
typeof nativeLoader.actionConfigureFeedbackSubFilterAddGoalId ===
318322
'function';
319323

324+
// Probe whether the RMW supports content filtering, matching the
325+
// same check the ActionClient constructor performs.
326+
let isContentFilterSupported = false;
327+
beforeEach(function () {
328+
if (isContentFilterSupported) return; // only probe once
329+
if (isFeedbackFilterSupported()) {
330+
try {
331+
const loader = require('../lib/interface_loader.js');
332+
const typeClass = loader.loadInterface(fibonacci);
333+
const probeSub = node.createSubscription(
334+
typeClass.impl.FeedbackMessage,
335+
'fibonacci/_action/feedback',
336+
() => {}
337+
);
338+
isContentFilterSupported =
339+
typeof probeSub.isContentFilterSupported === 'function' &&
340+
probeSub.isContentFilterSupported();
341+
node.destroySubscription(probeSub);
342+
} catch {
343+
isContentFilterSupported = false;
344+
}
345+
}
346+
});
347+
320348
it('Test option defaults to false', function () {
321349
let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci');
322350
assert.strictEqual(client._enableFeedbackMsgOptimization, false);
@@ -327,8 +355,8 @@ describe('rclnodejs action client', function () {
327355
let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', {
328356
enableFeedbackMsgOptimization: true,
329357
});
330-
// Only enabled when native API exists
331-
if (isFeedbackFilterSupported()) {
358+
// Only enabled when native API exists AND the RMW supports content filtering
359+
if (isFeedbackFilterSupported() && isContentFilterSupported) {
332360
assert.strictEqual(client._enableFeedbackMsgOptimization, true);
333361
} else {
334362
assert.strictEqual(client._enableFeedbackMsgOptimization, false);

0 commit comments

Comments
 (0)