1111from kafka .conn import BrokerConnection , ConnectionStates , SSLWantWriteError , VERSION_CHECKS
1212from kafka .metrics .metrics import Metrics
1313from kafka .metrics .stats .sensor import Sensor
14- from kafka .protocol .api import RequestHeader
1514from kafka .protocol .new .consumer import HeartbeatResponse
1615from kafka .protocol .new .metadata import MetadataRequest
1716from kafka .protocol .new .producer import ProduceRequest
@@ -201,8 +200,8 @@ def test_send_no_response(_socket, conn):
201200 conn .connect ()
202201 assert conn .state is ConnectionStates .CONNECTED
203202 req = ProduceRequest [0 ](acks = 0 , timeout_ms = 0 , topic_data = ())
204- header = RequestHeader ( req .API_KEY , req . API_VERSION , 0 , conn .config ['client_id' ])
205- payload_bytes = len (header . encode ()) + len ( req .encode ())
203+ req .with_header ( correlation_id = 0 , client_id = conn .config ['client_id' ])
204+ payload_bytes = len (req .encode (header = True , framed = False ))
206205 third = payload_bytes // 3
207206 remainder = payload_bytes % 3
208207 _socket .send .side_effect = [4 , third , third , third , remainder ]
@@ -218,8 +217,8 @@ def test_send_response(_socket, conn):
218217 conn .connect ()
219218 assert conn .state is ConnectionStates .CONNECTED
220219 req = MetadataRequest [0 ]([])
221- header = RequestHeader ( req .API_KEY , req . API_VERSION , 0 , conn .config ['client_id' ])
222- payload_bytes = len (header . encode ()) + len ( req .encode ())
220+ req .with_header ( correlation_id = 0 , client_id = conn .config ['client_id' ])
221+ payload_bytes = len (req .encode (header = True , framed = False ))
223222 third = payload_bytes // 3
224223 remainder = payload_bytes % 3
225224 _socket .send .side_effect = [4 , third , third , third , remainder ]
@@ -237,11 +236,11 @@ def test_send_async_request_while_other_request_is_already_in_buffer(_socket, co
237236 bytes_sent_sensor = metrics .mocked_sensors ['node-0.bytes-sent' ]
238237
239238 req1 = MetadataRequest [0 ](topics = 'foo' )
240- header1 = RequestHeader ( req1 .API_KEY , req1 . API_VERSION , 0 , conn .config ['client_id' ])
241- payload_bytes1 = len (header1 . encode ()) + len ( req1 .encode ())
239+ req1 .with_header ( correlation_id = 0 , client_id = conn .config ['client_id' ])
240+ payload_bytes1 = len (req1 .encode (header = True , framed = False ))
242241 req2 = MetadataRequest [0 ]([])
243- header2 = RequestHeader ( req2 .API_KEY , req2 . API_VERSION , 0 , conn .config ['client_id' ])
244- payload_bytes2 = len (header2 . encode ()) + len ( req2 .encode ())
242+ req2 .with_header ( correlation_id = 0 , client_id = conn .config ['client_id' ])
243+ payload_bytes2 = len (req2 .encode (header = True , framed = False ))
245244
246245 # The first call to the socket will raise a transient SSL exception. This will make the first
247246 # request to be kept in the internal buffer to be sent in the next call of
@@ -299,8 +298,8 @@ def test_recv_disconnected(_socket, conn):
299298 assert conn .connected ()
300299
301300 req = MetadataRequest [0 ]([])
302- header = RequestHeader ( req .API_KEY , req . API_VERSION , 0 , conn .config ['client_id' ])
303- payload_bytes = len (header . encode ()) + len ( req .encode ())
301+ req .with_header ( correlation_id = 0 , client_id = conn .config ['client_id' ])
302+ payload_bytes = len (req .encode (header = True , framed = False ))
304303 _socket .send .side_effect = [4 , payload_bytes ]
305304 conn .send (req )
306305
0 commit comments