Skip to content

Commit fdc6356

Browse files
committed
feat&bug-fix: implemented message buffer fixed bugs, change id type to String and improve the logic for history handling
1 parent 3694301 commit fdc6356

9 files changed

Lines changed: 50 additions & 42 deletions

File tree

.github/workflows/java-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
REDIS_HOST: ${{ secrets.REDIS_HOST }}
3131
REDIS_PORT: ${{ secrets.REDIS_PORT }}
3232
REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD }}
33+
KAFKA_SERVER: ${{ secrets.KAFKA_SERVER }}
3334

3435
steps:
3536
- name: Checkout Repository

src/main/java/com/connect/controller/ChatUserController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.connect.dto.MessageDTO;
44
import com.connect.dto.UserDTO;
5-
import com.connect.kafka.KafkaPublisherService;
65
import com.connect.model.Message;
76
import com.connect.model.User;
87
import com.connect.service.ChatUserService;
@@ -56,6 +55,7 @@ public void handleHistory(SimpMessageHeaderAccessor headerAccessor) {
5655
log.info("Handling the History of chats for the Room: {}", roomId);
5756
Optional<List<Message>> messages = chatUserService.chatHistoryHandler(roomId);
5857
messages.ifPresent(msgs -> {
58+
System.out.println(messages.get().size());
5959
List<MessageDTO> dtoList = msgs.stream()
6060
.map(MessageDTO::new)
6161
.collect(Collectors.toList());
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
package com.connect.kafka;
22

3+
import com.connect.buffer.MessageBuffer;
34
import com.connect.model.Message;
45
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.annotation.Autowired;
57
import org.springframework.kafka.annotation.KafkaListener;
68
import org.springframework.stereotype.Service;
79

810
@Service
911
@Slf4j
1012
public class KafkaListenerService {
1113

14+
@Autowired
15+
private MessageBuffer messageBuffer;
16+
1217
@KafkaListener(topics = "chat", groupId = "chat-group")
1318
public void consumeEvent(Message message) {
1419
// logic message is being consumed.
1520
log.info("Consumed message: {}", message.toString());
21+
// Here we have to save the message to a shared buffer ok, and we need to save the data to the database.
22+
messageBuffer.addMessage(message);
1623
}
1724

1825
}

src/main/java/com/connect/kafka/KafkaPublisherService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class KafkaPublisherService {
1717
private KafkaTemplate<String, Object> kafkaTemplate;
1818

1919
public void sendEvent(Message message) {
20+
System.out.println(message.toString());
2021
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("chat", message);
2122
future.whenComplete((result, exception) -> {
2223
if (exception == null) {

src/main/java/com/connect/model/Message.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ public class Message {
1818
@Id
1919
private ObjectId id;
2020

21-
private ObjectId roomId;
21+
private String roomId;
2222
private String sender;
2323
private String message;
2424
private LocalDateTime timeStamp;
2525

26-
public Message(ObjectId roomId, String sender, String message) {
26+
public Message(String roomId, String sender, String message) {
2727
this.roomId = roomId;
2828
this.sender = sender;
2929
this.message = message;

src/main/java/com/connect/model/Room.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
@Document(collection = "rooms")
2020
public class Room {
2121
@Id
22-
private ObjectId roomId;
22+
private String roomId;
2323
private String roomName;
2424
private String roomDescription;
2525
private LocalDateTime timeStamp;

src/main/java/com/connect/repository/RoomRepository.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Optional<List<Room>> getRooms() {
3333
return Optional.of(mongoTemplate.find(query, Room.class));
3434
}
3535

36-
public Optional<Room> findRoomByID(ObjectId roomId) {
36+
public Optional<Room> findRoomByID(String roomId) {
3737
Query query = new Query();
3838
query.addCriteria(Criteria.where("roomId").is(roomId));
3939
return Optional.ofNullable(mongoTemplate.findOne(query, Room.class));
@@ -49,10 +49,16 @@ public Optional<Message> addMessageToRoom(Message message) {
4949
return Optional.ofNullable(mongoTemplate.insert(message));
5050
}
5151

52-
public Optional<List<Message>> getRoomSpecificMessages(ObjectId roomId) {
52+
public Optional<List<Message>> addMessages(List<Message> messageList) {
53+
return Optional.of((List<Message>) mongoTemplate.insert(messageList, Message.class));
54+
}
55+
56+
public Optional<List<Message>> getRoomSpecificMessages(String roomId) {
5357
Query query = new Query();
5458
query.addCriteria(Criteria.where("roomId").is(roomId));
55-
return Optional.of(mongoTemplate.find(query, Message.class));
59+
List<Message> messageList = mongoTemplate.find(query, Message.class);
60+
System.out.println(messageList.size());
61+
return Optional.of(messageList);
5662
}
5763

5864
}

src/main/java/com/connect/service/ChatUserService.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.connect.service;
22

3+
import com.connect.buffer.MessageBuffer;
34
import com.connect.dto.MessageDTO;
45
import com.connect.enums.UserStatus;
56
import com.connect.kafka.KafkaPublisherService;
@@ -37,6 +38,9 @@ public class ChatUserService {
3738
@Autowired
3839
private KafkaPublisherService kafkaPublisherService;
3940

41+
@Autowired
42+
private MessageBuffer messageBuffer;
43+
4044
private Map<String, User> users = new ConcurrentHashMap<>();
4145

4246
public void greetingHandler(String token) {
@@ -61,16 +65,14 @@ public void joiningRequestHandler(String username, String roomId) {
6165
return;
6266
}
6367

64-
ObjectId roomID = new ObjectId(roomId);
65-
6668
// Checking the Room exists from the RoomId
67-
Optional<Room> requestedRoom = roomRepository.findRoomByID(roomID);
69+
Optional<Room> requestedRoom = roomRepository.findRoomByID(roomId);
6870
if (requestedRoom.isEmpty()) {
6971
log.error("No room exists");
7072
return;
7173
}
7274
// If Room exists and the User is valid we have to update the room data.
73-
roomService.addUserToRoom(requiredUser, roomID);
75+
roomService.addUserToRoom(requiredUser, roomId);
7476
}
7577

7678
public void publishMessageToKafka(MessageDTO messageDTO, String roomId) {
@@ -80,39 +82,40 @@ public void publishMessageToKafka(MessageDTO messageDTO, String roomId) {
8082
} else if (messageDTO.getMessage().isEmpty() || roomId.isEmpty()) {
8183
log.error("Empty Data occurred");
8284
return;
85+
} else if (!ObjectId.isValid(roomId)) {
86+
log.error("Room ID is not valid");
87+
return;
8388
}
84-
ObjectId roomID = new ObjectId(roomId);
85-
// We are assuming that the roomID is not null so.
89+
// Room ID is valid and not null.
8690
Message message = Message.builder()
87-
.roomId(roomID)
91+
.roomId(roomId)
8892
.sender(messageDTO.getSender())
8993
.message(messageDTO.getMessage())
9094
.build();
9195

92-
kafkaPublisherService.sendEvent(message);
93-
}
94-
95-
public Optional<Message> addMessagetoRoom(MessageDTO messageDTO, String roomId) {
96-
if (messageDTO == null) {
97-
log.error("Message DTO is null");
98-
return Optional.empty();
99-
} else if (messageDTO.getMessage().isEmpty() || roomId.isEmpty()) {
100-
log.error("Empty Data occurred");
101-
return Optional.empty();
102-
}
103-
104-
ObjectId roomID = new ObjectId(roomId);
96+
System.out.println(message.toString());
10597

106-
return roomService.addMessage(messageDTO, roomID);
98+
kafkaPublisherService.sendEvent(message);
10799
}
108100

109101
public Optional<List<Message>> chatHistoryHandler(String roomId) {
110102
if (roomId.isEmpty()) {
111103
log.error("Room ID is null in the chat history handler");
112104
return Optional.empty();
113105
}
114-
ObjectId roomID = new ObjectId(roomId);
115-
return roomRepository.getRoomSpecificMessages(roomID);
106+
107+
// Here we have to check the roomSpecific message is in the buffer or not.
108+
if (messageBuffer.size() != 0) {
109+
// Here we need to filter out the room specific messages if present in the buffer.
110+
List<Message> messageList = messageBuffer.getMessages().stream()
111+
.filter(message -> message.getRoomId().equals(roomId))
112+
.toList();
113+
if (!messageList.isEmpty()) {
114+
return Optional.of(messageList);
115+
}
116+
}
117+
// If the buffer is empty we have to fetch the messages from the db.
118+
return roomRepository.getRoomSpecificMessages(roomId);
116119
}
117120

118121
public Optional<List<User>> fetchUser() {

src/main/java/com/connect/service/RoomService.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package com.connect.service;
22

3-
import com.connect.dto.MessageDTO;
4-
import com.connect.enums.UserRole;
5-
import com.connect.model.Message;
63
import com.connect.model.Room;
74
import com.connect.model.User;
85
import com.connect.repository.RoomRepository;
96
import com.connect.repository.UserRepository;
107
import jakarta.annotation.PostConstruct;
118
import lombok.extern.slf4j.Slf4j;
12-
import org.bson.types.ObjectId;
139
import org.springframework.beans.factory.annotation.Autowired;
1410
import org.springframework.stereotype.Service;
1511

16-
import java.security.Principal;
1712
import java.util.List;
1813
import java.util.Optional;
1914

@@ -48,21 +43,16 @@ public void init() {
4843
}
4944
}
5045

51-
public void addUserToRoom(User user, ObjectId roomId) {
46+
public void addUserToRoom(User user, String roomId) {
5247
Optional<Room> room = roomRepository.findRoomByID(roomId);
5348
if (room.isEmpty()) {
5449
log.error("No room exists failed to add user to the room");
5550
return;
5651
}
5752

5853
}
59-
60-
public Optional<Message> addMessage(MessageDTO messageDTO, ObjectId roomId) {
61-
Message message = new Message(roomId, messageDTO.getSender(), messageDTO.getMessage());
62-
return roomRepository.addMessageToRoom(message);
63-
}
6454

65-
public boolean isRoomExists(ObjectId roomId) {
55+
public boolean isRoomExists(String roomId) {
6656
return roomRepository.findRoomByID(roomId).isPresent();
6757
}
6858

0 commit comments

Comments
 (0)