Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/main/java/com/coinflow/event/domain/DomainEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.coinflow.event.domain;

import jakarta.persistence.*;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

@Getter
@Entity
@NoArgsConstructor
@Table(name = "domain_events")
public class DomainEvent {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Enumerated(EnumType.STRING)
private DomainEventType eventType;

private String aggregateType;
private Long aggregateId;

private Long marketId;
private String marketSymbol;

private String payload;

private boolean published;
private LocalDateTime publishedAt;
private int publishAttempts;

private LocalDateTime createdAt;

public static DomainEvent create(
DomainEventType eventType,
String aggregateType,
Long aggregateId,
Long marketId,
String marketSymbol,
String payload
) {
DomainEvent event = new DomainEvent();
event.eventType = eventType;
event.aggregateType = aggregateType;
event.aggregateId = aggregateId;
event.marketId = marketId;
event.marketSymbol = marketSymbol;
event.payload = payload;
event.published = false;
event.publishAttempts = 0;
event.createdAt = LocalDateTime.now();
return event;
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/coinflow/event/domain/DomainEventType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.coinflow.event.domain;

public enum DomainEventType {
ORDER_ACCEPTED,
ORDER_PARTIALLY_FILLED,
ORDER_FILLED,
ORDER_CANCELED,
TRADE_CREATED,
SETTLEMENT_COMPLETED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.coinflow.event.repository;

import com.coinflow.event.domain.DomainEvent;
import com.coinflow.event.domain.DomainEventType;
import org.springframework.data.jpa.repository.JpaRepository;

import java.util.List;

public interface DomainEventRepository extends JpaRepository<DomainEvent, Long> {
List<DomainEvent> findAllByAggregateTypeAndAggregateId(String aggregateType, Long aggregateId);
List<DomainEvent> findAllByEventType(DomainEventType eventType);
}
102 changes: 102 additions & 0 deletions src/main/java/com/coinflow/event/service/DomainEventRecorder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.coinflow.event.service;

import com.coinflow.event.domain.DomainEvent;
import com.coinflow.event.domain.DomainEventType;
import com.coinflow.event.repository.DomainEventRepository;
import com.coinflow.order.domain.Order;
import com.coinflow.trade.domain.Trade;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@RequiredArgsConstructor
public class DomainEventRecorder {

private final DomainEventRepository domainEventRepository;
private final ObjectMapper objectMapper;

public void recordOrderAccepted(Order order) {
save(DomainEventType.ORDER_ACCEPTED, "ORDER", order.getId(),
order.getMarketId(), order.getMarketSymbol(),
Map.of(
"orderId", order.getId(),
"userId", order.getUserId(),
"market", order.getMarketSymbol(),
"side", order.getSide().name(),
"price", order.getPrice(),
"quantity", order.getOriginalQuantity()
));
}

public void recordOrderFillEvent(Order order, Long tradeId) {
DomainEventType type = (order.getRemainingQuantity().signum() == 0)
? DomainEventType.ORDER_FILLED
: DomainEventType.ORDER_PARTIALLY_FILLED;

save(type, "ORDER", order.getId(),
order.getMarketId(), order.getMarketSymbol(),
Map.of(
"orderId", order.getId(),
"tradeId", tradeId,
"executedQuantity", order.getExecutedQuantity(),
"remainingQuantity", order.getRemainingQuantity(),
"executedQuoteAmount", order.getExecutedQuoteAmount()
));
}

public void recordOrderCanceled(Order order, String releasedAsset, String releasedAmount) {
save(DomainEventType.ORDER_CANCELED, "ORDER", order.getId(),
order.getMarketId(), order.getMarketSymbol(),
Map.of(
"orderId", order.getId(),
"userId", order.getUserId(),
"releasedAsset", releasedAsset,
"releasedAmount", releasedAmount
));
}

public void recordTradeCreated(Trade trade) {
save(DomainEventType.TRADE_CREATED, "TRADE", trade.getId(),
trade.getMarketId(), trade.getMarketSymbol(),
Map.of(
"tradeId", trade.getId(),
"market", trade.getMarketSymbol(),
"buyOrderId", trade.getBuyOrderId(),
"sellOrderId", trade.getSellOrderId(),
"makerOrderId", trade.getMakerOrderId(),
"takerOrderId", trade.getTakerOrderId(),
"price", trade.getPrice(),
"quantity", trade.getQuantity(),
"quoteAmount", trade.getQuoteAmount()
));
}

public void recordSettlementCompleted(Trade trade) {
save(DomainEventType.SETTLEMENT_COMPLETED, "TRADE", trade.getId(),
trade.getMarketId(), trade.getMarketSymbol(),
Map.of(
"tradeId", trade.getId(),
"buyOrderId", trade.getBuyOrderId(),
"sellOrderId", trade.getSellOrderId(),
"price", trade.getPrice(),
"quantity", trade.getQuantity(),
"quoteAmount", trade.getQuoteAmount()
));
}

private void save(DomainEventType type, String aggregateType, Long aggregateId,
Long marketId, String marketSymbol, Map<String, Object> payload) {
try {
String json = objectMapper.writeValueAsString(payload);
domainEventRepository.save(
DomainEvent.create(type, aggregateType, aggregateId, marketId, marketSymbol, json)
);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize domain event payload: " + type, e);
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/coinflow/order/service/OrderService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.coinflow.order.dto.CreateOrderResponse;
import com.coinflow.order.dto.OrderDetailResponse;
import com.coinflow.order.dto.OrderSummaryResponse;
import com.coinflow.event.service.DomainEventRecorder;
import com.coinflow.order.matching.MatchResult;
import com.coinflow.order.matching.MatchingEngine;
import com.coinflow.order.repository.OrderRepository;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class OrderService {
private final TradeRepository tradeRepository;
private final WalletLedgerRepository walletLedgerRepository;
private final MatchingEngine matchingEngine;
private final DomainEventRecorder eventRecorder;
private final TransactionTemplate transactionTemplate;

private final Map<Long, ReentrantLock> marketLocks = new ConcurrentHashMap<>();
Expand All @@ -62,6 +64,7 @@ public OrderService(
TradeRepository tradeRepository,
WalletLedgerRepository walletLedgerRepository,
MatchingEngine matchingEngine,
DomainEventRecorder eventRecorder,
PlatformTransactionManager transactionManager
) {
this.marketRepository = marketRepository;
Expand All @@ -71,6 +74,7 @@ public OrderService(
this.tradeRepository = tradeRepository;
this.walletLedgerRepository = walletLedgerRepository;
this.matchingEngine = matchingEngine;
this.eventRecorder = eventRecorder;
this.transactionTemplate = new TransactionTemplate(transactionManager);
}

Expand Down Expand Up @@ -147,6 +151,7 @@ public CreateOrderResponse createOrder(Long currentUserId, CreateOrderRequest re
sequence, request.clientOrderId()
);
orderRepository.save(order);
eventRecorder.recordOrderAccepted(order);

// ORDER_LOCK ledger
walletLedgerRepository.save(WalletLedger.create(
Expand Down Expand Up @@ -195,6 +200,7 @@ public CancelOrderResponse cancelOrder(Long currentUserId, Long orderId) {

lockedOrder.cancel();
matchingEngine.cancelOrder(lockedOrder.getMarketSymbol(), lockedOrder);
eventRecorder.recordOrderCanceled(lockedOrder, lockedOrder.getLockedAsset(), releaseAmount.toPlainString());

return CancelOrderResponse.of(lockedOrder, lockedOrder.getLockedAsset(), releaseAmount.toPlainString());
});
Expand Down Expand Up @@ -265,6 +271,10 @@ private List<Trade> settle(Market market, Order taker, List<MatchResult> matchRe
Long sellOrderId = result.sellOrderId();
Long tradeId = trade.getId();

eventRecorder.recordOrderFillEvent(maker, tradeId);
eventRecorder.recordOrderFillEvent(taker, tradeId);
eventRecorder.recordTradeCreated(trade);

walletLedgerRepository.save(WalletLedger.create(
buyerQuoteWallet, LedgerType.TRADE_BUY_QUOTE_SETTLE,
buyerRefund, buyerReleased.negate(),
Expand All @@ -286,6 +296,7 @@ private List<Trade> settle(Market market, Order taker, List<MatchResult> matchRe
sellOrderId, tradeId
));

eventRecorder.recordSettlementCompleted(trade);
trades.add(trade);
}

Expand Down
Loading
Loading