From 49345402413ea39e4d99fb5a6bfd2845564031e7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 17 Mar 2026 15:20:02 -0700 Subject: [PATCH 1/4] kafka.consumer -> new FetchRequest/Response --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 250d20ea6..d8a14236c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -8,7 +8,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.fetch import FetchRequest, AbortedTransaction +from kafka.protocol.new.consumer import FetchRequest from kafka.protocol.list_offsets import ( ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) From 946298b74006c8b93254d56a0266c431916e8b4b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 17 Mar 2026 15:21:08 -0700 Subject: [PATCH 2/4] kafka.consumer -> new ListOffsetsRequest/Response --- kafka/consumer/fetcher.py | 2 +- kafka/consumer/group.py | 2 +- kafka/consumer/subscription_state.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d8a14236c..06841fbae 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -9,7 +9,7 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.new.consumer import FetchRequest -from kafka.protocol.list_offsets import ( +from kafka.protocol.new.consumer import ( ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) from kafka.record import MemoryRecords diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index e3c2672c3..a04b3e997 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -13,7 +13,7 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.metrics import MetricConfig, Metrics -from kafka.protocol.list_offsets import OffsetResetStrategy +from kafka.protocol.new.consumer import OffsetResetStrategy from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import Timer from kafka.version import __version__ diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 32a2ae4ce..f71a06ce2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -9,7 +9,7 @@ import time import kafka.errors as Errors -from kafka.protocol.list_offsets import OffsetResetStrategy +from kafka.protocol.new.consumer import OffsetResetStrategy from kafka.structs import OffsetAndMetadata from kafka.util import ensure_valid_topic_name, synchronized From 9843ca100222a47b40e5f59aca3931424bc7101c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 18:43:59 -0700 Subject: [PATCH 3/4] Reuse AbortedTransaction struct from FetchResponse --- kafka/consumer/fetcher.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 06841fbae..3afebb5e1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -910,7 +910,7 @@ class PartitionRecords(object): def __init__(self, fetch_offset, tp, records, key_deserializer=None, value_deserializer=None, check_crcs=True, isolation_level=READ_UNCOMMITTED, - aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples + aborted_transactions=None, # AbortedTransaction data from FetchResponse metric_aggregator=None, on_drain=lambda x: None): self.fetch_offset = fetch_offset self.topic_partition = tp @@ -921,8 +921,7 @@ def __init__(self, fetch_offset, tp, records, self.isolation_level = isolation_level self.aborted_producer_ids = set() self.aborted_transactions = collections.deque( - sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [], - key=lambda txn: txn.first_offset) + sorted(aborted_transactions or [], key=lambda txn: txn.first_offset) ) self.metric_aggregator = metric_aggregator self.check_crcs = check_crcs From cf4b88785dcd206a3f0305019d860fcd102a63fb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:49:20 -0700 Subject: [PATCH 4/4] test_fetcher --- test/test_fetcher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 9e4927c9c..539c1c410 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -14,8 +14,10 @@ import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy +from kafka.protocol.new.consumer import ( + FetchRequest, FetchResponse, + ListOffsetsResponse, OffsetResetStrategy, +) from kafka.errors import ( StaleMetadata, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError