Skip to content

Commit 6d5b6ca

Browse files
committed
feat: configured kafka and implemented producer and consumer
1 parent 83db2c2 commit 6d5b6ca

8 files changed

Lines changed: 108 additions & 8 deletions

File tree

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@
5050
<groupId>org.springframework.boot</groupId>
5151
<artifactId>spring-boot-starter-data-redis</artifactId>
5252
</dependency>
53+
<dependency>
54+
<groupId>org.springframework.kafka</groupId>
55+
<artifactId>spring-kafka</artifactId>
56+
</dependency>
5357

5458
<dependency>
5559
<groupId>org.springframework.boot</groupId>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.connect.config;
2+
3+
import org.apache.kafka.clients.admin.NewTopic;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
7+
@Configuration
8+
public class KafkaProducerConfig {
9+
10+
@Bean
11+
public NewTopic newTopic() {
12+
return new NewTopic("chat", 1, (short) 1);
13+
}
14+
15+
}

src/main/java/com/connect/controller/ChatUserController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.connect.dto.MessageDTO;
44
import com.connect.dto.UserDTO;
5+
import com.connect.kafka.KafkaPublisherService;
56
import com.connect.model.Message;
67
import com.connect.model.User;
78
import com.connect.service.ChatUserService;
@@ -44,9 +45,8 @@ public void handleMessage(@Payload MessageDTO message, SimpMessageHeaderAccessor
4445
// Fetching the RoomID from the header.
4546
String roomID = headerAccessor.getFirstNativeHeader("roomId");
4647
log.info("Sending the message to the room: {}", roomID);
47-
Optional<Message> dbMessage = chatUserService.addMessagetoRoom(message, roomID);
48-
dbMessage.ifPresent(msg ->
49-
messagingTemplate.convertAndSend("/topic/chat/" + roomID, msg));
48+
chatUserService.publishMessageToKafka(message, roomID);
49+
messagingTemplate.convertAndSend("/topic/chat/" + roomID, message);
5050
}
5151

5252
// Route for handling the history.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.connect.kafka;
2+
3+
import com.connect.model.Message;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.stereotype.Service;
7+
8+
@Service
9+
@Slf4j
10+
public class KafkaListenerService {
11+
12+
@KafkaListener(topics = "chat", groupId = "chat-group")
13+
public void consumeEvent(Message message) {
14+
// logic message is being consumed.
15+
log.info("Consumed message: {}", message.toString());
16+
}
17+
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.connect.kafka;
2+
3+
import com.connect.model.Message;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.kafka.core.KafkaTemplate;
7+
import org.springframework.kafka.support.SendResult;
8+
import org.springframework.stereotype.Service;
9+
10+
import java.util.concurrent.CompletableFuture;
11+
12+
@Service
13+
@Slf4j
14+
public class KafkaPublisherService {
15+
16+
@Autowired
17+
private KafkaTemplate<String, Object> kafkaTemplate;
18+
19+
public void sendEvent(Message message) {
20+
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("chat", message);
21+
future.whenComplete((result, exception) -> {
22+
if (exception == null) {
23+
log.info("Message sent successfully to {}", result.getRecordMetadata().offset());
24+
} else {
25+
log.error("Unable to send message: {}", message.toString());
26+
}
27+
});
28+
}
29+
30+
}

src/main/java/com/connect/model/Message.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package com.connect.model;
22

3-
import lombok.AllArgsConstructor;
4-
import lombok.Getter;
5-
import lombok.NoArgsConstructor;
6-
import lombok.Setter;
3+
import lombok.*;
74
import org.bson.types.ObjectId;
85
import org.springframework.data.annotation.Id;
96
import org.springframework.data.mongodb.core.mapping.Document;
@@ -14,6 +11,8 @@
1411
@Setter
1512
@NoArgsConstructor
1613
@AllArgsConstructor
14+
@Builder
15+
@ToString
1716
@Document(collection = "messages")
1817
public class Message {
1918
@Id

src/main/java/com/connect/service/ChatUserService.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.connect.dto.MessageDTO;
44
import com.connect.enums.UserStatus;
5+
import com.connect.kafka.KafkaPublisherService;
56
import com.connect.model.Message;
67
import com.connect.model.Room;
78
import com.connect.model.User;
@@ -33,6 +34,9 @@ public class ChatUserService {
3334
@Autowired
3435
private UserRepository userRepository;
3536

37+
@Autowired
38+
private KafkaPublisherService kafkaPublisherService;
39+
3640
private Map<String, User> users = new ConcurrentHashMap<>();
3741

3842
public void greetingHandler(String token) {
@@ -69,6 +73,25 @@ public void joiningRequestHandler(String username, String roomId) {
6973
roomService.addUserToRoom(requiredUser, roomID);
7074
}
7175

76+
public void publishMessageToKafka(MessageDTO messageDTO, String roomId) {
77+
if (messageDTO == null) {
78+
log.error("Message DTO is null");
79+
return;
80+
} else if (messageDTO.getMessage().isEmpty() || roomId.isEmpty()) {
81+
log.error("Empty Data occurred");
82+
return;
83+
}
84+
ObjectId roomID = new ObjectId(roomId);
85+
// We are assuming that the roomID is not null so.
86+
Message message = Message.builder()
87+
.roomId(roomID)
88+
.sender(messageDTO.getSender())
89+
.message(messageDTO.getMessage())
90+
.build();
91+
92+
kafkaPublisherService.sendEvent(message);
93+
}
94+
7295
public Optional<Message> addMessagetoRoom(MessageDTO messageDTO, String roomId) {
7396
if (messageDTO == null) {
7497
log.error("Message DTO is null");

src/main/resources/application.properties

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,15 @@ frontend.origin = ${FRONTEND_ORIGIN}
2323
spring.data.redis.host = ${REDIS_HOST}
2424
spring.data.redis.port = ${REDIS_PORT}
2525
spring.data.redis.password = ${REDIS_PASSWORD}
26-
spring.data.redis.ssl.enabled = false
26+
spring.data.redis.ssl.enabled = false
27+
28+
# Kafka Producer configuration
29+
spring.kafka.producer.bootstrap-servers = ${KAFKA_SERVER}
30+
spring.kafka.producer.keySerializer = org.apache.kafka.common.serialization.StringSerializer
31+
spring.kafka.producer.valueSerializer = org.springframework.kafka.support.serializer.JsonSerializer
32+
33+
# Kafka Consumer configuration
34+
spring.kafka.consumer.bootstrap-servers = ${KAFKA_SERVER}
35+
spring.kafka.consumer.keyDeserializer = org.apache.kafka.common.serialization.StringDeserializer
36+
spring.kafka.consumer.valueDeserializer = org.springframework.kafka.support.serializer.JsonDeserializer
37+
spring.kafka.consumer.properties.spring.json.trusted.packages = com.connect.model

0 commit comments

Comments
 (0)