diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 250d20ea6..3afebb5e1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -8,8 +8,8 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.fetch import FetchRequest, AbortedTransaction -from kafka.protocol.list_offsets import ( +from kafka.protocol.new.consumer import FetchRequest +from kafka.protocol.new.consumer import ( ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) from kafka.record import MemoryRecords @@ -910,7 +910,7 @@ class PartitionRecords(object): def __init__(self, fetch_offset, tp, records, key_deserializer=None, value_deserializer=None, check_crcs=True, isolation_level=READ_UNCOMMITTED, - aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples + aborted_transactions=None, # AbortedTransaction data from FetchResponse metric_aggregator=None, on_drain=lambda x: None): self.fetch_offset = fetch_offset self.topic_partition = tp @@ -921,8 +921,7 @@ def __init__(self, fetch_offset, tp, records, self.isolation_level = isolation_level self.aborted_producer_ids = set() self.aborted_transactions = collections.deque( - sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [], - key=lambda txn: txn.first_offset) + sorted(aborted_transactions or [], key=lambda txn: txn.first_offset) ) self.metric_aggregator = metric_aggregator self.check_crcs = check_crcs diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index e3c2672c3..a04b3e997 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -13,7 +13,7 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.metrics import MetricConfig, Metrics -from kafka.protocol.list_offsets import OffsetResetStrategy +from kafka.protocol.new.consumer import OffsetResetStrategy from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import Timer from kafka.version import __version__ diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 32a2ae4ce..f71a06ce2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -9,7 +9,7 @@ import time import kafka.errors as Errors -from kafka.protocol.list_offsets import OffsetResetStrategy +from kafka.protocol.new.consumer import OffsetResetStrategy from kafka.structs import OffsetAndMetadata from kafka.util import ensure_valid_topic_name, synchronized diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 9e4927c9c..539c1c410 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -14,8 +14,10 @@ import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy +from kafka.protocol.new.consumer import ( + FetchRequest, FetchResponse, + ListOffsetsResponse, OffsetResetStrategy, +) from kafka.errors import ( StaleMetadata, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError