From a0662936a60f951e53409050330d52b39e3c0d8e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 22:44:34 -0700 Subject: [PATCH 1/4] Add CreateTopicsResponse test (wip) --- test/protocol/new/admin/test_new_admin.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/protocol/new/admin/test_new_admin.py b/test/protocol/new/admin/test_new_admin.py index 1ca8112de..3e562897e 100644 --- a/test/protocol/new/admin/test_new_admin.py +++ b/test/protocol/new/admin/test_new_admin.py @@ -40,6 +40,29 @@ def test_create_topics_request_roundtrip(version): assert decoded == request +@pytest.mark.parametrize("version", range(CreateTopicsResponse.min_version, CreateTopicsResponse.max_version + 1)) +def test_create_topics_response_roundtrip(version): + Topic = CreateTopicsResponse.CreatableTopicResult + topics = [ + Topic( + name="test-topic", + error_code=13, + error_message='foo' if version >= 1 else '', + topic_config_error_code=2 if version >= 5 else 0, + num_partitions=1 if version >= 5 else -1, + replication_factor=1 if version >= 5 else -1, + configs=[] + ) + ] + response = CreateTopicsResponse( + throttle_time_ms=123 if version >= 2 else 0, + topics=topics, + ) + encoded = response.encode(version=version) + decoded = CreateTopicsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DeleteTopicsRequest.min_version, DeleteTopicsRequest.max_version + 1)) def test_delete_topics_request_roundtrip(version): topic_names = ["topic-1", "topic-2"] From ddaefdbbc04fdf472f5439875b734549772db393 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 23:19:34 -0700 Subject: [PATCH 2/4] More admin response tests --- test/protocol/new/admin/test_new_admin.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/protocol/new/admin/test_new_admin.py b/test/protocol/new/admin/test_new_admin.py index 3e562897e..51e9d06f5 100644 --- a/test/protocol/new/admin/test_new_admin.py +++ b/test/protocol/new/admin/test_new_admin.py @@ -83,6 +83,24 @@ def test_delete_topics_request_roundtrip(version): assert decoded == request +@pytest.mark.parametrize("version", range(DeleteTopicsResponse.min_version, DeleteTopicsResponse.max_version + 1)) +def test_delete_topics_response_roundtrip(version): + Topic = DeleteTopicsResponse.DeletableTopicResult + response = DeleteTopicsResponse( + throttle_time_ms=123 if version >= 1 else 0, + responses=[ + Topic( + name="topic-1", + error_code=123, + error_message='foo' if version >= 5 else None, + ) + ], + ) + encoded = response.encode(version=version) + decoded = DeleteTopicsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DescribeGroupsRequest.min_version, DescribeGroupsRequest.max_version + 1)) def test_describe_groups_request_roundtrip(version): request = DescribeGroupsRequest( From 428cc8bb1760cb674a7163506bdcc52642a43ae9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 19 Mar 2026 08:26:42 -0700 Subject: [PATCH 3/4] patch bitfield types on admin responses --- kafka/protocol/new/admin/cluster.py | 6 +++++- kafka/protocol/new/admin/groups.py | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/new/admin/cluster.py b/kafka/protocol/new/admin/cluster.py index 2613675c2..925c7b1b0 100644 --- a/kafka/protocol/new/admin/cluster.py +++ b/kafka/protocol/new/admin/cluster.py @@ -4,7 +4,11 @@ class DescribeClusterRequest(ApiMessage): pass -class DescribeClusterResponse(ApiMessage): pass +class DescribeClusterResponse(ApiMessage): + @classmethod + def json_patch(cls, json): + json['fields'][7]['type'] = 'bitfield' + return json class DescribeConfigsRequest(ApiMessage): pass class DescribeConfigsResponse(ApiMessage): pass diff --git a/kafka/protocol/new/admin/groups.py b/kafka/protocol/new/admin/groups.py index cdb12d408..dd8f56318 100644 --- a/kafka/protocol/new/admin/groups.py +++ b/kafka/protocol/new/admin/groups.py @@ -2,7 +2,13 @@ class DescribeGroupsRequest(ApiMessage): pass -class DescribeGroupsResponse(ApiMessage): pass +class DescribeGroupsResponse(ApiMessage): + @classmethod + def json_patch(cls, json): + # group authorized_operations + json['fields'][1]['fields'][7]['type'] = 'bitfield' + return json + class ListGroupsRequest(ApiMessage): pass class ListGroupsResponse(ApiMessage): pass From 2c083b19e650bc11bfb587167ca42a029f74d0de Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 19 Mar 2026 08:27:13 -0700 Subject: [PATCH 4/4] Finish admin response tests + cleanup --- test/protocol/new/admin/test_new_admin.py | 436 +++++++++++++++++----- 1 file changed, 347 insertions(+), 89 deletions(-) diff --git a/test/protocol/new/admin/test_new_admin.py b/test/protocol/new/admin/test_new_admin.py index 51e9d06f5..5fb0b1c31 100644 --- a/test/protocol/new/admin/test_new_admin.py +++ b/test/protocol/new/admin/test_new_admin.py @@ -21,17 +21,28 @@ @pytest.mark.parametrize("version", range(CreateTopicsRequest.min_version, CreateTopicsRequest.max_version + 1)) def test_create_topics_request_roundtrip(version): Topic = CreateTopicsRequest.CreatableTopic - topics = [ - Topic( - name="test-topic", - num_partitions=1, - replication_factor=1, - assignments=[], - configs=[] - ) - ] + Assignment = Topic.CreatableReplicaAssignment + Config = Topic.CreatableTopicConfig request = CreateTopicsRequest( - topics=topics, + topics=[ + Topic( + name="test-topic", + num_partitions=1, + replication_factor=1, + assignments=[ + Assignment( + partition_index=1, + broker_ids=[1, 2, 3], + ), + ], + configs=[ + Config( + name='foo', + value='bar', + ), + ] + ) + ], timeout_ms=10000, validate_only=False ) @@ -43,20 +54,29 @@ def test_create_topics_request_roundtrip(version): @pytest.mark.parametrize("version", range(CreateTopicsResponse.min_version, CreateTopicsResponse.max_version + 1)) def test_create_topics_response_roundtrip(version): Topic = CreateTopicsResponse.CreatableTopicResult - topics = [ - Topic( - name="test-topic", - error_code=13, - error_message='foo' if version >= 1 else '', - topic_config_error_code=2 if version >= 5 else 0, - num_partitions=1 if version >= 5 else -1, - replication_factor=1 if version >= 5 else -1, - configs=[] - ) - ] + Config = Topic.CreatableTopicConfigs response = CreateTopicsResponse( throttle_time_ms=123 if version >= 2 else 0, - topics=topics, + topics=[ + Topic( + name="test-topic", + topic_id=uuid.uuid4() if version >= 7 else uuid.UUID(int=0), + error_code=13, + error_message='foo' if version >= 1 else '', + topic_config_error_code=2 if version >= 5 else 0, + num_partitions=1 if version >= 5 else -1, + replication_factor=1 if version >= 5 else -1, + configs=[ + Config( + name='foo', + value='bar', + read_only=True, + config_source=3, + is_sensitive=True, + ), + ] if version >= 5 else [], + ) + ], ) encoded = response.encode(version=version) decoded = CreateTopicsResponse.decode(encoded, version=version) @@ -65,18 +85,14 @@ def test_create_topics_response_roundtrip(version): @pytest.mark.parametrize("version", range(DeleteTopicsRequest.min_version, DeleteTopicsRequest.max_version + 1)) def test_delete_topics_request_roundtrip(version): - topic_names = ["topic-1", "topic-2"] - Topic = DeleteTopicsRequest.DeleteTopicState - topics = [] - if version >= 6: - for t_name in topic_names: - topics.append(Topic(name=t_name, topic_id=uuid.uuid4())) - request = DeleteTopicsRequest( - topic_names=topic_names if version < 6 else [], + topics=[ + Topic(name="topic-1", topic_id=uuid.uuid4()), + Topic(name="topic-2", topic_id=uuid.uuid4()), + ] if version >= 6 else [], + topic_names=["topic-1", "topic-2"] if version < 6 else [], timeout_ms=10000, - topics=topics ) encoded = request.encode(version=version) decoded = DeleteTopicsRequest.decode(encoded, version=version) @@ -101,6 +117,46 @@ def test_delete_topics_response_roundtrip(version): assert decoded == response +@pytest.mark.parametrize("version", range(CreatePartitionsRequest.min_version, CreatePartitionsRequest.max_version + 1)) +def test_create_partitions_request_roundtrip(version): + TopicPartition = CreatePartitionsRequest.CreatePartitionsTopic + Assignment = TopicPartition.CreatePartitionsAssignment + request = CreatePartitionsRequest( + topics=[ + TopicPartition( + name="test-topic", + count=2, + assignments=[ + Assignment(broker_ids=[1, 2]) + ] + ) + ], + timeout_ms=10000, + validate_only=False + ) + encoded = request.encode(version=version) + decoded = CreatePartitionsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(CreatePartitionsResponse.min_version, CreatePartitionsResponse.max_version + 1)) +def test_create_partitions_response_roundtrip(version): + Result = CreatePartitionsResponse.CreatePartitionsTopicResult + response = CreatePartitionsResponse( + throttle_time_ms=123, + results=[ + Result( + name='topic-foo', + error_code=123, + error_message='error', + ), + ], + ) + encoded = response.encode(version=version) + decoded = CreatePartitionsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DescribeGroupsRequest.min_version, DescribeGroupsRequest.max_version + 1)) def test_describe_groups_request_roundtrip(version): request = DescribeGroupsRequest( @@ -112,6 +168,39 @@ def test_describe_groups_request_roundtrip(version): assert decoded == request +@pytest.mark.parametrize("version", range(DescribeGroupsResponse.min_version, DescribeGroupsResponse.max_version + 1)) +def test_describe_groups_response_roundtrip(version): + Group = DescribeGroupsResponse.DescribedGroup + Member = Group.DescribedGroupMember + response = DescribeGroupsResponse( + throttle_time_ms=123 if version >= 1 else 0, + groups=[ + Group( + error_code=3, + error_message='foo' if version >= 6 else None, + group_id='group-1', + group_state='good', + protocol_type='fizz', + protocol_data='buzz', + members=[ + Member( + member_id='abcd', + group_instance_id='efgh' if version >= 4 else None, + client_id='client', + client_host='localhost', + member_metadata=b'\xab\x12', + member_assignment=b'\x72\xff', + ), + ], + authorized_operations={1} if version >= 3 else None, + ) + ], + ) + encoded = DescribeGroupsResponse.encode(response, version=version) + decoded = DescribeGroupsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(ListGroupsRequest.min_version, ListGroupsRequest.max_version + 1)) def test_list_groups_request_roundtrip(version): request = ListGroupsRequest( @@ -122,6 +211,53 @@ def test_list_groups_request_roundtrip(version): assert decoded == request +@pytest.mark.parametrize("version", range(ListGroupsResponse.min_version, ListGroupsResponse.max_version + 1)) +def test_list_groups_response_roundtrip(version): + Group = ListGroupsResponse.ListedGroup + response = ListGroupsResponse( + throttle_time_ms=123 if version >= 1 else 0, + error_code=2, + groups=[ + Group( + group_id='group-1', + protocol_type='foobar', + group_state='stable' if version >= 4 else '', + group_type='type' if version >= 5 else '', + ) + ], + ) + encoded = response.encode(version=version) + decoded = ListGroupsResponse.decode(encoded, version=version) + assert decoded == response + + +@pytest.mark.parametrize("version", range(DeleteGroupsRequest.min_version, DeleteGroupsRequest.max_version + 1)) +def test_delete_groups_request_roundtrip(version): + request = DeleteGroupsRequest( + groups_names=['foo', 'bar'], + ) + encoded = request.encode(version=version) + decoded = DeleteGroupsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(DeleteGroupsResponse.min_version, DeleteGroupsResponse.max_version + 1)) +def test_delete_groups_response_roundtrip(version): + Result = DeleteGroupsResponse.DeletableGroupResult + response = DeleteGroupsResponse( + throttle_time_ms=123, + results=[ + Result( + group_id='group-1', + error_code=23, + ) + ], + ) + encoded = response.encode(version=version) + decoded = DeleteGroupsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DescribeClusterRequest.min_version, DescribeClusterRequest.max_version + 1)) def test_describe_cluster_request_roundtrip(version): request = DescribeClusterRequest( @@ -134,18 +270,43 @@ def test_describe_cluster_request_roundtrip(version): assert decoded == request +@pytest.mark.parametrize("version", range(DescribeClusterResponse.min_version, DescribeClusterResponse.max_version + 1)) +def test_describe_cluster_response_roundtrip(version): + Broker = DescribeClusterResponse.DescribeClusterBroker + response = DescribeClusterResponse( + throttle_time_ms=123, + error_code=3, + error_message='error', + endpoint_type=12 if version >= 1 else 1, + cluster_id='cluster', + controller_id=2, + brokers=[ + Broker( + broker_id=2, + host='localhost', + port=9000, + rack='AA', + is_fenced=True if version >= 2 else False, + ) + ], + cluster_authorized_operations={2}, + ) + encoded = response.encode(version=version) + decoded = DescribeClusterResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DescribeConfigsRequest.min_version, DescribeConfigsRequest.max_version + 1)) def test_describe_configs_request_roundtrip(version): Resource = DescribeConfigsRequest.DescribeConfigsResource - resources = [ - Resource( - resource_type=2, # TOPIC - resource_name="test-topic", - configuration_keys=["cleanup.policy"] - ) - ] request = DescribeConfigsRequest( - resources=resources, + resources=[ + Resource( + resource_type=2, + resource_name="test-topic", + configuration_keys=["cleanup.policy"] + ) + ], include_synonyms=True if version >= 1 else False, include_documentation=True if version >= 3 else False ) @@ -153,6 +314,9 @@ def test_describe_configs_request_roundtrip(version): decoded = DescribeConfigsRequest.decode(encoded, version=version) assert decoded == request + +@pytest.mark.parametrize("version", range(DescribeConfigsResponse.min_version, DescribeConfigsResponse.max_version + 1)) +def test_describe_configs_response_roundtrip(version): Result = DescribeConfigsResponse.DescribeConfigsResult Config = Result.DescribeConfigsResourceResult Synonym = Config.DescribeConfigsSynonym @@ -182,57 +346,144 @@ def test_describe_configs_request_roundtrip(version): ) ], ) - encoded = DescribeConfigsResponse.encode(response, version=version) decoded = DescribeConfigsResponse.decode(encoded, version=version) assert decoded == response +@pytest.mark.parametrize("version", range(AlterConfigsRequest.min_version, AlterConfigsRequest.max_version + 1)) +def test_alter_configs_request_roundtrip(version): + Resource = AlterConfigsRequest.AlterConfigsResource + Config = Resource.AlterableConfig + request = AlterConfigsRequest( + resources=[ + Resource( + resource_type=2, + resource_name="test-topic", + configs=[ + Config( + name='foo', + value='bar', + ), + ], + ) + ], + validate_only=True, + ) + encoded = request.encode(version=version) + decoded = AlterConfigsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(AlterConfigsResponse.min_version, AlterConfigsResponse.max_version + 1)) +def test_alter_configs_response_roundtrip(version): + Response = AlterConfigsResponse.AlterConfigsResourceResponse + response = AlterConfigsResponse( + throttle_time_ms=123, + responses=[ + Response( + error_code=22, + error_message='error', + resource_type=2, + resource_name="test-topic", + ) + ], + ) + encoded = response.encode(version=version) + decoded = AlterConfigsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(CreateAclsRequest.min_version, CreateAclsRequest.max_version + 1)) def test_create_acls_request_roundtrip(version): Creation = CreateAclsRequest.AclCreation - creations = [ - Creation( - resource_type=2, - resource_name="test-topic", - resource_pattern_type=3, - principal="User:alice", - host="*", - operation=3, - permission_type=3 - ) - ] request = CreateAclsRequest( - creations=creations + creations=[ + Creation( + resource_type=2, + resource_name="test-topic", + resource_pattern_type=3, + principal="User:alice", + host="*", + operation=3, + permission_type=3 + ) + ], ) encoded = request.encode(version=version) decoded = CreateAclsRequest.decode(encoded, version=version) assert decoded == request -@pytest.mark.parametrize("version", range(CreatePartitionsRequest.min_version, CreatePartitionsRequest.max_version + 1)) -def test_create_partitions_request_roundtrip(version): - TopicPartition = CreatePartitionsRequest.CreatePartitionsTopic - Assignment = TopicPartition.CreatePartitionsAssignment - topic_partitions = [ - TopicPartition( - name="test-topic", - count=2, - assignments=[ - Assignment(broker_ids=[1, 2]) - ] - ) - ] - request = CreatePartitionsRequest( - topics=topic_partitions, - timeout_ms=10000, - validate_only=False +@pytest.mark.parametrize("version", range(CreateAclsResponse.min_version, CreateAclsResponse.max_version + 1)) +def test_create_acls_response_roundtrip(version): + Result = CreateAclsResponse.AclCreationResult + response = CreateAclsResponse( + throttle_time_ms=123, + results=[ + Result( + error_code=2, + error_message='foo', + ), + ], + ) + encoded = response.encode(version=version) + decoded = CreateAclsResponse.decode(encoded, version=version) + assert decoded == response + + +@pytest.mark.parametrize("version", range(DeleteAclsRequest.min_version, DeleteAclsRequest.max_version + 1)) +def test_delete_acls_request_roundtrip(version): + Filter = DeleteAclsRequest.DeleteAclsFilter + request = DeleteAclsRequest( + filters=[ + Filter( + resource_type_filter=2, + resource_name_filter="test-topic", + pattern_type_filter=3 if version >= 1 else 3, + principal_filter="User:alice", + host_filter="*", + operation=3, + permission_type=3 + ) + ], ) encoded = request.encode(version=version) - decoded = CreatePartitionsRequest.decode(encoded, version=version) + decoded = DeleteAclsRequest.decode(encoded, version=version) assert decoded == request +@pytest.mark.parametrize("version", range(DeleteAclsResponse.min_version, DeleteAclsResponse.max_version + 1)) +def test_delete_acls_response_roundtrip(version): + Result = DeleteAclsResponse.DeleteAclsFilterResult + Acl = Result.DeleteAclsMatchingAcl + response = DeleteAclsResponse( + throttle_time_ms=123, + filter_results=[ + Result( + error_code=12, + error_message='error', + matching_acls=[ + Acl( + error_code=2, + error_message='fizz', + resource_type=2, + resource_name="test-topic", + pattern_type=3 if version >= 1 else 3, + principal="User:alice", + host="localhost", + operation=3, + permission_type=3 + ), + ] + ), + ] + ) + encoded = response.encode(version=version) + decoded = DeleteAclsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(DescribeAclsRequest.min_version, DescribeAclsRequest.max_version + 1)) def test_describe_acls_request_roundtrip(version): request = DescribeAclsRequest( @@ -249,23 +500,30 @@ def test_describe_acls_request_roundtrip(version): assert decoded == request -@pytest.mark.parametrize("version", range(DeleteAclsRequest.min_version, DeleteAclsRequest.max_version + 1)) -def test_delete_acls_request_roundtrip(version): - Filter = DeleteAclsRequest.DeleteAclsFilter - filters = [ - Filter( - resource_type_filter=2, - resource_name_filter="test-topic", - pattern_type_filter=3 if version >= 1 else 3, - principal_filter="User:alice", - host_filter="*", - operation=3, - permission_type=3 - ) - ] - request = DeleteAclsRequest( - filters=filters +@pytest.mark.parametrize("version", range(DescribeAclsResponse.min_version, DescribeAclsResponse.max_version + 1)) +def test_describe_acls_response_roundtrip(version): + Resource = DescribeAclsResponse.DescribeAclsResource + Acl = Resource.AclDescription + response = DescribeAclsResponse( + throttle_time_ms=123, + error_code=1, + error_message='error', + resources=[ + Resource( + resource_type=2, + resource_name="test-topic", + pattern_type=3 if version >= 1 else 3, + acls=[ + Acl( + principal="User:alice", + host="localhost", + operation=3, + permission_type=3 + ), + ], + ), + ], ) - encoded = request.encode(version=version) - decoded = DeleteAclsRequest.decode(encoded, version=version) - assert decoded == request + encoded = response.encode(version=version) + decoded = DescribeAclsResponse.decode(encoded, version=version) + assert decoded == response