Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 111 additions & 93 deletions test/protocol/new/consumer/test_new_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,118 @@
from kafka.protocol.new.consumer import FetchRequest, FetchResponse


@pytest.mark.parametrize("version", range(FetchRequest.min_version, FetchRequest.max_version + 1))
def test_fetch_request_roundtrip(version):
ReplicaState = FetchRequest.ReplicaState
Topic = FetchRequest.FetchTopic
Partition = Topic.FetchPartition
ForgottenTopic = FetchRequest.ForgottenTopic
request = FetchRequest(
cluster_id='cluster' if version >= 12 else None,
replica_id=12 if version <= 14 else -1,
replica_state=ReplicaState(
replica_id=12,
replica_epoch=345,
) if version >= 15 else None,
max_wait_ms=500,
min_bytes=1,
isolation_level=2 if version >= 4 else 0,
session_id=12 if version >= 7 else 0,
session_epoch=34 if version >= 7 else -1,
topics=[
Topic(
topic="test-topic" if version <= 12 else '',
topic_id=uuid.uuid4() if version >= 13 else None,
partitions=[
Partition(
partition=0,
current_leader_epoch=6 if version >= 9 else -1,
fetch_offset=100,
last_fetched_epoch=8 if version >= 12 else -1,
log_start_offset=3 if version >= 5 else -1,
partition_max_bytes=1024,
replica_directory_id=uuid.uuid4() if version >= 17 else None,
)
],
),
],
forgotten_topics_data=[
ForgottenTopic(
topic='foo' if version <= 12 else '',
topic_id=uuid.uuid4() if version >= 13 else None,
partitions=[1, 2, 3],
),
] if version >= 7 else [],
rack_id='Z' if version >= 11 else '',
)
encoded = request.encode(version=version)
decoded = FetchRequest.decode(encoded, version=version)
assert decoded == request


@pytest.mark.parametrize("version", range(FetchResponse.min_version, FetchResponse.max_version + 1))
def test_fetch_response_roundtrip(version):
Response = FetchResponse.FetchableTopicResponse
PartitionData = Response.PartitionData
EpochEndOffset = PartitionData.EpochEndOffset
LeaderIdAndEpoch = PartitionData.LeaderIdAndEpoch
SnapshotId = PartitionData.SnapshotId
AbortedTransaction = PartitionData.AbortedTransaction
NodeEndpoint = FetchResponse.NodeEndpoint
response = FetchResponse(
throttle_time_ms=100 if version >= 1 else 0,
error_code=13 if version >= 7 else 0,
session_id=12345 if version >= 7 else 0,
responses=[
Response(
topic="test-topic" if version <= 12 else '',
topic_id=uuid.uuid4() if version >= 13 else None,
partitions=[
PartitionData(
partition_index=0,
error_code=0,
high_watermark=1000,
last_stable_offset=3 if version >= 4 else -1,
log_start_offset=25 if version >= 5 else -1,
diverging_epoch=EpochEndOffset(
epoch=1000,
end_offset=3,
) if version >= 12 else None,
current_leader=LeaderIdAndEpoch(
leader_id=5,
leader_epoch=99,
) if version >= 12 else None,
snapshot_id=SnapshotId(
end_offset=44,
epoch=88,
) if version >= 12 else None,
aborted_transactions=[
AbortedTransaction(
producer_id=3,
first_offset=9,
),
] if version >= 4 else [],
preferred_read_replica=12 if version >= 11 else -1,
records=b"some records"
)
]
)
],
node_endpoints=[
NodeEndpoint(
node_id=12,
host='foo',
port=1000,
rack='ZZ',
),
] if version >= 16 else [],
)
encoded = response.encode(version=version)
decoded = FetchResponse.decode(encoded, version=version)
assert decoded == response


def test_fetch_request_v15_hex():
# Hex dump provided by user
expected_hex = "0001000f0000007b00096d792d636c69656e7400000001f400000001000003e800123456780000007f01010100"
expected_bytes = binascii.unhexlify(expected_hex)

Expand Down Expand Up @@ -34,95 +144,3 @@ def test_fetch_request_v15_hex():
assert decoded.session_id == 0x12345678
assert decoded.session_epoch == 0x7f
assert decoded.max_wait_ms == 500


@pytest.mark.parametrize("version", range(FetchRequest.min_version, FetchRequest.max_version + 1))
def test_fetch_request_roundtrip(version):
# Topic data needs to match the version's requirements (Topic vs TopicId)
topic_data = []
if version < 13:
topic_data = [
FetchRequest.FetchTopic(
topic="test-topic",
partitions=[
FetchRequest.FetchTopic.FetchPartition(
partition=0,
fetch_offset=100,
partition_max_bytes=1024
)
]
)
]
else:
topic_id = uuid.uuid4()
topic_data = [
FetchRequest.FetchTopic(
topic_id=topic_id,
partitions=[
FetchRequest.FetchTopic.FetchPartition(
partition=0,
fetch_offset=100,
partition_max_bytes=1024
)
]
)
]

data = FetchRequest(
replica_id=-1,
max_wait_ms=500,
min_bytes=1,
topics=topic_data
)

encoded = data.encode(version=version)
decoded = FetchRequest.decode(encoded, version=version)

assert decoded == data


@pytest.mark.parametrize("version", range(FetchResponse.min_version, FetchResponse.max_version + 1))
def test_fetch_response_roundtrip(version):
# Mocking some response data
resp_topic_data = []
if version < 13:
resp_topic_data = [
FetchResponse.FetchableTopicResponse(
topic="test-topic",
partitions=[
FetchResponse.FetchableTopicResponse.PartitionData(
partition_index=0,
error_code=0,
high_watermark=1000,
records=b"some records"
)
]
)
]
else:
topic_id = uuid.uuid4()
resp_topic_data = [
FetchResponse.FetchableTopicResponse(
topic_id=topic_id,
partitions=[
FetchResponse.FetchableTopicResponse.PartitionData(
partition_index=0,
error_code=0,
high_watermark=1000,
records=b"some records"
)
]
)
]

data = FetchResponse(
throttle_time_ms=100 if version >= 1 else 0,
error_code=0,
session_id=12345 if version >= 7 else 0,
responses=resp_topic_data
)

encoded = data.encode(version=version)
decoded = FetchResponse.decode(encoded, version=version)

assert decoded == data
Loading
Loading