Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ jobs:

- name: Download dependencies
run: |
git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b ros2 https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b feature/run_session_until_one_stream https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
git clone -b feature/run_session_until_one_stream https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client

git clone -b galactic https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds
touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE
# Install coverage tools
Expand Down
52 changes: 52 additions & 0 deletions rmw_microxrcedds_c/include/rmw_microros/init_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,58 @@ rmw_ret_t rmw_uros_options_set_client_key(
uint32_t client_key,
rmw_init_options_t * rmw_options);

/**
* \brief Fills rmw implementation-specific options with the given parameters.
* In this case, the stream number of a publisher can be defined.
* This function may use static memory resources that should be freed.
*
* \param[in] stream Micro XRCE-DDS Client stream number.
* \param[in,out] rmw_options Updated options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_set_publisher_out_stream(
size_t stream,
rmw_publisher_options_t * rmw_options);

/**
* \brief Free rmw implementation-specific static resources allocated.
*
* \param[in,out] rmw_options RMW options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_free_publisher_init_options(
rmw_publisher_options_t * rmw_options);

/**
* \brief Fills rmw implementation-specific options with the given parameters.
* In this case, the stream number of a subscriber can be defined.
* This function may use static memory resources that should be freed.
*
* \param[in] stream Micro XRCE-DDS Client stream number.
* \param[in,out] rmw_options Updated options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_set_subscriber_input_stream(
size_t stream,
rmw_subscription_options_t * rmw_options);

/**
* \brief Free rmw implementation-specific static resources allocated.
*
* \param[in,out] rmw_options Options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_free_subscriber_init_options(
rmw_subscription_options_t * rmw_options);

/** @}*/

#if defined(__cplusplus)
Expand Down
1 change: 1 addition & 0 deletions rmw_microxrcedds_c/src/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#define RMW_UXRCE_MAX_TOPICS @RMW_UXRCE_MAX_TOPICS@
#define RMW_UXRCE_MAX_WAIT_SETS @RMW_UXRCE_MAX_WAIT_SETS@
#define RMW_UXRCE_MAX_GUARD_CONDITION @RMW_UXRCE_MAX_GUARD_CONDITION@
#define RMW_UXRCE_MAX_ENTITIES_INIT_OPTION RMW_UXRCE_MAX_PUBLISHERS + RMW_UXRCE_MAX_SUBSCRIPTIONS

