-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMessageHandler.java
More file actions
83 lines (71 loc) · 3.47 KB
/
MessageHandler.java
File metadata and controls
83 lines (71 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package fi.hsl.transitdata.stopestimates;
import fi.hsl.common.pulsar.IMessageHandler;
import fi.hsl.common.pulsar.PulsarApplicationContext;
import fi.hsl.common.transitdata.TransitdataProperties;
import fi.hsl.common.transitdata.TransitdataProperties.*;
import fi.hsl.common.transitdata.proto.InternalMessages;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
public class MessageHandler implements IMessageHandler {
private static final Logger log = LoggerFactory.getLogger(MessageHandler.class);
private Consumer<byte[]> consumer;
private Producer<byte[]> producer;
private IStopEstimatesFactory factory;
public MessageHandler(PulsarApplicationContext context, final IStopEstimatesFactory factory) {
consumer = context.getConsumer();
producer = context.getProducer();
this.factory = factory;
}
public void handleMessage(Message received) throws Exception {
try {
final Optional<List<InternalMessages.StopEstimate>> maybeStopEstimates = factory.toStopEstimates(received);
if (maybeStopEstimates.isPresent()) {
final MessageId messageId = received.getMessageId();
final long timestamp = received.getEventTime();
final String key = received.getKey();
final List<InternalMessages.StopEstimate> stopEstimates = maybeStopEstimates.get();
stopEstimates.forEach(stopEstimate -> sendPulsarMessage(messageId, stopEstimate, timestamp, key));
} else {
ack(received.getMessageId()); //Ack so we don't receive it again
}
} catch (Exception e) {
log.error("Exception while handling message", e);
}
}
private void ack(MessageId received) {
consumer.acknowledgeAsync(received)
.exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
})
.thenRun(() -> {});
}
private void sendPulsarMessage(MessageId received, InternalMessages.StopEstimate estimate, long timestamp, String key) {
String routeId = estimate.getTripInfo().getRouteId();
if (routeId.contains("31M")) {
log.debug("Found metro trip. RouteId: {}", routeId);
}
producer.newMessage()
.key(key)
.eventTime(timestamp)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA, ProtobufSchema.InternalMessagesStopEstimate.toString())
.property(TransitdataProperties.KEY_SCHEMA_VERSION, Integer.toString(estimate.getSchemaVersion()))
.property(TransitdataProperties.KEY_DVJ_ID, estimate.getTripInfo().getTripId()) // TODO remove once TripUpdateProcessor won't need it anymore
.value(estimate.toByteArray())
.sendAsync()
.whenComplete((MessageId id, Throwable t) -> {
if (t != null) {
log.error("Failed to send Pulsar message", t);
//Should we abort?
}
else {
//Does this become a bottleneck? Does pulsar send more messages before we ack the previous one?
//If yes we need to get rid of this
ack(received);
}
});
}
}