Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kafka.metrics.stats import Avg, Count, Rate
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.new.metadata import MetadataRequest
from kafka.util import Dict, Timer, WeakMethod, ensure_valid_topic_name
from kafka.version import __version__

Expand Down
5 changes: 2 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api_versions import ApiVersionsRequest
from kafka.protocol.new.metadata import ApiVersionsRequest
from kafka.protocol.broker_api_versions import (
BROKER_API_VERSIONS, VERSION_CHECKS,
infer_broker_version_from_api_versions,
)
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
from kafka.protocol.new.sasl import SaslAuthenticateRequest, SaslHandshakeRequest
from kafka.protocol.types import Int32
from kafka.sasl import get_sasl_mechanism
from kafka.socks5_wrapper import Socks5Wrapper
Expand Down
4 changes: 2 additions & 2 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.new.metadata import MetadataRequest
from kafka.protocol.new.producer import ProduceRequest
from kafka.structs import BrokerMetadata


Expand Down
2 changes: 1 addition & 1 deletion test/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket

from kafka.cluster import ClusterMetadata, collect_hosts
from kafka.protocol.metadata import MetadataResponse
from kafka.protocol.new.metadata import MetadataResponse


def test_empty_broker_list():
Expand Down
27 changes: 13 additions & 14 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS
from kafka.metrics.metrics import Metrics
from kafka.metrics.stats.sensor import Sensor
from kafka.protocol.api import RequestHeader
from kafka.protocol.group import HeartbeatResponse
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.new.consumer import HeartbeatResponse
from kafka.protocol.new.metadata import MetadataRequest
from kafka.protocol.new.producer import ProduceRequest

import kafka.errors as Errors

Expand Down Expand Up @@ -201,8 +200,8 @@ def test_send_no_response(_socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = ProduceRequest[0](acks=0, timeout_ms=0, topic_data=())
header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
req.with_header(correlation_id=0, client_id=conn.config['client_id'])
payload_bytes = len(req.encode(header=True, framed=False))
third = payload_bytes // 3
remainder = payload_bytes % 3
_socket.send.side_effect = [4, third, third, third, remainder]
Expand All @@ -218,8 +217,8 @@ def test_send_response(_socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = MetadataRequest[0]([])
header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
req.with_header(correlation_id=0, client_id=conn.config['client_id'])
payload_bytes = len(req.encode(header=True, framed=False))
third = payload_bytes // 3
remainder = payload_bytes % 3
_socket.send.side_effect = [4, third, third, third, remainder]
Expand All @@ -237,11 +236,11 @@ def test_send_async_request_while_other_request_is_already_in_buffer(_socket, co
bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent']

req1 = MetadataRequest[0](topics='foo')
header1 = RequestHeader(req1.API_KEY, req1.API_VERSION, 0, conn.config['client_id'])
payload_bytes1 = len(header1.encode()) + len(req1.encode())
req1.with_header(correlation_id=0, client_id=conn.config['client_id'])
payload_bytes1 = len(req1.encode(header=True, framed=False))
req2 = MetadataRequest[0]([])
header2 = RequestHeader(req2.API_KEY, req2.API_VERSION, 0, conn.config['client_id'])
payload_bytes2 = len(header2.encode()) + len(req2.encode())
req2.with_header(correlation_id=0, client_id=conn.config['client_id'])
payload_bytes2 = len(req2.encode(header=True, framed=False))

# The first call to the socket will raise a transient SSL exception. This will make the first
# request to be kept in the internal buffer to be sent in the next call of
Expand Down Expand Up @@ -299,8 +298,8 @@ def test_recv_disconnected(_socket, conn):
assert conn.connected()

req = MetadataRequest[0]([])
header = RequestHeader(req.API_KEY, req.API_VERSION, 0, conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
req.with_header(correlation_id=0, client_id=conn.config['client_id'])
payload_bytes = len(req.encode(header=True, framed=False))
_socket.send.side_effect = [4, payload_bytes]
conn.send(req)

Expand Down
Loading