Skip to content

Commit c28e025

Browse files
committed
initial pubish only mqtt
1 parent f02c8bc commit c28e025

File tree

2 files changed

+125
-25
lines changed

2 files changed

+125
-25
lines changed

NetX/inc/u_nx_ethernet.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
#define ETH_MESSAGE_SIZE 128 /* Maximum ethernet message size in bytes. */
1818
#define ETH_MAX_PACKETS 10 /* Maximum number of packets we wanna handle simultaneously */
1919
#define ETH_NUMBER_OF_NODES 8 /* Number of nodes in the network. */
20+
#define ETH_ENABLE_MANUAL_UDP_MULTICAST 0 // whether to enable UDP multicast
21+
#define ETH_ENABLE_IGMP 0 // whether to enable IGMP
22+
#define ETH_ENABLE_MQTT 1 // whether to use a MQTT connection
23+
#define ETH_MQTT_SERVER_IP IP_ADDRESS(10,0,0,1) // the server address of the broker (TPU usually)
24+
#define ETH_MQTT_SERVER_PORT 1883
2025

2126
typedef enum {
2227
TPU = (1 << 0), // 0b00000001
@@ -72,6 +77,7 @@ typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function
7277
*/
7378
uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve);
7479

80+
#if ETH_ENABLE_MANUAL_UDP_MULTICAST
7581
/**
7682
* @brief Creates an ethernet message. Can be send with ethernet_send_message(), or added to a queue.
7783
* @param recipient_id The ID of the recipient node.
@@ -88,6 +94,29 @@ ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t r
8894
* @return Status.
8995
*/
9096
uint8_t ethernet_send_message(ethernet_message_t *message);
97+
#endif
98+
99+
#if ETH_ENABLE_MQTT
100+
/**
101+
* @brief Sends a MQTT message to outgoing queue
102+
* @param topic_name The topic name
103+
* @param topic_size The topic size in bytes
104+
* @param message The data to send
105+
* @param message_size The message size in bytes
106+
* @return The error code.
107+
*/
108+
uint8_t ethernet_mqtt_publish(char* topic_name, UINT topic_size, char* message, UINT message_size);
109+
110+
/**
111+
*
112+
*/
113+
//ethernet_mqtt_subscribe();
114+
115+
/**
116+
*
117+
*/
118+
119+
#endif
91120

92121
/**
93122
* @brief Retrieves the time from PTP stack.

NetX/src/u_nx_ethernet.c

Lines changed: 96 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
#include "u_tx_debug.h"
77
#include "c_utils.h"
88
#include "nx_api.h"
9+
#include "u_tx_general.h"
910
#include <string.h>
1011
#include <stdio.h>
12+
#if ETH_ENABLE_MQTT
13+
#include "nxd_mqtt_client.h"
14+
#endif
1115

1216
/* PRIVATE MACROS */
1317
#define _IP_THREAD_STACK_SIZE 2048
@@ -16,15 +20,23 @@
1620
#define _IP_NETWORK_MASK IP_ADDRESS(255, 255, 255, 0)
1721
#define _UDP_QUEUE_MAXIMUM 12
1822
#define _PTP_THREAD_PRIORITY 2
23+
#define _MQTT_THREAD_PRIORITY 2
1924

2025
/* The DEFAULT_PAYLOAD_SIZE should match with RxBuffLen configured via MX_ETH_Init */
2126
#define DEFAULT_PAYLOAD_SIZE 1524
2227
#define NX_APP_PACKET_POOL_SIZE ((DEFAULT_PAYLOAD_SIZE + sizeof(NX_PACKET)) * 10)
28+
#define MQTT_CLIENT_STACK_SIZE 4096
2329

2430
/* DEVICE INFO */
2531
typedef struct {
2632
/* NetX Objects */
33+
#if ETH_ENABLE_MANUAL_UDP_MULTICAST
2734
NX_UDP_SOCKET socket;
35+
#endif
36+
#if ETH_ENABLE_MQTT
37+
NXD_MQTT_CLIENT mqtt_client;
38+
UCHAR mqtt_thread_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];
39+
#endif
2840
NX_PACKET_POOL packet_pool;
2941
NX_IP ip;
3042
NX_PTP_CLIENT ptp_client;
@@ -45,6 +57,21 @@ typedef struct {
4557
} _ethernet_device_t;
4658
static _ethernet_device_t device = { 0 };
4759

