@@ -57,8 +57,8 @@ public enum OffsetMode {
5757
5858 // Total messages consumed from the broker, including control messages.
5959 private long readSoFar = 0 ;
60- // Control messages encountered and intentionally skipped.
61- private long controlMessagesSkipped = 0 ;
60+ // Control and Global RT DIV messages encountered and intentionally skipped.
61+ private long messagesSkipped = 0 ;
6262 // Data records actually delivered to the caller via {@link #next()}.
6363 private long recordsDelivered = 0 ;
6464
@@ -164,9 +164,9 @@ public PubSubInputRecord next() throws IOException {
164164 readSoFar += 1 ;
165165
166166 KafkaKey messageKey = pubSubMessage .getKey ();
167- if (messageKey .isControlMessage ()) {
168- // Skip control messages and record the skip.
169- controlMessagesSkipped += 1 ;
167+ if (messageKey .isControlMessage () || messageKey . isGlobalRtDiv () ) {
168+ // Skip control / Global RT DIV messages and record the skip.
169+ messagesSkipped += 1 ;
170170 continue ;
171171 }
172172
@@ -193,7 +193,7 @@ public float getProgress() {
193193 readSoFar ,
194194 targetCount ,
195195 recordsDelivered ,
196- controlMessagesSkipped ,
196+ messagesSkipped ,
197197 String .format ("%.2f" , progress * 100 ));
198198 lastLoggedProgress = progress ;
199199 }
@@ -222,7 +222,7 @@ public void close() {
222222 topicPartition ,
223223 readSoFar ,
224224 recordsDelivered ,
225- controlMessagesSkipped );
225+ messagesSkipped );
226226 }
227227 }
228228
0 commit comments