#if RMW_UXRCE_MAX_TOPICS == -1
#define RMW_UXRCE_MAX_TOPICS_INTERNAL RMW_UXRCE_MAX_PUBLISHERS + RMW_UXRCE_MAX_SUBSCRIPTIONS
Expand Down
8 changes: 4 additions & 4 deletions rmw_microxrcedds_c/src/rmw_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ rmw_create_client(

custom_client->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[0] :
custom_node->context->reliable_output[0];

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[0] :
custom_node->context->reliable_input[0];

custom_client->client_data_request = uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
43 changes: 29 additions & 14 deletions rmw_microxrcedds_c/src/rmw_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ rmw_init(
rmw_uxrce_init_guard_condition_memory(
&guard_condition_memory, custom_guard_condition,
RMW_UXRCE_MAX_GUARD_CONDITION);
rmw_uxrce_init_entities_init_options_memory(
&entities_init_options_memory,
custom_entities_init_options,
RMW_UXRCE_MAX_ENTITIES_INIT_OPTION);

// Micro-XRCE-DDS Client transport initialization
rmw_ret_t transport_init_ret = rmw_uxrce_transport_init(
Expand All @@ -286,20 +290,31 @@ rmw_init(
uxr_set_request_callback(&context_impl->session, on_request, NULL);
uxr_set_reply_callback(&context_impl->session, on_reply, NULL);

context_impl->reliable_input = uxr_create_input_reliable_stream(
&context_impl->session, context_impl->input_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY_INPUT,
RMW_UXRCE_STREAM_HISTORY_INPUT);
context_impl->reliable_output =
uxr_create_output_reliable_stream(
&context_impl->session, context_impl->output_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY_OUTPUT,
RMW_UXRCE_STREAM_HISTORY_OUTPUT);

context_impl->best_effort_input = uxr_create_input_best_effort_stream(&context_impl->session);
context_impl->best_effort_output = uxr_create_output_best_effort_stream(
&context_impl->session,
context_impl->output_best_effort_stream_buffer, context_impl->transport.comm.mtu);
for (size_t i = 0; i < UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS; i++) {
context_impl->reliable_input[i] = uxr_create_input_reliable_stream(
&context_impl->session, context_impl->input_reliable_stream_buffer[i],
RMW_UXRCE_MAX_INPUT_BUFFER_SIZE,
RMW_UXRCE_STREAM_HISTORY_INPUT);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS; i++) {
context_impl->reliable_output[i] =
uxr_create_output_reliable_stream(
&context_impl->session, context_impl->output_reliable_stream_buffer[i],
RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE,
RMW_UXRCE_STREAM_HISTORY_OUTPUT);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS; i++) {
context_impl->best_effort_input[i] =
uxr_create_input_best_effort_stream(&context_impl->session);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS; i++) {
context_impl->best_effort_output[i] = uxr_create_output_best_effort_stream(
&context_impl->session,
context_impl->output_best_effort_stream_buffer[i], context_impl->transport.comm.mtu);
}

if (!uxr_create_session(&context_impl->session)) {
CLOSE_TRANSPORT(&context_impl->transport);
Expand Down
78 changes: 78 additions & 0 deletions rmw_microxrcedds_c/src/rmw_microros/init_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,81 @@ rmw_ret_t rmw_uros_options_set_client_key(

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_set_publisher_out_stream(
size_t stream,
rmw_publisher_options_t * rmw_options)
{
if (NULL == rmw_options) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

if (NULL == rmw_options->rmw_specific_publisher_payload) {
rmw_uxrce_mempool_item_t * memory_node = get_memory(&entities_init_options_memory);
if (NULL == memory_node) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher options");
return RMW_RET_ERROR;
}
rmw_options->rmw_specific_publisher_payload = memory_node->data;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_publisher_payload;
uxrce_init_options->stream_index.publisher_output_stream = stream;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_free_publisher_init_options(
rmw_publisher_options_t * rmw_options)
{
if (NULL == rmw_options || NULL == rmw_options->rmw_specific_subscription_payload) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

put_memory(&entities_init_options_memory, &rmw_options->mem);
rmw_options->rmw_specific_subscription_payload = NULL;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_set_subscriber_input_stream(
size_t stream,
rmw_subscription_options_t * rmw_options)
{
if (NULL == rmw_options) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

if (NULL == rmw_options->rmw_specific_subscription_payload) {
rmw_uxrce_mempool_item_t * memory_node = get_memory(&entities_init_options_memory);
if (NULL == memory_node) {
RMW_SET_ERROR_MSG("failed to allocate memory for subscriber options");
return RMW_RET_ERROR;
}
rmw_options->rmw_specific_subscription_payload = memory_node->data;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_subscription_payload;
uxrce_init_options->stream_index.subscriber_input_stream = stream;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_free_subscriber_init_options(
rmw_subscription_options_t * rmw_options)
{
if (NULL == rmw_options || NULL == rmw_options->rmw_specific_subscription_payload) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

put_memory(&entities_init_options_memory, &rmw_options->mem);
rmw_options->rmw_specific_subscription_payload = NULL;

return RMW_RET_OK;
}
32 changes: 25 additions & 7 deletions rmw_microxrcedds_c/src/rmw_microros_internal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,22 @@ typedef struct rmw_context_impl_t
#endif // if RMW_UXRCE_GRAPH
rmw_guard_condition_t graph_guard_condition;

uxrStreamId reliable_input;
uxrStreamId reliable_output;
uxrStreamId best_effort_output;
uxrStreamId best_effort_input;
uxrStreamId reliable_input[UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS];
uxrStreamId reliable_output[UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS];
uxrStreamId best_effort_output[UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS];
uxrStreamId best_effort_input[UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS];

uxrStreamId * creation_stream;
uxrStreamId * destroy_stream;
int creation_timeout;
int destroy_timeout;

uint8_t input_reliable_stream_buffer[RMW_UXRCE_MAX_INPUT_BUFFER_SIZE];
uint8_t output_reliable_stream_buffer[RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE];
uint8_t output_best_effort_stream_buffer[RMW_UXRCE_MAX_TRANSPORT_MTU];
uint8_t input_reliable_stream_buffer[UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS][
RMW_UXRCE_MAX_INPUT_BUFFER_SIZE];
uint8_t output_reliable_stream_buffer[UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS][
RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE];
uint8_t output_best_effort_stream_buffer[UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS][
RMW_UXRCE_MAX_TRANSPORT_MTU];

uint16_t id_participant;
uint16_t id_topic;
Expand All @@ -113,6 +116,16 @@ struct rmw_init_options_impl_t

typedef struct rmw_init_options_impl_t rmw_uxrce_init_options_impl_t;

typedef struct rmw_uxrce_entities_init_options_t
{
rmw_uxrce_mempool_item_t mem;

union {
size_t publisher_output_stream;
size_t subscriber_input_stream;
} stream_index;
} rmw_uxrce_entities_init_options_t;

// ROS2 entities definitions

typedef struct rmw_uxrce_topic_t
Expand Down Expand Up @@ -294,6 +307,10 @@ extern rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS];
extern rmw_uxrce_mempool_t guard_condition_memory;
extern rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION];

extern rmw_uxrce_mempool_t entities_init_options_memory;
extern rmw_uxrce_entities_init_options_t custom_entities_init_options[
RMW_UXRCE_MAX_ENTITIES_INIT_OPTION];

// Memory init functions

#define RMW_INIT_DEFINE_MEMORY(X) \
Expand All @@ -313,6 +330,7 @@ RMW_INIT_DEFINE_MEMORY(static_input_buffer)
RMW_INIT_DEFINE_MEMORY(init_options_impl)
RMW_INIT_DEFINE_MEMORY(wait_set)
RMW_INIT_DEFINE_MEMORY(guard_condition)
RMW_INIT_DEFINE_MEMORY(entities_init_options)

// Memory management functions

Expand Down
10 changes: 7 additions & 3 deletions rmw_microxrcedds_c/src/rmw_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ rmw_publish(
custom_publisher->stream_id);

if (UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
uxr_flash_output_streams(&custom_publisher->owner_node->context->session);
uxr_flash_one_output_stream(
&custom_publisher->owner_node->context->session,
custom_publisher->stream_id);
} else {
written &= uxr_run_session_until_confirm_delivery(
&custom_publisher->owner_node->context->session, custom_publisher->session_timeout);
written &= uxr_run_session_until_confirm_delivery_one_stream(
&custom_publisher->owner_node->context->session,
custom_publisher->stream_id,
custom_publisher->session_timeout);
}
}
if (!written) {
Expand Down
10 changes: 8 additions & 2 deletions rmw_microxrcedds_c/src/rmw_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,16 @@ rmw_create_publisher(
custom_publisher->session_timeout = RMW_UXRCE_PUBLISH_RELIABLE_TIMEOUT;
custom_publisher->qos = *qos_policies;

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) publisher_options->rmw_specific_publisher_payload;
size_t used_output_stream =
(NULL != uxrce_init_options) ?
uxrce_init_options->stream_index.publisher_output_stream : 0;

custom_publisher->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[used_output_stream] :
custom_node->context->reliable_output[used_output_stream];

custom_publisher->cs_cb_size = NULL;
custom_publisher->cs_cb_serialization = NULL;
Expand Down
8 changes: 4 additions & 4 deletions rmw_microxrcedds_c/src/rmw_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ rmw_create_service(

custom_service->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[0] :
custom_node->context->reliable_output[0];

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[0] :
custom_node->context->reliable_input[0];

custom_service->service_data_resquest = uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
10 changes: 8 additions & 2 deletions rmw_microxrcedds_c/src/rmw_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,16 @@ rmw_create_subscription(
delivery_control.max_elapsed_time = UXR_MAX_ELAPSED_TIME_UNLIMITED;
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) subscription_options->rmw_specific_subscription_payload;
size_t used_input_stream =
(NULL != uxrce_init_options) ?
uxrce_init_options->stream_index.subscriber_input_stream : 0;

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[used_input_stream] :
custom_node->context->reliable_input[used_input_stream];

uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
4 changes: 4 additions & 0 deletions rmw_microxrcedds_c/src/types.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS];
rmw_uxrce_mempool_t guard_condition_memory;
rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION];

rmw_uxrce_mempool_t entities_init_options_memory;
rmw_uxrce_entities_init_options_t custom_entities_init_options[RMW_UXRCE_MAX_ENTITIES_INIT_OPTION];

// Memory init functions

#define RMW_INIT_MEMORY(X) \
Expand Down Expand Up @@ -103,6 +106,7 @@ RMW_INIT_MEMORY(static_input_buffer)
RMW_INIT_MEMORY(init_options_impl)
RMW_INIT_MEMORY(wait_set)
RMW_INIT_MEMORY(guard_condition)
RMW_INIT_MEMORY(entities_init_options)

// Memory management functions

Expand Down
Loading