60+
/* CALLBACK FUNCTIONS */
61+
#if ETH_ENABLE_MQTT
62+
static VOID _mqtt_disconnect_callback(NXD_MQTT_CLIENT *client_ptr)
63+
{
64+
printf("client disconnected from server\n");
65+
}
66+
67+
static VOID _mqtt_recieve_callback(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
68+
{
69+
//tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
70+
return;
71+
}
72+
#endif
73+
74+
4875
/* Callback function. Called when a PTP event is processed. */
4976
// extern UINT ptp_clock_callback(NX_PTP_CLIENT *client_ptr, UINT operation,
5077
// NX_PTP_TIME *time_ptr, NX_PACKET *packet_ptr,
@@ -85,6 +112,7 @@ static UINT _ptp_event_callback(NX_PTP_CLIENT *ptp_client_ptr, UINT event, VOID
85112
}
86113

87114
/* Callback function. Called when an ethernet message is received. */
115+
#if ETH_ENABLE_MANUAL_UDP_MULTICAST
88116
static void _receive_message(NX_UDP_SOCKET *socket) {
89117
NX_PACKET *packet;
90118
ULONG bytes_copied;
@@ -130,6 +158,7 @@ static void _receive_message(NX_UDP_SOCKET *socket) {
130158
/* Release the packet */
131159
nx_packet_release(packet);
132160
}
161+
#endif
133162

134163
/* API FUNCTIONS */
135164

@@ -166,8 +195,8 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve
166195
status = nx_ip_create(
167196
&device.ip, // Pointer to the IP instance
168197
"Ethernet IP Instance", // Name
169-
IP_ADDRESS(5, 5, 5, device.node_id), // Dummy unicast IP (we shouldn't have to use this if we're just using multicast for everything)
170-
_IP_NETWORK_MASK, // Network mask
198+
IP_ADDRESS(10, 0, 0, device.node_id), // Unicast IP derived from node_id, 10.0.0.0/24
199+
_IP_NETWORK_MASK, // Network mask /24
171200
&device.packet_pool, // Pointer to the packet pool
172201
device.driver, // Pointer to the Ethernet driver function
173202
device.ip_memory, // Pointer to the memory for the IP instance
@@ -190,21 +219,23 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve
190219
return status;
191220
}
192221

193-
194-
/* Enable UDP */
195-
status = nx_udp_enable(&device.ip);
222+
/* Enable igmp */
223+
#if ETH_ENABLE_IGMP || ETH_ENABLE_MANUAL_UDP_MULTICAST
224+
status = nx_igmp_enable(&device.ip);
196225
if (status != NX_SUCCESS) {
197-
PRINTLN_ERROR("Failed to enable UDP (Status: %d/%s).", status, nx_status_toString(status));
226+
PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status));
198227
return status;
199228
}
229+
#endif
200230

201-
/* Enable igmp */
202-
status = nx_igmp_enable(&device.ip);
231+
/* Enable UDP */
232+
status = nx_udp_enable(&device.ip);
203233
if (status != NX_SUCCESS) {
204-
PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status));
234+
PRINTLN_ERROR("Failed to enable UDP (Status: %d/%s).", status, nx_status_toString(status));
205235
return status;
206236
}
207237

