From 3a7123ee0983e200550df6b6f1cb502d3410a094 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 16 Mar 2026 19:28:32 -0700 Subject: [PATCH 1/7] kafka.client_async -> kafka.protocol.new (MetadataRequest) --- kafka/client_async.py | 2 +- test/test_client_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 9a89ba20c..8553f2d8d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -16,7 +16,7 @@ from kafka.metrics.stats import Avg, Count, Rate from kafka.metrics.stats.rate import TimeUnit from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.new.metadata import MetadataRequest from kafka.util import Dict, Timer, WeakMethod, ensure_valid_topic_name from kafka.version import __version__ diff --git a/test/test_client_async.py b/test/test_client_async.py index f37906074..4223dfc67 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -9,7 +9,7 @@ from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.new.metadata import MetadataRequest from kafka.protocol.produce import ProduceRequest from kafka.structs import BrokerMetadata From fcab43476ba9e17b5bef2a949ba4e3f4358fdc30 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 16 Mar 2026 20:36:05 -0700 Subject: [PATCH 2/7] kafka.conn -> new ApiVersionsRequest --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index ed11e7099..1d71d2e7c 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -11,7 +11,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.api_versions import ApiVersionsRequest +from kafka.protocol.new.metadata import ApiVersionsRequest from kafka.protocol.broker_api_versions import ( BROKER_API_VERSIONS, VERSION_CHECKS, infer_broker_version_from_api_versions, From ce318a9f35c609d7c6b48e3d77d56ad8e050d360 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 16 Mar 2026 20:43:29 -0700 Subject: [PATCH 3/7] kafka.conn -> new SaslAuthenticate/SaslHandshake --- kafka/conn.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1d71d2e7c..6939488cd 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,8 +17,7 @@ infer_broker_version_from_api_versions, ) from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest -from kafka.protocol.sasl_handshake import SaslHandshakeRequest +from kafka.protocol.new.sasl import SaslAuthenticateRequest, SaslHandshakeRequest from kafka.protocol.types import Int32 from kafka.sasl import get_sasl_mechanism from kafka.socks5_wrapper import Socks5Wrapper From 73657b425aa6da359ea2b2eaf4a1d7393eaef4f5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:35:33 -0700 Subject: [PATCH 4/7] test_conn --- test/test_conn.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_conn.py b/test/test_conn.py index eb90ba5ed..146e3de80 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -12,9 +12,9 @@ from kafka.metrics.metrics import Metrics from kafka.metrics.stats.sensor import Sensor from kafka.protocol.api import RequestHeader -from kafka.protocol.group import HeartbeatResponse -from kafka.protocol.metadata import MetadataRequest -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.consumer import HeartbeatResponse +from kafka.protocol.new.metadata import MetadataRequest +from kafka.protocol.new.producer import ProduceRequest import kafka.errors as Errors From 6bf9021a29745236945569f77bd289f6d3b2cb02 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:36:00 -0700 Subject: [PATCH 5/7] test_client_async --- test/test_client_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_client_async.py b/test/test_client_async.py index 4223dfc67..ec196eef3 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -10,7 +10,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.protocol.new.metadata import MetadataRequest -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.producer import ProduceRequest from kafka.structs import BrokerMetadata From 62da7fade868a60fa2e8d5d9f48aa869bbb143d6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:41:46 -0700 Subject: [PATCH 6/7] test/test_cluster --- test/test_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_cluster.py b/test/test_cluster.py index d1dfb9353..61dc7ee03 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -3,7 +3,7 @@ import socket from kafka.cluster import ClusterMetadata, collect_hosts -from kafka.protocol.metadata import MetadataResponse +from kafka.protocol.new.metadata import MetadataResponse def test_empty_broker_list(): From deae066d7983237d3a4d475b2eb5361bfe98faa9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:46:16 -0700 Subject: [PATCH 7/7] Drop RequestHeader from test_conn --- test/test_conn.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/test/test_conn.py b/test/test_conn.py index 146e3de80..27f8cbe5d 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -11,7 +11,6 @@ from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS from kafka.metrics.metrics import Metrics from kafka.metrics.stats.sensor import Sensor -from kafka.protocol.api import RequestHeader from kafka.protocol.new.consumer import HeartbeatResponse from kafka.protocol.new.metadata import MetadataRequest from kafka.protocol.new.producer import ProduceRequest @@ -201,8 +200,8 @@ def test_send_no_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = ProduceRequest[0](acks=0, timeout_ms=0, topic_data=()) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) third = payload_bytes // 3 remainder = payload_bytes % 3 _socket.send.side_effect = [4, third, third, third, remainder] @@ -218,8 +217,8 @@ def test_send_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = MetadataRequest[0]([]) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) third = payload_bytes // 3 remainder = payload_bytes % 3 _socket.send.side_effect = [4, third, third, third, remainder] @@ -237,11 +236,11 @@ def test_send_async_request_while_other_request_is_already_in_buffer(_socket, co bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent'] req1 = MetadataRequest[0](topics='foo') - header1 = RequestHeader(req1.API_KEY, req1.API_VERSION, 0, conn.config['client_id']) - payload_bytes1 = len(header1.encode()) + len(req1.encode()) + req1.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes1 = len(req1.encode(header=True, framed=False)) req2 = MetadataRequest[0]([]) - header2 = RequestHeader(req2.API_KEY, req2.API_VERSION, 0, conn.config['client_id']) - payload_bytes2 = len(header2.encode()) + len(req2.encode()) + req2.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes2 = len(req2.encode(header=True, framed=False)) # The first call to the socket will raise a transient SSL exception. This will make the first # request to be kept in the internal buffer to be sent in the next call of @@ -299,8 +298,8 @@ def test_recv_disconnected(_socket, conn): assert conn.connected() req = MetadataRequest[0]([]) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) _socket.send.side_effect = [4, payload_bytes] conn.send(req)