From b0f3665eabd03fd83e5c23c9e3c6aeb2aa6745cb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 3 Mar 2026 15:48:33 -0800 Subject: [PATCH 1/3] get_response_class --- kafka/protocol/__init__.py | 53 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py index ff9c68306..578cf489a 100644 --- a/kafka/protocol/__init__.py +++ b/kafka/protocol/__init__.py @@ -1,3 +1,12 @@ +from . import ( + produce, fetch, list_offsets, metadata, + commit, find_coordinator, group, + sasl_handshake, api_versions, admin, + init_producer_id, offset_for_leader_epoch, + add_partitions_to_txn, add_offsets_to_txn, end_txn, + txn_offset_commit, sasl_authenticate, +) + API_KEYS = { 0: 'Produce', 1: 'Fetch', @@ -44,3 +53,47 @@ 46: 'ListPartitionReassignments', 48: 'DescribeClientQuotas', } + +# Mapping of Api_key to a tuple of (request_classes, response_classes) +REQUEST_TYPES = { + 0: (produce.ProduceRequest, produce.ProduceResponse), + 1: (fetch.FetchRequest, fetch.FetchResponse), + 2: (list_offsets.ListOffsetsRequest, list_offsets.ListOffsetsResponse), + 3: (metadata.MetadataRequest, metadata.MetadataResponse), + 8: (commit.OffsetCommitRequest, commit.OffsetCommitResponse), + 9: (commit.OffsetFetchRequest, commit.OffsetFetchResponse), + 10: (find_coordinator.FindCoordinatorRequest, find_coordinator.FindCoordinatorResponse), + 11: (group.JoinGroupRequest, group.JoinGroupResponse), + 12: (group.HeartbeatRequest, group.HeartbeatResponse), + 13: (group.LeaveGroupRequest, group.LeaveGroupResponse), + 14: (group.SyncGroupRequest, group.SyncGroupResponse), + 15: (admin.DescribeGroupsRequest, admin.DescribeGroupsResponse), + 16: (admin.ListGroupsRequest, admin.ListGroupsResponse), + 17: (sasl_handshake.SaslHandshakeRequest, sasl_handshake.SaslHandshakeResponse), + 18: (api_versions.ApiVersionsRequest, api_versions.ApiVersionsResponse), + 19: (admin.CreateTopicsRequest, admin.CreateTopicsResponse), + 20: (admin.DeleteTopicsRequest, admin.DeleteTopicsResponse), + 21: (admin.DeleteRecordsRequest, admin.DeleteRecordsResponse), + 22: (init_producer_id.InitProducerIdRequest, init_producer_id.InitProducerIdResponse), + 23: (offset_for_leader_epoch.OffsetForLeaderEpochRequest, offset_for_leader_epoch.OffsetForLeaderEpochResponse), + 24: (add_partitions_to_txn.AddPartitionsToTxnRequest, add_partitions_to_txn.AddPartitionsToTxnResponse), + 25: (add_offsets_to_txn.AddOffsetsToTxnRequest, add_offsets_to_txn.AddOffsetsToTxnResponse), + 26: (end_txn.EndTxnRequest, end_txn.EndTxnResponse), + 28: (txn_offset_commit.TxnOffsetCommitRequest, txn_offset_commit.TxnOffsetCommitResponse), + 29: (admin.DescribeAclsRequest, admin.DescribeAclsResponse), + 30: (admin.CreateAclsRequest, admin.CreateAclsResponse), + 31: (admin.DeleteAclsRequest, admin.DeleteAclsResponse), + 32: (admin.DescribeConfigsRequest, admin.DescribeConfigsResponse), + 33: (admin.AlterConfigsRequest, admin.AlterConfigsResponse), + 36: (sasl_authenticate.SaslAuthenticateRequest, sasl_authenticate.SaslAuthenticateResponse), + 37: (admin.CreatePartitionsRequest, admin.CreatePartitionsResponse), + 42: (admin.DeleteGroupsRequest, admin.DeleteGroupsResponse) +} + +def get_response_class(api_key, api_version): + request_type, response_type = REQUEST_TYPES.get(api_key, (None, None)) + if response_type: + if hasattr(response_type, '__getitem__'): + return response_type[api_version] + return response_type + return None From 75cf7e5e462fc2856b6d2aeeb97e82039129d36d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Mar 2026 18:57:46 -0800 Subject: [PATCH 2/3] Use request header to track in-flight-requests in protocol parser --- kafka/protocol/add_offsets_to_txn.py | 3 -- kafka/protocol/add_partitions_to_txn.py | 3 -- kafka/protocol/admin.py | 40 ----------------------- kafka/protocol/api.py | 5 --- kafka/protocol/api_versions.py | 5 --- kafka/protocol/commit.py | 14 -------- kafka/protocol/end_txn.py | 3 -- kafka/protocol/fetch.py | 12 ------- kafka/protocol/find_coordinator.py | 3 -- kafka/protocol/group.py | 18 ---------- kafka/protocol/init_producer_id.py | 2 -- kafka/protocol/list_offsets.py | 6 ---- kafka/protocol/metadata.py | 9 ----- kafka/protocol/offset_for_leader_epoch.py | 5 --- kafka/protocol/parser.py | 17 ++++++---- kafka/protocol/produce.py | 9 ----- kafka/protocol/sasl_authenticate.py | 2 -- kafka/protocol/sasl_handshake.py | 2 -- kafka/protocol/txn_offset_commit.py | 3 -- test/test_object_conversion.py | 8 ----- 20 files changed, 11 insertions(+), 158 deletions(-) diff --git a/kafka/protocol/add_offsets_to_txn.py b/kafka/protocol/add_offsets_to_txn.py index bc6805b83..36bf0ecb3 100644 --- a/kafka/protocol/add_offsets_to_txn.py +++ b/kafka/protocol/add_offsets_to_txn.py @@ -26,7 +26,6 @@ class AddOffsetsToTxnResponse_v2(Response): class AddOffsetsToTxnRequest_v0(Request): API_KEY = 25 API_VERSION = 0 - RESPONSE_TYPE = AddOffsetsToTxnResponse_v0 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('producer_id', Int64), @@ -38,14 +37,12 @@ class AddOffsetsToTxnRequest_v0(Request): class AddOffsetsToTxnRequest_v1(Request): API_KEY = 25 API_VERSION = 1 - RESPONSE_TYPE = AddOffsetsToTxnResponse_v1 SCHEMA = AddOffsetsToTxnRequest_v0.SCHEMA class AddOffsetsToTxnRequest_v2(Request): API_KEY = 25 API_VERSION = 2 - RESPONSE_TYPE = AddOffsetsToTxnResponse_v2 SCHEMA = AddOffsetsToTxnRequest_v1.SCHEMA diff --git a/kafka/protocol/add_partitions_to_txn.py b/kafka/protocol/add_partitions_to_txn.py index b6a95ceaf..70ceedc4d 100644 --- a/kafka/protocol/add_partitions_to_txn.py +++ b/kafka/protocol/add_partitions_to_txn.py @@ -29,7 +29,6 @@ class AddPartitionsToTxnResponse_v2(Response): class AddPartitionsToTxnRequest_v0(Request): API_KEY = 24 API_VERSION = 0 - RESPONSE_TYPE = AddPartitionsToTxnResponse_v0 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('producer_id', Int64), @@ -42,14 +41,12 @@ class AddPartitionsToTxnRequest_v0(Request): class AddPartitionsToTxnRequest_v1(Request): API_KEY = 24 API_VERSION = 1 - RESPONSE_TYPE = AddPartitionsToTxnResponse_v1 SCHEMA = AddPartitionsToTxnRequest_v0.SCHEMA class AddPartitionsToTxnRequest_v2(Request): API_KEY = 24 API_VERSION = 2 - RESPONSE_TYPE = AddPartitionsToTxnResponse_v2 SCHEMA = AddPartitionsToTxnRequest_v1.SCHEMA diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 56ef656e6..a52827ead 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -45,7 +45,6 @@ class CreateTopicsResponse_v3(Response): class CreateTopicsRequest_v0(Request): API_KEY = 19 API_VERSION = 0 - RESPONSE_TYPE = CreateTopicsResponse_v0 SCHEMA = Schema( ('create_topic_requests', Array( ('topic', String('utf-8')), @@ -64,7 +63,6 @@ class CreateTopicsRequest_v0(Request): class CreateTopicsRequest_v1(Request): API_KEY = 19 API_VERSION = 1 - RESPONSE_TYPE = CreateTopicsResponse_v1 SCHEMA = Schema( ('create_topic_requests', Array( ('topic', String('utf-8')), @@ -84,14 +82,12 @@ class CreateTopicsRequest_v1(Request): class CreateTopicsRequest_v2(Request): API_KEY = 19 API_VERSION = 2 - RESPONSE_TYPE = CreateTopicsResponse_v2 SCHEMA = CreateTopicsRequest_v1.SCHEMA class CreateTopicsRequest_v3(Request): API_KEY = 19 API_VERSION = 3 - RESPONSE_TYPE = CreateTopicsResponse_v3 SCHEMA = CreateTopicsRequest_v1.SCHEMA @@ -141,7 +137,6 @@ class DeleteTopicsResponse_v3(Response): class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 - RESPONSE_TYPE = DeleteTopicsResponse_v0 SCHEMA = Schema( ('topics', Array(String('utf-8'))), ('timeout', Int32) @@ -151,21 +146,18 @@ class DeleteTopicsRequest_v0(Request): class DeleteTopicsRequest_v1(Request): API_KEY = 20 API_VERSION = 1 - RESPONSE_TYPE = DeleteTopicsResponse_v1 SCHEMA = DeleteTopicsRequest_v0.SCHEMA class DeleteTopicsRequest_v2(Request): API_KEY = 20 API_VERSION = 2 - RESPONSE_TYPE = DeleteTopicsResponse_v2 SCHEMA = DeleteTopicsRequest_v0.SCHEMA class DeleteTopicsRequest_v3(Request): API_KEY = 20 API_VERSION = 3 - RESPONSE_TYPE = DeleteTopicsResponse_v3 SCHEMA = DeleteTopicsRequest_v0.SCHEMA @@ -196,7 +188,6 @@ class DeleteRecordsResponse_v0(Response): class DeleteRecordsRequest_v0(Request): API_KEY = 21 API_VERSION = 0 - RESPONSE_TYPE = DeleteRecordsResponse_v0 SCHEMA = Schema( ('topics', Array( ('name', String('utf-8')), @@ -242,20 +233,17 @@ class ListGroupsResponse_v2(Response): class ListGroupsRequest_v0(Request): API_KEY = 16 API_VERSION = 0 - RESPONSE_TYPE = ListGroupsResponse_v0 SCHEMA = Schema() class ListGroupsRequest_v1(Request): API_KEY = 16 API_VERSION = 1 - RESPONSE_TYPE = ListGroupsResponse_v1 SCHEMA = ListGroupsRequest_v0.SCHEMA class ListGroupsRequest_v2(Request): API_KEY = 16 API_VERSION = 1 - RESPONSE_TYPE = ListGroupsResponse_v2 SCHEMA = ListGroupsRequest_v0.SCHEMA @@ -338,7 +326,6 @@ class DescribeGroupsResponse_v3(Response): class DescribeGroupsRequest_v0(Request): API_KEY = 15 API_VERSION = 0 - RESPONSE_TYPE = DescribeGroupsResponse_v0 SCHEMA = Schema( ('groups', Array(String('utf-8'))) ) @@ -347,21 +334,18 @@ class DescribeGroupsRequest_v0(Request): class DescribeGroupsRequest_v1(Request): API_KEY = 15 API_VERSION = 1 - RESPONSE_TYPE = DescribeGroupsResponse_v1 SCHEMA = DescribeGroupsRequest_v0.SCHEMA class DescribeGroupsRequest_v2(Request): API_KEY = 15 API_VERSION = 2 - RESPONSE_TYPE = DescribeGroupsResponse_v2 SCHEMA = DescribeGroupsRequest_v0.SCHEMA class DescribeGroupsRequest_v3(Request): API_KEY = 15 API_VERSION = 3 - RESPONSE_TYPE = DescribeGroupsResponse_v3 SCHEMA = Schema( ('groups', Array(String('utf-8'))), ('include_authorized_operations', Boolean) @@ -424,7 +408,6 @@ class DescribeAclsResponse_v2(Response): class DescribeAclsRequest_v0(Request): API_KEY = 29 API_VERSION = 0 - RESPONSE_TYPE = DescribeAclsResponse_v0 SCHEMA = Schema( ('resource_type', Int8), ('resource_name', String('utf-8')), @@ -438,7 +421,6 @@ class DescribeAclsRequest_v0(Request): class DescribeAclsRequest_v1(Request): API_KEY = 29 API_VERSION = 1 - RESPONSE_TYPE = DescribeAclsResponse_v1 SCHEMA = Schema( ('resource_type', Int8), ('resource_name', String('utf-8')), @@ -456,7 +438,6 @@ class DescribeAclsRequest_v2(Request): """ API_KEY = 29 API_VERSION = 2 - RESPONSE_TYPE = DescribeAclsResponse_v2 SCHEMA = DescribeAclsRequest_v1.SCHEMA @@ -481,7 +462,6 @@ class CreateAclsResponse_v1(Response): class CreateAclsRequest_v0(Request): API_KEY = 30 API_VERSION = 0 - RESPONSE_TYPE = CreateAclsResponse_v0 SCHEMA = Schema( ('creations', Array( ('resource_type', Int8), @@ -495,7 +475,6 @@ class CreateAclsRequest_v0(Request): class CreateAclsRequest_v1(Request): API_KEY = 30 API_VERSION = 1 - RESPONSE_TYPE = CreateAclsResponse_v1 SCHEMA = Schema( ('creations', Array( ('resource_type', Int8), @@ -552,7 +531,6 @@ class DeleteAclsResponse_v1(Response): class DeleteAclsRequest_v0(Request): API_KEY = 31 API_VERSION = 0 - RESPONSE_TYPE = DeleteAclsResponse_v0 SCHEMA = Schema( ('filters', Array( ('resource_type', Int8), @@ -566,7 +544,6 @@ class DeleteAclsRequest_v0(Request): class DeleteAclsRequest_v1(Request): API_KEY = 31 API_VERSION = 1 - RESPONSE_TYPE = DeleteAclsResponse_v1 SCHEMA = Schema( ('filters', Array( ('resource_type', Int8), @@ -603,7 +580,6 @@ class AlterConfigsResponse_v1(Response): class AlterConfigsRequest_v0(Request): API_KEY = 33 API_VERSION = 0 - RESPONSE_TYPE = AlterConfigsResponse_v0 SCHEMA = Schema( ('resources', Array( ('resource_type', Int8), @@ -617,7 +593,6 @@ class AlterConfigsRequest_v0(Request): class AlterConfigsRequest_v1(Request): API_KEY = 33 API_VERSION = 1 - RESPONSE_TYPE = AlterConfigsResponse_v1 SCHEMA = AlterConfigsRequest_v0.SCHEMA AlterConfigsRequest = [AlterConfigsRequest_v0, AlterConfigsRequest_v1] @@ -689,7 +664,6 @@ class DescribeConfigsResponse_v2(Response): class DescribeConfigsRequest_v0(Request): API_KEY = 32 API_VERSION = 0 - RESPONSE_TYPE = DescribeConfigsResponse_v0 SCHEMA = Schema( ('resources', Array( ('resource_type', Int8), @@ -700,7 +674,6 @@ class DescribeConfigsRequest_v0(Request): class DescribeConfigsRequest_v1(Request): API_KEY = 32 API_VERSION = 1 - RESPONSE_TYPE = DescribeConfigsResponse_v1 SCHEMA = Schema( ('resources', Array( ('resource_type', Int8), @@ -713,7 +686,6 @@ class DescribeConfigsRequest_v1(Request): class DescribeConfigsRequest_v2(Request): API_KEY = 32 API_VERSION = 2 - RESPONSE_TYPE = DescribeConfigsResponse_v2 SCHEMA = DescribeConfigsRequest_v1.SCHEMA @@ -751,7 +723,6 @@ class DescribeLogDirsResponse_v0(Response): class DescribeLogDirsRequest_v0(Request): API_KEY = 35 API_VERSION = 0 - RESPONSE_TYPE = DescribeLogDirsResponse_v0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -792,7 +763,6 @@ class SaslAuthenticateResponse_v1(Response): class SaslAuthenticateRequest_v0(Request): API_KEY = 36 API_VERSION = 0 - RESPONSE_TYPE = SaslAuthenticateResponse_v0 SCHEMA = Schema( ('sasl_auth_bytes', Bytes) ) @@ -801,7 +771,6 @@ class SaslAuthenticateRequest_v0(Request): class SaslAuthenticateRequest_v1(Request): API_KEY = 36 API_VERSION = 1 - RESPONSE_TYPE = SaslAuthenticateResponse_v1 SCHEMA = SaslAuthenticateRequest_v0.SCHEMA @@ -834,7 +803,6 @@ class CreatePartitionsResponse_v1(Response): class CreatePartitionsRequest_v0(Request): API_KEY = 37 API_VERSION = 0 - RESPONSE_TYPE = CreatePartitionsResponse_v0 SCHEMA = Schema( ('topic_partitions', Array( ('topic', String('utf-8')), @@ -850,7 +818,6 @@ class CreatePartitionsRequest_v1(Request): API_KEY = 37 API_VERSION = 1 SCHEMA = CreatePartitionsRequest_v0.SCHEMA - RESPONSE_TYPE = CreatePartitionsResponse_v1 CreatePartitionsRequest = [ @@ -881,7 +848,6 @@ class DeleteGroupsResponse_v1(Response): class DeleteGroupsRequest_v0(Request): API_KEY = 42 API_VERSION = 0 - RESPONSE_TYPE = DeleteGroupsResponse_v0 SCHEMA = Schema( ("groups_names", Array(String("utf-8"))) ) @@ -890,7 +856,6 @@ class DeleteGroupsRequest_v0(Request): class DeleteGroupsRequest_v1(Request): API_KEY = 42 API_VERSION = 1 - RESPONSE_TYPE = DeleteGroupsResponse_v1 SCHEMA = DeleteGroupsRequest_v0.SCHEMA @@ -923,7 +888,6 @@ class DescribeClientQuotasResponse_v0(Response): class DescribeClientQuotasRequest_v0(Request): API_KEY = 48 API_VERSION = 0 - RESPONSE_TYPE = DescribeClientQuotasResponse_v0 SCHEMA = Schema( ('components', Array( ('entity_type', String('utf-8')), @@ -969,7 +933,6 @@ class AlterPartitionReassignmentsRequest_v0(Request): FLEXIBLE_VERSION = True API_KEY = 45 API_VERSION = 0 - RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0 SCHEMA = Schema( ("timeout_ms", Int32), ("topics", CompactArray( @@ -1017,7 +980,6 @@ class ListPartitionReassignmentsRequest_v0(Request): FLEXIBLE_VERSION = True API_KEY = 46 API_VERSION = 0 - RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0 SCHEMA = Schema( ("timeout_ms", Int32), ("topics", CompactArray( @@ -1054,7 +1016,6 @@ class ElectLeadersResponse_v0(Response): class ElectLeadersRequest_v0(Request): API_KEY = 43 API_VERSION = 1 - RESPONSE_TYPE = ElectLeadersResponse_v0 SCHEMA = Schema( ('election_type', Int8), ('topic_partitions', Array( @@ -1085,7 +1046,6 @@ class ElectLeadersResponse_v1(Response): class ElectLeadersRequest_v1(Request): API_KEY = 43 API_VERSION = 1 - RESPONSE_TYPE = ElectLeadersResponse_v1 SCHEMA = Schema( ('election_type', Int8), ('topic_partitions', Array( diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 0f571bfad..60e6c6b6d 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -109,11 +109,6 @@ def __eq__(self, other): class Request(RequestResponse): - @abc.abstractproperty - def RESPONSE_TYPE(self): - """The Response class associated with the api request""" - pass - @classmethod def is_request(cls): return True diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py index 0ec02fd9c..c6ef407cb 100644 --- a/kafka/protocol/api_versions.py +++ b/kafka/protocol/api_versions.py @@ -88,28 +88,24 @@ class ApiVersionsResponse_v4(BaseApiVersionsResponse): class ApiVersionsRequest_v0(Request): API_KEY = 18 API_VERSION = 0 - RESPONSE_TYPE = ApiVersionsResponse_v0 SCHEMA = Schema() class ApiVersionsRequest_v1(Request): API_KEY = 18 API_VERSION = 1 - RESPONSE_TYPE = ApiVersionsResponse_v1 SCHEMA = ApiVersionsRequest_v0.SCHEMA class ApiVersionsRequest_v2(Request): API_KEY = 18 API_VERSION = 2 - RESPONSE_TYPE = ApiVersionsResponse_v2 SCHEMA = ApiVersionsRequest_v1.SCHEMA class ApiVersionsRequest_v3(Request): API_KEY = 18 API_VERSION = 3 - RESPONSE_TYPE = ApiVersionsResponse_v3 SCHEMA = Schema( ('client_software_name', CompactString('utf-8')), ('client_software_version', CompactString('utf-8')), @@ -121,7 +117,6 @@ class ApiVersionsRequest_v3(Request): class ApiVersionsRequest_v4(Request): API_KEY = 18 API_VERSION = 4 - RESPONSE_TYPE = ApiVersionsResponse_v4 SCHEMA = ApiVersionsRequest_v3.SCHEMA FLEXIBLE_VERSION = True diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 74141184b..beffe1d52 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -66,7 +66,6 @@ class OffsetCommitResponse_v7(Response): class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage - RESPONSE_TYPE = OffsetCommitResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -81,7 +80,6 @@ class OffsetCommitRequest_v0(Request): class OffsetCommitRequest_v1(Request): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage - RESPONSE_TYPE = OffsetCommitResponse_v1 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -99,7 +97,6 @@ class OffsetCommitRequest_v1(Request): class OffsetCommitRequest_v2(Request): API_KEY = 8 API_VERSION = 2 - RESPONSE_TYPE = OffsetCommitResponse_v2 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -118,7 +115,6 @@ class OffsetCommitRequest_v2(Request): class OffsetCommitRequest_v3(Request): API_KEY = 8 API_VERSION = 3 - RESPONSE_TYPE = OffsetCommitResponse_v3 SCHEMA = OffsetCommitRequest_v2.SCHEMA DEFAULT_RETENTION_TIME = -1 @@ -126,7 +122,6 @@ class OffsetCommitRequest_v3(Request): class OffsetCommitRequest_v4(Request): API_KEY = 8 API_VERSION = 4 - RESPONSE_TYPE = OffsetCommitResponse_v4 SCHEMA = OffsetCommitRequest_v3.SCHEMA DEFAULT_RETENTION_TIME = -1 @@ -134,7 +129,6 @@ class OffsetCommitRequest_v4(Request): class OffsetCommitRequest_v5(Request): API_KEY = 8 API_VERSION = 5 # drops retention_time - RESPONSE_TYPE = OffsetCommitResponse_v5 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -151,7 +145,6 @@ class OffsetCommitRequest_v5(Request): class OffsetCommitRequest_v6(Request): API_KEY = 8 API_VERSION = 6 - RESPONSE_TYPE = OffsetCommitResponse_v6 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -169,7 +162,6 @@ class OffsetCommitRequest_v6(Request): class OffsetCommitRequest_v7(Request): API_KEY = 8 API_VERSION = 7 - RESPONSE_TYPE = OffsetCommitResponse_v7 SCHEMA = Schema( ('group_id', String('utf-8')), ('generation_id', Int32), @@ -275,7 +267,6 @@ class OffsetFetchResponse_v5(Response): class OffsetFetchRequest_v0(Request): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage - RESPONSE_TYPE = OffsetFetchResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -287,7 +278,6 @@ class OffsetFetchRequest_v0(Request): class OffsetFetchRequest_v1(Request): API_KEY = 9 API_VERSION = 1 # kafka-backed storage - RESPONSE_TYPE = OffsetFetchResponse_v1 SCHEMA = OffsetFetchRequest_v0.SCHEMA @@ -297,28 +287,24 @@ class OffsetFetchRequest_v2(Request): # the group is currently consuming that partition. API_KEY = 9 API_VERSION = 2 - RESPONSE_TYPE = OffsetFetchResponse_v2 SCHEMA = OffsetFetchRequest_v1.SCHEMA class OffsetFetchRequest_v3(Request): API_KEY = 9 API_VERSION = 3 - RESPONSE_TYPE = OffsetFetchResponse_v3 SCHEMA = OffsetFetchRequest_v2.SCHEMA class OffsetFetchRequest_v4(Request): API_KEY = 9 API_VERSION = 4 - RESPONSE_TYPE = OffsetFetchResponse_v4 SCHEMA = OffsetFetchRequest_v3.SCHEMA class OffsetFetchRequest_v5(Request): API_KEY = 9 API_VERSION = 5 - RESPONSE_TYPE = OffsetFetchResponse_v5 SCHEMA = OffsetFetchRequest_v4.SCHEMA diff --git a/kafka/protocol/end_txn.py b/kafka/protocol/end_txn.py index 5a4089fa6..c355ed9c2 100644 --- a/kafka/protocol/end_txn.py +++ b/kafka/protocol/end_txn.py @@ -26,7 +26,6 @@ class EndTxnResponse_v2(Response): class EndTxnRequest_v0(Request): API_KEY = 26 API_VERSION = 0 - RESPONSE_TYPE = EndTxnResponse_v0 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('producer_id', Int64), @@ -37,14 +36,12 @@ class EndTxnRequest_v0(Request): class EndTxnRequest_v1(Request): API_KEY = 26 API_VERSION = 1 - RESPONSE_TYPE = EndTxnResponse_v1 SCHEMA = EndTxnRequest_v0.SCHEMA class EndTxnRequest_v2(Request): API_KEY = 26 API_VERSION = 2 - RESPONSE_TYPE = EndTxnResponse_v2 SCHEMA = EndTxnRequest_v1.SCHEMA diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index cc8652774..8834c8cd6 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -168,7 +168,6 @@ class FetchResponse_v11(Response): class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 - RESPONSE_TYPE = FetchResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -185,21 +184,18 @@ class FetchRequest_v0(Request): class FetchRequest_v1(Request): API_KEY = 1 API_VERSION = 1 - RESPONSE_TYPE = FetchResponse_v1 SCHEMA = FetchRequest_v0.SCHEMA class FetchRequest_v2(Request): API_KEY = 1 API_VERSION = 2 - RESPONSE_TYPE = FetchResponse_v2 SCHEMA = FetchRequest_v1.SCHEMA class FetchRequest_v3(Request): API_KEY = 1 API_VERSION = 3 - RESPONSE_TYPE = FetchResponse_v3 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -219,7 +215,6 @@ class FetchRequest_v4(Request): # Adds message format v2 API_KEY = 1 API_VERSION = 4 - RESPONSE_TYPE = FetchResponse_v4 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -239,7 +234,6 @@ class FetchRequest_v5(Request): # This may only be used in broker-broker api calls API_KEY = 1 API_VERSION = 5 - RESPONSE_TYPE = FetchResponse_v5 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -264,7 +258,6 @@ class FetchRequest_v6(Request): """ API_KEY = 1 API_VERSION = 6 - RESPONSE_TYPE = FetchResponse_v6 SCHEMA = FetchRequest_v5.SCHEMA @@ -274,7 +267,6 @@ class FetchRequest_v7(Request): """ API_KEY = 1 API_VERSION = 7 - RESPONSE_TYPE = FetchResponse_v7 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -303,7 +295,6 @@ class FetchRequest_v8(Request): """ API_KEY = 1 API_VERSION = 8 - RESPONSE_TYPE = FetchResponse_v8 SCHEMA = FetchRequest_v7.SCHEMA @@ -313,7 +304,6 @@ class FetchRequest_v9(Request): """ API_KEY = 1 API_VERSION = 9 - RESPONSE_TYPE = FetchResponse_v9 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -343,7 +333,6 @@ class FetchRequest_v10(Request): """ API_KEY = 1 API_VERSION = 10 - RESPONSE_TYPE = FetchResponse_v10 SCHEMA = FetchRequest_v9.SCHEMA @@ -353,7 +342,6 @@ class FetchRequest_v11(Request): """ API_KEY = 1 API_VERSION = 11 - RESPONSE_TYPE = FetchResponse_v11 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), diff --git a/kafka/protocol/find_coordinator.py b/kafka/protocol/find_coordinator.py index caad15bfd..fbb4b500d 100644 --- a/kafka/protocol/find_coordinator.py +++ b/kafka/protocol/find_coordinator.py @@ -35,7 +35,6 @@ class FindCoordinatorResponse_v2(Response): class FindCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 - RESPONSE_TYPE = FindCoordinatorResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')) ) @@ -44,7 +43,6 @@ class FindCoordinatorRequest_v0(Request): class FindCoordinatorRequest_v1(Request): API_KEY = 10 API_VERSION = 1 - RESPONSE_TYPE = FindCoordinatorResponse_v1 SCHEMA = Schema( ('coordinator_key', String('utf-8')), ('coordinator_type', Int8) # 0: consumer, 1: transaction @@ -54,7 +52,6 @@ class FindCoordinatorRequest_v1(Request): class FindCoordinatorRequest_v2(Request): API_KEY = 10 API_VERSION = 2 - RESPONSE_TYPE = FindCoordinatorResponse_v2 SCHEMA = FindCoordinatorRequest_v1.SCHEMA diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index a56bd48dc..1c40838b2 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -81,7 +81,6 @@ class JoinGroupResponse_v5(Response): class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 - RESPONSE_TYPE = JoinGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('session_timeout', Int32), @@ -96,7 +95,6 @@ class JoinGroupRequest_v0(Request): class JoinGroupRequest_v1(Request): API_KEY = 11 API_VERSION = 1 - RESPONSE_TYPE = JoinGroupResponse_v1 SCHEMA = Schema( ('group', String('utf-8')), ('session_timeout', Int32), @@ -112,28 +110,24 @@ class JoinGroupRequest_v1(Request): class JoinGroupRequest_v2(Request): API_KEY = 11 API_VERSION = 2 - RESPONSE_TYPE = JoinGroupResponse_v2 SCHEMA = JoinGroupRequest_v1.SCHEMA class JoinGroupRequest_v3(Request): API_KEY = 11 API_VERSION = 3 - RESPONSE_TYPE = JoinGroupResponse_v3 SCHEMA = JoinGroupRequest_v2.SCHEMA class JoinGroupRequest_v4(Request): API_KEY = 11 API_VERSION = 4 - RESPONSE_TYPE = JoinGroupResponse_v4 SCHEMA = JoinGroupRequest_v3.SCHEMA class JoinGroupRequest_v5(Request): API_KEY = 11 API_VERSION = 5 - RESPONSE_TYPE = JoinGroupResponse_v5 SCHEMA = Schema( ('group', String('utf-8')), ('session_timeout', Int32), @@ -201,7 +195,6 @@ class SyncGroupResponse_v3(Response): class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 - RESPONSE_TYPE = SyncGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -215,21 +208,18 @@ class SyncGroupRequest_v0(Request): class SyncGroupRequest_v1(Request): API_KEY = 14 API_VERSION = 1 - RESPONSE_TYPE = SyncGroupResponse_v1 SCHEMA = SyncGroupRequest_v0.SCHEMA class SyncGroupRequest_v2(Request): API_KEY = 14 API_VERSION = 2 - RESPONSE_TYPE = SyncGroupResponse_v2 SCHEMA = SyncGroupRequest_v1.SCHEMA class SyncGroupRequest_v3(Request): API_KEY = 14 API_VERSION = 3 - RESPONSE_TYPE = SyncGroupResponse_v3 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -294,7 +284,6 @@ class HeartbeatResponse_v3(Response): class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 - RESPONSE_TYPE = HeartbeatResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -305,21 +294,18 @@ class HeartbeatRequest_v0(Request): class HeartbeatRequest_v1(Request): API_KEY = 12 API_VERSION = 1 - RESPONSE_TYPE = HeartbeatResponse_v1 SCHEMA = HeartbeatRequest_v0.SCHEMA class HeartbeatRequest_v2(Request): API_KEY = 12 API_VERSION = 2 - RESPONSE_TYPE = HeartbeatResponse_v2 SCHEMA = HeartbeatRequest_v1.SCHEMA class HeartbeatRequest_v3(Request): API_KEY = 12 API_VERSION = 3 - RESPONSE_TYPE = HeartbeatResponse_v3 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -377,7 +363,6 @@ class LeaveGroupResponse_v3(Response): class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 - RESPONSE_TYPE = LeaveGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('member_id', String('utf-8')) @@ -387,21 +372,18 @@ class LeaveGroupRequest_v0(Request): class LeaveGroupRequest_v1(Request): API_KEY = 13 API_VERSION = 1 - RESPONSE_TYPE = LeaveGroupResponse_v1 SCHEMA = LeaveGroupRequest_v0.SCHEMA class LeaveGroupRequest_v2(Request): API_KEY = 13 API_VERSION = 2 - RESPONSE_TYPE = LeaveGroupResponse_v2 SCHEMA = LeaveGroupRequest_v1.SCHEMA class LeaveGroupRequest_v3(Request): API_KEY = 13 API_VERSION = 3 - RESPONSE_TYPE = LeaveGroupResponse_v3 SCHEMA = Schema( ('group', String('utf-8')), ('members', Array( diff --git a/kafka/protocol/init_producer_id.py b/kafka/protocol/init_producer_id.py index 102ea4770..c9d0a2d39 100644 --- a/kafka/protocol/init_producer_id.py +++ b/kafka/protocol/init_producer_id.py @@ -22,7 +22,6 @@ class InitProducerIdResponse_v1(Response): class InitProducerIdRequest_v0(Request): API_KEY = 22 API_VERSION = 0 - RESPONSE_TYPE = InitProducerIdResponse_v0 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('transaction_timeout_ms', Int32), @@ -32,7 +31,6 @@ class InitProducerIdRequest_v0(Request): class InitProducerIdRequest_v1(Request): API_KEY = 22 API_VERSION = 1 - RESPONSE_TYPE = InitProducerIdResponse_v1 SCHEMA = InitProducerIdRequest_v0.SCHEMA diff --git a/kafka/protocol/list_offsets.py b/kafka/protocol/list_offsets.py index 99f85f12e..bd81b7f14 100644 --- a/kafka/protocol/list_offsets.py +++ b/kafka/protocol/list_offsets.py @@ -91,7 +91,6 @@ class ListOffsetsResponse_v5(Response): class ListOffsetsRequest_v0(Request): API_KEY = 2 API_VERSION = 0 - RESPONSE_TYPE = ListOffsetsResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -108,7 +107,6 @@ class ListOffsetsRequest_v0(Request): class ListOffsetsRequest_v1(Request): API_KEY = 2 API_VERSION = 1 - RESPONSE_TYPE = ListOffsetsResponse_v1 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -125,7 +123,6 @@ class ListOffsetsRequest_v1(Request): class ListOffsetsRequest_v2(Request): API_KEY = 2 API_VERSION = 2 - RESPONSE_TYPE = ListOffsetsResponse_v2 SCHEMA = Schema( ('replica_id', Int32), ('isolation_level', Int8), # <- added isolation_level @@ -143,7 +140,6 @@ class ListOffsetsRequest_v2(Request): class ListOffsetsRequest_v3(Request): API_KEY = 2 API_VERSION = 3 - RESPONSE_TYPE = ListOffsetsResponse_v3 SCHEMA = ListOffsetsRequest_v2.SCHEMA DEFAULTS = { 'replica_id': -1 @@ -156,7 +152,6 @@ class ListOffsetsRequest_v4(Request): """ API_KEY = 2 API_VERSION = 4 - RESPONSE_TYPE = ListOffsetsResponse_v4 SCHEMA = Schema( ('replica_id', Int32), ('isolation_level', Int8), # <- added isolation_level @@ -175,7 +170,6 @@ class ListOffsetsRequest_v4(Request): class ListOffsetsRequest_v5(Request): API_KEY = 2 API_VERSION = 5 - RESPONSE_TYPE = ListOffsetsResponse_v5 SCHEMA = ListOffsetsRequest_v4.SCHEMA DEFAULTS = { 'replica_id': -1 diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 715b48595..f3d588133 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -195,7 +195,6 @@ class MetadataResponse_v8(Response): class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 - RESPONSE_TYPE = MetadataResponse_v0 SCHEMA = Schema( ('topics', Array(String('utf-8'))) ) @@ -206,7 +205,6 @@ class MetadataRequest_v0(Request): class MetadataRequest_v1(Request): API_KEY = 3 API_VERSION = 1 - RESPONSE_TYPE = MetadataResponse_v1 SCHEMA = MetadataRequest_v0.SCHEMA ALL_TOPICS = None # Null Array (len -1) for topics returns all topics NO_TOPICS = [] # Empty array (len 0) for topics returns no topics @@ -215,7 +213,6 @@ class MetadataRequest_v1(Request): class MetadataRequest_v2(Request): API_KEY = 3 API_VERSION = 2 - RESPONSE_TYPE = MetadataResponse_v2 SCHEMA = MetadataRequest_v1.SCHEMA ALL_TOPICS = None NO_TOPICS = [] @@ -224,7 +221,6 @@ class MetadataRequest_v2(Request): class MetadataRequest_v3(Request): API_KEY = 3 API_VERSION = 3 - RESPONSE_TYPE = MetadataResponse_v3 SCHEMA = MetadataRequest_v1.SCHEMA ALL_TOPICS = None NO_TOPICS = [] @@ -233,7 +229,6 @@ class MetadataRequest_v3(Request): class MetadataRequest_v4(Request): API_KEY = 3 API_VERSION = 4 - RESPONSE_TYPE = MetadataResponse_v4 SCHEMA = Schema( ('topics', Array(String('utf-8'))), ('allow_auto_topic_creation', Boolean) @@ -249,7 +244,6 @@ class MetadataRequest_v5(Request): """ API_KEY = 3 API_VERSION = 5 - RESPONSE_TYPE = MetadataResponse_v5 SCHEMA = MetadataRequest_v4.SCHEMA ALL_TOPICS = None NO_TOPICS = [] @@ -258,7 +252,6 @@ class MetadataRequest_v5(Request): class MetadataRequest_v6(Request): API_KEY = 3 API_VERSION = 6 - RESPONSE_TYPE = MetadataResponse_v6 SCHEMA = MetadataRequest_v5.SCHEMA ALL_TOPICS = None NO_TOPICS = [] @@ -267,7 +260,6 @@ class MetadataRequest_v6(Request): class MetadataRequest_v7(Request): API_KEY = 3 API_VERSION = 7 - RESPONSE_TYPE = MetadataResponse_v7 SCHEMA = MetadataRequest_v6.SCHEMA ALL_TOPICS = None NO_TOPICS = [] @@ -276,7 +268,6 @@ class MetadataRequest_v7(Request): class MetadataRequest_v8(Request): API_KEY = 3 API_VERSION = 8 - RESPONSE_TYPE = MetadataResponse_v8 SCHEMA = Schema( ('topics', Array(String('utf-8'))), ('allow_auto_topic_creation', Boolean), diff --git a/kafka/protocol/offset_for_leader_epoch.py b/kafka/protocol/offset_for_leader_epoch.py index 64c9d1751..db7533d7c 100644 --- a/kafka/protocol/offset_for_leader_epoch.py +++ b/kafka/protocol/offset_for_leader_epoch.py @@ -67,7 +67,6 @@ class OffsetForLeaderEpochResponse_v4(Response): class OffsetForLeaderEpochRequest_v0(Request): API_KEY = 23 API_VERSION = 0 - RESPONSE_TYPE = OffsetForLeaderEpochResponse_v0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -79,14 +78,12 @@ class OffsetForLeaderEpochRequest_v0(Request): class OffsetForLeaderEpochRequest_v1(Request): API_KEY = 23 API_VERSION = 1 - RESPONSE_TYPE = OffsetForLeaderEpochResponse_v1 SCHEMA = OffsetForLeaderEpochRequest_v0.SCHEMA class OffsetForLeaderEpochRequest_v2(Request): API_KEY = 23 API_VERSION = 2 - RESPONSE_TYPE = OffsetForLeaderEpochResponse_v2 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -99,7 +96,6 @@ class OffsetForLeaderEpochRequest_v2(Request): class OffsetForLeaderEpochRequest_v3(Request): API_KEY = 23 API_VERSION = 3 - RESPONSE_TYPE = OffsetForLeaderEpochResponse_v3 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -113,7 +109,6 @@ class OffsetForLeaderEpochRequest_v3(Request): class OffsetForLeaderEpochRequest_v4(Request): API_KEY = 23 API_VERSION = 4 - RESPONSE_TYPE = OffsetForLeaderEpochResponse_v4 SCHEMA = Schema( ('replica_id', Int32), ('topics', CompactArray( diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 0ebe5ace7..8fad5b0c9 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -2,6 +2,7 @@ import logging import kafka.errors as Errors +from kafka.protocol import get_response_class from kafka.protocol.find_coordinator import FindCoordinatorResponse from kafka.protocol.frame import KafkaBytes from kafka.protocol.types import Int32 @@ -62,8 +63,7 @@ def send_request(self, request, correlation_id=None): data = request.encode(framed=True, header=True) self.bytes_to_send.append(data) if request.expect_response(): - ifr = (correlation_id, request) - self.in_flight_requests.append(ifr) + self.in_flight_requests.append(header) return correlation_id def send_bytes(self): @@ -136,8 +136,13 @@ def receive_bytes(self, data): def _process_response(self, read_buffer): if not self.in_flight_requests: raise Errors.CorrelationIdError('No in-flight-request found for server response') - (correlation_id, request) = self.in_flight_requests.popleft() - response_type = request.RESPONSE_TYPE + header = self.in_flight_requests.popleft() + correlation_id = header.correlation_id + response_type = get_response_class(header.api_key, header.api_version) + if response_type is None: + log.error('Unable to find ResponseType for api_key=%d api_version=%d', + header.api_key, header.api_version) + raise Errors.KafkaProtocolError('Unable to find response type for api %d v%d' % (header.api_key, header.api_version)) response_header = response_type.parse_header(read_buffer) recv_correlation_id = response_header.correlation_id # 0.8.2 quirk @@ -162,10 +167,10 @@ def _process_response(self, read_buffer): except ValueError: read_buffer.seek(0) buf = read_buffer.read() - log.error('Response %d [ResponseType: %s Request: %s]:' + log.error('Response %d [ResponseType: %s RequestHeader: %s]:' ' Unable to decode %d-byte buffer: %r', correlation_id, response_type, - request, len(buf), buf) + header, len(buf), buf) raise Errors.KafkaProtocolError('Unable to decode response') log.debug('%s Received response %d %s', self._ident, correlation_id, response) diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 4f07ca002..b13358f9d 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -132,7 +132,6 @@ def expect_response(self): class ProduceRequest_v0(ProduceRequest): API_VERSION = 0 - RESPONSE_TYPE = ProduceResponse_v0 SCHEMA = Schema( ('required_acks', Int16), ('timeout', Int32), @@ -146,20 +145,17 @@ class ProduceRequest_v0(ProduceRequest): class ProduceRequest_v1(ProduceRequest): API_VERSION = 1 - RESPONSE_TYPE = ProduceResponse_v1 SCHEMA = ProduceRequest_v0.SCHEMA class ProduceRequest_v2(ProduceRequest): API_VERSION = 2 - RESPONSE_TYPE = ProduceResponse_v2 SCHEMA = ProduceRequest_v1.SCHEMA class ProduceRequest_v3(ProduceRequest): # Adds support for message format v2 API_VERSION = 3 - RESPONSE_TYPE = ProduceResponse_v3 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('required_acks', Int16), @@ -178,7 +174,6 @@ class ProduceRequest_v4(ProduceRequest): The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3 """ API_VERSION = 4 - RESPONSE_TYPE = ProduceResponse_v4 SCHEMA = ProduceRequest_v3.SCHEMA @@ -188,7 +183,6 @@ class ProduceRequest_v5(ProduceRequest): partition level field: the log_start_offset. """ API_VERSION = 5 - RESPONSE_TYPE = ProduceResponse_v5 SCHEMA = ProduceRequest_v4.SCHEMA @@ -197,7 +191,6 @@ class ProduceRequest_v6(ProduceRequest): The version number is bumped to indicate that on quota violation brokers send out responses before throttling. """ API_VERSION = 6 - RESPONSE_TYPE = ProduceResponse_v6 SCHEMA = ProduceRequest_v5.SCHEMA @@ -206,7 +199,6 @@ class ProduceRequest_v7(ProduceRequest): V7 bumped up to indicate ZStandard capability. (see KIP-110) """ API_VERSION = 7 - RESPONSE_TYPE = ProduceResponse_v7 SCHEMA = ProduceRequest_v6.SCHEMA @@ -216,7 +208,6 @@ class ProduceRequest_v8(ProduceRequest): (See KIP-467) """ API_VERSION = 8 - RESPONSE_TYPE = ProduceResponse_v8 SCHEMA = ProduceRequest_v7.SCHEMA diff --git a/kafka/protocol/sasl_authenticate.py b/kafka/protocol/sasl_authenticate.py index 0be8b54a4..588038b5b 100644 --- a/kafka/protocol/sasl_authenticate.py +++ b/kafka/protocol/sasl_authenticate.py @@ -24,7 +24,6 @@ class SaslAuthenticateResponse_v1(Response): class SaslAuthenticateRequest_v0(Request): API_KEY = 36 API_VERSION = 0 - RESPONSE_TYPE = SaslAuthenticateResponse_v0 SCHEMA = Schema( ('auth_bytes', Bytes)) @@ -32,7 +31,6 @@ class SaslAuthenticateRequest_v0(Request): class SaslAuthenticateRequest_v1(Request): API_KEY = 36 API_VERSION = 1 - RESPONSE_TYPE = SaslAuthenticateResponse_v1 SCHEMA = SaslAuthenticateRequest_v0.SCHEMA diff --git a/kafka/protocol/sasl_handshake.py b/kafka/protocol/sasl_handshake.py index fa792cf67..cd7f93e70 100644 --- a/kafka/protocol/sasl_handshake.py +++ b/kafka/protocol/sasl_handshake.py @@ -20,7 +20,6 @@ class SaslHandshakeResponse_v1(Response): class SaslHandshakeRequest_v0(Request): API_KEY = 17 API_VERSION = 0 - RESPONSE_TYPE = SaslHandshakeResponse_v0 SCHEMA = Schema( ('mechanism', String('utf-8')) ) @@ -29,7 +28,6 @@ class SaslHandshakeRequest_v0(Request): class SaslHandshakeRequest_v1(Request): API_KEY = 17 API_VERSION = 1 - RESPONSE_TYPE = SaslHandshakeResponse_v1 SCHEMA = SaslHandshakeRequest_v0.SCHEMA diff --git a/kafka/protocol/txn_offset_commit.py b/kafka/protocol/txn_offset_commit.py index cea96e9f8..fc9998779 100644 --- a/kafka/protocol/txn_offset_commit.py +++ b/kafka/protocol/txn_offset_commit.py @@ -29,7 +29,6 @@ class TxnOffsetCommitResponse_v2(Response): class TxnOffsetCommitRequest_v0(Request): API_KEY = 28 API_VERSION = 0 - RESPONSE_TYPE = TxnOffsetCommitResponse_v0 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('group_id', String('utf-8')), @@ -46,14 +45,12 @@ class TxnOffsetCommitRequest_v0(Request): class TxnOffsetCommitRequest_v1(Request): API_KEY = 28 API_VERSION = 1 - RESPONSE_TYPE = TxnOffsetCommitResponse_v1 SCHEMA = TxnOffsetCommitRequest_v0.SCHEMA class TxnOffsetCommitRequest_v2(Request): API_KEY = 28 API_VERSION = 2 - RESPONSE_TYPE = TxnOffsetCommitResponse_v2 SCHEMA = Schema( ('transactional_id', String('utf-8')), ('group_id', String('utf-8')), diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py index f1d584091..bd2c51b47 100644 --- a/test/test_object_conversion.py +++ b/test/test_object_conversion.py @@ -13,7 +13,6 @@ def test_get_item(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myobject', Int16)) @@ -26,7 +25,6 @@ def test_with_empty_schema(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema() tc = TestClass() @@ -37,7 +35,6 @@ def test_with_basic_schema(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myobject', Int16)) @@ -49,7 +46,6 @@ def test_with_basic_array_schema(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myarray', Array(Int16))) @@ -61,7 +57,6 @@ def test_with_complex_array_schema(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myarray', Array( ('subobject', Int16), @@ -80,7 +75,6 @@ def test_with_array_and_other(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myarray', Array( ('subobject', Int16), @@ -102,7 +96,6 @@ def test_with_nested_array(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myarray', Array( ('subarray', Array(Int16)), @@ -128,7 +121,6 @@ def test_with_complex_nested_array(self, superclass): class TestClass(superclass): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC SCHEMA = Schema( ('myarray', Array( ('subarray', Array( From ce4c150b80225c23cc56e54e273dc1cff2846e4f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2026 17:26:55 -0700 Subject: [PATCH 3/3] Avoid false positive security issue in api error log; fixup header ifr --- kafka/protocol/parser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 8fad5b0c9..847a47dc0 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -63,7 +63,7 @@ def send_request(self, request, correlation_id=None): data = request.encode(framed=True, header=True) self.bytes_to_send.append(data) if request.expect_response(): - self.in_flight_requests.append(header) + self.in_flight_requests.append(request.header) return correlation_id def send_bytes(self): @@ -140,7 +140,7 @@ def _process_response(self, read_buffer): correlation_id = header.correlation_id response_type = get_response_class(header.api_key, header.api_version) if response_type is None: - log.error('Unable to find ResponseType for api_key=%d api_version=%d', + log.error('Unable to find ResponseType for api=%d version=%d', header.api_key, header.api_version) raise Errors.KafkaProtocolError('Unable to find response type for api %d v%d' % (header.api_key, header.api_version)) response_header = response_type.parse_header(read_buffer)