Apache Kafka 토픽 간에 메시지를 재생(replay)하는 도구입니다. 소스 토픽의 메시지를 읽어서 타겟 토픽으로 복사하는 기능을 제공하며, 다양한 재생 옵션을 지원합니다.
- 토픽 간 메시지 복사: 소스 토픽의 메시지를 타겟 토픽으로 재생
- 다양한 Consumer 모드: Subscribe 모드와 Assign 모드 지원
- Offset 기반 재생: 특정 오프셋부터 메시지 재생
- Timestamp 기반 재생: 특정 시간 이후의 메시지 재생
- 메시지 수 제한: 최대 재생할 메시지 수 제한
- 처리 지연 설정: 메시지 처리 간 지연 시간 설정
- 자동 타임아웃: 설정된 시간 동안 메시지가 없으면 자동 종료
- 클러스터 간 복제: 서로 다른 Kafka 클러스터 간 메시지 복제 지원
- Java 17 이상
- Apache Kafka 3.7.0
- Gradle 7.x 이상
./gradlew build./gradlew jarapplication.properties 파일에 다음과 같이 설정합니다:
# 필수 설정
kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=source-topic
kafka.target.topic=target-topic
# Consumer 설정
kafka.group.id=replay-consumer-group
# 선택적 설정
kafka.partition=0 # 특정 파티션 지정 (생략시 subscribe 모드)
kafka.offset=100 # 시작 오프셋 (partition과 함께 사용)
kafka.timestamp=2024-01-01T00:00:00Z # 시작 타임스탬프 (ISO-8601 형식)
kafka.max.messages=1000 # 최대 재생 메시지 수
kafka.processing.delay=100 # 메시지 처리 간 지연 시간 (ms)
kafka.consumer.timeout=10 # Consumer 타임아웃 (초)| 설정 키 | 필수 | 기본값 | 설명 |
|---|---|---|---|
kafka.source.bootstrap.servers |
✅ | - | 소스 Kafka 클러스터 주소 |
kafka.target.bootstrap.servers |
✅ | - | 타겟 Kafka 클러스터 주소 |
kafka.source.topic |
✅ | - | 소스 토픽명 |
kafka.target.topic |
✅ | - | 타겟 토픽명 |
kafka.group.id |
⚪ | replay-consumer-group |
Consumer 그룹 ID |
kafka.partition |
⚪ | - | 특정 파티션 번호 (생략시 전체 토픽 구독) |
kafka.offset |
⚪ | - | 시작 오프셋 (partition과 함께 사용) |
kafka.timestamp |
⚪ | - | 시작 타임스탬프 (ISO-8601 형식) |
kafka.max.messages |
⚪ | - | 최대 재생 메시지 수 |
kafka.processing.delay |
⚪ | 0 |
메시지 처리 간 지연 시간 (밀리초) |
kafka.consumer.timeout |
⚪ | 10 |
Consumer 타임아웃 (초) |
java -jar kafka-replayer.jar config.propertieskafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=orders
kafka.target.topic=orders-copy
kafka.group.id=replay-groupkafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=events
kafka.target.topic=events-backup
kafka.partition=0
kafka.offset=1000kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=logs
kafka.target.topic=logs-archive
kafka.timestamp=2024-01-01T00:00:00Z
kafka.max.messages=5000kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=transactions
kafka.target.topic=transactions-replay
kafka.processing.delay=500
kafka.max.messages=100- KafkaRePlayer: 메인 애플리케이션 클래스
- ConfigLoader: 설정 파일 로딩 및 검증
- ReplayConfig: 재생 설정을 담는 설정 객체 (Record 타입)
- KafkaConsumerManager: Kafka Consumer 생성 및 설정
- KafkaProducerManager: Kafka Producer 생성 및 설정
- MessageReplayService: 메시지 재생 핵심 로직
1. 설정 파일 로드 (ConfigLoader)
2. Kafka Consumer/Producer 생성 (Manager 클래스들)
3. Consumer 모드 설정 (Subscribe/Assign)
4. 메시지 폴링 및 재생 (MessageReplayService)
5. 통계 정보 출력 및 종료
kafka.partition이 설정되지 않은 경우- 전체 토픽을 구독하여 메시지 수신
- Consumer Group을 사용한 로드 밸런싱
kafka.partition이 설정된 경우- 특정 파티션만 할당하여 메시지 수신
- 오프셋 또는 타임스탬프 기반 시작 위치 설정 가능
kafka.partition=0
# kafka.group.id 설정하지 않음 (선택적)재생 완료 후 다음 통계 정보가 출력됩니다:
- 처리된 메시지 수
- 총 소요 시간
- 초당 처리량 (TPS)
src/
├── main/java/kr/geun/oss/replayer/
│ ├── KafkaRePlayer.java # 메인 애플리케이션
│ ├── MessageReplayService.java # 메시지 재생 로직
│ ├── config/
│ │ ├── ConfigLoader.java # 설정 로더
│ │ └── ReplayConfig.java # 설정 데이터 클래스
│ ├── consumer/
│ │ └── KafkaConsumerManager.java # Kafka Consumer 관리
│ └── producer/
│ └── KafkaProducerManager.java # Kafka Producer 관리
└── test/ # 단위 테스트
./gradlew test./gradlew test jacocoTestReport
open build/reports/jacoco/test/html/index.html- 단위 테스트: 각 컴포넌트의 개별 기능 테스트
- 설정 테스트: ConfigLoader, ReplayConfig 검증
- Manager 테스트: Consumer/Producer 관리자 기능 테스트
- 서비스 테스트: MessageReplayService 로직 테스트 (모킹 사용)
특정 토픽의 모든 메시지를 다른 토픽으로 복사:
kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=orders
kafka.target.topic=orders-backup
kafka.group.id=backup-group오프셋 100부터 1000개의 메시지만 재생:
kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=events
kafka.target.topic=events-replay
kafka.partition=0
kafka.offset=100
kafka.max.messages=10002024년 1월 1일 이후의 메시지를 재생:
kafka.source.bootstrap.servers=localhost:9092
kafka.target.bootstrap.servers=localhost:9092
kafka.source.topic=logs
kafka.target.topic=logs-filtered
kafka.group.id=filter-group
kafka.timestamp=2024-01-01T00:00:00Z서로 다른 Kafka 클러스터 간 메시지 복제:
kafka.source.bootstrap.servers=source-kafka:9092
kafka.target.bootstrap.servers=target-kafka:9092
kafka.source.topic=production-data
kafka.target.topic=staging-data
kafka.group.id=cross-cluster-group
kafka.processing.delay=50- Gradle: 빌드 도구
- Dependencies: Kafka Client 3.7.0, SLF4J
- Java 17+ Record 패턴 사용
- 함수형 프로그래밍 스타일 선호
- 상세한 로깅과 에러 처리
- 단위 테스트: Mockito를 사용한 격리된 테스트
- 통합 테스트 제외: 실제 Kafka 의존성 없는 테스트
- 설정 검증: 다양한 설정 시나리오 테스트
- 메시지 순서: 파티션 내에서는 순서가 보장되지만, 파티션 간에는 보장되지 않습니다.
- 중복 처리: 재시작 시 중복 메시지가 발생할 수 있습니다.
- 메모리 사용량: 대용량 메시지 처리 시 메모리 사용량을 모니터링하세요.
- 네트워크: 클러스터 간 복제 시 네트워크 지연을 고려하세요.
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls
- bootstrap.servers 설정을 확인하세요
- Kafka 브로커가 실행 중인지 확인하세요
org.apache.kafka.common.errors.UnknownTopicOrPartitionException
- 소스/타겟 토픽이 존재하는지 확인하세요
- auto.create.topics.enable 설정을 확인하세요
org.apache.kafka.common.errors.TopicAuthorizationException
- Consumer/Producer 권한을 확인하세요
- ACL 설정을 확인하세요