Skip to content

Commit ebc1826

Browse files
committed
Reuse AbortedTransaction struct from FetchResponse
1 parent 0a7a7fc commit ebc1826

1 file changed

Lines changed: 2 additions & 3 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ class PartitionRecords(object):
910910
def __init__(self, fetch_offset, tp, records,
911911
key_deserializer=None, value_deserializer=None,
912912
check_crcs=True, isolation_level=READ_UNCOMMITTED,
913-
aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples
913+
aborted_transactions=None, # AbortedTransaction data from FetchResponse
914914
metric_aggregator=None, on_drain=lambda x: None):
915915
self.fetch_offset = fetch_offset
916916
self.topic_partition = tp
@@ -921,8 +921,7 @@ def __init__(self, fetch_offset, tp, records,
921921
self.isolation_level = isolation_level
922922
self.aborted_producer_ids = set()
923923
self.aborted_transactions = collections.deque(
924-
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
925-
key=lambda txn: txn.first_offset)
924+
sorted(aborted_transactions or [], key=lambda txn: txn.first_offset)
926925
)
927926
self.metric_aggregator = metric_aggregator
928927
self.check_crcs = check_crcs

0 commit comments

Comments
 (0)