From 3c8c566423327423fba91570bdd01d198f71cd93 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 17 Mar 2026 15:15:47 -0700 Subject: [PATCH 1/2] Add kafka.protocol.new.consumer.offsets; add UNKNOWN_OFFSET and OffsetResetStrategy --- kafka/protocol/new/consumer/__init__.py | 2 ++ kafka/protocol/new/consumer/fetch.py | 9 --------- kafka/protocol/new/consumer/offsets.py | 22 ++++++++++++++++++++++ 3 files changed, 24 insertions(+), 9 deletions(-) create mode 100644 kafka/protocol/new/consumer/offsets.py diff --git a/kafka/protocol/new/consumer/__init__.py b/kafka/protocol/new/consumer/__init__.py index e533ce1ad..235088794 100644 --- a/kafka/protocol/new/consumer/__init__.py +++ b/kafka/protocol/new/consumer/__init__.py @@ -1,9 +1,11 @@ from .fetch import * from .group import * +from .offsets import * __all__ = [ 'FetchRequest', 'FetchResponse', + 'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'ListOffsetsRequest', 'ListOffsetsResponse', 'JoinGroupRequest', 'JoinGroupResponse', 'SyncGroupRequest', 'SyncGroupResponse', diff --git a/kafka/protocol/new/consumer/fetch.py b/kafka/protocol/new/consumer/fetch.py index 975e3b404..173715664 100644 --- a/kafka/protocol/new/consumer/fetch.py +++ b/kafka/protocol/new/consumer/fetch.py @@ -4,15 +4,6 @@ class FetchRequest(ApiMessage): pass class FetchResponse(ApiMessage): pass -class ListOffsetsRequest(ApiMessage): pass -class ListOffsetsResponse(ApiMessage): pass - -class OffsetForLeaderEpochRequest(ApiMessage): pass -class OffsetForLeaderEpochResponse(ApiMessage): pass - - __all__ = [ 'FetchRequest', 'FetchResponse', - 'ListOffsetsRequest', 'ListOffsetsResponse', - 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse', ] diff --git a/kafka/protocol/new/consumer/offsets.py b/kafka/protocol/new/consumer/offsets.py new file mode 100644 index 000000000..709c0162f --- /dev/null +++ b/kafka/protocol/new/consumer/offsets.py @@ -0,0 +1,22 @@ +from ..api_message import ApiMessage + + +UNKNOWN_OFFSET = -1 + +class OffsetResetStrategy: + LATEST = -1 + EARLIEST = -2 + NONE = 0 + +class ListOffsetsRequest(ApiMessage): pass +class ListOffsetsResponse(ApiMessage): pass + +class OffsetForLeaderEpochRequest(ApiMessage): pass +class OffsetForLeaderEpochResponse(ApiMessage): pass + + +__all__ = [ + 'UNKNOWN_OFFSET', 'OffsetResetStrategy', + 'ListOffsetsRequest', 'ListOffsetsResponse', + 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse', +] From 17cb24c904e1e9ded2578632715b215cd6701c8a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 15:04:00 -0700 Subject: [PATCH 2/2] tests --- .../new/consumer/test_new_list_offsets.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/protocol/new/consumer/test_new_list_offsets.py b/test/protocol/new/consumer/test_new_list_offsets.py index 239a6de8f..0576007b4 100644 --- a/test/protocol/new/consumer/test_new_list_offsets.py +++ b/test/protocol/new/consumer/test_new_list_offsets.py @@ -1,6 +1,9 @@ import pytest -from kafka.protocol.new.consumer import ListOffsetsRequest, ListOffsetsResponse +from kafka.protocol.new.consumer import ( + ListOffsetsRequest, ListOffsetsResponse, + UNKNOWN_OFFSET, OffsetResetStrategy, +) @pytest.mark.parametrize("version", range(ListOffsetsRequest.min_version, ListOffsetsRequest.max_version + 1)) @@ -54,3 +57,14 @@ def test_list_offsets_response_roundtrip(version): encoded = ListOffsetsResponse.encode(data, version=version) decoded = ListOffsetsResponse.decode(encoded, version=version) assert decoded == data + + +def test_unknown_offset(): + assert UNKNOWN_OFFSET == -1 + + +def test_offset_reset_strategies(): + assert OffsetResetStrategy.LATEST == -1 + assert OffsetResetStrategy.EARLIEST == -2 + assert OffsetResetStrategy.NONE == 0 +