Skip to content

Commit b3055c6

Browse files
committed
test(be): 파티션 개수 증가 및 ack 설정 변경
1 parent ec6d54c commit b3055c6

4 files changed

Lines changed: 12 additions & 8 deletions

File tree

src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaProducer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ public class KafkaProducer {
2424

2525
public void sendChatMessage(ChatMessageToKafka chatMessageToKafka, Long channelId) {
2626
try {
27+
String randomKey = String.valueOf(ThreadLocalRandom.current().nextInt(1, 6));
28+
2729
String jsonChatMessage = objectMapper.writeValueAsString(chatMessageToKafka);
2830

29-
kafkaTemplate.send(topicChat, String.valueOf(channelId), jsonChatMessage)
31+
kafkaTemplate.send(topicChat, randomKey, jsonChatMessage)
3032
.whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분
3133
if (ex == null) {
3234
log.info("Kafka message sent: {}", result.toString());

src/backend/state_server/src/main/java/com/jootalkpia/state_server/service/KafkaConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public class KafkaConsumer {
2121

2222
@KafkaListener(
2323
topics = "${topic.chat}",
24-
groupId = "${group.status}"
24+
groupId = "${group.status}",
25+
concurrency = "5"
2526
)
2627
public void processState(String kafkaMessage) {
2728
ObjectMapper mapper = new ObjectMapper();

src/backend/state_server/src/main/java/com/jootalkpia/state_server/service/StateService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@ public PushMessageToKafka createPushMessage(JsonNode commonNode, JsonNode messag
3232
}
3333

3434
public Set<String> findNotificationTargets(String channelId, String userId) {
35-
Set<String> testSessions = stringOperRedisTemplate.opsForSet().members("user:sessions");
36-
37-
log.info(testSessions.toString());
38-
39-
return testSessions;
35+
return stringOperRedisTemplate.opsForSet().members("user:sessions");
4036

4137
// Set<String> subscriber = findSubscribers(channelId);
4238
// Set<String> onlineSessions = findOnlineSessions(userId, subscriber);

src/backend/state_server/src/main/resources/application.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ spring:
55
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
66
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
77
producer:
8-
acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가
8+
acks: 0
99
key-serializer: org.apache.kafka.common.serialization.StringSerializer
1010
value-serializer: org.apache.kafka.common.serialization.StringSerializer
11+
batch-size: 65536 # 64KB
12+
properties:
13+
linger.ms: 20 # 20ms
14+
buffer.memory: 67108864 # 64MB로 버퍼 메모리도 증가
15+
compression.type: snappy
1116
data:
1217
redis:
1318
host: ${REDIS_HOST}

0 commit comments

Comments
 (0)