-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservice.rs
More file actions
304 lines (266 loc) · 8.74 KB
/
service.rs
File metadata and controls
304 lines (266 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
use std::{sync::Arc, time::Duration};
use futures_lite::StreamExt;
use lapin::{
options::{BasicConsumeOptions, QueueDeclareOptions},
types::FieldTable,
Channel,
};
use log::debug;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use super::connection::build_connection;
#[derive(Serialize, Deserialize)]
pub enum MessageType {
ExecuteFlow,
TestExecuteFlow,
}
#[derive(Serialize, Deserialize)]
pub struct Sender {
pub name: String,
pub protocol: String,
pub version: String,
}
#[derive(Serialize, Deserialize)]
pub struct Message {
pub message_type: MessageType,
pub sender: Sender,
pub timestamp: i64,
pub message_id: String,
pub body: String,
}
pub struct RabbitmqClient {
pub channel: Arc<Mutex<Channel>>,
}
#[derive(Debug)]
pub enum RabbitMqError {
LapinError(lapin::Error),
ConnectionError(String),
TimeoutError,
DeserializationError,
}
impl From<lapin::Error> for RabbitMqError {
fn from(error: lapin::Error) -> Self {
RabbitMqError::LapinError(error)
}
}
impl From<std::io::Error> for RabbitMqError {
fn from(error: std::io::Error) -> Self {
RabbitMqError::ConnectionError(error.to_string())
}
}
impl std::fmt::Display for RabbitMqError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RabbitMqError::LapinError(err) => write!(f, "RabbitMQ error: {}", err),
RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
}
}
}
impl RabbitmqClient {
// Create a new RabbitMQ client with channel
pub async fn new(rabbitmq_url: &str) -> Self {
let connection = build_connection(rabbitmq_url).await;
let channel = connection.create_channel().await.unwrap();
match channel
.queue_declare(
"send_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
{
Ok(_) => (),
Err(err) => log::error!("Failed to declare send_queue: {}", err),
}
match channel
.queue_declare(
"recieve_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
{
Ok(_) => (),
Err(err) => log::error!("Failed to declare recieve_queue: {}", err),
}
RabbitmqClient {
channel: Arc::new(Mutex::new(channel)),
}
}
// Send message to the queue
pub async fn send_message(
&self,
message_json: String,
queue_name: &str,
) -> Result<(), lapin::Error> {
let channel = self.channel.lock().await;
channel
.basic_publish(
"", // exchange
queue_name, // routing key (queue name)
lapin::options::BasicPublishOptions::default(),
message_json.as_bytes(),
lapin::BasicProperties::default(),
)
.await?;
Ok(())
}
// Receive messages from a queue
// Receive messages from a queue with no timeout
pub async fn await_message_no_timeout(
&self,
queue_name: &str,
message_id: String,
ack_on_success: bool,
) -> Result<Message, RabbitMqError> {
let mut consumer = {
let channel = self.channel.lock().await;
let consumer_res = channel
.basic_consume(
queue_name,
"consumer",
lapin::options::BasicConsumeOptions::default(),
FieldTable::default(),
)
.await;
match consumer_res {
Ok(consumer) => consumer,
Err(err) => panic!("{}", err),
}
};
debug!("Starting to consume from {}", queue_name);
while let Some(delivery_result) = consumer.next().await {
let delivery = match delivery_result {
Ok(del) => del,
Err(_) => return Err(RabbitMqError::DeserializationError),
};
let data = &delivery.data;
let message_str = match std::str::from_utf8(&data) {
Ok(str) => str,
Err(_) => {
return Err(RabbitMqError::DeserializationError);
}
};
debug!("Received message: {}", message_str);
// Parse the message
let message = match serde_json::from_str::<Message>(message_str) {
Ok(m) => m,
Err(e) => {
log::error!("Failed to parse message: {}", e);
return Err(RabbitMqError::DeserializationError);
}
};
if message.message_id == message_id {
if ack_on_success {
delivery
.ack(lapin::options::BasicAckOptions::default())
.await
.expect("Failed to acknowledge message");
}
return Ok(message);
}
}
Err(RabbitMqError::DeserializationError)
}
// Function intended to get used by the runtime
pub async fn receive_messages(
&self,
queue_name: &str,
handle_message: fn(Message) -> Result<Message, lapin::Error>,
) -> Result<(), lapin::Error> {
let mut consumer = {
let channel = self.channel.lock().await;
let consumer_res = channel
.basic_consume(
queue_name,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await;
match consumer_res {
Ok(consumer) => consumer,
Err(err) => panic!("Cannot consume messages: {}", err),
}
};
debug!("Starting to consume from {}", queue_name);
while let Some(delivery) = consumer.next().await {
let delivery = match delivery {
Ok(del) => del,
Err(err) => {
log::error!("Error receiving message: {}", err);
return Err(err);
}
};
let data = &delivery.data;
let message_str = match std::str::from_utf8(&data) {
Ok(str) => {
log::info!("Received message: {}", str);
str
}
Err(err) => {
log::error!("Error decoding message: {}", err);
return Ok(());
}
};
// Parse the message
let inc_message = match serde_json::from_str::<Message>(message_str) {
Ok(mess) => mess,
Err(err) => {
log::error!("Error parsing message: {}", err);
return Ok(());
}
};
let message = match handle_message(inc_message) {
Ok(mess) => mess,
Err(err) => {
log::error!("Error handling message: {}", err);
return Ok(());
}
};
let message_json = match serde_json::to_string(&message) {
Ok(json) => json,
Err(err) => {
log::error!("Error serializing message: {}", err);
return Ok(());
}
};
{
let _ = self.send_message(message_json, "recieve_queue").await;
}
// Acknowledge the message
delivery
.ack(lapin::options::BasicAckOptions::default())
.await
.expect("Failed to acknowledge message");
}
Ok(())
}
// Receive messages from a queue with timeout
pub async fn await_message(
&self,
queue_name: &str,
message_id: String,
timeout: Duration,
ack_on_success: bool,
) -> Result<Message, RabbitMqError> {
// Set a timeout
match tokio::time::timeout(
timeout,
self.await_message_no_timeout(queue_name, message_id, ack_on_success),
)
.await
{
Ok(result) => result,
Err(_) => {
debug!(
"Timeout waiting for message after {} seconds",
timeout.as_secs()
);
Err(RabbitMqError::TimeoutError)
}
}
}
}