Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
from . import (
produce, fetch, list_offsets, metadata,
commit, find_coordinator, group,
sasl_handshake, api_versions, admin,
init_producer_id, offset_for_leader_epoch,
add_partitions_to_txn, add_offsets_to_txn, end_txn,
txn_offset_commit, sasl_authenticate,
)

API_KEYS = {
0: 'Produce',
1: 'Fetch',
Expand Down Expand Up @@ -44,3 +53,47 @@
46: 'ListPartitionReassignments',
48: 'DescribeClientQuotas',
}

# Mapping of Api_key to a tuple of (request_classes, response_classes)
REQUEST_TYPES = {
0: (produce.ProduceRequest, produce.ProduceResponse),
1: (fetch.FetchRequest, fetch.FetchResponse),
2: (list_offsets.ListOffsetsRequest, list_offsets.ListOffsetsResponse),
3: (metadata.MetadataRequest, metadata.MetadataResponse),
8: (commit.OffsetCommitRequest, commit.OffsetCommitResponse),
9: (commit.OffsetFetchRequest, commit.OffsetFetchResponse),
10: (find_coordinator.FindCoordinatorRequest, find_coordinator.FindCoordinatorResponse),
11: (group.JoinGroupRequest, group.JoinGroupResponse),
12: (group.HeartbeatRequest, group.HeartbeatResponse),
13: (group.LeaveGroupRequest, group.LeaveGroupResponse),
14: (group.SyncGroupRequest, group.SyncGroupResponse),
15: (admin.DescribeGroupsRequest, admin.DescribeGroupsResponse),
16: (admin.ListGroupsRequest, admin.ListGroupsResponse),
17: (sasl_handshake.SaslHandshakeRequest, sasl_handshake.SaslHandshakeResponse),
18: (api_versions.ApiVersionsRequest, api_versions.ApiVersionsResponse),
19: (admin.CreateTopicsRequest, admin.CreateTopicsResponse),
20: (admin.DeleteTopicsRequest, admin.DeleteTopicsResponse),
21: (admin.DeleteRecordsRequest, admin.DeleteRecordsResponse),
22: (init_producer_id.InitProducerIdRequest, init_producer_id.InitProducerIdResponse),
23: (offset_for_leader_epoch.OffsetForLeaderEpochRequest, offset_for_leader_epoch.OffsetForLeaderEpochResponse),
24: (add_partitions_to_txn.AddPartitionsToTxnRequest, add_partitions_to_txn.AddPartitionsToTxnResponse),
25: (add_offsets_to_txn.AddOffsetsToTxnRequest, add_offsets_to_txn.AddOffsetsToTxnResponse),
26: (end_txn.EndTxnRequest, end_txn.EndTxnResponse),
28: (txn_offset_commit.TxnOffsetCommitRequest, txn_offset_commit.TxnOffsetCommitResponse),
29: (admin.DescribeAclsRequest, admin.DescribeAclsResponse),
30: (admin.CreateAclsRequest, admin.CreateAclsResponse),
31: (admin.DeleteAclsRequest, admin.DeleteAclsResponse),
32: (admin.DescribeConfigsRequest, admin.DescribeConfigsResponse),
33: (admin.AlterConfigsRequest, admin.AlterConfigsResponse),
36: (sasl_authenticate.SaslAuthenticateRequest, sasl_authenticate.SaslAuthenticateResponse),
37: (admin.CreatePartitionsRequest, admin.CreatePartitionsResponse),
42: (admin.DeleteGroupsRequest, admin.DeleteGroupsResponse)
}

def get_response_class(api_key, api_version):
request_type, response_type = REQUEST_TYPES.get(api_key, (None, None))
if response_type:
if hasattr(response_type, '__getitem__'):
return response_type[api_version]
return response_type
return None
3 changes: 0 additions & 3 deletions kafka/protocol/add_offsets_to_txn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class AddOffsetsToTxnResponse_v2(Response):
class AddOffsetsToTxnRequest_v0(Request):
API_KEY = 25
API_VERSION = 0
RESPONSE_TYPE = AddOffsetsToTxnResponse_v0
SCHEMA = Schema(
('transactional_id', String('utf-8')),
('producer_id', Int64),
Expand All @@ -38,14 +37,12 @@ class AddOffsetsToTxnRequest_v0(Request):
class AddOffsetsToTxnRequest_v1(Request):
API_KEY = 25
API_VERSION = 1
RESPONSE_TYPE = AddOffsetsToTxnResponse_v1
SCHEMA = AddOffsetsToTxnRequest_v0.SCHEMA


class AddOffsetsToTxnRequest_v2(Request):
API_KEY = 25
API_VERSION = 2
RESPONSE_TYPE = AddOffsetsToTxnResponse_v2
SCHEMA = AddOffsetsToTxnRequest_v1.SCHEMA


Expand Down
3 changes: 0 additions & 3 deletions kafka/protocol/add_partitions_to_txn.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class AddPartitionsToTxnResponse_v2(Response):
class AddPartitionsToTxnRequest_v0(Request):
API_KEY = 24
API_VERSION = 0
RESPONSE_TYPE = AddPartitionsToTxnResponse_v0
SCHEMA = Schema(
('transactional_id', String('utf-8')),
('producer_id', Int64),
Expand All @@ -42,14 +41,12 @@ class AddPartitionsToTxnRequest_v0(Request):
class AddPartitionsToTxnRequest_v1(Request):
API_KEY = 24
API_VERSION = 1
RESPONSE_TYPE = AddPartitionsToTxnResponse_v1
SCHEMA = AddPartitionsToTxnRequest_v0.SCHEMA


class AddPartitionsToTxnRequest_v2(Request):
API_KEY = 24
API_VERSION = 2
RESPONSE_TYPE = AddPartitionsToTxnResponse_v2
SCHEMA = AddPartitionsToTxnRequest_v1.SCHEMA


Expand Down
Loading
Loading