diff --git a/lib/action/client.js b/lib/action/client.js index 083ee1edb..28c5fd7ad 100644 --- a/lib/action/client.js +++ b/lib/action/client.js @@ -51,6 +51,12 @@ class ActionClient extends Entity { * @param {QoS} options.qos.feedbackSubQosProfile - Quality of service option for the feedback subscription, * default: new QoS(QoS.HistoryPolicy.RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT, 10). * @param {QoS} options.qos.statusSubQosProfile - Quality of service option for the status subscription, default: QoS.profileActionStatusDefault. + * @param {boolean} options.enableFeedbackMsgOptimization - Enable feedback subscription content filter to + * optimize the handling of feedback messages. When enabled, the content filter is used to configure + * the goal ID for the subscription, which helps avoid the reception of irrelevant feedback messages. + * An action client can handle up to 6 goals simultaneously with this optimization. If the number + * of goals exceeds the limit or the RMW doesn't support content filter, optimization is automatically + * disabled. Default: false. */ constructor(node, typeClass, actionName, options) { super(null, null, options); @@ -87,6 +93,15 @@ class ActionClient extends Entity { checkTypes: true, }; + // Enable feedback subscription content filter optimization. + // Only supported on ROS2 Rolling and only effective when the native + // binding provides the required functions AND the RMW implementation + // actually supports content filtering on the feedback subscription. + this._enableFeedbackMsgOptimization = + this._options.enableFeedbackMsgOptimization === true && + DistroUtils.getDistroId() >= DistroUtils.DistroId.ROLLING && + typeof rclnodejs.actionConfigureFeedbackSubFilterAddGoalId === 'function'; + let type = this.typeClass.type(); this._handle = rclnodejs.actionCreateClient( @@ -126,6 +141,7 @@ class ActionClient extends Entity { } this._goalHandles.set(uuid, goalHandle); + this._feedbackSubFilterAddGoalId(goalHandle.goalId); } else { // Clean up feedback callback for rejected goals let uuid = ActionUuid.fromMessage( @@ -205,6 +221,9 @@ class ActionClient extends Entity { status === ActionInterfaces.GoalStatus.STATUS_ABORTED ) { this._goalHandles.delete(uuid); + this._feedbackSubFilterRemoveGoalId( + statusMessage.goal_info.goal_id + ); } } } else { @@ -393,6 +412,8 @@ class ActionClient extends Entity { this._removePendingCancelRequest(sequenceNumber) ); + this._feedbackSubFilterRemoveGoalId(goalHandle.goalId); + return deferred.promise; } @@ -442,9 +463,10 @@ class ActionClient extends Entity { goalHandle.status = result.status; return result.result; }); - deferred.setDoneCallback(() => - this._removePendingResultRequest(sequenceNumber) - ); + deferred.setDoneCallback(() => { + this._removePendingResultRequest(sequenceNumber); + this._feedbackSubFilterRemoveGoalId(goalHandle.goalId); + }); this._pendingResultRequests.set(sequenceNumber, deferred); @@ -464,6 +486,42 @@ class ActionClient extends Entity { this._pendingCancelRequests.delete(sequenceNumber); } + /** + * Add a goal ID to the feedback subscription content filter. + * @ignore + * @param {object} goalId - The goal UUID message. + */ + _feedbackSubFilterAddGoalId(goalId) { + if (!this._enableFeedbackMsgOptimization) return; + try { + rclnodejs.actionConfigureFeedbackSubFilterAddGoalId( + this.handle, + Buffer.from(goalId.uuid) + ); + } catch (e) { + this._enableFeedbackMsgOptimization = false; + this._node.getLogger().warn(`${e.message}`); + } + } + + /** + * Remove a goal ID from the feedback subscription content filter. + * @ignore + * @param {object} goalId - The goal UUID message. + */ + _feedbackSubFilterRemoveGoalId(goalId) { + if (!this._enableFeedbackMsgOptimization) return; + try { + rclnodejs.actionConfigureFeedbackSubFilterRemoveGoalId( + this.handle, + Buffer.from(goalId.uuid) + ); + } catch (e) { + this._enableFeedbackMsgOptimization = false; + this._node.getLogger().warn(`${e.message}`); + } + } + /** * Destroy the underlying action client handle. * @return {undefined} diff --git a/src/executor.cpp b/src/executor.cpp index 38dbbc451..d9411c27f 100644 --- a/src/executor.cpp +++ b/src/executor.cpp @@ -48,7 +48,8 @@ Executor::Executor(Napi::Env env, HandleManager* handle_manager, handle_manager_(handle_manager), delegate_(delegate), context_(nullptr), - env_(env) { + env_(env), + work_pending_(false) { running_.store(false); } @@ -105,6 +106,8 @@ void Executor::Stop() { // Stop thread first, and then uv_close // Make sure async_ is not used anymore running_.store(false); + // Wake the background thread in case it is waiting on the condvar. + work_done_cv_.notify_all(); handle_manager_->StopWaitingHandles(); uv_thread_join(&background_thread_); @@ -133,6 +136,21 @@ bool Executor::IsMainThread() { void Executor::DoWork(uv_async_t* handle) { Executor* executor = reinterpret_cast(handle->data); + + // RAII guard: always clear work_pending_ and notify the background thread, + // even if ExecuteReadyHandles() throws (e.g. from N-API callbacks). + // Without this, the background thread would block forever on work_done_cv_. + struct WorkDoneGuard { + Executor* exec; + ~WorkDoneGuard() { + { + std::lock_guard lock(exec->work_done_mutex_); + exec->work_pending_ = false; + } + exec->work_done_cv_.notify_one(); + } + } guard{executor}; + executor->ExecuteReadyHandles(); } @@ -159,7 +177,23 @@ void Executor::Run(void* arg) { if (!uv_is_closing(reinterpret_cast(executor->async_)) && handle_manager->ready_handles_count() > 0) { + // Tell the main thread there is work to do, then wait for it to + // finish before re-entering rcl_wait. This prevents a data race + // where the background thread holds subscriptions in the wait set + // while the main thread modifies their state (e.g. content filter). + { + std::lock_guard lock(executor->work_done_mutex_); + executor->work_pending_ = true; + } uv_async_send(executor->async_); + + // Wait until DoWork() signals completion. + { + std::unique_lock lock(executor->work_done_mutex_); + executor->work_done_cv_.wait(lock, [executor] { + return !executor->work_pending_ || !executor->running_.load(); + }); + } } } diff --git a/src/executor.h b/src/executor.h index 5b5b2a67a..601ec8023 100644 --- a/src/executor.h +++ b/src/executor.h @@ -20,7 +20,9 @@ #include #include +#include #include +#include #include #include "rcl_handle.h" @@ -72,6 +74,15 @@ class Executor { Napi::Env env_; std::atomic_bool running_; + + // Synchronization: the background thread waits after uv_async_send until + // the main thread finishes ExecuteReadyHandles. This prevents the + // background thread from re-entering rcl_wait (which holds a reference to + // subscriptions) while the main thread modifies subscription state (e.g. + // content filter changes). + std::mutex work_done_mutex_; + std::condition_variable work_done_cv_; + bool work_pending_; // true while the main thread is processing handles }; } // namespace rclnodejs diff --git a/src/rcl_action_client_bindings.cpp b/src/rcl_action_client_bindings.cpp index de548e34d..560b33f4b 100644 --- a/src/rcl_action_client_bindings.cpp +++ b/src/rcl_action_client_bindings.cpp @@ -250,6 +250,67 @@ Napi::Value ActionSendCancelRequest(const Napi::CallbackInfo& info) { return Napi::Number::New(env, static_cast(sequence_number)); } +#if ROS_VERSION >= 5000 // ROS2 Rolling +Napi::Value ActionConfigureFeedbackSubFilterAddGoalId( + const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + + RclHandle* action_client_handle = + RclHandle::Unwrap(info[0].As()); + rcl_action_client_t* action_client = + reinterpret_cast(action_client_handle->ptr()); + + auto goal_id_buffer = info[1].As>(); + const uint8_t* goal_id_array = goal_id_buffer.Data(); + size_t goal_id_size = goal_id_buffer.Length(); + + rcl_ret_t ret = + rcl_action_client_configure_feedback_subscription_filter_add_goal_id( + action_client, goal_id_array, goal_id_size); + + if (RCL_RET_OK != ret) { + std::string error_text{ + "Failed to add goal id to feedback subscription content filter: "}; + error_text += rcl_get_error_string().str; + rcl_reset_error(); + Napi::Error::New(env, error_text).ThrowAsJavaScriptException(); + return Napi::Boolean::New(env, false); + } + + return Napi::Boolean::New(env, true); +} + +Napi::Value ActionConfigureFeedbackSubFilterRemoveGoalId( + const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + + RclHandle* action_client_handle = + RclHandle::Unwrap(info[0].As()); + rcl_action_client_t* action_client = + reinterpret_cast(action_client_handle->ptr()); + + auto goal_id_buffer = info[1].As>(); + const uint8_t* goal_id_array = goal_id_buffer.Data(); + size_t goal_id_size = goal_id_buffer.Length(); + + rcl_ret_t ret = + rcl_action_client_configure_feedback_subscription_filter_remove_goal_id( + action_client, goal_id_array, goal_id_size); + + if (RCL_RET_OK != ret) { + std::string error_text{ + "Failed to remove goal id from feedback subscription content " + "filter: "}; + error_text += rcl_get_error_string().str; + rcl_reset_error(); + Napi::Error::New(env, error_text).ThrowAsJavaScriptException(); + return Napi::Boolean::New(env, false); + } + + return Napi::Boolean::New(env, true); +} +#endif // ROS_VERSION >= 5000 + #if ROS_VERSION >= 2505 // ROS2 >= Kilted Napi::Value ConfigureActionClientIntrospection(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); @@ -307,7 +368,15 @@ Napi::Object InitActionClientBindings(Napi::Env env, Napi::Object exports) { #if ROS_VERSION >= 2505 // ROS2 >= Kilted exports.Set("configureActionClientIntrospection", Napi::Function::New(env, ConfigureActionClientIntrospection)); -#endif // ROS_VERSION >= 2505 +#endif // ROS_VERSION >= 2505 +#if ROS_VERSION >= 5000 // ROS2 Rolling + exports.Set( + "actionConfigureFeedbackSubFilterAddGoalId", + Napi::Function::New(env, ActionConfigureFeedbackSubFilterAddGoalId)); + exports.Set( + "actionConfigureFeedbackSubFilterRemoveGoalId", + Napi::Function::New(env, ActionConfigureFeedbackSubFilterRemoveGoalId)); +#endif // ROS_VERSION >= 5000 return exports; } diff --git a/test/test-action-client.js b/test/test-action-client.js index 92226bb77..1a37e7d75 100644 --- a/test/test-action-client.js +++ b/test/test-action-client.js @@ -31,6 +31,10 @@ describe('rclnodejs action client', function () { let publishFeedback = null; async function executeCallback(goalHandle) { + // Delay before publishing feedback to allow the client time to process + // the goal response and set up the content filter (if enabled). + await assertUtils.createDelay(50); + if ( publishFeedback && ActionUuid.fromMessage(publishFeedback).toString() === @@ -310,4 +314,186 @@ describe('rclnodejs action client', function () { ServiceIntrospectionStates.CONTENTS ); }); + + describe('enableFeedbackMsgOptimization', function () { + const nativeLoader = require('../lib/native_loader.js'); + const isFeedbackFilterSupported = () => + typeof nativeLoader.actionConfigureFeedbackSubFilterAddGoalId === + 'function'; + + it('Test option defaults to false', function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci'); + assert.strictEqual(client._enableFeedbackMsgOptimization, false); + client.destroy(); + }); + + it('Test option can be set to true', function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + // Only enabled when native API exists + if (isFeedbackFilterSupported()) { + assert.strictEqual(client._enableFeedbackMsgOptimization, true); + } else { + assert.strictEqual(client._enableFeedbackMsgOptimization, false); + } + client.destroy(); + }); + + it('Test does not affect normal feedback reception', async function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + + let feedbackCallback = sinon.spy(function (feedback) { + assert.ok(feedback); + }); + + let goalUuid = ActionUuid.randomMessage(); + publishFeedback = goalUuid; + + let result = await client.waitForServer(2000); + assert.ok(result); + + let goalHandle = await client.sendGoal( + new Fibonacci.Goal(), + feedbackCallback, + goalUuid + ); + assert.ok(goalHandle.isAccepted()); + + await goalHandle.getResult(); + assert.ok(goalHandle.isSucceeded()); + assert.ok(feedbackCallback.calledOnce); + + client.destroy(); + }); + + // Verify that enabling the content filter optimization does not break + // feedback delivery when multiple goals are active concurrently. + it('Test multiple goals with optimization enabled still receive feedback correctly', async function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + + let goal1Uuid = ActionUuid.randomMessage(); + let goal2Uuid = ActionUuid.randomMessage(); + + let feedback1Callback = sinon.spy(); + let feedback2Callback = sinon.spy(); + + // Only publish feedback for the first goal + publishFeedback = goal1Uuid; + + let result = await client.waitForServer(2000); + assert.ok(result); + + const [goal1Handle, goal2Handle] = await Promise.all([ + client.sendGoal(new Fibonacci.Goal(), feedback1Callback, goal1Uuid), + client.sendGoal(new Fibonacci.Goal(), feedback2Callback, goal2Uuid), + ]); + + await goal1Handle.getResult(); + await goal2Handle.getResult(); + + // Only first goal should have received feedback + assert.ok(feedback1Callback.calledOnce); + assert.ok(feedback2Callback.notCalled); + + client.destroy(); + }); + + it('Test cancel goal then send new goal', async function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + + let result = await client.waitForServer(2000); + assert.ok(result); + + let goalHandle = await client.sendGoal(new Fibonacci.Goal()); + assert.ok(goalHandle.isAccepted()); + + result = await goalHandle.cancelGoal(); + assert.ok(result); + + assert.strictEqual( + ActionUuid.fromMessage(result.goals_canceling[0].goal_id).toString(), + ActionUuid.fromMessage(goalHandle.goalId).toString() + ); + + // Send another goal after cancel - should still work + let goalHandle2 = await client.sendGoal(new Fibonacci.Goal()); + assert.ok(goalHandle2.isAccepted()); + + let result2 = await goalHandle2.getResult(); + assert.ok(result2); + + client.destroy(); + }); + + it('Test send multiple goals (3)', async function () { + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + + let result = await client.waitForServer(2000); + assert.ok(result); + + const [goal1Handle, goal2Handle, goal3Handle] = await Promise.all([ + client.sendGoal(new Fibonacci.Goal()), + client.sendGoal(new Fibonacci.Goal()), + client.sendGoal(new Fibonacci.Goal()), + ]); + + assert.ok(goal1Handle.accepted); + assert.ok(goal2Handle.accepted); + assert.ok(goal3Handle.accepted); + + const [result1, result2, result3] = await Promise.all([ + goal1Handle.getResult(), + goal2Handle.getResult(), + goal3Handle.getResult(), + ]); + + assert.ok(result1); + assert.ok(result2); + assert.ok(result3); + + client.destroy(); + }); + + it('Test handles more than 6 goals gracefully', async function () { + // The DDS content filter limit is 6 concurrent goals (100 params / 16 per goal). + // When exceeded, optimization auto-disables but goals should still work. + let client = new rclnodejs.ActionClient(node, fibonacci, 'fibonacci', { + enableFeedbackMsgOptimization: true, + }); + + let feedbackCallback = sinon.spy(); + let goalUuid = ActionUuid.randomMessage(); + publishFeedback = goalUuid; + + let result = await client.waitForServer(2000); + assert.ok(result); + + // Send 7 goals sequentially - exceeds the 6 goal content filter limit + let handles = []; + for (let i = 0; i < 7; i++) { + let uuid = i === 0 ? goalUuid : undefined; + let cb = i === 0 ? feedbackCallback : undefined; + let h = await client.sendGoal(new Fibonacci.Goal(), cb, uuid); + assert.ok(h.isAccepted()); + handles.push(h); + } + + // Wait for all results + for (const h of handles) { + let r = await h.getResult(); + assert.ok(r); + } + + client.destroy(); + }); + }); }); diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index a908bbf6a..43d8eced5 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -473,6 +473,13 @@ const actionClient = new rclnodejs.ActionClient( 'example_interfaces/action/Fibonacci', 'fibonnaci' ); + +const actionClientWithOptimization = new rclnodejs.ActionClient( + node, + 'example_interfaces/action/Fibonacci', + 'fibonnaci', + { enableFeedbackMsgOptimization: true } +); expectType>( actionClient ); diff --git a/types/action_client.d.ts b/types/action_client.d.ts index 253e49777..1845546e1 100644 --- a/types/action_client.d.ts +++ b/types/action_client.d.ts @@ -142,6 +142,14 @@ declare module 'rclnodejs' { options?: Options & { validateGoals?: boolean; validationOptions?: MessageValidationOptions; + /** + * Enable feedback subscription content filter to optimize the handling + * of feedback messages. When enabled, the content filter is used to + * configure the goal ID for the subscription, avoiding reception of + * irrelevant feedback messages. An action client can handle up to 6 + * goals simultaneously with this optimization. Default: false. + */ + enableFeedbackMsgOptimization?: boolean; } );