238+
#if ETH_ENABLE_MANUAL_UDP_MULTICAST
208239
/* Set up multicast groups.
209240
* (This iterates through every possible node combination between 0b00000001 and 0b11111111.
210241
* If any of the combinations include device.node_id, that combination gets added as a multicast group.
@@ -224,22 +255,6 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve
224255
}
225256
}
226257

227-
/* Create the PTP client instance */
228-
status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool,
229-
_PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack),
230-
_nx_ptp_client_soft_clock_callback, NX_NULL);
231-
if(status != NX_SUCCESS) {
232-
PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status));
233-
return status;
234-
}
235-
236-
/* start the PTP client */
237-
status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL);
238-
if(status != NX_SUCCESS) {
239-
PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status));
240-
return status;
241-
}
242-
243258
/* Create UDP socket for broadcasting */
244259
status = nx_udp_socket_create(
245260
&device.ip, // IP instance
@@ -278,7 +293,55 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve
278293
nx_udp_socket_delete(&device.socket);
279294
return status;
280295
}
296+
#endif
297+
298+
/* Create the PTP client instance */
299+
status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool,
300+
_PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack),
301+
_nx_ptp_client_soft_clock_callback, NX_NULL);
302+
if(status != NX_SUCCESS) {
303+
PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status));
304+
return status;
305+
}
306+
307+
/* start the PTP client */
308+
status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL);
309+
if(status != NX_SUCCESS) {
310+
PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status));
311+
return status;
312+
}
313+
314+
#if ETH_ENABLE_MQTT
315+
/* Create MQTT client instance. */
316+
char* client_id;
317+
UINT client_id_size = sprintf(client_id, "FW-%d", device.node_id);
318+
319+
status = nxd_mqtt_client_create(&device.mqtt_client, client_id,
320+
client_id, client_id_size, &device.ip, &device.packet_pool,
321+
(VOID*)device.mqtt_thread_stack, sizeof(device.mqtt_thread_stack),
322+
_MQTT_THREAD_PRIORITY, NX_NULL, 0);
323+
if(status != NX_SUCCESS) {
324+
PRINTLN_ERROR("Failed to create MQTT client (Status: %d/%s).", status, nx_status_toString(status));
325+
return status;
326+
}
327+
328+
/* Register the disconnect notification function. */
329+
status = nxd_mqtt_client_disconnect_notify_set(&device.mqtt_client, _mqtt_disconnect_callback);
330+
if(status != NX_SUCCESS) {
331+
PRINTLN_ERROR("Failed to create MQTT disconnect notification (Status: %d/%s).", status, nx_status_toString(status));
332+
return status;
333+
}
334+
335+
NXD_ADDRESS server_ip;
336+
server_ip.nxd_ip_version = 4;
337+
server_ip.nxd_ip_address.v4 = ETH_MQTT_SERVER_IP;
338+
/* Start the connection to the server. */
339+
nxd_mqtt_client_connect(&device.mqtt_client, &server_ip, ETH_MQTT_SERVER_PORT,
340+
300, NX_TRUE, NX_WAIT_FOREVER);
281341

342+
/* Set the receive notify function. */
343+
nxd_mqtt_client_receive_notify_set(&device.mqtt_client, _mqtt_recieve_callback);
344+
#endif
282345
/* Mark device as initialized. */
283346
device.is_initialized = true;
284347

@@ -287,6 +350,7 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve
287350
}
288351

289352
/* Creates an ethernet message (i.e. returns an ethernet_message_t instance). */
353+
#if ETH_ENABLE_MANUAL_UDP_MULTICAST
290354
ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t recipient_id, uint8_t *data, uint8_t data_length) {
291355
ethernet_message_t message = {0};
292356

@@ -391,6 +455,13 @@ uint8_t ethernet_send_message(ethernet_message_t *message) {
391455
PRINTLN_INFO("Sent ethernet message (Recipient ID: %d, Message ID: %d, Message Contents: %d).", message->recipient_id, message->message_id, message->data);
392456
return U_SUCCESS;
393457
}
458+
#endif
459+
460+
#if ETH_ENABLE_MQTT
461+
uint8_t ethernet_mqtt_publish(char *topic_name, UINT topic_size, char *message, UINT message_size) {
462+
return nxd_mqtt_client_publish(&device.mqtt_client, topic_name, topic_size, message, message_size, NX_FALSE, 0, MS_TO_TICKS(100));
463+
}
464+
#endif
394465

395466
NX_PTP_DATE_TIME ethernet_get_time(void) {
396467
NX_PTP_TIME tm;

0 commit comments

Comments
 (0)