From c3cabea11a8a668d2f5cfa563ceb215f2abb1078 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 20 Mar 2026 11:59:12 -0700 Subject: [PATCH 1/5] kafka.producer -> new protocol --- kafka/producer/sender.py | 3 +-- kafka/producer/transaction_manager.py | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index f1484b0ce..4f6909d94 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -9,8 +9,7 @@ from kafka.metrics.measurable import AnonMeasurable from kafka.metrics.stats import Avg, Max, Rate from kafka.producer.transaction_manager import ProducerIdAndEpoch -from kafka.protocol.init_producer_id import InitProducerIdRequest -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.producer import InitProducerIdRequest, ProduceRequest from kafka.structs import TopicPartition from kafka.version import __version__ diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 6ebf748e9..656966893 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -6,12 +6,11 @@ import threading import kafka.errors as Errors -from kafka.protocol.add_offsets_to_txn import AddOffsetsToTxnRequest -from kafka.protocol.add_partitions_to_txn import AddPartitionsToTxnRequest -from kafka.protocol.end_txn import EndTxnRequest -from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.init_producer_id import InitProducerIdRequest -from kafka.protocol.txn_offset_commit import TxnOffsetCommitRequest +from kafka.protocol.new.metadata import FindCoordinatorRequest +from kafka.protocol.new.producer import ( + AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, + EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest, +) from kafka.structs import TopicPartition From aab01d360f9a276e326c7e0cc30f534798e20fb5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 17:41:58 -0700 Subject: [PATCH 2/5] use new ProduceRequest in test_sender --- test/test_sender.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/test_sender.py b/test/test_sender.py index 08012adb3..a7c0020ae 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -13,7 +13,7 @@ import kafka.errors as Errors from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.producer.kafka import KafkaProducer -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.producer import ProduceRequest from kafka.producer.future import FutureRecordMetadata from kafka.producer.producer_batch import ProducerBatch from kafka.producer.record_accumulator import RecordAccumulator @@ -64,7 +64,8 @@ def test_produce_request(sender, api_version, produce_version): magic = KafkaProducer.max_usable_produce_magic(api_version) batch = producer_batch(magic=magic) produce_request = sender._produce_request(0, 0, 0, [batch]) - assert isinstance(produce_request, ProduceRequest[produce_version]) + assert isinstance(produce_request, ProduceRequest) + assert produce_request.version == produce_version @pytest.mark.parametrize(("api_version", "produce_version"), [ @@ -81,7 +82,8 @@ def test_create_produce_requests(sender, api_version, produce_version): produce_requests_by_node = sender._create_produce_requests(batches_by_node) assert len(produce_requests_by_node) == 3 for node in range(3): - assert isinstance(produce_requests_by_node[node], ProduceRequest[produce_version]) + assert isinstance(produce_requests_by_node[node], ProduceRequest) + assert produce_requests_by_node[node].version == produce_version def test_complete_batch_success(sender): From 84c9483021743c6221b2ccc107207af36eb140f5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 17:42:19 -0700 Subject: [PATCH 3/5] new FindCoordinatorRequest attrs in transaction manager --- kafka/producer/transaction_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 656966893..7a81d6431 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -755,8 +755,8 @@ def __init__(self, transaction_manager, coord_type, coord_key): else: raise ValueError("Unrecognized coordinator type: %s" % (coord_type,)) self.request = FindCoordinatorRequest[version]( - coordinator_key=coord_key, - coordinator_type=coord_type_int8, + key=coord_key, + key_type=coord_type_int8, ) @property From a17ae4f324a3319a060c0674764861d90f35dfe7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 17:45:39 -0700 Subject: [PATCH 4/5] fix transactional manager bug --- kafka/producer/transaction_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 7a81d6431..dd20cd9b4 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -941,7 +941,7 @@ def handle_response(self, response): log.debug("Successfully added offsets for %s from consumer group %s to transaction.", tp, self.consumer_group_id) del self.transaction_manager._pending_txn_offset_commits[tp] - elif error in (errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError): + elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError): retriable_failure = True lookup_coordinator = True elif error is Errors.UnknownTopicOrPartitionError: From 8e0484770511f3dc2751db528b6d1582bde9e063 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 18:08:59 -0700 Subject: [PATCH 5/5] send message to topic for auto-creation --- test/integration/test_producer_integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index c7e6ad4c9..94b3668ff 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -194,6 +194,7 @@ def test_transactional_producer_offsets(kafka_broker): with producer_factory(bootstrap_servers=connect_str, transactional_id='testing') as producer: producer.init_transactions() producer.begin_transaction() + producer.send('transactional_test_topic', partition=0, value=b'msg1').get() producer.send_offsets_to_transaction(offsets, 'txn-test-group') producer.commit_transaction()