diff --git a/kafka/client_async.py b/kafka/client_async.py index 9a89ba20c..8553f2d8d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -16,7 +16,7 @@ from kafka.metrics.stats import Avg, Count, Rate from kafka.metrics.stats.rate import TimeUnit from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.new.metadata import MetadataRequest from kafka.util import Dict, Timer, WeakMethod, ensure_valid_topic_name from kafka.version import __version__ diff --git a/kafka/conn.py b/kafka/conn.py index ed11e7099..6939488cd 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -11,14 +11,13 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.api_versions import ApiVersionsRequest +from kafka.protocol.new.metadata import ApiVersionsRequest from kafka.protocol.broker_api_versions import ( BROKER_API_VERSIONS, VERSION_CHECKS, infer_broker_version_from_api_versions, ) from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest -from kafka.protocol.sasl_handshake import SaslHandshakeRequest +from kafka.protocol.new.sasl import SaslAuthenticateRequest, SaslHandshakeRequest from kafka.protocol.types import Int32 from kafka.sasl import get_sasl_mechanism from kafka.socks5_wrapper import Socks5Wrapper diff --git a/test/test_client_async.py b/test/test_client_async.py index f37906074..ec196eef3 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -9,8 +9,8 @@ from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future -from kafka.protocol.metadata import MetadataRequest -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.metadata import MetadataRequest +from kafka.protocol.new.producer import ProduceRequest from kafka.structs import BrokerMetadata diff --git a/test/test_cluster.py b/test/test_cluster.py index d1dfb9353..61dc7ee03 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -3,7 +3,7 @@ import socket from kafka.cluster import ClusterMetadata, collect_hosts -from kafka.protocol.metadata import MetadataResponse +from kafka.protocol.new.metadata import MetadataResponse def test_empty_broker_list(): diff --git a/test/test_conn.py b/test/test_conn.py index eb90ba5ed..27f8cbe5d 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -11,10 +11,9 @@ from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS from kafka.metrics.metrics import Metrics from kafka.metrics.stats.sensor import Sensor -from kafka.protocol.api import RequestHeader -from kafka.protocol.group import HeartbeatResponse -from kafka.protocol.metadata import MetadataRequest -from kafka.protocol.produce import ProduceRequest +from kafka.protocol.new.consumer import HeartbeatResponse +from kafka.protocol.new.metadata import MetadataRequest +from kafka.protocol.new.producer import ProduceRequest import kafka.errors as Errors @@ -201,8 +200,8 @@ def test_send_no_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = ProduceRequest[0](acks=0, timeout_ms=0, topic_data=()) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) third = payload_bytes // 3 remainder = payload_bytes % 3 _socket.send.side_effect = [4, third, third, third, remainder] @@ -218,8 +217,8 @@ def test_send_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = MetadataRequest[0]([]) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) third = payload_bytes // 3 remainder = payload_bytes % 3 _socket.send.side_effect = [4, third, third, third, remainder] @@ -237,11 +236,11 @@ def test_send_async_request_while_other_request_is_already_in_buffer(_socket, co bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent'] req1 = MetadataRequest[0](topics='foo') - header1 = RequestHeader(req1.API_KEY, req1.API_VERSION, 0, conn.config['client_id']) - payload_bytes1 = len(header1.encode()) + len(req1.encode()) + req1.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes1 = len(req1.encode(header=True, framed=False)) req2 = MetadataRequest[0]([]) - header2 = RequestHeader(req2.API_KEY, req2.API_VERSION, 0, conn.config['client_id']) - payload_bytes2 = len(header2.encode()) + len(req2.encode()) + req2.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes2 = len(req2.encode(header=True, framed=False)) # The first call to the socket will raise a transient SSL exception. This will make the first # request to be kept in the internal buffer to be sent in the next call of @@ -299,8 +298,8 @@ def test_recv_disconnected(_socket, conn): assert conn.connected() req = MetadataRequest[0]([]) - header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id']) - payload_bytes = len(header.encode()) + len(req.encode()) + req.with_header(correlation_id=0, client_id=conn.config['client_id']) + payload_bytes = len(req.encode(header=True, framed=False)) _socket.send.side_effect = [4, payload_bytes] conn.send(req)