@@ -516,13 +516,20 @@ async def _encode_loop(self):
516516 messages .extend (self ._messages_for_encode .get_nowait ())
517517
518518 logger .debug (
519- "writer reconnector %s encode %s messages" ,
519+ "writer reconnector %s start encoding %s messages" ,
520520 self ._id ,
521521 len (messages ),
522522 )
523523
524524 batch_codec = await self ._codec_selector (messages )
525525 await self ._encode_data_inplace (batch_codec , messages )
526+
527+ logger .debug (
528+ "writer reconnector %s encoded %s messages" ,
529+ self ._id ,
530+ len (messages ),
531+ )
532+
526533 self ._add_messages_to_send_queue (messages )
527534 except BaseException as err :
528535 self ._stop (err )
@@ -631,6 +638,8 @@ async def _read_loop(self, writer: "WriterAsyncIOStream"):
631638 for ack in resp .acks :
632639 self ._handle_receive_ack (ack )
633640
641+ logger .debug ("writer reconnector %s handled %s acks" , self ._id , len (resp .acks ))
642+
634643 def _handle_receive_ack (self , ack ):
635644 current_message = self ._messages .popleft ()
636645 message_future = self ._messages_future .popleft ()
@@ -650,12 +659,6 @@ def _handle_receive_ack(self, ack):
650659 else :
651660 raise TopicWriterError ("internal error - receive unexpected ack message." )
652661 message_future .set_result (result )
653- logger .debug (
654- "writer reconnector %s ack seqno=%s result=%s" ,
655- self ._id ,
656- ack .seq_no ,
657- type (result ).__name__ ,
658- )
659662
660663 async def _send_loop (self , writer : "WriterAsyncIOStream" ):
661664 try :
0 commit comments