Skip to content
3 changes: 3 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ def _poll(self, timeout):
self._sensors.select_time.record((end_select - start_select) * 1000000000)

for key, events in ready:
if key.fileobj.fileno() < 0:
self._selector.unregister(key.fileobj)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be more robust: we want to close the conn here if it is a BrokerConnection, which would then trigger an unregister. But it could also be the _wake_r socketpair, in which case we need to reset/rebuild the wake socketpair.


if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
Expand Down