From 4b35c22b50fd4d4ac7824b29a8698b623dde5113 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 13:42:37 +0900 Subject: [PATCH 1/7] =?UTF-8?q?feat(wallet):=20=EC=9E=85=EA=B8=88=20API=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80=20(WAL-003)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coinflow/common/exception/ErrorCode.java | 2 ++ .../coinflow/wallet/api/WalletController.java | 19 +++++++++--- .../coinflow/wallet/dto/DepositRequest.java | 6 ++++ .../wallet/service/WalletService.java | 30 +++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/coinflow/wallet/dto/DepositRequest.java diff --git a/src/main/java/com/coinflow/common/exception/ErrorCode.java b/src/main/java/com/coinflow/common/exception/ErrorCode.java index 2a14cf5..2252537 100644 --- a/src/main/java/com/coinflow/common/exception/ErrorCode.java +++ b/src/main/java/com/coinflow/common/exception/ErrorCode.java @@ -22,6 +22,8 @@ public enum ErrorCode { INVALID_STEP_SIZE(HttpStatus.BAD_REQUEST, "INVALID_STEP_SIZE", "Quantity does not match step size"), MIN_ORDER_QUANTITY_NOT_MET(HttpStatus.BAD_REQUEST, "MIN_ORDER_QUANTITY_NOT_MET", "Quantity is below minimum"), MIN_ORDER_AMOUNT_NOT_MET(HttpStatus.BAD_REQUEST, "MIN_ORDER_AMOUNT_NOT_MET", "Order amount is below minimum"), + WALLET_NOT_FOUND(HttpStatus.NOT_FOUND, "WALLET_NOT_FOUND", "Wallet not found"), + INVALID_AMOUNT(HttpStatus.BAD_REQUEST, "INVALID_AMOUNT", "Amount must be greater than zero"), INSUFFICIENT_BALANCE(HttpStatus.BAD_REQUEST, "INSUFFICIENT_BALANCE", "Insufficient balance"), DUPLICATE_CLIENT_ORDER_ID(HttpStatus.CONFLICT, "DUPLICATE_CLIENT_ORDER_ID", "Duplicate client order id"), diff --git a/src/main/java/com/coinflow/wallet/api/WalletController.java b/src/main/java/com/coinflow/wallet/api/WalletController.java index b8a2c83..b3e3ca4 100644 --- a/src/main/java/com/coinflow/wallet/api/WalletController.java +++ b/src/main/java/com/coinflow/wallet/api/WalletController.java @@ -1,15 +1,16 @@ package com.coinflow.wallet.api; +import com.coinflow.wallet.dto.DepositRequest; import com.coinflow.wallet.dto.WalletLedgerResponse; import com.coinflow.wallet.dto.WalletResponse; import com.coinflow.wallet.service.WalletService; import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.oauth2.jwt.Jwt; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; + +import java.util.List; import java.util.List; @@ -20,6 +21,16 @@ public class WalletController { private final WalletService walletService; + @PostMapping("/deposit") + @ResponseStatus(HttpStatus.OK) + public WalletResponse deposit( + @AuthenticationPrincipal Jwt jwt, + @RequestBody DepositRequest request + ) { + Long userId = Long.parseLong(jwt.getSubject()); + return walletService.deposit(userId, request); + } + @GetMapping public List getWallets(@AuthenticationPrincipal Jwt jwt) { Long userId = Long.parseLong(jwt.getSubject()); diff --git a/src/main/java/com/coinflow/wallet/dto/DepositRequest.java b/src/main/java/com/coinflow/wallet/dto/DepositRequest.java new file mode 100644 index 0000000..c8caadc --- /dev/null +++ b/src/main/java/com/coinflow/wallet/dto/DepositRequest.java @@ -0,0 +1,6 @@ +package com.coinflow.wallet.dto; + +public record DepositRequest( + String asset, + String amount +) {} diff --git a/src/main/java/com/coinflow/wallet/service/WalletService.java b/src/main/java/com/coinflow/wallet/service/WalletService.java index 1866302..df9a11f 100644 --- a/src/main/java/com/coinflow/wallet/service/WalletService.java +++ b/src/main/java/com/coinflow/wallet/service/WalletService.java @@ -1,5 +1,10 @@ package com.coinflow.wallet.service; +import com.coinflow.common.exception.ApiException; +import com.coinflow.common.exception.ErrorCode; +import com.coinflow.wallet.domain.LedgerType; +import com.coinflow.wallet.domain.WalletLedger; +import com.coinflow.wallet.dto.DepositRequest; import com.coinflow.wallet.dto.WalletLedgerResponse; import com.coinflow.wallet.dto.WalletResponse; import com.coinflow.wallet.repository.WalletLedgerRepository; @@ -8,6 +13,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.math.BigDecimal; import java.util.List; @Service @@ -25,6 +31,30 @@ public List getWallets(Long userId) { .toList(); } + @Transactional + public WalletResponse deposit(Long userId, DepositRequest request) { + BigDecimal amount; + try { + amount = new BigDecimal(request.amount()); + } catch (NumberFormatException e) { + throw new ApiException(ErrorCode.INVALID_AMOUNT); + } + if (amount.compareTo(BigDecimal.ZERO) <= 0) throw new ApiException(ErrorCode.INVALID_AMOUNT); + + var wallet = walletRepository.findByUserIdAndAssetWithLock(userId, request.asset()) + .orElseThrow(() -> new ApiException(ErrorCode.WALLET_NOT_FOUND)); + + wallet.deposit(amount); + + walletLedgerRepository.save(WalletLedger.create( + wallet, LedgerType.SEED_DEPOSIT, + amount, BigDecimal.ZERO, + null, null + )); + + return WalletResponse.from(wallet); + } + @Transactional(readOnly = true) public List getLedgers(Long userId, String asset) { var ledgers = (asset != null) From 49bb76c0f0430090e69585cc3610a1b4093045b0 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 13:42:58 +0900 Subject: [PATCH 2/7] =?UTF-8?q?feat(docs):=20Swagger=20UI=20=EB=B0=8F=20JW?= =?UTF-8?q?T=20Bearer=20=EC=9D=B8=EC=A6=9D=20=EC=84=A4=EC=A0=95=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 1 + .../com/coinflow/config/OpenApiConfig.java | 29 +++++++++++++++++++ .../com/coinflow/config/SecurityConfig.java | 1 + 3 files changed, 31 insertions(+) create mode 100644 src/main/java/com/coinflow/config/OpenApiConfig.java diff --git a/build.gradle b/build.gradle index 8cb7f64..338e1f7 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,7 @@ dependencies { implementation 'org.flywaydb:flyway-mysql' implementation 'org.springframework.boot:spring-boot-starter-security' implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server' + implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.8' compileOnly 'org.projectlombok:lombok' runtimeOnly 'com.mysql:mysql-connector-j' annotationProcessor 'org.projectlombok:lombok' diff --git a/src/main/java/com/coinflow/config/OpenApiConfig.java b/src/main/java/com/coinflow/config/OpenApiConfig.java new file mode 100644 index 0000000..942e7b9 --- /dev/null +++ b/src/main/java/com/coinflow/config/OpenApiConfig.java @@ -0,0 +1,29 @@ +package com.coinflow.config; + +import io.swagger.v3.oas.models.Components; +import io.swagger.v3.oas.models.OpenAPI; +import io.swagger.v3.oas.models.info.Info; +import io.swagger.v3.oas.models.security.SecurityRequirement; +import io.swagger.v3.oas.models.security.SecurityScheme; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class OpenApiConfig { + + @Bean + public OpenAPI openAPI() { + return new OpenAPI() + .info(new Info() + .title("CoinFlow API") + .description("암호화폐 거래소 백엔드 API") + .version("v1")) + .addSecurityItem(new SecurityRequirement().addList("Bearer")) + .components(new Components() + .addSecuritySchemes("Bearer", new SecurityScheme() + .name("Bearer") + .type(SecurityScheme.Type.HTTP) + .scheme("bearer") + .bearerFormat("JWT"))); + } +} diff --git a/src/main/java/com/coinflow/config/SecurityConfig.java b/src/main/java/com/coinflow/config/SecurityConfig.java index e9d51aa..1631781 100644 --- a/src/main/java/com/coinflow/config/SecurityConfig.java +++ b/src/main/java/com/coinflow/config/SecurityConfig.java @@ -48,6 +48,7 @@ public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Excepti .requestMatchers("/api/v1/auth/signup", "/api/v1/auth/login").permitAll() .requestMatchers("/api/v1/markets", "/api/v1/markets/*/orderbook", "/api/v1/markets/*/trades").permitAll() .requestMatchers("/actuator/health", "/actuator/info").permitAll() + .requestMatchers("/swagger-ui/**", "/swagger-ui.html", "/v3/api-docs/**").permitAll() .anyRequest().authenticated() ) .oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults())) From 6e29d2a402ca7876ffc962b742b071c9748de175 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 13:44:34 +0900 Subject: [PATCH 3/7] =?UTF-8?q?docs:=20Phase=202=20PRD=20=EB=B0=8F=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84=20=EA=B3=84=ED=9A=8D=20=EB=AC=B8=EC=84=9C=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .docs/v2/PRD.md | 137 ++++++++++++++++++++++++++++++++++ .docs/v2/Plan.md | 188 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 .docs/v2/PRD.md create mode 100644 .docs/v2/Plan.md diff --git a/.docs/v2/PRD.md b/.docs/v2/PRD.md new file mode 100644 index 0000000..3f98751 --- /dev/null +++ b/.docs/v2/PRD.md @@ -0,0 +1,137 @@ +# CoinFlow Phase 2 PRD + +이 문서는 CoinFlow Phase 1 MVP 완료 이후 Phase 2의 제품 범위와 기술 목표를 정의한다. + +## 1. 배경 + +Phase 1에서는 단일 인스턴스 환경에서 지정가 주문 생성/취소/매칭/정산/원장/이벤트 로그까지 이어지는 핵심 파이프라인을 완성했다. + +Phase 2에서는 **분산 시스템 확장성**을 목표로 한다. +도메인 이벤트를 Kafka로 안정적으로 발행하고, WebSocket으로 클라이언트에 실시간 push한다. + +--- + +## 2. 목표 + +- 도메인 이벤트를 Outbox Pattern으로 Kafka에 안정적으로 발행한다. +- Kafka Consumer가 이벤트를 수신해 WebSocket으로 클라이언트에 실시간 broadcast한다. +- 이벤트 유실 없는 at-least-once 전달을 보장한다. +- 발행 실패 시 재시도 및 dead-letter 처리 경계를 확보한다. + +--- + +## 3. 핵심 설계 결정 + +### 3-1. Outbox Pattern + +**문제** +OrderService에서 Kafka를 직접 발행하면 DB commit 성공 후 Kafka 발행 실패 시 이벤트가 유실된다. +두 작업은 단일 트랜잭션으로 묶을 수 없다. + +**해결** +이벤트를 DB(domain_events)에 같은 트랜잭션 안에 먼저 저장(Phase 1에서 완료). +별도 프로세스(OutboxPublisher)가 주기적으로 미발행 이벤트를 조회해 Kafka에 발행한다. + +``` +[OrderService] + │ (동일 트랜잭션) + ▼ +[domain_events] published=false + ▲ polling (1초) +[OutboxPublisher] + │ KafkaTemplate + ▼ +[Kafka] +``` + +**보장** +- Kafka 장애 시에도 이벤트는 DB에 안전하게 보관 +- 서버 재시작 후 미발행 이벤트 자동 재발행 +- `publish_attempts`로 재시도 횟수 추적, 임계치 초과 시 dead-letter 처리 가능 + +### 3-2. at-least-once 전달 + +Outbox polling 특성상 중복 발행이 발생할 수 있다. +Consumer는 `event.id` 기준으로 idempotent하게 처리한다. + +### 3-3. 확장 방향 (Phase 3) + +Scheduler polling 방식은 최대 1초의 딜레이가 있다. +MySQL binlog를 실시간으로 감지하는 **CDC(Change Data Capture) / Debezium**으로 전환하면 +딜레이를 수십 ms 수준으로 줄일 수 있다. + +--- + +## 4. 아키텍처 + +``` +[OrderService] ──(동일 트랜잭션)──▶ [domain_events] (published=false) + │ + @Scheduled (1초) + │ + [OutboxPublisher] + │ KafkaTemplate + ▼ + ┌──────────────────────────┐ + │ Kafka │ + │ coinflow.order.events │ + │ coinflow.trade.events │ + └──────────────────────────┘ + │ @KafkaListener + ▼ + [WebSocketBroadcaster] + │ SimpMessagingTemplate (STOMP) + ▼ + /topic/orderbook/{market} + /topic/trades/{market} +``` + +--- + +## 5. Kafka Topic 설계 + +| Topic | 포함 이벤트 | 용도 | +|-------|------------|------| +| `coinflow.order.events` | ORDER_ACCEPTED, ORDER_PARTIALLY_FILLED, ORDER_FILLED, ORDER_CANCELED | 주문 상태 실시간 알림 | +| `coinflow.trade.events` | TRADE_CREATED, SETTLEMENT_COMPLETED | 체결/정산 실시간 broadcast | + +--- + +## 6. WebSocket 엔드포인트 + +| 구독 채널 | 발행 시점 | 페이로드 | +|-----------|-----------|----------| +| `/topic/trades/{market}` | TRADE_CREATED | price, quantity, side, tradedAt | +| `/topic/orderbook/{market}` | ORDER_ACCEPTED / FILLED / CANCELED | 변경된 호가창 스냅샷 | + +--- + +## 7. 구현 이슈 + +| 이슈 | 브랜치 | 내용 | +|------|--------|------| +| #19 | feat/19/kafka-setup | Docker Compose (Kafka KRaft), Topic 설정, KafkaTemplate 빈 | +| #20 | feat/20/outbox-publisher | @Scheduled OutboxPublisher, 재시도/dead-letter 처리 | +| #21 | feat/21/websocket | STOMP WebSocket 설정, Kafka Consumer → broadcast | +| #22 | feat/22/e2e | 주문 → Kafka 발행 → WebSocket 수신 end-to-end 검증 | + +--- + +## 8. 기술 스택 추가 + +| 기술 | 용도 | +|------|------| +| Apache Kafka (KRaft) | 메시지 브로커 | +| spring-kafka | KafkaTemplate, @KafkaListener | +| spring-websocket | STOMP WebSocket 서버 | +| spring-messaging | SimpMessagingTemplate | +| Docker Compose | 로컬 Kafka 환경 | + +--- + +## 9. 성공 기준 + +- 주문 생성 → 최대 2초 내 `/topic/trades/{market}` WebSocket 메시지 수신 +- Kafka 일시 중단 후 재시작 시 미발행 이벤트 자동 재발행 +- publish_attempts 3회 초과 이벤트 별도 식별 가능 +- end-to-end 통합 테스트 통과 diff --git a/.docs/v2/Plan.md b/.docs/v2/Plan.md new file mode 100644 index 0000000..902a82e --- /dev/null +++ b/.docs/v2/Plan.md @@ -0,0 +1,188 @@ +# CoinFlow Phase 2 구현 플랜 + +## 이슈 #19 — Kafka 환경 구성 (feat/19/kafka-setup) + +### 목표 +로컬 Kafka 환경 구성 및 Spring Boot 연동 + +### 작업 내용 + +**1. Docker Compose** +```yaml +# docker-compose.yml +kafka (KRaft 모드, Zookeeper 없음) + - KAFKA_PROCESS_ROLES: broker,controller + - port: 9092 +``` + +**2. build.gradle 의존성 추가** +``` +spring-kafka +``` + +**3. application.yml Kafka 설정** +```yaml +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: StringSerializer + value-serializer: StringSerializer + consumer: + group-id: coinflow-websocket + auto-offset-reset: latest +``` + +**4. Topic 생성 빈** +``` +coinflow.order.events (partition: 4, replication: 1) +coinflow.trade.events (partition: 4, replication: 1) +``` + +**검증**: Spring Boot 실행 시 Topic 자동 생성 확인 + +--- + +## 이슈 #20 — Outbox Publisher (feat/20/outbox-publisher) + +### 목표 +domain_events 테이블의 미발행 이벤트를 Kafka로 안정적으로 발행 + +### 작업 내용 + +**1. OutboxPublisher** +```java +@Scheduled(fixedDelay = 1000) +public void publish() { + // published=false, publish_attempts < 5 인 이벤트 최대 100건 조회 + // KafkaTemplate으로 발행 + // 성공: published=true, published_at=now() + // 실패: publish_attempts++ +} +``` + +**2. 발행 Topic 라우팅** +``` +ORDER_* 이벤트 → coinflow.order.events +TRADE_* 이벤트 → coinflow.trade.events +``` + +**3. Kafka 메시지 구조** +```json +{ + "eventId": 1, + "eventType": "TRADE_CREATED", + "aggregateId": 42, + "marketSymbol": "BTC-KRW", + "payload": "{...}" +} +``` + +**4. DomainEventRepository 쿼리 추가** +```java +findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(int maxAttempts) +updatePublishedTrue(Long id) +incrementPublishAttempts(Long id) +``` + +**검증**: 주문 생성 후 1~2초 내 Kafka 메시지 발행 확인 (kafka-console-consumer) + +--- + +## 이슈 #21 — WebSocket 실시간 broadcast (feat/21/websocket) + +### 목표 +Kafka Consumer가 이벤트를 수신해 WebSocket 클라이언트에 실시간 push + +### 작업 내용 + +**1. WebSocket 설정** +```java +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { + // STOMP endpoint: /ws + // topic prefix: /topic + // app prefix: /app +} +``` + +**2. Kafka Consumer** +```java +@KafkaListener(topics = "coinflow.trade.events", groupId = "coinflow-websocket") +public void onTradeEvent(String message) { + // TRADE_CREATED → /topic/trades/{market} broadcast + // SETTLEMENT_COMPLETED → 무시 (or 추가 활용) +} + +@KafkaListener(topics = "coinflow.order.events", groupId = "coinflow-websocket") +public void onOrderEvent(String message) { + // ORDER_FILLED / CANCELED → 오더북 변경 → /topic/orderbook/{market} broadcast +} +``` + +**3. WebSocket 메시지 DTO** +``` +TradeMessage { market, price, quantity, side, tradedAt } +OrderBookMessage { market, buySide[], sellSide[] } +``` + +**검증**: WebSocket 클라이언트(Postman or wscat) 구독 후 주문 체결 시 메시지 수신 확인 + +--- + +## 이슈 #22 — end-to-end 통합 테스트 (feat/22/e2e) + +### 목표 +주문 → Kafka 발행 → WebSocket 수신 전 과정 자동 검증 + +### 작업 내용 + +**1. EmbeddedKafka 테스트 환경** +```java +@EmbeddedKafka(partitions = 1, topics = {"coinflow.order.events", "coinflow.trade.events"}) +``` + +**2. 테스트 시나리오** +``` +EVT-E2E-001: 주문 체결 시 coinflow.trade.events에 TRADE_CREATED 메시지 발행 검증 +EVT-E2E-002: 주문 취소 시 coinflow.order.events에 ORDER_CANCELED 메시지 발행 검증 +EVT-E2E-003: Outbox 재시도 검증 (Kafka 발행 실패 시 publish_attempts 증가) +``` + +**3. WebSocket 수신 테스트** +```java +StompSession session = stompClient.connect("/ws", ...); +session.subscribe("/topic/trades/BTC-KRW", handler); +// 주문 체결 → handler가 메시지 수신할 때까지 대기 (CompletableFuture) +``` + +--- + +## 전체 구현 순서 + +``` +#19 kafka-setup + └─ Docker Compose + Spring 연동 확인 + ↓ +#20 outbox-publisher + └─ DB → Kafka 발행 동작 확인 + ↓ +#21 websocket + └─ Kafka → WebSocket broadcast 동작 확인 + ↓ +#22 e2e + └─ 전 구간 자동 테스트 +``` + +--- + +## 면접 어필 포인트 정리 + +| 주제 | 설명 | +|------|------| +| Outbox Pattern | "DB 트랜잭션과 메시지 발행의 원자성 문제를 Outbox로 해결" | +| at-least-once | "중복 발행 가능성 인지, consumer idempotent 처리" | +| 재시도/dead-letter | "publish_attempts로 실패 추적, 임계치 초과 시 별도 처리" | +| CDC 확장 | "현재 polling 방식, Debezium CDC로 전환 시 딜레이 수십 ms로 감소 가능" | +| 실시간 push | "STOMP WebSocket으로 클라이언트에 체결/오더북 실시간 broadcast" | From 1977a809d6792b7b6f2a7a275cce21067687a2f5 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 13:44:53 +0900 Subject: [PATCH 4/7] =?UTF-8?q?feat(wallet):=20=EC=9E=85=EA=B8=88=20API=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20(WAL-003)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/coinflow/wallet/WalletApiTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/test/java/com/coinflow/wallet/WalletApiTest.java b/src/test/java/com/coinflow/wallet/WalletApiTest.java index 713fdc3..3c08048 100644 --- a/src/test/java/com/coinflow/wallet/WalletApiTest.java +++ b/src/test/java/com/coinflow/wallet/WalletApiTest.java @@ -182,6 +182,74 @@ void setUp() { assertThat(sellerKrwLedgers.get(0).getDeltaAvailable()).isEqualByComparingTo("10000"); } + // ── WAL-003 입금 API ────────────────────────────────────────────── + + @Test + void KRW_입금_성공() { + String token = signupAndLogin("wallet007@example.com"); + + var response = depositViaApi(token, "KRW", "1000000"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody().get("asset")).isEqualTo("KRW"); + assertThat(new BigDecimal((String) response.getBody().get("availableBalance"))) + .isEqualByComparingTo("1000000"); + } + + @Test + void BTC_입금_성공() { + String token = signupAndLogin("wallet008@example.com"); + + var response = depositViaApi(token, "BTC", "0.5"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody().get("asset")).isEqualTo("BTC"); + assertThat(new BigDecimal((String) response.getBody().get("availableBalance"))) + .isEqualByComparingTo("0.5"); + } + + @Test + void 입금_후_SEED_DEPOSIT_원장_기록() { + String token = signupAndLogin("wallet009@example.com"); + depositViaApi(token, "KRW", "500000"); + + var user = userRepository.findByEmail("wallet009@example.com").orElseThrow(); + List ledgers = walletLedgerRepository + .findAllByUserIdAndAssetOrderByCreatedAtDesc(user.getId(), "KRW"); + + assertThat(ledgers).hasSize(1); + assertThat(ledgers.get(0).getType()).isEqualTo(LedgerType.SEED_DEPOSIT); + assertThat(ledgers.get(0).getDeltaAvailable()).isEqualByComparingTo("500000"); + assertThat(ledgers.get(0).getAvailableBalanceAfter()).isEqualByComparingTo("500000"); + } + + @Test + void 존재하지_않는_자산_입금_실패() { + String token = signupAndLogin("wallet010@example.com"); + + var response = depositViaApi(token, "ETH", "1.0"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + assertThat(response.getBody().get("code")).isEqualTo("WALLET_NOT_FOUND"); + } + + @Test + void 입금액_0이하_실패() { + String token = signupAndLogin("wallet011@example.com"); + + var response = depositViaApi(token, "KRW", "0"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST); + assertThat(response.getBody().get("code")).isEqualTo("INVALID_AMOUNT"); + } + + @Test + void 입금_토큰_없음() { + var response = restTemplate.postForEntity("/api/v1/wallets/deposit", + Map.of("asset", "KRW", "amount", "1000"), Map.class); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED); + } + // ── helpers ─────────────────────────────────────────────────────── private String signupAndLogin(String email) { @@ -237,6 +305,13 @@ private ResponseEntity cancelOrder(String token, Long orderId) { return restTemplate.exchange("/api/v1/orders/" + orderId + "/cancel", HttpMethod.POST, new HttpEntity<>(headers), Map.class); } + private ResponseEntity depositViaApi(String token, String asset, String amount) { + HttpHeaders headers = new HttpHeaders(); + headers.setBearerAuth(token); + return restTemplate.exchange("/api/v1/wallets/deposit", HttpMethod.POST, + new HttpEntity<>(Map.of("asset", asset, "amount", amount), headers), Map.class); + } + private ResponseEntity getWallets(String token) { HttpHeaders headers = new HttpHeaders(); headers.setBearerAuth(token); From 2b5bc0d50c2d2625ab8bafc98b741e9273863406 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 15:03:49 +0900 Subject: [PATCH 5/7] =?UTF-8?q?docs(v2):=20Phase=202=20PRD=20=EB=B0=8F=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84=20=ED=94=8C=EB=9E=9C=20=EA=B5=AC=EC=B2=B4?= =?UTF-8?q?=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 거래소 도메인 관점에서 Kafka/WebSocket 도입 배경 명시. 설계 결정 근거, 면접 Q&A, 이슈별 파일 목록 추가. --- .docs/v2/PRD.md | 226 +++++++++++++------- .docs/v2/Plan.md | 534 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 580 insertions(+), 180 deletions(-) diff --git a/.docs/v2/PRD.md b/.docs/v2/PRD.md index 3f98751..303e072 100644 --- a/.docs/v2/PRD.md +++ b/.docs/v2/PRD.md @@ -1,108 +1,166 @@ # CoinFlow Phase 2 PRD -이 문서는 CoinFlow Phase 1 MVP 완료 이후 Phase 2의 제품 범위와 기술 목표를 정의한다. - ## 1. 배경 -Phase 1에서는 단일 인스턴스 환경에서 지정가 주문 생성/취소/매칭/정산/원장/이벤트 로그까지 이어지는 핵심 파이프라인을 완성했다. +### Phase 1에서 완성한 것 -Phase 2에서는 **분산 시스템 확장성**을 목표로 한다. -도메인 이벤트를 Kafka로 안정적으로 발행하고, WebSocket으로 클라이언트에 실시간 push한다. +단일 인스턴스 환경에서 지정가 주문 생성 → 매칭 → 정산 → 원장 기록까지 이어지는 거래소 핵심 파이프라인을 완성했다. ---- +### Phase 2가 필요한 이유 -## 2. 목표 +거래소는 체결 이벤트 하나가 발생했을 때 여러 곳에 동시에 알려야 한다. + +``` +체결 발생 + ├── 매수자/매도자 지갑 정산 (Phase 1 완료) + ├── 호가창 갱신 (Phase 1 완료, in-memory) + ├── 체결 피드 클라이언트 broadcast ← Phase 2 + └── 외부 시스템 (통계, 리스크 등) ← Phase 2 이후 +``` -- 도메인 이벤트를 Outbox Pattern으로 Kafka에 안정적으로 발행한다. -- Kafka Consumer가 이벤트를 수신해 WebSocket으로 클라이언트에 실시간 broadcast한다. -- 이벤트 유실 없는 at-least-once 전달을 보장한다. -- 발행 실패 시 재시도 및 dead-letter 처리 경계를 확보한다. +이 모든 처리를 하나의 트랜잭션에 넣으면 결합도가 폭발하고 장애 전파가 발생한다. +Phase 2의 목표는 **이벤트 기반으로 이 결합을 끊는 것**이다. --- -## 3. 핵심 설계 결정 +## 2. 해결하려는 문제 + +### 문제 1 — 이벤트 유실 가능성 -### 3-1. Outbox Pattern +Phase 1에서 `domain_events` 테이블에 이벤트를 DB 트랜잭션 안에 저장해뒀다. +하지만 이 이벤트를 아무도 소비하지 않는다. Kafka로 발행하는 코드가 없다. -**문제** -OrderService에서 Kafka를 직접 발행하면 DB commit 성공 후 Kafka 발행 실패 시 이벤트가 유실된다. -두 작업은 단일 트랜잭션으로 묶을 수 없다. +OrderService에서 Kafka를 직접 호출하면: +``` +DB commit 성공 → Kafka 발행 실패 → 이벤트 유실 +``` +DB와 Kafka는 단일 트랜잭션으로 묶을 수 없기 때문에 이 갭이 생긴다. -**해결** -이벤트를 DB(domain_events)에 같은 트랜잭션 안에 먼저 저장(Phase 1에서 완료). -별도 프로세스(OutboxPublisher)가 주기적으로 미발행 이벤트를 조회해 Kafka에 발행한다. +### 문제 2 — 클라이언트는 실시간을 원한다 + +거래소에서 가격이 REST polling으로 갱신되면 제품이 성립하지 않는다. +호가창과 체결 피드는 WebSocket push가 없으면 사용자가 쓸 수 없다. + +--- + +## 3. 해결 방법 + +### 3-1. Outbox Pattern — 이벤트 유실 제거 + +Phase 1에서 `domain_events` 테이블에 이미 이벤트를 저장하고 있다. +이것이 Outbox다. ``` [OrderService] - │ (동일 트랜잭션) + │ DB 트랜잭션 안에서 이벤트 저장 (이미 완료) ▼ [domain_events] published=false - ▲ polling (1초) -[OutboxPublisher] - │ KafkaTemplate + ▲ + │ @Scheduled polling (1초) +[OutboxPublisher] ← Phase 2에서 추가 + │ KafkaTemplate.send().get() (동기, ACK 확인) ▼ [Kafka] + │ published=true로 갱신 ``` -**보장** -- Kafka 장애 시에도 이벤트는 DB에 안전하게 보관 -- 서버 재시작 후 미발행 이벤트 자동 재발행 -- `publish_attempts`로 재시도 횟수 추적, 임계치 초과 시 dead-letter 처리 가능 +**보장하는 것:** +- Kafka 장애 중에도 이벤트는 DB에 안전하게 보관된다 +- 서버 재시작 후 미발행 이벤트가 자동으로 재발행된다 +- `publish_attempts`로 실패 횟수를 추적하고 임계치 초과 시 dead-letter로 분류한다 -### 3-2. at-least-once 전달 +**감수하는 것:** +- at-least-once → 중복 발행이 발생할 수 있다 +- Consumer는 `event.id` 기준으로 idempotent하게 처리해야 한다 +- polling 방식이므로 최대 1초의 발행 딜레이가 있다 -Outbox polling 특성상 중복 발행이 발생할 수 있다. -Consumer는 `event.id` 기준으로 idempotent하게 처리한다. +### 3-2. Kafka — 서비스 간 결합 제거 -### 3-3. 확장 방향 (Phase 3) +Kafka를 이벤트 버스로 두면 OrderService는 체결 사실만 기록한다. +누가 그 이벤트를 소비할지 OrderService가 알 필요가 없다. -Scheduler polling 방식은 최대 1초의 딜레이가 있다. -MySQL binlog를 실시간으로 감지하는 **CDC(Change Data Capture) / Debezium**으로 전환하면 -딜레이를 수십 ms 수준으로 줄일 수 있다. +``` +OrderService → domain_events → Kafka + ├── WebSocketBroadcaster (실시간 push) + └── (미래) 통계 서비스, 리스크 엔진, ... +``` + +소비자가 늘어도 OrderService 코드를 건드리지 않는다. + +### 3-3. WebSocket — 실시간 push + +Kafka Consumer가 이벤트를 수신하면 STOMP WebSocket으로 클라이언트에 broadcast한다. + +``` +Kafka → WebSocketBroadcaster → /topic/trades/{market} + → /topic/orderbook/{market} +``` + +클라이언트는 관심 있는 시장을 구독하고, 체결/오더북 변경을 실시간으로 받는다. --- -## 4. 아키텍처 +## 4. 전체 아키텍처 ``` -[OrderService] ──(동일 트랜잭션)──▶ [domain_events] (published=false) - │ - @Scheduled (1초) - │ - [OutboxPublisher] - │ KafkaTemplate - ▼ - ┌──────────────────────────┐ - │ Kafka │ - │ coinflow.order.events │ - │ coinflow.trade.events │ - └──────────────────────────┘ - │ @KafkaListener - ▼ - [WebSocketBroadcaster] - │ SimpMessagingTemplate (STOMP) - ▼ - /topic/orderbook/{market} - /topic/trades/{market} +┌─────────────────────────────────────────────────────────┐ +│ Spring Boot App │ +│ │ +│ [OrderService] │ +│ │ 동일 트랜잭션 │ +│ ▼ │ +│ [domain_events] published=false ◄──────────────────┐ │ +│ │ │ +│ [OutboxPublisher] @Scheduled(fixedDelay=1000) │ │ +│ │ KafkaTemplate.send().get() │ │ +│ │ 성공: published=true │ │ +│ │ 실패: publish_attempts++ │ │ +└──────┼───────────────────────────────────────────────┘ │ + │ │ + ▼ │ + ┌─────────────────────────┐ │ + │ Kafka │ │ + │ coinflow.order.events │ │ + │ coinflow.trade.events │ │ + └──────────┬──────────────┘ │ + │ @KafkaListener │ + ▼ │ + [WebSocketBroadcaster] │ + │ SimpMessagingTemplate │ + ▼ │ + /topic/trades/{market} │ + /topic/orderbook/{market} │ + │ │ + ▼ │ + [Browser / Client] │ ``` --- ## 5. Kafka Topic 설계 -| Topic | 포함 이벤트 | 용도 | -|-------|------------|------| -| `coinflow.order.events` | ORDER_ACCEPTED, ORDER_PARTIALLY_FILLED, ORDER_FILLED, ORDER_CANCELED | 주문 상태 실시간 알림 | -| `coinflow.trade.events` | TRADE_CREATED, SETTLEMENT_COMPLETED | 체결/정산 실시간 broadcast | +| Topic | 이벤트 | 파티션 키 | 파티션 수 | +|-------|--------|-----------|-----------| +| `coinflow.order.events` | ORDER_ACCEPTED, ORDER_PARTIALLY_FILLED, ORDER_FILLED, ORDER_CANCELED | marketSymbol | 4 | +| `coinflow.trade.events` | TRADE_CREATED, SETTLEMENT_COMPLETED | marketSymbol | 4 | + +**파티션 키를 marketSymbol로 설정하는 이유:** +같은 시장의 이벤트가 같은 파티션으로 들어가 Consumer가 시장 단위 순서를 보장받는다. +BTC-KRW 이벤트와 ETH-KRW 이벤트는 서로 다른 파티션에서 병렬 처리된다. --- -## 6. WebSocket 엔드포인트 +## 6. WebSocket 채널 설계 -| 구독 채널 | 발행 시점 | 페이로드 | -|-----------|-----------|----------| -| `/topic/trades/{market}` | TRADE_CREATED | price, quantity, side, tradedAt | -| `/topic/orderbook/{market}` | ORDER_ACCEPTED / FILLED / CANCELED | 변경된 호가창 스냅샷 | +| 채널 | 발행 트리거 | 페이로드 | 용도 | +|------|------------|----------|------| +| `/topic/trades/{market}` | TRADE_CREATED | price, quantity, side, tradedAt | 체결 피드 | +| `/topic/orderbook/{market}` | ORDER_ACCEPTED / FILLED / PARTIALLY_FILLED / CANCELED | buySide[], sellSide[] 전체 스냅샷 | 호가창 갱신 | + +**오더북을 delta가 아닌 스냅샷으로 broadcast하는 이유:** +- delta 방식은 클라이언트가 로컬 상태를 유지하고 순서를 보장해야 한다 +- 패킷 유실 시 클라이언트 상태가 서버와 불일치한다 +- Phase 2에서는 스냅샷으로 단순화하고, Phase 3에서 delta + 체크섬으로 최적화한다 --- @@ -110,28 +168,40 @@ MySQL binlog를 실시간으로 감지하는 **CDC(Change Data Capture) / Debezi | 이슈 | 브랜치 | 내용 | |------|--------|------| -| #19 | feat/19/kafka-setup | Docker Compose (Kafka KRaft), Topic 설정, KafkaTemplate 빈 | -| #20 | feat/20/outbox-publisher | @Scheduled OutboxPublisher, 재시도/dead-letter 처리 | -| #21 | feat/21/websocket | STOMP WebSocket 설정, Kafka Consumer → broadcast | -| #22 | feat/22/e2e | 주문 → Kafka 발행 → WebSocket 수신 end-to-end 검증 | +| #19 | feat/19/kafka-setup | Docker Compose (MySQL + Kafka KRaft), Topic 설정, KafkaTemplate 빈 | +| #20 | feat/20/outbox-publisher | OutboxPublisher, 재시도/dead-letter 처리 | +| #21 | feat/21/websocket | STOMP WebSocket, Kafka Consumer → broadcast | +| #22 | feat/22/e2e | 주문 체결 → Kafka 발행 → WebSocket 수신 end-to-end 자동 검증 | --- ## 8. 기술 스택 추가 -| 기술 | 용도 | -|------|------| -| Apache Kafka (KRaft) | 메시지 브로커 | -| spring-kafka | KafkaTemplate, @KafkaListener | -| spring-websocket | STOMP WebSocket 서버 | -| spring-messaging | SimpMessagingTemplate | -| Docker Compose | 로컬 Kafka 환경 | +| 기술 | 선택 이유 | +|------|-----------| +| Apache Kafka (KRaft) | Zookeeper 없는 단순한 로컬 환경. 거래소 도메인의 표준 이벤트 브로커 | +| spring-kafka | `KafkaTemplate`, `@KafkaListener` Spring 네이티브 통합 | +| spring-websocket (STOMP) | pub/sub 구조가 명확하고 Spring 내장 지원. SockJS 폴백 포함 | +| Docker Compose | 단일 명령으로 개발 환경 재현 | --- ## 9. 성공 기준 -- 주문 생성 → 최대 2초 내 `/topic/trades/{market}` WebSocket 메시지 수신 -- Kafka 일시 중단 후 재시작 시 미발행 이벤트 자동 재발행 -- publish_attempts 3회 초과 이벤트 별도 식별 가능 -- end-to-end 통합 테스트 통과 +| 기준 | 측정 방법 | +|------|-----------| +| 주문 체결 → 2초 내 WebSocket 메시지 수신 | E2E 테스트 timeout 5s | +| Kafka 재시작 후 미발행 이벤트 자동 재발행 | `published=false` 이벤트 재발행 확인 | +| `publish_attempts >= 5` 이벤트 dead-letter 분류 | SQL 조회로 확인 | +| E2E 통합 테스트 통과 | CI 기준 | + +--- + +## 10. Phase 3 확장 방향 + +| 항목 | Phase 2 | Phase 3 | +|------|---------|---------| +| 이벤트 발행 방식 | Scheduler polling (1초 딜레이) | Debezium CDC (binlog 기반, 수십 ms) | +| 오더북 broadcast | 전체 스냅샷 | delta + 체크섬 | +| Consumer 확장 | 단일 앱 내 | 별도 서비스로 분리 | +| 부하 분산 | 단일 인스턴스 | Consumer group 수평 확장 | diff --git a/.docs/v2/Plan.md b/.docs/v2/Plan.md index 902a82e..6af18bb 100644 --- a/.docs/v2/Plan.md +++ b/.docs/v2/Plan.md @@ -1,188 +1,518 @@ # CoinFlow Phase 2 구현 플랜 +## Phase 1 완료 상태 — 이미 있는 것 + +| 항목 | 파일 | 비고 | +|------|------|------| +| `domain_events` 테이블 | `V3__create_orders_...sql` | `published`, `published_at`, `publish_attempts` 컬럼 포함 | +| `DomainEvent` 엔티티 | `event/domain/DomainEvent.java` | Outbox 필드 완비 | +| `DomainEventRecorder` | `event/service/DomainEventRecorder.java` | 6종 이벤트 DB 저장 | +| `Dockerfile` | 루트 | 멀티스테이지 빌드 | + +없는 것: `docker-compose.yml`, Kafka 의존성, WebSocket 의존성, OutboxPublisher, WebSocketBroadcaster + +--- + ## 이슈 #19 — Kafka 환경 구성 (feat/19/kafka-setup) ### 목표 -로컬 Kafka 환경 구성 및 Spring Boot 연동 -### 작업 내용 +`docker compose up -d` 한 번으로 MySQL + Kafka가 올라오고, +Spring Boot 실행 시 Topic이 자동 생성되는 것까지 확인한다. -**1. Docker Compose** -```yaml -# docker-compose.yml -kafka (KRaft 모드, Zookeeper 없음) - - KAFKA_PROCESS_ROLES: broker,controller - - port: 9092 -``` +### 왜 KRaft인가 -**2. build.gradle 의존성 추가** -``` -spring-kafka -``` +기존 Kafka는 메타데이터 관리를 위해 Zookeeper가 필요했다. +KRaft(Kafka Raft)는 Kafka 자체에 메타데이터를 내장해 Zookeeper 없이 동작한다. +로컬 개발 환경에서 컨테이너 하나로 끝낼 수 있다. + +### 작업 목록 + +**1. docker-compose.yml** (신규) -**3. application.yml Kafka 설정** ```yaml -spring: +services: + mysql: + image: mysql:8.0 + ports: ["3306:3306"] + environment: + MYSQL_DATABASE: coinflow + MYSQL_USER: coinflow + MYSQL_PASSWORD: coinflow + MYSQL_ROOT_PASSWORD: root + volumes: + - mysql-data:/var/lib/mysql + kafka: - bootstrap-servers: localhost:9092 - producer: - key-serializer: StringSerializer - value-serializer: StringSerializer - consumer: - group-id: coinflow-websocket - auto-offset-reset: latest + image: bitnami/kafka:3.7 + ports: ["9092:9092"] + environment: + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" + +volumes: + mysql-data: ``` -**4. Topic 생성 빈** +`AUTO_CREATE_TOPICS_ENABLE=false`로 설정하는 이유: +오타로 잘못된 Topic 이름이 자동 생성되는 것을 막기 위해 명시적 Topic 생성만 허용한다. + +**2. build.gradle** + +```groovy +implementation 'org.springframework.kafka:spring-kafka' +testImplementation 'org.springframework.kafka:spring-kafka-test' +``` + +**3. application.properties 추가** + +```properties +# Kafka +spring.kafka.bootstrap-servers=${KAFKA_SERVERS:localhost:9092} +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.consumer.group-id=coinflow-websocket +spring.kafka.consumer.auto-offset-reset=latest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer + +# Scheduling +spring.task.scheduling.pool.size=2 ``` -coinflow.order.events (partition: 4, replication: 1) -coinflow.trade.events (partition: 4, replication: 1) + +**4. KafkaTopicConfig.java** (신규, `com.coinflow.config`) + +```java +@Configuration +public class KafkaTopicConfig { + @Bean + public NewTopic orderEventsTopic() { + return TopicBuilder.name("coinflow.order.events").partitions(4).replicas(1).build(); + } + @Bean + public NewTopic tradeEventsTopic() { + return TopicBuilder.name("coinflow.trade.events").partitions(4).replicas(1).build(); + } +} ``` -**검증**: Spring Boot 실행 시 Topic 자동 생성 확인 +파티션을 4개로 설정하는 이유: +현재는 단일 Consumer지만, 향후 Consumer를 수평 확장할 때 파티션 수가 상한이 된다. +Topic 생성 후 파티션 수를 줄이는 것은 불가능하므로 여유 있게 설정한다. + +### 변경 파일 요약 + +| 파일 | 작업 | +|------|------| +| `docker-compose.yml` | 신규 | +| `src/main/java/com/coinflow/config/KafkaTopicConfig.java` | 신규 | +| `build.gradle` | `spring-kafka`, `spring-kafka-test` 추가 | +| `src/main/resources/application.properties` | Kafka 연결 설정 추가 | + +### 완료 기준 + +```bash +docker compose up -d +./gradlew bootRun +# Topic 생성 확인 +docker exec coinflow-kafka-1 kafka-topics.sh \ + --list --bootstrap-server localhost:9092 +# 출력: coinflow.order.events, coinflow.trade.events +``` --- ## 이슈 #20 — Outbox Publisher (feat/20/outbox-publisher) ### 목표 -domain_events 테이블의 미발행 이벤트를 Kafka로 안정적으로 발행 -### 작업 내용 +`domain_events.published=false` 이벤트를 1초마다 폴링해 Kafka로 발행한다. +발행 성공 시 `published=true`, 실패 시 `publish_attempts++`. +5회 실패한 이벤트는 dead-letter로 분류해 폴링에서 제외한다. + +### 핵심 설계 결정 + +| 항목 | 결정 | 이유 | +|------|------|------| +| send 방식 | `kafkaTemplate.send(...).get()` (동기) | 비동기 send는 ACK 전에 메서드가 끝날 수 있어 실패를 감지 못함 | +| 스케줄링 방식 | `fixedDelay` | `fixedRate`는 이전 실행이 끝나지 않아도 다음 실행이 시작 → 중복 처리 위험 | +| 배치 크기 | 최대 100건 | 단일 폴링 주기 내 처리량 상한. DB 부하 통제 | +| MAX_ATTEMPTS | 5 | 일시적 Kafka 장애는 5회 안에 해소된다고 가정 | +| 파티션 키 | marketSymbol | 같은 시장 이벤트 → 같은 파티션 → Consumer 순서 보장 | +| 트랜잭션 범위 | 배치 단위 `@Transactional` | 이벤트 하나 실패 시 해당 이벤트만 attempts 증가, 나머지는 published 처리 | + +### 작업 목록 + +**1. DomainEvent.java — 상태 변경 메서드 추가** -**1. OutboxPublisher** ```java -@Scheduled(fixedDelay = 1000) -public void publish() { - // published=false, publish_attempts < 5 인 이벤트 최대 100건 조회 - // KafkaTemplate으로 발행 - // 성공: published=true, published_at=now() - // 실패: publish_attempts++ +public void markPublished() { + this.published = true; + this.publishedAt = LocalDateTime.now(); +} + +public void incrementAttempts() { + this.publishAttempts++; } ``` -**2. 발행 Topic 라우팅** +엔티티가 자신의 상태 전환을 메서드로 캡슐화한다. +외부에서 필드를 직접 수정하지 않는다. + +**2. DomainEventRepository.java — Outbox 쿼리 추가** + +```java +// published=false이고 attempts < MAX_ATTEMPTS인 이벤트를 id 오름차순으로 최대 100건 조회 +// → 오래된 이벤트부터 발행 (FIFO 보장) +List findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(int maxAttempts); ``` -ORDER_* 이벤트 → coinflow.order.events -TRADE_* 이벤트 → coinflow.trade.events + +**3. OutboxPublisher.java** (신규, `com.coinflow.event.service`) + +```java +@Component +@RequiredArgsConstructor +public class OutboxPublisher { + + static final int MAX_ATTEMPTS = 5; + static final String ORDER_TOPIC = "coinflow.order.events"; + static final String TRADE_TOPIC = "coinflow.trade.events"; + + private final DomainEventRepository domainEventRepository; + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + @Scheduled(fixedDelay = 1000) + @Transactional + public void publish() { + List pending = domainEventRepository + .findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(MAX_ATTEMPTS); + + for (DomainEvent event : pending) { + try { + String topic = resolveTopic(event.getEventType()); + String message = buildMessage(event); + kafkaTemplate.send(topic, event.getMarketSymbol(), message).get(); + event.markPublished(); + } catch (Exception e) { + event.incrementAttempts(); + } + } + } + + private String resolveTopic(DomainEventType type) { + return switch (type) { + case ORDER_ACCEPTED, ORDER_PARTIALLY_FILLED, + ORDER_FILLED, ORDER_CANCELED -> ORDER_TOPIC; + case TRADE_CREATED, SETTLEMENT_COMPLETED -> TRADE_TOPIC; + }; + } + + private String buildMessage(DomainEvent event) throws JsonProcessingException { + return objectMapper.writeValueAsString(Map.of( + "eventId", event.getId(), + "eventType", event.getEventType().name(), + "marketSymbol", event.getMarketSymbol(), + "payload", event.getPayload() + )); + } +} ``` -**3. Kafka 메시지 구조** +**4. CoinflowApplication.java** + +```java +@EnableScheduling // 추가 +``` + +### Kafka 메시지 포맷 + ```json { - "eventId": 1, - "eventType": "TRADE_CREATED", - "aggregateId": 42, + "eventId": 1, + "eventType": "TRADE_CREATED", "marketSymbol": "BTC-KRW", - "payload": "{...}" + "payload": "{\"tradeId\":1,\"price\":\"100000000\",\"quantity\":\"0.0001\",\"quoteAmount\":\"10000\"}" } ``` -**4. DomainEventRepository 쿼리 추가** -```java -findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(int maxAttempts) -updatePublishedTrue(Long id) -incrementPublishAttempts(Long id) +`payload`는 이미 JSON 문자열로 직렬화되어 있으므로 그대로 포함한다. +Consumer는 `payload`를 다시 역직렬화해서 사용한다. + +### Dead-letter 모니터링 + +```sql +-- publish_attempts >= 5인 이벤트 목록 +SELECT id, event_type, market_symbol, payload, created_at, publish_attempts +FROM domain_events +WHERE published = false AND publish_attempts >= 5 +ORDER BY created_at; ``` -**검증**: 주문 생성 후 1~2초 내 Kafka 메시지 발행 확인 (kafka-console-consumer) +### 변경 파일 요약 + +| 파일 | 작업 | +|------|------| +| `event/domain/DomainEvent.java` | `markPublished()`, `incrementAttempts()` 추가 | +| `event/repository/DomainEventRepository.java` | Outbox 폴링 쿼리 추가 | +| `event/service/OutboxPublisher.java` | 신규 | +| `CoinflowApplication.java` | `@EnableScheduling` 추가 | + +### 완료 기준 + +```bash +# 주문 체결 후 1~2초 내 메시지 확인 +docker exec coinflow-kafka-1 kafka-console-consumer.sh \ + --bootstrap-server localhost:9092 \ + --topic coinflow.trade.events \ + --from-beginning + +# 출력 예시 +{"eventId":1,"eventType":"TRADE_CREATED","marketSymbol":"BTC-KRW","payload":"{...}"} +``` --- ## 이슈 #21 — WebSocket 실시간 broadcast (feat/21/websocket) ### 목표 -Kafka Consumer가 이벤트를 수신해 WebSocket 클라이언트에 실시간 push -### 작업 내용 +Kafka Consumer가 이벤트를 수신하면 STOMP WebSocket으로 클라이언트에 push한다. +체결 발생 시 체결 피드, 주문 변경 시 오더북 스냅샷을 broadcast한다. + +### 핵심 설계 결정 + +| 항목 | 결정 | 이유 | +|------|------|------| +| 프로토콜 | STOMP over WebSocket | pub/sub 구조가 명확. 채널 단위 구독 가능 | +| SockJS | 포함 | WebSocket 미지원 환경 폴백 자동 처리 | +| 오더북 broadcast | 전체 스냅샷 | delta 방식은 클라이언트가 로컬 상태를 유지해야 해서 패킷 유실 시 불일치 발생 | +| Consumer groupId | `coinflow-websocket` | 향후 다른 Consumer group 추가 시 독립적으로 오프셋 관리 | + +### 작업 목록 + +**1. build.gradle** + +```groovy +implementation 'org.springframework.boot:spring-boot-starter-websocket' +``` + +**2. WebSocketConfig.java** (신규, `com.coinflow.config`) -**1. WebSocket 설정** ```java @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - // STOMP endpoint: /ws - // topic prefix: /topic - // app prefix: /app + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableSimpleBroker("/topic"); + registry.setApplicationDestinationPrefixes("/app"); + } + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/ws") + .setAllowedOriginPatterns("*") + .withSockJS(); + registry.addEndpoint("/ws") // wscat 등 raw WebSocket 클라이언트용 + .setAllowedOriginPatterns("*"); + } } ``` -**2. Kafka Consumer** +**3. WebSocket 메시지 DTO** (`com.coinflow.websocket.dto`) + ```java -@KafkaListener(topics = "coinflow.trade.events", groupId = "coinflow-websocket") -public void onTradeEvent(String message) { - // TRADE_CREATED → /topic/trades/{market} broadcast - // SETTLEMENT_COMPLETED → 무시 (or 추가 활용) -} +// 체결 피드: /topic/trades/{market} +record TradeMessage( + String market, + String price, + String quantity, + String side, // BUY or SELL (taker 기준) + String tradedAt +) {} + +// 오더북 스냅샷: /topic/orderbook/{market} +record OrderBookMessage( + String market, + List buySide, // 높은 가격 우선 + List sellSide // 낮은 가격 우선 +) {} + +record PriceLevel(String price, String quantity) {} +``` + +**4. WebSocketBroadcaster.java** (신규, `com.coinflow.websocket`) -@KafkaListener(topics = "coinflow.order.events", groupId = "coinflow-websocket") -public void onOrderEvent(String message) { - // ORDER_FILLED / CANCELED → 오더북 변경 → /topic/orderbook/{market} broadcast +```java +@Component +@RequiredArgsConstructor +public class WebSocketBroadcaster { + + private final SimpMessagingTemplate messaging; + private final MatchingEngine matchingEngine; + private final ObjectMapper objectMapper; + + @KafkaListener(topics = "coinflow.trade.events", groupId = "coinflow-websocket") + public void onTradeEvent(String raw) { + KafkaMessage msg = parse(raw); + if ("TRADE_CREATED".equals(msg.eventType())) { + TradeMessage trade = buildTradeMessage(msg); + messaging.convertAndSend("/topic/trades/" + msg.marketSymbol(), trade); + } + } + + @KafkaListener(topics = "coinflow.order.events", groupId = "coinflow-websocket") + public void onOrderEvent(String raw) { + KafkaMessage msg = parse(raw); + Set triggers = Set.of( + "ORDER_ACCEPTED", "ORDER_FILLED", "ORDER_PARTIALLY_FILLED", "ORDER_CANCELED" + ); + if (triggers.contains(msg.eventType())) { + OrderBookMessage snapshot = buildSnapshot(msg.marketSymbol()); + messaging.convertAndSend("/topic/orderbook/" + msg.marketSymbol(), snapshot); + } + } } ``` -**3. WebSocket 메시지 DTO** -``` -TradeMessage { market, price, quantity, side, tradedAt } -OrderBookMessage { market, buySide[], sellSide[] } +`MatchingEngine`에서 현재 오더북 상태를 읽어 스냅샷을 만든다. +메모리 오더북은 이미 최신 상태를 유지하고 있으므로 별도 DB 조회 없이 broadcast 가능하다. + +**5. SecurityConfig.java 수정** + +```java +.requestMatchers("/ws/**").permitAll() ``` -**검증**: WebSocket 클라이언트(Postman or wscat) 구독 후 주문 체결 시 메시지 수신 확인 +### 변경 파일 요약 + +| 파일 | 작업 | +|------|------| +| `config/WebSocketConfig.java` | 신규 | +| `websocket/WebSocketBroadcaster.java` | 신규 | +| `websocket/dto/TradeMessage.java` | 신규 | +| `websocket/dto/OrderBookMessage.java` | 신규 | +| `websocket/dto/PriceLevel.java` | 신규 | +| `build.gradle` | `spring-boot-starter-websocket` 추가 | +| `config/SecurityConfig.java` | `/ws/**` permitAll 추가 | + +### 완료 기준 + +```bash +# wscat 설치: npm install -g wscat +wscat -c ws://localhost:8080/ws + +# STOMP CONNECT 프레임 전송 후 구독 +SUBSCRIBE /topic/trades/BTC-KRW + +# 별도 터미널에서 매수/매도 주문 체결 → 아래 메시지 수신 확인 +{"market":"BTC-KRW","price":"100000000","quantity":"0.0001","side":"BUY","tradedAt":"..."} +``` --- -## 이슈 #22 — end-to-end 통합 테스트 (feat/22/e2e) +## 이슈 #22 — E2E 통합 테스트 (feat/22/e2e) ### 목표 -주문 → Kafka 발행 → WebSocket 수신 전 과정 자동 검증 -### 작업 내용 +주문 체결 → Kafka 발행 → WebSocket 수신 전 구간을 자동으로 검증한다. +CI에서 Docker 없이 실행되어야 한다 → `@EmbeddedKafka` 사용. + +### 테스트 시나리오 + +| ID | 시나리오 | 검증 항목 | +|----|----------|-----------| +| EVT-E2E-001 | 주문 체결 → `coinflow.trade.events`에 `TRADE_CREATED` 발행 | eventType, marketSymbol, payload.price | +| EVT-E2E-002 | 주문 취소 → `coinflow.order.events`에 `ORDER_CANCELED` 발행 | eventType, payload.orderId | +| EVT-E2E-003 | Kafka 발행 실패 → `publish_attempts` 1 증가 | attempts == 1, published == false | +| EVT-E2E-004 | `publish_attempts >= 5` → 폴링 제외 | 6번째 poll에서 해당 이벤트 미발행 | +| WS-E2E-001 | 주문 체결 → `/topic/trades/BTC-KRW` 수신 | price, quantity 일치, 5초 내 수신 | + +### 작업 목록 + +**1. KafkaPublishingTest.java** (신규, `com.coinflow.e2e`) -**1. EmbeddedKafka 테스트 환경** ```java +@SpringBootTest(webEnvironment = RANDOM_PORT) +@Import(TestcontainersConfig.class) @EmbeddedKafka(partitions = 1, topics = {"coinflow.order.events", "coinflow.trade.events"}) -``` +class KafkaPublishingTest { -**2. 테스트 시나리오** + @Autowired OutboxPublisher outboxPublisher; + @Autowired DomainEventRepository domainEventRepository; + + @Test + void 주문_체결_시_TRADE_CREATED_Kafka_발행() throws Exception { + // given: 매수자/매도자 회원가입 + 입금 + 매수/매도 주문 생성 (REST API) + // when: outboxPublisher.publish() 직접 호출 + // then: KafkaTestUtils.getRecords()로 메시지 수신 후 eventType assert + } +} ``` -EVT-E2E-001: 주문 체결 시 coinflow.trade.events에 TRADE_CREATED 메시지 발행 검증 -EVT-E2E-002: 주문 취소 시 coinflow.order.events에 ORDER_CANCELED 메시지 발행 검증 -EVT-E2E-003: Outbox 재시도 검증 (Kafka 발행 실패 시 publish_attempts 증가) + +**2. OutboxRetryTest.java** (신규) + +```java +// KafkaTemplate을 Mock으로 교체 → send() 예외 발생하도록 설정 +// outboxPublisher.publish() 호출 +// then: publish_attempts == 1, published == false +// 5회 반복 후 6번째 publish() 호출 → 해당 이벤트 미처리 확인 ``` -**3. WebSocket 수신 테스트** +**3. WebSocketBroadcastTest.java** (신규) + ```java -StompSession session = stompClient.connect("/ws", ...); -session.subscribe("/topic/trades/BTC-KRW", handler); -// 주문 체결 → handler가 메시지 수신할 때까지 대기 (CompletableFuture) +// WebSocketStompClient 생성 +// /ws 연결 → /topic/trades/BTC-KRW 구독 +// 주문 체결 API 호출 +// CompletableFuture.get(5, SECONDS) 대기 +// price, quantity assert ``` +### 변경 파일 요약 + +| 파일 | 작업 | +|------|------| +| `e2e/KafkaPublishingTest.java` | 신규 | +| `e2e/OutboxRetryTest.java` | 신규 | +| `e2e/WebSocketBroadcastTest.java` | 신규 | + --- ## 전체 구현 순서 ``` -#19 kafka-setup - └─ Docker Compose + Spring 연동 확인 - ↓ -#20 outbox-publisher - └─ DB → Kafka 발행 동작 확인 - ↓ -#21 websocket - └─ Kafka → WebSocket broadcast 동작 확인 - ↓ -#22 e2e - └─ 전 구간 자동 테스트 +feat/20/wallet PR 완료 + ↓ +feat/19/kafka-setup ── docker-compose + build.gradle + KafkaTopicConfig + 검증: docker compose up → Topic 생성 확인 + ↓ +feat/20/outbox-publisher ── OutboxPublisher + Repository 쿼리 + @EnableScheduling + 검증: kafka-console-consumer에서 메시지 1~2초 내 수신 + ↓ +feat/21/websocket ── WebSocketConfig + WebSocketBroadcaster + DTO + 검증: wscat 구독 후 주문 체결 → 메시지 수신 + ↓ +feat/22/e2e ── EmbeddedKafka + STOMP 클라이언트 테스트 + 검증: ./gradlew test 전체 통과 ``` --- -## 면접 어필 포인트 정리 - -| 주제 | 설명 | -|------|------| -| Outbox Pattern | "DB 트랜잭션과 메시지 발행의 원자성 문제를 Outbox로 해결" | -| at-least-once | "중복 발행 가능성 인지, consumer idempotent 처리" | -| 재시도/dead-letter | "publish_attempts로 실패 추적, 임계치 초과 시 별도 처리" | -| CDC 확장 | "현재 polling 방식, Debezium CDC로 전환 시 딜레이 수십 ms로 감소 가능" | -| 실시간 push | "STOMP WebSocket으로 클라이언트에 체결/오더북 실시간 broadcast" | +## 면접 Q&A 준비 + +| 예상 질문 | 답변 포인트 | +|-----------|------------| +| 왜 Kafka를 썼나? | 체결 이벤트 하나를 여러 소비자(WebSocket, 통계, 리스크)가 독립적으로 처리해야 해서. OrderService가 소비자를 알 필요 없음 | +| Outbox Pattern이 뭔가? | DB 트랜잭션과 Kafka 발행은 하나로 묶을 수 없음. 이벤트를 DB에 먼저 저장하고 별도 프로세스가 폴링해서 발행 → 이벤트 유실 0 | +| 왜 동기 send인가? | `.get()`으로 ACK 확인 후 `published=true`. 비동기면 Kafka 실패를 감지 못하고 이벤트가 유실될 수 있음 | +| 중복 발행은 어떻게 처리하나? | at-least-once를 감수. WebSocket broadcast는 중복이 UX에만 영향. DB 소비자 추가 시 `eventId` 기준 dedup | +| 파티션 키를 왜 marketSymbol로? | BTC-KRW 이벤트는 항상 같은 파티션 → Consumer가 시장 단위 순서를 보장받음 | +| 오더북을 왜 스냅샷으로? | delta는 클라이언트가 로컬 상태를 유지해야 하고, 패킷 유실 시 불일치. 스냅샷은 항상 최신 상태 보장 | +| polling의 한계는? | 최대 1초 딜레이. Debezium CDC로 전환하면 binlog 기반 실시간 감지, 수십 ms로 줄어듦 | From a12949b5b62e4741f3219845aac3d1ccaa773b85 Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 15:13:30 +0900 Subject: [PATCH 6/7] =?UTF-8?q?docs(v2):=20Phase=202=20=EC=84=A4=EA=B3=84?= =?UTF-8?q?=20=EB=AC=B8=EC=84=9C=20=EA=B5=AC=EC=B2=B4=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PRD: 정상/장애 시퀀스 다이어그램, 장애 시나리오 표, idempotency 명세 추가 Plan: KafkaMessage DTO, WebSocketBroadcaster 전체 구현, 테스트 코드 완성 --- .docs/v2/PRD.md | 186 +++++++++++--- .docs/v2/Plan.md | 630 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 674 insertions(+), 142 deletions(-) diff --git a/.docs/v2/PRD.md b/.docs/v2/PRD.md index 303e072..72cf7bd 100644 --- a/.docs/v2/PRD.md +++ b/.docs/v2/PRD.md @@ -109,35 +109,141 @@ Kafka → WebSocketBroadcaster → /topic/trades/{market} │ [OrderService] │ │ │ 동일 트랜잭션 │ │ ▼ │ -│ [domain_events] published=false ◄──────────────────┐ │ -│ │ │ -│ [OutboxPublisher] @Scheduled(fixedDelay=1000) │ │ -│ │ KafkaTemplate.send().get() │ │ -│ │ 성공: published=true │ │ -│ │ 실패: publish_attempts++ │ │ -└──────┼───────────────────────────────────────────────┘ │ - │ │ - ▼ │ - ┌─────────────────────────┐ │ - │ Kafka │ │ - │ coinflow.order.events │ │ - │ coinflow.trade.events │ │ - └──────────┬──────────────┘ │ - │ @KafkaListener │ - ▼ │ - [WebSocketBroadcaster] │ - │ SimpMessagingTemplate │ - ▼ │ - /topic/trades/{market} │ - /topic/orderbook/{market} │ - │ │ - ▼ │ - [Browser / Client] │ +│ [domain_events] published=false │ +│ │ +│ [OutboxPublisher] @Scheduled(fixedDelay=1000) │ +│ │ KafkaTemplate.send().get() │ +│ │ 성공 → published=true │ +│ │ 실패 → publish_attempts++ │ +└──────┼───────────────────────────────────────────────────┘ + │ + ▼ + ┌─────────────────────────┐ + │ Kafka │ + │ coinflow.order.events │ + │ coinflow.trade.events │ + └──────────┬──────────────┘ + │ @KafkaListener + ▼ + [WebSocketBroadcaster] + │ SimpMessagingTemplate + ▼ + /topic/trades/{market} ← 체결 피드 + /topic/orderbook/{market} ← 호가창 스냅샷 + │ + ▼ + [Browser / Client] +``` + +--- + +## 5. 정상 흐름 시퀀스 + +### 5-1. 주문 체결 → WebSocket 수신 전체 흐름 + +``` +Client A OrderService domain_events OutboxPublisher Kafka WebSocketBroadcaster Client B + │ │ │ │ │ │ │ + │─ POST /orders ▶│ │ │ │ │ │ + │ │─ match() ──────▶ │ │ │ │ │ + │ │─ settle() ─────▶ │ │ │ │ │ + │ │─ save(event) ──▶ │ published=false │ │ │ │ + │◀─ 200 OK ──────│ │ │ │ │ │ + │ │ │ │ │ │ │ + │ │ │ @Scheduled(1s) │ │ │ │ + │ │ │◀─ poll ─────────│ │ │ │ + │ │ │─ events ───────▶│ │ │ │ + │ │ │ │─ send() ───▶ │ │ │ + │ │ │ │◀─ ACK ───────│ │ │ + │ │ │◀─ published=true─│ │ │ │ + │ │ │ │ │─ @KafkaListener▶ │ + │ │ │ │ │ │─ convertAndSend▶│ + │ │ │ │ │ │ /topic/trades │ + │ │ │ │ │ │ │◀ STOMP PUSH +``` + +### 5-2. Kafka 장애 시 흐름 + +``` +OutboxPublisher Kafka domain_events + │ │ │ + │─ send() ─────────▶│ │ + │◀─ TimeoutException─│ │ + │─ incrementAttempts()────────────────▶│ publish_attempts=1 + │ │ │ + │ (1초 후 재시도) │ │ + │─ send() ─────────▶│ │ + │◀─ TimeoutException─│ │ + │─ incrementAttempts()────────────────▶│ publish_attempts=2 + │ │ │ + │ (Kafka 복구) │ │ + │─ send() ─────────▶│ │ + │◀─ ACK ─────────────│ │ + │─ markPublished() ──────────────────▶│ published=true +``` + +### 5-3. 서버 재시작 시 흐름 + +``` +서버 재시작 + │ + ▼ +OutboxPublisher @Scheduled 시작 + │ + │ published=false AND attempts < 5 인 이벤트 조회 + ▼ +[이전에 발행 못한 이벤트들] + │ + ▼ +Kafka로 재발행 → published=true +``` + +--- + +## 6. 장애 시나리오 정의 + +| 시나리오 | 발생 시점 | 결과 | 복구 방법 | +|---------|-----------|------|-----------| +| Kafka 일시 장애 | send() 호출 시 | `publish_attempts++`, 다음 poll에서 재시도 | Kafka 복구 후 자동 재발행 | +| 서버 재시작 (정상) | publish 완료 전 | `published=false` 이벤트 남아있음 | 재시작 후 @Scheduled가 자동 재발행 | +| 서버 재시작 (send 후 ACK 전) | Kafka 수신, DB 미갱신 | `published=false`로 남아 재발행 | Consumer가 중복 수신 → idempotent 처리 | +| Kafka 영구 장애 | send() 5회 실패 | `publish_attempts=5`, 폴링 제외 | 수동 조회 후 재처리 또는 dead-letter 큐 | +| WebSocket 연결 끊김 | broadcast 시 | 해당 클라이언트만 미수신 | 클라이언트가 재연결 후 최신 스냅샷 수신 | + +--- + +## 7. at-least-once와 idempotency + +### 중복 발행이 발생하는 경우 + +OutboxPublisher가 `send().get()`으로 ACK를 받은 뒤 `markPublished()`를 DB에 커밋하기 전에 서버가 죽으면, +다음 재시작 시 같은 이벤트를 다시 발행한다. + +``` +정상: send → ACK → markPublished (commit) → 끝 +장애: send → ACK → [서버 크래시] → 재시작 → 같은 이벤트 재발행 +``` + +### Consumer의 idempotent 처리 + +Phase 2의 WebSocket broadcast는 중복 수신이 발생해도 클라이언트 UX에만 영향을 준다. +같은 체결 메시지가 두 번 오면 화면에 같은 row가 두 번 표시될 수 있다. + +DB에 결과를 저장하는 Consumer(향후 통계, 정산 등)를 추가할 때는 `event.id` 기준 dedup이 필요하다. + +```sql +-- Consumer가 이미 처리한 이벤트를 기록하는 테이블 (Phase 3 이후) +CREATE TABLE processed_events ( + event_id BIGINT NOT NULL, + consumer VARCHAR(60) NOT NULL, + processed_at DATETIME(6), + PRIMARY KEY (event_id, consumer) +); ``` --- -## 5. Kafka Topic 설계 +## 8. Kafka Topic 설계 | Topic | 이벤트 | 파티션 키 | 파티션 수 | |-------|--------|-----------|-----------| @@ -148,9 +254,13 @@ Kafka → WebSocketBroadcaster → /topic/trades/{market} 같은 시장의 이벤트가 같은 파티션으로 들어가 Consumer가 시장 단위 순서를 보장받는다. BTC-KRW 이벤트와 ETH-KRW 이벤트는 서로 다른 파티션에서 병렬 처리된다. +**파티션을 4개로 설정하는 이유:** +Consumer group의 병렬 처리 가능한 Consumer 수의 상한이 파티션 수다. +생성 후 파티션 수를 줄이는 것은 불가능하므로 여유 있게 설정한다. + --- -## 6. WebSocket 채널 설계 +## 9. WebSocket 채널 설계 | 채널 | 발행 트리거 | 페이로드 | 용도 | |------|------------|----------|------| @@ -162,9 +272,15 @@ BTC-KRW 이벤트와 ETH-KRW 이벤트는 서로 다른 파티션에서 병렬 - 패킷 유실 시 클라이언트 상태가 서버와 불일치한다 - Phase 2에서는 스냅샷으로 단순화하고, Phase 3에서 delta + 체크섬으로 최적화한다 +**side 결정 방식 (TRADE_CREATED 기준):** +``` +takerOrderId == buyOrderId → side = "BUY" +takerOrderId == sellOrderId → side = "SELL" +``` + --- -## 7. 구현 이슈 +## 10. 구현 이슈 | 이슈 | 브랜치 | 내용 | |------|--------|------| @@ -175,7 +291,7 @@ BTC-KRW 이벤트와 ETH-KRW 이벤트는 서로 다른 파티션에서 병렬 --- -## 8. 기술 스택 추가 +## 11. 기술 스택 추가 | 기술 | 선택 이유 | |------|-----------| @@ -186,22 +302,22 @@ BTC-KRW 이벤트와 ETH-KRW 이벤트는 서로 다른 파티션에서 병렬 --- -## 9. 성공 기준 +## 12. 성공 기준 | 기준 | 측정 방법 | |------|-----------| | 주문 체결 → 2초 내 WebSocket 메시지 수신 | E2E 테스트 timeout 5s | | Kafka 재시작 후 미발행 이벤트 자동 재발행 | `published=false` 이벤트 재발행 확인 | | `publish_attempts >= 5` 이벤트 dead-letter 분류 | SQL 조회로 확인 | -| E2E 통합 테스트 통과 | CI 기준 | +| E2E 통합 테스트 통과 | `./gradlew test` CI 기준 | --- -## 10. Phase 3 확장 방향 +## 13. Phase 3 확장 방향 | 항목 | Phase 2 | Phase 3 | |------|---------|---------| -| 이벤트 발행 방식 | Scheduler polling (1초 딜레이) | Debezium CDC (binlog 기반, 수십 ms) | -| 오더북 broadcast | 전체 스냅샷 | delta + 체크섬 | -| Consumer 확장 | 단일 앱 내 | 별도 서비스로 분리 | -| 부하 분산 | 단일 인스턴스 | Consumer group 수평 확장 | +| 이벤트 발행 방식 | Scheduler polling (최대 1초 딜레이) | Debezium CDC (binlog 기반, 수십 ms) | +| 오더북 broadcast | 전체 스냅샷 | delta + sequence number + 클라이언트 체크섬 | +| Consumer 구조 | 단일 앱 내 WebSocketBroadcaster | 별도 서비스로 분리, Consumer group 수평 확장 | +| idempotency | WebSocket broadcast만 (UX 허용) | DB Consumer 추가 시 processed_events 테이블로 dedup | diff --git a/.docs/v2/Plan.md b/.docs/v2/Plan.md index 6af18bb..0ffa9a4 100644 --- a/.docs/v2/Plan.md +++ b/.docs/v2/Plan.md @@ -17,13 +17,13 @@ ### 목표 -`docker compose up -d` 한 번으로 MySQL + Kafka가 올라오고, +`docker compose up -d` 한 번으로 MySQL + Kafka가 올라오고, Spring Boot 실행 시 Topic이 자동 생성되는 것까지 확인한다. ### 왜 KRaft인가 -기존 Kafka는 메타데이터 관리를 위해 Zookeeper가 필요했다. -KRaft(Kafka Raft)는 Kafka 자체에 메타데이터를 내장해 Zookeeper 없이 동작한다. +기존 Kafka는 메타데이터 관리를 위해 Zookeeper가 필요했다. +KRaft(Kafka Raft)는 Kafka 자체에 메타데이터를 내장해 Zookeeper 없이 동작한다. 로컬 개발 환경에서 컨테이너 하나로 끝낼 수 있다. ### 작업 목록 @@ -42,6 +42,11 @@ services: MYSQL_ROOT_PASSWORD: root volumes: - mysql-data:/var/lib/mysql + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] + interval: 10s + timeout: 5s + retries: 5 kafka: image: bitnami/kafka:3.7 @@ -53,12 +58,17 @@ services: KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 5s + retries: 5 volumes: mysql-data: ``` -`AUTO_CREATE_TOPICS_ENABLE=false`로 설정하는 이유: +`AUTO_CREATE_TOPICS_ENABLE=false`로 설정하는 이유: 오타로 잘못된 Topic 이름이 자동 생성되는 것을 막기 위해 명시적 Topic 생성만 허용한다. **2. build.gradle** @@ -89,21 +99,25 @@ spring.task.scheduling.pool.size=2 ```java @Configuration public class KafkaTopicConfig { + @Bean public NewTopic orderEventsTopic() { - return TopicBuilder.name("coinflow.order.events").partitions(4).replicas(1).build(); + return TopicBuilder.name("coinflow.order.events") + .partitions(4) + .replicas(1) + .build(); } + @Bean public NewTopic tradeEventsTopic() { - return TopicBuilder.name("coinflow.trade.events").partitions(4).replicas(1).build(); + return TopicBuilder.name("coinflow.trade.events") + .partitions(4) + .replicas(1) + .build(); } } ``` -파티션을 4개로 설정하는 이유: -현재는 단일 Consumer지만, 향후 Consumer를 수평 확장할 때 파티션 수가 상한이 된다. -Topic 생성 후 파티션 수를 줄이는 것은 불가능하므로 여유 있게 설정한다. - ### 변경 파일 요약 | 파일 | 작업 | @@ -118,6 +132,7 @@ Topic 생성 후 파티션 수를 줄이는 것은 불가능하므로 여유 있 ```bash docker compose up -d ./gradlew bootRun + # Topic 생성 확인 docker exec coinflow-kafka-1 kafka-topics.sh \ --list --bootstrap-server localhost:9092 @@ -130,20 +145,20 @@ docker exec coinflow-kafka-1 kafka-topics.sh \ ### 목표 -`domain_events.published=false` 이벤트를 1초마다 폴링해 Kafka로 발행한다. -발행 성공 시 `published=true`, 실패 시 `publish_attempts++`. +`domain_events.published=false` 이벤트를 1초마다 폴링해 Kafka로 발행한다. +발행 성공 시 `published=true`, 실패 시 `publish_attempts++`. 5회 실패한 이벤트는 dead-letter로 분류해 폴링에서 제외한다. ### 핵심 설계 결정 | 항목 | 결정 | 이유 | |------|------|------| -| send 방식 | `kafkaTemplate.send(...).get()` (동기) | 비동기 send는 ACK 전에 메서드가 끝날 수 있어 실패를 감지 못함 | +| send 방식 | `kafkaTemplate.send(...).get()` (동기) | 비동기 send는 ACK 전에 메서드가 끝나 실패를 감지 못할 수 있음 | | 스케줄링 방식 | `fixedDelay` | `fixedRate`는 이전 실행이 끝나지 않아도 다음 실행이 시작 → 중복 처리 위험 | | 배치 크기 | 최대 100건 | 단일 폴링 주기 내 처리량 상한. DB 부하 통제 | | MAX_ATTEMPTS | 5 | 일시적 Kafka 장애는 5회 안에 해소된다고 가정 | | 파티션 키 | marketSymbol | 같은 시장 이벤트 → 같은 파티션 → Consumer 순서 보장 | -| 트랜잭션 범위 | 배치 단위 `@Transactional` | 이벤트 하나 실패 시 해당 이벤트만 attempts 증가, 나머지는 published 처리 | +| 트랜잭션 범위 | 배치 단위 `@Transactional` | 이벤트 하나 실패해도 나머지는 published 처리 | ### 작업 목록 @@ -160,20 +175,18 @@ public void incrementAttempts() { } ``` -엔티티가 자신의 상태 전환을 메서드로 캡슐화한다. -외부에서 필드를 직접 수정하지 않는다. - **2. DomainEventRepository.java — Outbox 쿼리 추가** ```java // published=false이고 attempts < MAX_ATTEMPTS인 이벤트를 id 오름차순으로 최대 100건 조회 -// → 오래된 이벤트부터 발행 (FIFO 보장) +// id 오름차순 = 오래된 이벤트부터 발행 (FIFO 보장) List findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(int maxAttempts); ``` **3. OutboxPublisher.java** (신규, `com.coinflow.event.service`) ```java +@Slf4j @Component @RequiredArgsConstructor public class OutboxPublisher { @@ -190,7 +203,7 @@ public class OutboxPublisher { @Transactional public void publish() { List pending = domainEventRepository - .findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(MAX_ATTEMPTS); + .findTop100ByPublishedFalseAndPublishAttemptsLessThanOrderByIdAsc(MAX_ATTEMPTS); for (DomainEvent event : pending) { try { @@ -199,6 +212,8 @@ public class OutboxPublisher { kafkaTemplate.send(topic, event.getMarketSymbol(), message).get(); event.markPublished(); } catch (Exception e) { + log.warn("Kafka publish failed. eventId={}, attempts={}, error={}", + event.getId(), event.getPublishAttempts(), e.getMessage()); event.incrementAttempts(); } } @@ -213,12 +228,12 @@ public class OutboxPublisher { } private String buildMessage(DomainEvent event) throws JsonProcessingException { - return objectMapper.writeValueAsString(Map.of( - "eventId", event.getId(), - "eventType", event.getEventType().name(), - "marketSymbol", event.getMarketSymbol(), - "payload", event.getPayload() - )); + Map msg = new LinkedHashMap<>(); + msg.put("eventId", event.getId()); + msg.put("eventType", event.getEventType().name()); + msg.put("marketSymbol", event.getMarketSymbol()); + msg.put("payload", event.getPayload()); + return objectMapper.writeValueAsString(msg); } } ``` @@ -227,6 +242,8 @@ public class OutboxPublisher { ```java @EnableScheduling // 추가 +@SpringBootApplication +public class CoinflowApplication { ... } ``` ### Kafka 메시지 포맷 @@ -236,18 +253,17 @@ public class OutboxPublisher { "eventId": 1, "eventType": "TRADE_CREATED", "marketSymbol": "BTC-KRW", - "payload": "{\"tradeId\":1,\"price\":\"100000000\",\"quantity\":\"0.0001\",\"quoteAmount\":\"10000\"}" + "payload": "{\"tradeId\":1,\"price\":100000000,\"quantity\":0.0001,\"quoteAmount\":10000}" } ``` -`payload`는 이미 JSON 문자열로 직렬화되어 있으므로 그대로 포함한다. -Consumer는 `payload`를 다시 역직렬화해서 사용한다. +`payload`는 `DomainEventRecorder`가 직렬화한 JSON 문자열을 그대로 포함한다. +Consumer는 `payload`를 역직렬화해서 개별 필드를 사용한다. ### Dead-letter 모니터링 ```sql --- publish_attempts >= 5인 이벤트 목록 -SELECT id, event_type, market_symbol, payload, created_at, publish_attempts +SELECT id, event_type, market_symbol, created_at, publish_attempts FROM domain_events WHERE published = false AND publish_attempts >= 5 ORDER BY created_at; @@ -265,13 +281,12 @@ ORDER BY created_at; ### 완료 기준 ```bash -# 주문 체결 후 1~2초 내 메시지 확인 docker exec coinflow-kafka-1 kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic coinflow.trade.events \ --from-beginning -# 출력 예시 +# 주문 체결 후 1~2초 내 아래 메시지 출력 확인 {"eventId":1,"eventType":"TRADE_CREATED","marketSymbol":"BTC-KRW","payload":"{...}"} ``` @@ -281,7 +296,7 @@ docker exec coinflow-kafka-1 kafka-console-consumer.sh \ ### 목표 -Kafka Consumer가 이벤트를 수신하면 STOMP WebSocket으로 클라이언트에 push한다. +Kafka Consumer가 이벤트를 수신하면 STOMP WebSocket으로 클라이언트에 push한다. 체결 발생 시 체결 피드, 주문 변경 시 오더북 스냅샷을 broadcast한다. ### 핵심 설계 결정 @@ -292,6 +307,7 @@ Kafka Consumer가 이벤트를 수신하면 STOMP WebSocket으로 클라이언 | SockJS | 포함 | WebSocket 미지원 환경 폴백 자동 처리 | | 오더북 broadcast | 전체 스냅샷 | delta 방식은 클라이언트가 로컬 상태를 유지해야 해서 패킷 유실 시 불일치 발생 | | Consumer groupId | `coinflow-websocket` | 향후 다른 Consumer group 추가 시 독립적으로 오프셋 관리 | +| 오더북 스냅샷 소스 | MatchingEngine (in-memory) | 이미 최신 상태 유지. DB 조회 불필요 | ### 작업 목록 @@ -319,72 +335,158 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { registry.addEndpoint("/ws") .setAllowedOriginPatterns("*") .withSockJS(); - registry.addEndpoint("/ws") // wscat 등 raw WebSocket 클라이언트용 + registry.addEndpoint("/ws") // raw WebSocket (wscat 등 테스트 클라이언트용) .setAllowedOriginPatterns("*"); } } ``` -**3. WebSocket 메시지 DTO** (`com.coinflow.websocket.dto`) +**3. KafkaMessage.java** (신규, `com.coinflow.websocket.dto`) + +Kafka에서 수신한 메시지를 역직렬화하는 내부 DTO. + +```java +public record KafkaMessage( + Long eventId, + String eventType, + String marketSymbol, + String payload // TRADE_CREATED 등 이벤트별 JSON 문자열 +) {} +``` + +**4. TradeMessage.java** (신규, `/topic/trades/{market}` 발행용) ```java -// 체결 피드: /topic/trades/{market} -record TradeMessage( - String market, - String price, - String quantity, - String side, // BUY or SELL (taker 기준) - String tradedAt +public record TradeMessage( + String market, + String price, + String quantity, + String side, // "BUY" or "SELL" — taker 기준 + String tradedAt ) {} +``` + +**5. OrderBookMessage.java / PriceLevel.java** (신규, `/topic/orderbook/{market}` 발행용) -// 오더북 스냅샷: /topic/orderbook/{market} -record OrderBookMessage( - String market, - List buySide, // 높은 가격 우선 - List sellSide // 낮은 가격 우선 +```java +public record OrderBookMessage( + String market, + List buySide, // 높은 가격 우선 + List sellSide // 낮은 가격 우선 ) {} -record PriceLevel(String price, String quantity) {} +public record PriceLevel(String price, String quantity) {} ``` -**4. WebSocketBroadcaster.java** (신규, `com.coinflow.websocket`) +**6. MatchingEngine.java — 오더북 조회 메서드 추가** + +WebSocketBroadcaster가 현재 오더북 상태를 읽기 위해 필요. + +```java +// MatchingEngine에 추가 +public List getBuySide(String marketSymbol) { + MemoryOrderBook book = books.get(marketSymbol); + return book == null ? List.of() : book.getBuySide(); +} + +public List getSellSide(String marketSymbol) { + MemoryOrderBook book = books.get(marketSymbol); + return book == null ? List.of() : book.getSellSide(); +} +``` + +**7. WebSocketBroadcaster.java** (신규, `com.coinflow.websocket`) ```java +@Slf4j @Component @RequiredArgsConstructor public class WebSocketBroadcaster { private final SimpMessagingTemplate messaging; - private final MatchingEngine matchingEngine; - private final ObjectMapper objectMapper; + private final MatchingEngine matchingEngine; + private final ObjectMapper objectMapper; + + // ── 체결 이벤트 수신 ──────────────────────────────────────────────── @KafkaListener(topics = "coinflow.trade.events", groupId = "coinflow-websocket") public void onTradeEvent(String raw) { - KafkaMessage msg = parse(raw); - if ("TRADE_CREATED".equals(msg.eventType())) { - TradeMessage trade = buildTradeMessage(msg); - messaging.convertAndSend("/topic/trades/" + msg.marketSymbol(), trade); + try { + KafkaMessage msg = parse(raw); + if ("TRADE_CREATED".equals(msg.eventType())) { + TradeMessage trade = buildTradeMessage(msg); + messaging.convertAndSend("/topic/trades/" + msg.marketSymbol(), trade); + } + } catch (Exception e) { + log.error("Failed to broadcast trade event. raw={}", raw, e); } } + // ── 주문 이벤트 수신 ──────────────────────────────────────────────── + + private static final Set ORDER_BOOK_TRIGGERS = Set.of( + "ORDER_ACCEPTED", "ORDER_FILLED", "ORDER_PARTIALLY_FILLED", "ORDER_CANCELED" + ); + @KafkaListener(topics = "coinflow.order.events", groupId = "coinflow-websocket") public void onOrderEvent(String raw) { - KafkaMessage msg = parse(raw); - Set triggers = Set.of( - "ORDER_ACCEPTED", "ORDER_FILLED", "ORDER_PARTIALLY_FILLED", "ORDER_CANCELED" + try { + KafkaMessage msg = parse(raw); + if (ORDER_BOOK_TRIGGERS.contains(msg.eventType())) { + OrderBookMessage snapshot = buildSnapshot(msg.marketSymbol()); + messaging.convertAndSend("/topic/orderbook/" + msg.marketSymbol(), snapshot); + } + } catch (Exception e) { + log.error("Failed to broadcast order event. raw={}", raw, e); + } + } + + // ── 내부 메서드 ───────────────────────────────────────────────────── + + private KafkaMessage parse(String raw) throws JsonProcessingException { + return objectMapper.readValue(raw, KafkaMessage.class); + } + + private TradeMessage buildTradeMessage(KafkaMessage msg) throws JsonProcessingException { + JsonNode p = objectMapper.readTree(msg.payload()); + + // taker가 buyOrderId와 같으면 BUY, sellOrderId와 같으면 SELL + long takerOrderId = p.get("takerOrderId").asLong(); + long buyOrderId = p.get("buyOrderId").asLong(); + String side = (takerOrderId == buyOrderId) ? "BUY" : "SELL"; + + return new TradeMessage( + msg.marketSymbol(), + p.get("price").decimalValue().toPlainString(), + p.get("quantity").decimalValue().toPlainString(), + side, + LocalDateTime.now().toString() ); - if (triggers.contains(msg.eventType())) { - OrderBookMessage snapshot = buildSnapshot(msg.marketSymbol()); - messaging.convertAndSend("/topic/orderbook/" + msg.marketSymbol(), snapshot); + } + + private OrderBookMessage buildSnapshot(String marketSymbol) { + List buySide = aggregateLevels(matchingEngine.getBuySide(marketSymbol)); + List sellSide = aggregateLevels(matchingEngine.getSellSide(marketSymbol)); + return new OrderBookMessage(marketSymbol, buySide, sellSide); + } + + // 같은 가격의 여러 주문을 하나의 호가 레벨로 합산한다 + private List aggregateLevels(List entries) { + Map byPrice = new LinkedHashMap<>(); + for (OrderBookEntry e : entries) { + byPrice.merge(e.price(), e.remainingQuantity(), BigDecimal::add); } + return byPrice.entrySet().stream() + .map(en -> new PriceLevel( + en.getKey().toPlainString(), + en.getValue().stripTrailingZeros().toPlainString() + )) + .toList(); } } ``` -`MatchingEngine`에서 현재 오더북 상태를 읽어 스냅샷을 만든다. -메모리 오더북은 이미 최신 상태를 유지하고 있으므로 별도 DB 조회 없이 broadcast 가능하다. - -**5. SecurityConfig.java 수정** +**8. SecurityConfig.java 수정** ```java .requestMatchers("/ws/**").permitAll() @@ -396,9 +498,11 @@ public class WebSocketBroadcaster { |------|------| | `config/WebSocketConfig.java` | 신규 | | `websocket/WebSocketBroadcaster.java` | 신규 | +| `websocket/dto/KafkaMessage.java` | 신규 | | `websocket/dto/TradeMessage.java` | 신규 | | `websocket/dto/OrderBookMessage.java` | 신규 | | `websocket/dto/PriceLevel.java` | 신규 | +| `order/matching/MatchingEngine.java` | `getBuySide()`, `getSellSide()` 추가 | | `build.gradle` | `spring-boot-starter-websocket` 추가 | | `config/SecurityConfig.java` | `/ws/**` permitAll 추가 | @@ -408,10 +512,22 @@ public class WebSocketBroadcaster { # wscat 설치: npm install -g wscat wscat -c ws://localhost:8080/ws -# STOMP CONNECT 프레임 전송 후 구독 -SUBSCRIBE /topic/trades/BTC-KRW +# STOMP CONNECT 후 구독 (STOMP 프레임 직접 입력) +CONNECT +accept-version:1.2 + +^@ + +SUBSCRIBE +id:sub-0 +destination:/topic/trades/BTC-KRW + +^@ + +# 별도 터미널에서 매수/매도 주문 체결 후 아래 메시지 수신 확인 +MESSAGE +destination:/topic/trades/BTC-KRW -# 별도 터미널에서 매수/매도 주문 체결 → 아래 메시지 수신 확인 {"market":"BTC-KRW","price":"100000000","quantity":"0.0001","side":"BUY","tradedAt":"..."} ``` @@ -421,58 +537,351 @@ SUBSCRIBE /topic/trades/BTC-KRW ### 목표 -주문 체결 → Kafka 발행 → WebSocket 수신 전 구간을 자동으로 검증한다. -CI에서 Docker 없이 실행되어야 한다 → `@EmbeddedKafka` 사용. +주문 체결 → Kafka 발행 → WebSocket 수신 전 구간을 자동으로 검증한다. +CI에서 Docker 없이 실행되어야 한다 → MySQL은 Testcontainer, Kafka는 `@EmbeddedKafka`. + +### EmbeddedKafka + Testcontainer 조합 + +``` +MySQL → Testcontainer (TestcontainersConfig, 이미 구현) +Kafka → @EmbeddedKafka (in-process, Docker 불필요) +``` + +`@EmbeddedKafka`의 `bootstrapServersProperty` 옵션을 사용하면 +`spring.kafka.bootstrap-servers`를 임베디드 브로커 주소로 자동 오버라이드한다. + +```java +@EmbeddedKafka( + partitions = 1, + topics = {"coinflow.order.events", "coinflow.trade.events"}, + bootstrapServersProperty = "spring.kafka.bootstrap-servers" // 자동 오버라이드 +) +``` ### 테스트 시나리오 | ID | 시나리오 | 검증 항목 | |----|----------|-----------| -| EVT-E2E-001 | 주문 체결 → `coinflow.trade.events`에 `TRADE_CREATED` 발행 | eventType, marketSymbol, payload.price | -| EVT-E2E-002 | 주문 취소 → `coinflow.order.events`에 `ORDER_CANCELED` 발행 | eventType, payload.orderId | +| EVT-E2E-001 | 주문 체결 → `TRADE_CREATED` Kafka 발행 | eventType, marketSymbol 포함 여부 | +| EVT-E2E-002 | 주문 취소 → `ORDER_CANCELED` Kafka 발행 | eventType 포함 여부 | | EVT-E2E-003 | Kafka 발행 실패 → `publish_attempts` 1 증가 | attempts == 1, published == false | | EVT-E2E-004 | `publish_attempts >= 5` → 폴링 제외 | 6번째 poll에서 해당 이벤트 미발행 | -| WS-E2E-001 | 주문 체결 → `/topic/trades/BTC-KRW` 수신 | price, quantity 일치, 5초 내 수신 | +| WS-E2E-001 | 주문 체결 → `/topic/trades/BTC-KRW` 수신 | market, price, quantity 일치, 5초 내 수신 | ### 작업 목록 **1. KafkaPublishingTest.java** (신규, `com.coinflow.e2e`) ```java -@SpringBootTest(webEnvironment = RANDOM_PORT) +@SuppressWarnings({"rawtypes", "unchecked"}) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @Import(TestcontainersConfig.class) -@EmbeddedKafka(partitions = 1, topics = {"coinflow.order.events", "coinflow.trade.events"}) +@EmbeddedKafka( + partitions = 1, + topics = {"coinflow.order.events", "coinflow.trade.events"}, + bootstrapServersProperty = "spring.kafka.bootstrap-servers" +) class KafkaPublishingTest { - @Autowired OutboxPublisher outboxPublisher; - @Autowired DomainEventRepository domainEventRepository; + @Autowired private TestRestTemplate restTemplate; + @Autowired private OutboxPublisher outboxPublisher; + @Autowired private UserRepository userRepository; + @Autowired private WalletRepository walletRepository; + @Autowired private MatchingEngine matchingEngine; + + @Value("${spring.embedded.kafka.brokers}") + private String brokers; + + private Consumer consumer; + + @BeforeEach + void setUp() { + matchingEngine.clearAll(); + + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of("coinflow.trade.events", "coinflow.order.events")); + } + + @AfterEach + void tearDown() { + consumer.close(); + } + // EVT-E2E-001 @Test - void 주문_체결_시_TRADE_CREATED_Kafka_발행() throws Exception { - // given: 매수자/매도자 회원가입 + 입금 + 매수/매도 주문 생성 (REST API) - // when: outboxPublisher.publish() 직접 호출 - // then: KafkaTestUtils.getRecords()로 메시지 수신 후 eventType assert + void 주문_체결_시_TRADE_CREATED_Kafka_발행() { + String buyerToken = signupAndLogin("kafka001a@example.com"); + String sellerToken = signupAndLogin("kafka001b@example.com"); + depositKrw("kafka001a@example.com", new BigDecimal("10000000")); + depositBtc("kafka001b@example.com", new BigDecimal("0.001")); + createOrder(buyerToken, "BTC-KRW", "BUY", "100000000", "0.0001"); + createOrder(sellerToken, "BTC-KRW", "SELL", "100000000", "0.0001"); + + outboxPublisher.publish(); + + ConsumerRecords records = + KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(5)); + + List tradeMessages = StreamSupport.stream(records.spliterator(), false) + .filter(r -> "coinflow.trade.events".equals(r.topic())) + .map(ConsumerRecord::value) + .filter(v -> v.contains("TRADE_CREATED")) + .toList(); + + assertThat(tradeMessages).isNotEmpty(); + assertThat(tradeMessages.get(0)).contains("\"marketSymbol\":\"BTC-KRW\""); + } + + // EVT-E2E-002 + @Test + void 주문_취소_시_ORDER_CANCELED_Kafka_발행() { + String token = signupAndLogin("kafka002@example.com"); + depositKrw("kafka002@example.com", new BigDecimal("10000000")); + var resp = createOrder(token, "BTC-KRW", "BUY", "100000000", "0.0001"); + Long orderId = ((Number) resp.getBody().get("orderId")).longValue(); + cancelOrder(token, orderId); + + outboxPublisher.publish(); + + ConsumerRecords records = + KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(5)); + + boolean hasCanceled = StreamSupport.stream(records.spliterator(), false) + .filter(r -> "coinflow.order.events".equals(r.topic())) + .map(ConsumerRecord::value) + .anyMatch(v -> v.contains("ORDER_CANCELED")); + + assertThat(hasCanceled).isTrue(); + } + + // ── helpers ───────────────────────────────────────────────────────── + + private String signupAndLogin(String email) { + restTemplate.postForEntity("/api/v1/auth/signup", + Map.of("email", email, "password", "password1234", "nickname", "tester"), Map.class); + var login = restTemplate.postForEntity("/api/v1/auth/login", + Map.of("email", email, "password", "password1234"), Map.class); + return (String) login.getBody().get("accessToken"); + } + + private void depositKrw(String email, BigDecimal amount) { deposit(email, "KRW", amount); } + private void depositBtc(String email, BigDecimal amount) { deposit(email, "BTC", amount); } + + private void deposit(String email, String asset, BigDecimal amount) { + var user = userRepository.findByEmail(email).orElseThrow(); + var wallet = walletRepository.findAllByUserId(user.getId()).stream() + .filter(w -> w.getAsset().equals(asset)).findFirst().orElseThrow(); + wallet.deposit(amount); + walletRepository.save(wallet); + } + + private ResponseEntity createOrder(String token, String market, String side, + String price, String quantity) { + HttpHeaders headers = new HttpHeaders(); + headers.setBearerAuth(token); + var body = Map.of("market", market, "side", side, "type", "LIMIT", + "timeInForce", "GTC", "price", price, "quantity", quantity); + return restTemplate.exchange("/api/v1/orders", HttpMethod.POST, + new HttpEntity<>(body, headers), Map.class); + } + + private void cancelOrder(String token, Long orderId) { + HttpHeaders headers = new HttpHeaders(); + headers.setBearerAuth(token); + restTemplate.exchange("/api/v1/orders/" + orderId + "/cancel", + HttpMethod.POST, new HttpEntity<>(headers), Map.class); } } ``` -**2. OutboxRetryTest.java** (신규) +**2. OutboxRetryTest.java** (신규, `com.coinflow.e2e`) ```java -// KafkaTemplate을 Mock으로 교체 → send() 예외 발생하도록 설정 -// outboxPublisher.publish() 호출 -// then: publish_attempts == 1, published == false -// 5회 반복 후 6번째 publish() 호출 → 해당 이벤트 미처리 확인 +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@Import(TestcontainersConfig.class) +@EmbeddedKafka( + partitions = 1, + topics = {"coinflow.order.events", "coinflow.trade.events"}, + bootstrapServersProperty = "spring.kafka.bootstrap-servers" +) +class OutboxRetryTest { + + @Autowired private TestRestTemplate restTemplate; + @Autowired private OutboxPublisher outboxPublisher; + @Autowired private DomainEventRepository domainEventRepository; + @Autowired private UserRepository userRepository; + @Autowired private WalletRepository walletRepository; + @Autowired private MatchingEngine matchingEngine; + + @MockBean + private KafkaTemplate kafkaTemplate; // Kafka send 실패 시뮬레이션 + + @BeforeEach + void setUp() { + matchingEngine.clearAll(); + when(kafkaTemplate.send(any(), any(), any())) + .thenThrow(new RuntimeException("Kafka unavailable")); + } + + // EVT-E2E-003 + @Test + void Kafka_발행_실패_시_publish_attempts_증가() { + String token = signupAndLogin("retry001@example.com"); + depositKrw("retry001@example.com", new BigDecimal("10000000")); + createOrder(token, "BTC-KRW", "BUY", "100000000", "0.0001"); + + DomainEvent before = domainEventRepository.findAll().stream() + .filter(e -> !e.isPublished()).findFirst().orElseThrow(); + + outboxPublisher.publish(); + + DomainEvent after = domainEventRepository.findById(before.getId()).orElseThrow(); + assertThat(after.isPublished()).isFalse(); + assertThat(after.getPublishAttempts()).isEqualTo(1); + } + + // EVT-E2E-004 + @Test + void publish_attempts_MAX_초과_시_폴링_제외() { + String token = signupAndLogin("retry002@example.com"); + depositKrw("retry002@example.com", new BigDecimal("10000000")); + createOrder(token, "BTC-KRW", "BUY", "100000000", "0.0001"); + + // MAX_ATTEMPTS번 실패시키기 + for (int i = 0; i < OutboxPublisher.MAX_ATTEMPTS; i++) { + outboxPublisher.publish(); + } + + long countBefore = domainEventRepository.findAll().stream() + .filter(e -> !e.isPublished()).count(); + + // MAX_ATTEMPTS+1번째 publish — 해당 이벤트는 폴링 대상에서 제외되어야 함 + reset(kafkaTemplate); // mock 초기화 (이제 send가 성공해도 호출 자체가 안 돼야 함) + outboxPublisher.publish(); + + long countAfter = domainEventRepository.findAll().stream() + .filter(e -> !e.isPublished()).count(); + + // 폴링 제외 → published가 변하지 않음 + assertThat(countAfter).isEqualTo(countBefore); + verify(kafkaTemplate, never()).send(any(), any(), any()); + } + + // ── helpers (KafkaPublishingTest와 동일) ──────────────────────────── + private String signupAndLogin(String email) { /* ... */ return null; } + private void depositKrw(String email, BigDecimal amount) { /* ... */ } + private ResponseEntity createOrder(String token, String market, String side, + String price, String quantity) { return null; } +} ``` -**3. WebSocketBroadcastTest.java** (신규) +**3. WebSocketBroadcastTest.java** (신규, `com.coinflow.e2e`) ```java -// WebSocketStompClient 생성 -// /ws 연결 → /topic/trades/BTC-KRW 구독 -// 주문 체결 API 호출 -// CompletableFuture.get(5, SECONDS) 대기 -// price, quantity assert +@SuppressWarnings({"rawtypes", "unchecked"}) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@Import(TestcontainersConfig.class) +@EmbeddedKafka( + partitions = 1, + topics = {"coinflow.order.events", "coinflow.trade.events"}, + bootstrapServersProperty = "spring.kafka.bootstrap-servers" +) +class WebSocketBroadcastTest { + + @Autowired private TestRestTemplate restTemplate; + @Autowired private OutboxPublisher outboxPublisher; + @Autowired private UserRepository userRepository; + @Autowired private WalletRepository walletRepository; + @Autowired private MatchingEngine matchingEngine; + + @LocalServerPort + private int port; + + @BeforeEach + void setUp() { + matchingEngine.clearAll(); + } + + // WS-E2E-001 + @Test + void 주문_체결_시_WebSocket_TradeMessage_수신() throws Exception { + // STOMP 클라이언트 연결 및 구독 + CompletableFuture received = new CompletableFuture<>(); + + WebSocketStompClient stompClient = new WebSocketStompClient( + new SockJsClient(List.of(new WebSocketTransport(new StandardWebSocketClient()))) + ); + stompClient.setMessageConverter(new MappingJackson2MessageConverter()); + + StompSession session = stompClient + .connect("ws://localhost:" + port + "/ws", new StompSessionHandlerAdapter() {}) + .get(5, TimeUnit.SECONDS); + + session.subscribe("/topic/trades/BTC-KRW", new StompFrameHandler() { + @Override + public Type getPayloadType(StompHeaders headers) { return Map.class; } + + @Override + public void handleFrame(StompHeaders headers, Object payload) { + received.complete((Map) payload); + } + }); + + // 주문 체결 후 OutboxPublisher 트리거 + String buyerToken = signupAndLogin("ws001a@example.com"); + String sellerToken = signupAndLogin("ws001b@example.com"); + depositKrw("ws001a@example.com", new BigDecimal("10000000")); + depositBtc("ws001b@example.com", new BigDecimal("0.001")); + createOrder(buyerToken, "BTC-KRW", "BUY", "100000000", "0.0001"); + createOrder(sellerToken, "BTC-KRW", "SELL", "100000000", "0.0001"); + outboxPublisher.publish(); + + // 5초 내 TradeMessage 수신 확인 + Map msg = received.get(5, TimeUnit.SECONDS); + assertThat(msg.get("market")).isEqualTo("BTC-KRW"); + assertThat(msg.get("price")).isEqualTo("100000000"); + assertThat(msg.get("quantity")).isEqualTo("0.0001"); + assertThat(msg.get("side")).isIn("BUY", "SELL"); + + session.disconnect(); + } + + // ── helpers ───────────────────────────────────────────────────────── + private String signupAndLogin(String email) { + restTemplate.postForEntity("/api/v1/auth/signup", + Map.of("email", email, "password", "password1234", "nickname", "tester"), Map.class); + var login = restTemplate.postForEntity("/api/v1/auth/login", + Map.of("email", email, "password", "password1234"), Map.class); + return (String) login.getBody().get("accessToken"); + } + + private void depositKrw(String email, BigDecimal amount) { deposit(email, "KRW", amount); } + private void depositBtc(String email, BigDecimal amount) { deposit(email, "BTC", amount); } + + private void deposit(String email, String asset, BigDecimal amount) { + var user = userRepository.findByEmail(email).orElseThrow(); + var wallet = walletRepository.findAllByUserId(user.getId()).stream() + .filter(w -> w.getAsset().equals(asset)).findFirst().orElseThrow(); + wallet.deposit(amount); + walletRepository.save(wallet); + } + + private void createOrder(String token, String market, String side, + String price, String quantity) { + HttpHeaders headers = new HttpHeaders(); + headers.setBearerAuth(token); + var body = Map.of("market", market, "side", side, "type", "LIMIT", + "timeInForce", "GTC", "price", price, "quantity", quantity); + restTemplate.exchange("/api/v1/orders", HttpMethod.POST, + new HttpEntity<>(body, headers), Map.class); + } +} ``` ### 변경 파일 요약 @@ -490,17 +899,23 @@ class KafkaPublishingTest { ``` feat/20/wallet PR 완료 ↓ -feat/19/kafka-setup ── docker-compose + build.gradle + KafkaTopicConfig - 검증: docker compose up → Topic 생성 확인 +feat/19/kafka-setup + └─ docker-compose + KafkaTopicConfig + application.properties + └─ 검증: docker compose up → Topic 목록 확인 ↓ -feat/20/outbox-publisher ── OutboxPublisher + Repository 쿼리 + @EnableScheduling - 검증: kafka-console-consumer에서 메시지 1~2초 내 수신 +feat/20/outbox-publisher + └─ DomainEvent 메서드 + Repository 쿼리 + OutboxPublisher + @EnableScheduling + └─ 검증: kafka-console-consumer에서 메시지 1~2초 내 수신 ↓ -feat/21/websocket ── WebSocketConfig + WebSocketBroadcaster + DTO - 검증: wscat 구독 후 주문 체결 → 메시지 수신 +feat/21/websocket + └─ WebSocketConfig + KafkaMessage + TradeMessage + OrderBookMessage + └─ WebSocketBroadcaster (buildTradeMessage, buildSnapshot 포함) + └─ MatchingEngine에 getBuySide/getSellSide 추가 + └─ 검증: wscat 구독 후 주문 체결 → TradeMessage 수신 ↓ -feat/22/e2e ── EmbeddedKafka + STOMP 클라이언트 테스트 - 검증: ./gradlew test 전체 통과 +feat/22/e2e + └─ KafkaPublishingTest + OutboxRetryTest + WebSocketBroadcastTest + └─ 검증: ./gradlew test 전체 통과 ``` --- @@ -509,10 +924,11 @@ feat/22/e2e ── EmbeddedKafka + STOMP 클라이언트 테스트 | 예상 질문 | 답변 포인트 | |-----------|------------| -| 왜 Kafka를 썼나? | 체결 이벤트 하나를 여러 소비자(WebSocket, 통계, 리스크)가 독립적으로 처리해야 해서. OrderService가 소비자를 알 필요 없음 | -| Outbox Pattern이 뭔가? | DB 트랜잭션과 Kafka 발행은 하나로 묶을 수 없음. 이벤트를 DB에 먼저 저장하고 별도 프로세스가 폴링해서 발행 → 이벤트 유실 0 | -| 왜 동기 send인가? | `.get()`으로 ACK 확인 후 `published=true`. 비동기면 Kafka 실패를 감지 못하고 이벤트가 유실될 수 있음 | -| 중복 발행은 어떻게 처리하나? | at-least-once를 감수. WebSocket broadcast는 중복이 UX에만 영향. DB 소비자 추가 시 `eventId` 기준 dedup | -| 파티션 키를 왜 marketSymbol로? | BTC-KRW 이벤트는 항상 같은 파티션 → Consumer가 시장 단위 순서를 보장받음 | -| 오더북을 왜 스냅샷으로? | delta는 클라이언트가 로컬 상태를 유지해야 하고, 패킷 유실 시 불일치. 스냅샷은 항상 최신 상태 보장 | -| polling의 한계는? | 최대 1초 딜레이. Debezium CDC로 전환하면 binlog 기반 실시간 감지, 수십 ms로 줄어듦 | +| 왜 Kafka를 썼나? | 체결 이벤트 하나를 WebSocket, 통계, 리스크 등 여러 소비자가 독립적으로 처리해야 해서. OrderService가 소비자를 알 필요 없음 | +| Outbox Pattern이 뭔가? | DB 트랜잭션과 Kafka 발행은 하나로 묶을 수 없음. 이벤트를 DB에 먼저 저장(Phase 1)하고 별도 스케줄러가 폴링해서 발행 → 이벤트 유실 0 | +| 왜 동기 send인가? | `.get()`으로 Kafka ACK 확인 후 `published=true`. 비동기면 Kafka 실패를 감지 못하고 이벤트가 유실될 수 있음 | +| at-least-once 중복은 어떻게? | WebSocket broadcast는 중복 수신이 UX에만 영향. DB Consumer 추가 시 `processed_events` 테이블로 `event.id` 기준 dedup | +| 파티션 키를 왜 marketSymbol로? | 같은 시장 이벤트가 항상 같은 파티션 → Consumer가 시장 단위 순서 보장. BTC-KRW와 ETH-KRW는 다른 파티션에서 병렬 처리 | +| 오더북을 왜 스냅샷으로? | delta는 클라이언트가 로컬 상태를 유지해야 하고 패킷 유실 시 불일치. 스냅샷은 항상 서버 현재 상태를 보장. Phase 3에서 delta + sequence로 최적화 예정 | +| polling의 한계는? | 최대 1초 딜레이. Debezium CDC로 전환하면 MySQL binlog 기반 실시간 감지, 수십 ms로 감소 | +| 테스트는 어떻게 했나? | MySQL은 Testcontainer, Kafka는 EmbeddedKafka. Docker 없이 CI에서 전 구간 자동 검증 | From e3f2419086f9564e7e0a7f060413f4c7cd91d53a Mon Sep 17 00:00:00 2001 From: ohhalim Date: Thu, 14 May 2026 15:55:48 +0900 Subject: [PATCH 7/7] =?UTF-8?q?docs(v2):=20=20issues=20=EB=AA=A9=EB=A1=9D?= =?UTF-8?q?=20=EC=84=A4=EC=A0=95=20=20=ED=96=A5=ED=9B=84=20=EB=A6=AC?= =?UTF-8?q?=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .docs/v2/ISSUES.md | 385 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 .docs/v2/ISSUES.md diff --git a/.docs/v2/ISSUES.md b/.docs/v2/ISSUES.md new file mode 100644 index 0000000..613fe79 --- /dev/null +++ b/.docs/v2/ISSUES.md @@ -0,0 +1,385 @@ +# CoinFlow Phase 1 Known Issues & Phase 2 개선 사항 + +Phase 1 코드 리뷰에서 발견된 이슈를 심각도 순으로 정리한다. +Phase 2 구현 전에 반드시 수정해야 하는 항목은 **[BLOCKER]**, 개선 권장은 **[IMPROVE]** 로 표시한다. + +--- + +## Priority 1 — 오더북 Rollback 불일치 [BLOCKER] + +### 현상 + +`OrderService.createOrder()`는 `transactionTemplate.execute()` 블록 안에서 +`matchingEngine.match()`를 호출해 in-memory 오더북을 즉시 변경한다. + +``` +// OrderService.java:120 +transactionTemplate.execute(status -> { + ... + List matchResults = matchingEngine.match(market, order); // 오더북 변경 + List trades = settle(market, order, matchResults); // DB 저장 + ... +}); +``` + +`settle()` 도중 예외가 발생하면 DB는 롤백되지만 +`MemoryOrderBook`(PriorityQueue)은 이미 변경된 상태로 남는다. + +### 원인 + +`MemoryOrderBook.match()` ([MemoryOrderBook.java:33](../../../src/main/java/com/coinflow/order/matching/MemoryOrderBook.java))는 +결과를 계산하는 동시에 `makerQueue.poll()` / `makerQueue.add()` 로 큐를 직접 변경한다. +DB 트랜잭션 롤백이 in-memory 상태를 되돌리지 못한다. + +### 해결 방향 — 계획/반영 2단계 분리 + +`match()`를 두 단계로 쪼개야 한다. + +**1단계 — `planMatch()`: 큐 변경 없이 결과만 계산** + +```java +// MemoryOrderBook에 추가 +public List planMatch(Order taker) { + // makerQueue를 peek만 하고 poll/add하지 않는다 + // 결과 MatchResult 리스트만 반환 +} +``` + +**2단계 — `applyMatchPlan()`: commit 이후 큐 반영** + +```java +// OrderService.createOrder() 안에서 +List plan = matchingEngine.planMatch(market, order); + +transactionTemplate.execute(status -> { + List trades = settle(market, order, plan); // DB 저장 + ... +}); + +// 트랜잭션 커밋 성공 후 +matchingEngine.applyMatchPlan(market, order, plan); // 오더북 반영 +``` + +`TransactionSynchronizationManager.registerSynchronization()` 의 `afterCommit()` 훅으로 +`applyMatchPlan()` 호출을 트랜잭션 커밋 이후로 보장한다. + +> **주의**: `afterCommit()` 안에서 예외가 발생해도 트랜잭션은 이미 커밋 완료 상태다. +> 오더북 반영 실패 시 서버를 재시작하거나 DB 체결 내역에서 오더북을 재구성해야 한다. +> 로그/알림으로 즉시 감지할 수 있도록 처리해야 한다. + +--- + +## Priority 2 — PriorityQueue 동시 읽기 위험 [BLOCKER] + +### 현상 + +`MemoryOrderBook`의 `getBuySide()` / `getSellSide()` ([MemoryOrderBook.java:113-125](../../../src/main/java/com/coinflow/order/matching/MemoryOrderBook.java))는 +`buyQueue` / `sellQueue`를 `stream()`으로 순회한다. + +```java +public List getBuySide() { + return buyQueue.stream() // PriorityQueue iteration — not thread-safe + .sorted(...) + .toList(); +} +``` + +`createOrder()` → `match()` 가 큐를 `poll/add` 하는 동안 +REST 오더북 조회(`GET /api/v1/markets/{symbol}/orderbook`)가 `getBuySide()`를 호출하면 +`ConcurrentModificationException` 이 발생한다. + +### 원인 — lock 소유자는 이미 OrderService + +`OrderService`에는 이미 시장 단위 `ReentrantLock` ([OrderService.java:57](../../../src/main/java/com/coinflow/order/service/OrderService.java))이 있다. +`match()` 는 이 lock 안에서 호출되므로 동시 체결 사이의 충돌은 없다. + +문제는 스냅샷 조회 경로다. +오더북 REST 엔드포인트는 `marketLock`을 잡지 않은 채 `getBuySide()`를 호출한다. + +### 해결 방향 + +lock 소유자를 `OrderService` 하나로 유지하고, +`getBuySide()` / `getSellSide()` 호출도 같은 `marketLock` 안에서 실행한다. + +```java +// MarketController 또는 MarketService에서 오더북 조회 시 +ReentrantLock lock = orderService.getMarketLock(marketId); +lock.lock(); +try { + List buySide = matchingEngine.getBuySide(symbol); + List sellSide = matchingEngine.getSellSide(symbol); + ... +} finally { + lock.unlock(); +} +``` + +`MatchingEngine` 에 lock을 또 추가하면 lock 소유자가 두 군데가 되어 +책임과 순서가 흩어진다. 기존 `marketLock` 을 재사용하는 방향이 맞다. + +--- + +## Priority 3 — Maker Order Row Lock 누락 [BLOCKER] + +### 현상 + +`settle()` ([OrderService.java:227](../../../src/main/java/com/coinflow/order/service/OrderService.java))은 maker 주문을 아래와 같이 조회한다. + +```java +Order maker = orderRepository.findById(result.makerOrderId()).orElseThrow(); +``` + +`findById()`는 `SELECT ... FOR UPDATE` 없는 일반 조회다. + +### 실제 위험 범위 + +현재 `createOrder()` / `cancelOrder()` 는 모두 시장 단위 `marketLock` 안에서 실행되므로 +같은 시장 내 동시 접근은 이미 직렬화된다. 즉 현재 구현에서 race가 터지진 않는다. + +그러나 **방어 심층(defense in depth)** 관점에서 문제가 있다. + +- 향후 "사용자 전체 주문 일괄 취소" 같은 API가 시장 lock을 우회하면 즉시 위험해진다. +- maker 주문의 상태(`OPEN` → `FILLED`) 전환 전에 취소 요청이 끼어들 경우, `isCancelable()` 재검증이 없으면 이미 체결된 주문이 취소될 수 있다. + +### 해결 방향 + +```java +// findById → SELECT FOR UPDATE 로 교체 +Order maker = orderRepository.findByIdWithLock(result.makerOrderId()).orElseThrow(); + +// fill 이전에 상태 재검증 +if (!maker.isOpen() && !maker.isPartiallyFilled()) { + throw new ApiException(ErrorCode.ORDER_NOT_FOUND); +} +``` + +wallet의 `findByUserIdAndAssetWithLock()` 와 동일한 패턴으로 통일한다. + +--- + +## Priority 4 — DepositRequest 검증 없음 [BLOCKER] + +### 현상 + +```java +// DepositRequest.java +public record DepositRequest( + String asset, // @NotNull 없음 + String amount // @NotNull 없음, String 타입 +) {} +``` + +`asset` 또는 `amount` 가 null / 빈 문자열로 들어오면 +서비스 레이어에서 `new BigDecimal(amount)` 호출 시 `NumberFormatException` 또는 NPE 발생. + +### 해결 + +```java +public record DepositRequest( + @NotBlank String asset, + @NotBlank + @Pattern(regexp = "^\\d+(\\.\\d+)?$", message = "amount must be a positive number") + String amount +) {} +``` + +서비스에서 `BigDecimal` 변환 시 이미 검증된 문자열이 들어오도록 보장한다. + +### 관련 — 입금 API 범위 + +`WalletController.deposit()`이 지갑 잔고를 직접 증가시킨다. +실제 거래소에서 입금은 외부 블록체인 확인 이벤트로 트리거되어야 한다. +Phase 1 MVP 범위로는 허용하지만, 이후 외부 연동 시 `DepositConfirmationService` 로 분리한다. + +--- + +## Priority 5 — Outbox 보강 [IMPROVE] + +### 5-1. send().get() Timeout 없음 + +```java +// 현재 — Kafka 장애 시 무한 대기 +kafkaTemplate.send(topic, key, value).get(); + +// 수정 +kafkaTemplate.send(topic, key, value).get(5, TimeUnit.SECONDS); +``` + +`TimeoutException` / `ExecutionException` 을 잡아 `publish_attempts` 를 증가시킨다. + +### 5-2. Domain Event 페이로드에 스키마/버전 없음 + +```json +// 현재 +{"orderId": 1, "status": "FILLED"} + +// 개선 — envelope 구조 +{ + "schemaVersion": "1.0", + "occurredAt": "2024-01-15T10:00:00Z", + "payload": {"orderId": 1, "status": "FILLED"} +} +``` + +`occurredAt` 을 DB 저장 시각과 분리하면 감사 추적이 정확해진다. +`schemaVersion` 으로 Consumer가 역직렬화 전략을 선택할 수 있다. + +### 5-3. Outbox 발행 순서 + +`@Scheduled` polling 쿼리에 `ORDER BY id ASC` 가 없으면 +DB insert 순서와 다른 순서로 이벤트가 발행될 수 있다. + +**Kafka partition key(marketSymbol)** 는 producer 발행 이후 파티션 내 순서만 보장한다. +DB polling 조회 순서와는 무관하다. `ORDER BY id ASC` 는 polling 쿼리에 반드시 추가한다. + +멀티 publisher 환경까지 고려하면 `SELECT ... SKIP LOCKED` 로 row claim을 구현해야 한다. + +### 5-4. 중복 Consumer 처리 (dedup) + +ACK 수신 후 `markPublished()` 커밋 전에 서버가 죽으면 같은 이벤트가 재발행된다. +WebSocket broadcast는 UX 허용 범위지만 +DB에 결과를 저장하는 Consumer를 추가할 때는 `processed_events` 로 dedup을 구현한다. + +```sql +CREATE TABLE processed_events ( + event_id BIGINT NOT NULL, + consumer VARCHAR(60) NOT NULL, + processed_at DATETIME(6), + PRIMARY KEY (event_id, consumer) +); +``` + +### 5-5. WebSocket broadcast payload에 dedup key 없음 + +체결 피드 메시지에 `tradeId` 또는 `eventId` 가 없으면 +클라이언트가 중복 수신 여부를 판별할 수 없다. + +```json +{ + "tradeId": 42, + "eventId": "evt-123", + "price": "50000", + "quantity": "0.01", + "side": "BUY", + "tradedAt": "2024-01-15T10:00:00Z" +} +``` + +--- + +## Priority 6 — 조회 API Pagination / Limit [IMPROVE] + +### 6-1. 주문 목록 조회 — limit 없음 + +```java +// OrderService.java:220 +orderRepository.findAllByUserIdOrderByCreatedAtDesc(currentUserId); +``` + +전체 반환이므로 주문이 많아지면 OOM 또는 느린 응답이 발생한다. + +**해결**: + +``` +GET /api/v1/orders?status=OPEN&limit=50&offset=0 +``` + +기본값 `limit=50`, 최대값 `limit=200` 을 Bean Validation으로 강제한다. + +### 6-2. 체결 내역 조회 — cursor 없음 + +``` +GET /api/v1/orders/{orderId}/fills +``` + +실시간 거래가 활발한 환경에서 페이지 이동 중 데이터가 밀린다. +`lastFillId` 를 커서로 사용하는 keyset pagination으로 전환한다. + +``` +GET /api/v1/orders/{orderId}/fills?lastFillId=0&limit=50 +``` + +### 6-3. Swagger 문서와 구현 불일치 + +`limit`, `offset`, `lastFillId` 파라미터가 OpenAPI 명세에 없다. +구현과 동시에 `@Parameter` 애노테이션으로 문서를 업데이트한다. + +--- + +## Priority 7 — 테스트 격리 [IMPROVE] + +### 현상 + +통합 테스트에서 `@BeforeEach` 로 `matchingEngine.clearAll()` 을 호출하면 +**in-memory 오더북만 초기화**되고 DB 체결 데이터는 남는다. + +```java +// MatchingEngine.java:63 +public void clearAll() { + orderBooks.clear(); // DB를 비우는 게 아니다 +} +``` + +결과적으로 DB에 이전 테스트 체결 내역이 누적된 채로 오더북만 빈 상태가 되어 +"재시작 후 복구" 시나리오와 다른 비현실적인 상태에서 테스트가 진행된다. + +### 해결 + +**매칭/정산 E2E 테스트**: DB cleanup 순서를 외래 키 의존 순으로 명시한다. + +```java +@BeforeEach +void setUp() { + tradeRepository.deleteAll(); + walletLedgerRepository.deleteAll(); + orderRepository.deleteAll(); + walletRepository.deleteAll(); + userRepository.deleteAll(); + matchingEngine.clearAll(); // DB 정리 후 오더북 초기화 +} +``` + +**단위 테스트**: `@Transactional` 으로 감싸 자동 롤백한다. + +--- + +## Priority 8 — docker-compose MySQL 패스워드 불일치 [IMPROVE] + +### 현상 + +`docker-compose.yml` 과 `application.properties` 의 MySQL 패스워드가 다르면 +`docker compose up` 후 Spring Boot 기동 시 DB 연결 실패. + +### 해결 + +두 파일 모두 동일한 환경변수 참조로 통일한다. + +```yaml +# docker-compose.yml +environment: + MYSQL_PASSWORD: coinflow +``` + +```properties +# application.properties +spring.datasource.password=${DB_PASSWORD:coinflow} +``` + +또는 docker-compose.yml 에서 `.env` 파일로 값을 관리한다. + +--- + +## 요약 표 + +| # | 항목 | 심각도 | 브랜치 | +|---|------|--------|--------| +| 1 | 오더북 rollback 불일치 (planMatch/applyMatchPlan) | BLOCKER | feat/phase1-fix | +| 2 | getBuySide/getSellSide 동시 읽기 (marketLock 재사용) | BLOCKER | feat/phase1-fix | +| 3 | Maker order row lock 누락 (findByIdWithLock) | BLOCKER | feat/phase1-fix | +| 4 | DepositRequest 검증 없음 (@NotBlank) | BLOCKER | feat/phase1-fix | +| 5 | Outbox timeout / schema / 순서 / dedup / broadcast key | IMPROVE | feat/20/outbox-publisher | +| 6 | 조회 API pagination / Swagger 불일치 | IMPROVE | feat/phase1-fix | +| 7 | 테스트 격리 (clearAll = in-memory만 초기화) | IMPROVE | feat/phase1-fix | +| 8 | docker-compose MySQL 패스워드 불일치 | IMPROVE | feat/19/kafka-setup | + +BLOCKER 항목은 Phase 2 시작 전에 수정을 완료하고, IMPROVE 항목은 해당 이슈 브랜치에서 병행 처리한다.