From 9d4cf51bb6ab2048710cd15709cb893da56bdbb9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 10:56:24 -0700 Subject: [PATCH 01/11] kafka.protocol.struct ALIASES --- kafka/protocol/struct.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 2de6af170..aeb9bbb69 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -14,6 +14,17 @@ def SCHEMA(self): """An instance of Schema() representing the structure""" pass + ALIASES = {} # for compatibility with new protocol defs from json + def __getattr__(self, name): + if name in self.ALIASES: + return getattr(self, self.ALIASES[name]) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") + + def __setattr__(self, name, value): + if name in self.ALIASES: + name = self.ALIASES[name] + return super().__setattr__(name, value) + def __init__(self, *args, **kwargs): if self.SCHEMA.has_tagged_fields(): # Dont require TaggedFields value in *args @@ -30,6 +41,9 @@ def __init__(self, *args, **kwargs): if self.SCHEMA.has_tagged_fields(): if kwargs.get('tags') is None: kwargs['tags'] = {} + for name in self.ALIASES: + if name in kwargs: + kwargs[self.ALIASES[name]] = kwargs.pop(name) for name in self.SCHEMA.names: setattr(self, name, kwargs.pop(name, None)) if kwargs: From b949e98f0f149cd8bf570051b33d2ed9897c8c00 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 11:28:10 -0700 Subject: [PATCH 02/11] old proto ApiVersionsResponse ALIASES --- kafka/protocol/api_versions.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py index b7571e1cd..9fe295ba1 100644 --- a/kafka/protocol/api_versions.py +++ b/kafka/protocol/api_versions.py @@ -9,11 +9,12 @@ class BaseApiVersionsResponse(Response): API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), - ('api_versions', Array( + ('api_keys', Array( ('api_key', Int16), ('min_version', Int16), ('max_version', Int16))) ) + ALIASES = {'api_versions': 'api_keys'} @classmethod def decode(cls, data, header=False, framed=False): @@ -37,11 +38,12 @@ class ApiVersionsResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), - ('api_versions', Array( + ('api_keys', Array( ('api_key', Int16), ('min_version', Int16), ('max_version', Int16))) ) + ALIASES = {'api_versions': 'api_keys'} class ApiVersionsResponse_v1(BaseApiVersionsResponse): @@ -49,7 +51,7 @@ class ApiVersionsResponse_v1(BaseApiVersionsResponse): API_VERSION = 1 SCHEMA = Schema( ('error_code', Int16), - ('api_versions', Array( + ('api_keys', Array( ('api_key', Int16), ('min_version', Int16), ('max_version', Int16))), @@ -68,7 +70,7 @@ class ApiVersionsResponse_v3(BaseApiVersionsResponse): API_VERSION = 3 SCHEMA = Schema( ('error_code', Int16), - ('api_versions', CompactArray( + ('api_keys', CompactArray( ('api_key', Int16), ('min_version', Int16), ('max_version', Int16), From 5fe58ebc02201fc4db3be550324885a2f2fcffd0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 11:29:22 -0700 Subject: [PATCH 03/11] old proto SaslHandshakeResponse ALIASES --- kafka/protocol/sasl_handshake.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/protocol/sasl_handshake.py b/kafka/protocol/sasl_handshake.py index cd7f93e70..2c68f1b8c 100644 --- a/kafka/protocol/sasl_handshake.py +++ b/kafka/protocol/sasl_handshake.py @@ -7,14 +7,18 @@ class SaslHandshakeResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), - ('enabled_mechanisms', Array(String('utf-8'))) + ('mechanisms', Array(String('utf-8'))) ) + ALIASES = { + 'enabled_mechanisms': 'mechanisms', + } class SaslHandshakeResponse_v1(Response): API_KEY = 17 API_VERSION = 1 SCHEMA = SaslHandshakeResponse_v0.SCHEMA + ALIASES = SaslHandshakeResponse_v0.ALIASES class SaslHandshakeRequest_v0(Request): From 356cecff97f26a04de88cb3554c9d6b66bb66b66 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 11:30:42 -0700 Subject: [PATCH 04/11] old proto Fetch ALIASES --- kafka/protocol/fetch.py | 114 +++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 43 deletions(-) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 8834c8cd6..d4f1626fc 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -12,14 +12,17 @@ class FetchResponse_v0(Response): API_KEY = 1 API_VERSION = 0 SCHEMA = Schema( - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('records', Bytes))))) ) + ALIASES = { + 'topics': 'responses', + } class FetchResponse_v1(Response): @@ -27,26 +30,29 @@ class FetchResponse_v1(Response): API_VERSION = 1 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('records', Bytes))))) ) + ALIASES = FetchResponse_v0.ALIASES class FetchResponse_v2(Response): API_KEY = 1 API_VERSION = 2 SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally + ALIASES = FetchResponse_v1.ALIASES class FetchResponse_v3(Response): API_KEY = 1 API_VERSION = 3 SCHEMA = FetchResponse_v2.SCHEMA + ALIASES = FetchResponse_v2.ALIASES class FetchResponse_v4(Response): @@ -55,18 +61,19 @@ class FetchResponse_v4(Response): API_VERSION = 4 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('last_stable_offset', Int64), ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), ('records', Bytes))))) ) + ALIASES = FetchResponse_v3.ALIASES class FetchResponse_v5(Response): @@ -74,12 +81,12 @@ class FetchResponse_v5(Response): API_VERSION = 5 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('last_stable_offset', Int64), ('log_start_offset', Int64), ('aborted_transactions', Array( @@ -87,6 +94,7 @@ class FetchResponse_v5(Response): ('first_offset', Int64))), ('records', Bytes))))) ) + ALIASES = FetchResponse_v4.ALIASES class FetchResponse_v6(Response): @@ -97,6 +105,7 @@ class FetchResponse_v6(Response): API_KEY = 1 API_VERSION = 6 SCHEMA = FetchResponse_v5.SCHEMA + ALIASES = FetchResponse_v5.ALIASES class FetchResponse_v7(Response): @@ -109,12 +118,12 @@ class FetchResponse_v7(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('session_id', Int32), - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('last_stable_offset', Int64), ('log_start_offset', Int64), ('aborted_transactions', Array( @@ -122,24 +131,28 @@ class FetchResponse_v7(Response): ('first_offset', Int64))), ('records', Bytes))))) ) + ALIASES = FetchResponse_v6.ALIASES class FetchResponse_v8(Response): API_KEY = 1 API_VERSION = 8 SCHEMA = FetchResponse_v7.SCHEMA + ALIASES = FetchResponse_v7.ALIASES class FetchResponse_v9(Response): API_KEY = 1 API_VERSION = 9 - SCHEMA = FetchResponse_v7.SCHEMA + SCHEMA = FetchResponse_v8.SCHEMA + ALIASES = FetchResponse_v8.ALIASES class FetchResponse_v10(Response): API_KEY = 1 API_VERSION = 10 - SCHEMA = FetchResponse_v7.SCHEMA + SCHEMA = FetchResponse_v9.SCHEMA + ALIASES = FetchResponse_v9.ALIASES class FetchResponse_v11(Response): @@ -149,12 +162,12 @@ class FetchResponse_v11(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('session_id', Int32), - ('topics', Array( - ('topics', String('utf-8')), + ('responses', Array( + ('topic', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('highwater_offset', Int64), + ('high_watermark', Int64), ('last_stable_offset', Int64), ('log_start_offset', Int64), ('aborted_transactions', Array( @@ -163,6 +176,7 @@ class FetchResponse_v11(Response): ('preferred_read_replica', Int32), ('records', Bytes))))) ) + ALIASES = FetchResponse_v10.ALIASES class FetchRequest_v0(Request): @@ -170,27 +184,32 @@ class FetchRequest_v0(Request): API_VERSION = 0 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('offset', Int64), - ('max_bytes', Int32))))) + ('fetch_offset', Int64), + ('partition_max_bytes', Int32))))) ) + ALIASES = { + 'max_wait_time': 'max_wait_ms', + } class FetchRequest_v1(Request): API_KEY = 1 API_VERSION = 1 SCHEMA = FetchRequest_v0.SCHEMA + ALIASES = FetchRequest_v0.ALIASES class FetchRequest_v2(Request): API_KEY = 1 API_VERSION = 2 SCHEMA = FetchRequest_v1.SCHEMA + ALIASES = FetchRequest_v1.ALIASES class FetchRequest_v3(Request): @@ -198,16 +217,17 @@ class FetchRequest_v3(Request): API_VERSION = 3 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), # This new field is only difference from FR_v2 ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('offset', Int64), - ('max_bytes', Int32))))) + ('fetch_offset', Int64), + ('partition_max_bytes', Int32))))) ) + ALIASES = FetchRequest_v2.ALIASES class FetchRequest_v4(Request): @@ -217,7 +237,7 @@ class FetchRequest_v4(Request): API_VERSION = 4 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), ('isolation_level', Int8), @@ -225,9 +245,10 @@ class FetchRequest_v4(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('offset', Int64), - ('max_bytes', Int32))))) + ('fetch_offset', Int64), + ('partition_max_bytes', Int32))))) ) + ALIASES = FetchRequest_v3.ALIASES class FetchRequest_v5(Request): @@ -236,7 +257,7 @@ class FetchRequest_v5(Request): API_VERSION = 5 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), ('isolation_level', Int8), @@ -246,8 +267,9 @@ class FetchRequest_v5(Request): ('partition', Int32), ('fetch_offset', Int64), ('log_start_offset', Int64), - ('max_bytes', Int32))))) + ('partition_max_bytes', Int32))))) ) + ALIASES = FetchRequest_v4.ALIASES class FetchRequest_v6(Request): @@ -259,6 +281,7 @@ class FetchRequest_v6(Request): API_KEY = 1 API_VERSION = 6 SCHEMA = FetchRequest_v5.SCHEMA + ALIASES = FetchRequest_v5.ALIASES class FetchRequest_v7(Request): @@ -269,7 +292,7 @@ class FetchRequest_v7(Request): API_VERSION = 7 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), ('isolation_level', Int8), @@ -281,12 +304,13 @@ class FetchRequest_v7(Request): ('partition', Int32), ('fetch_offset', Int64), ('log_start_offset', Int64), - ('max_bytes', Int32))))), + ('partition_max_bytes', Int32))))), ('forgotten_topics_data', Array( ('topic', String('utf-8')), ('partitions', Array(Int32)) )), ) + ALIASES = FetchRequest_v6.ALIASES class FetchRequest_v8(Request): @@ -296,6 +320,7 @@ class FetchRequest_v8(Request): API_KEY = 1 API_VERSION = 8 SCHEMA = FetchRequest_v7.SCHEMA + ALIASES = FetchRequest_v7.ALIASES class FetchRequest_v9(Request): @@ -306,7 +331,7 @@ class FetchRequest_v9(Request): API_VERSION = 9 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), ('isolation_level', Int8), @@ -319,12 +344,13 @@ class FetchRequest_v9(Request): ('current_leader_epoch', Int32), ('fetch_offset', Int64), ('log_start_offset', Int64), - ('max_bytes', Int32))))), + ('partition_max_bytes', Int32))))), ('forgotten_topics_data', Array( ('topic', String('utf-8')), ('partitions', Array(Int32)), )), ) + ALIASES = FetchRequest_v8.ALIASES class FetchRequest_v10(Request): @@ -334,6 +360,7 @@ class FetchRequest_v10(Request): API_KEY = 1 API_VERSION = 10 SCHEMA = FetchRequest_v9.SCHEMA + ALIASES = FetchRequest_v9.ALIASES class FetchRequest_v11(Request): @@ -344,7 +371,7 @@ class FetchRequest_v11(Request): API_VERSION = 11 SCHEMA = Schema( ('replica_id', Int32), - ('max_wait_time', Int32), + ('max_wait_ms', Int32), ('min_bytes', Int32), ('max_bytes', Int32), ('isolation_level', Int8), @@ -357,13 +384,14 @@ class FetchRequest_v11(Request): ('current_leader_epoch', Int32), ('fetch_offset', Int64), ('log_start_offset', Int64), - ('max_bytes', Int32))))), + ('partition_max_bytes', Int32))))), ('forgotten_topics_data', Array( ('topic', String('utf-8')), ('partitions', Array(Int32)) )), ('rack_id', String('utf-8')), ) + ALIASES = FetchRequest_v10.ALIASES FetchRequest = [ From ccec57a6f630044b7c62cc091671a702dcd011a1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 17 Mar 2026 15:19:10 -0700 Subject: [PATCH 05/11] old proto ListOffsets internal field renames --- kafka/protocol/list_offsets.py | 40 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/kafka/protocol/list_offsets.py b/kafka/protocol/list_offsets.py index bd81b7f14..5afed6af2 100644 --- a/kafka/protocol/list_offsets.py +++ b/kafka/protocol/list_offsets.py @@ -15,11 +15,11 @@ class ListOffsetsResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), - ('offsets', Array(Int64)))))) + ('old_style_offsets', Array(Int64)))))) ) class ListOffsetsResponse_v1(Response): @@ -27,9 +27,9 @@ class ListOffsetsResponse_v1(Response): API_VERSION = 1 SCHEMA = Schema( ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), ('timestamp', Int64), ('offset', Int64))))) @@ -42,9 +42,9 @@ class ListOffsetsResponse_v2(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), ('timestamp', Int64), ('offset', Int64))))) @@ -69,9 +69,9 @@ class ListOffsetsResponse_v4(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16), ('timestamp', Int64), ('offset', Int64), @@ -94,11 +94,11 @@ class ListOffsetsRequest_v0(Request): SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('timestamp', Int64), - ('max_offsets', Int32))))) + ('max_num_offsets', Int32))))) ) DEFAULTS = { 'replica_id': -1 @@ -110,9 +110,9 @@ class ListOffsetsRequest_v1(Request): SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('timestamp', Int64))))) ) DEFAULTS = { @@ -125,11 +125,11 @@ class ListOffsetsRequest_v2(Request): API_VERSION = 2 SCHEMA = Schema( ('replica_id', Int32), - ('isolation_level', Int8), # <- added isolation_level + ('isolation_level', Int8), # <- added isolation_level ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('timestamp', Int64))))) ) DEFAULTS = { @@ -154,11 +154,11 @@ class ListOffsetsRequest_v4(Request): API_VERSION = 4 SCHEMA = Schema( ('replica_id', Int32), - ('isolation_level', Int8), # <- added isolation_level + ('isolation_level', Int8), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('current_leader_epoch', Int32), ('timestamp', Int64))))) ) From 6085b0485a91d4f2f6b1093cfc87d2eef19f8419 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 11:34:32 -0700 Subject: [PATCH 06/11] old proto FindCoordinator ALIASES --- kafka/protocol/find_coordinator.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/kafka/protocol/find_coordinator.py b/kafka/protocol/find_coordinator.py index fbb4b500d..bdeeb259a 100644 --- a/kafka/protocol/find_coordinator.py +++ b/kafka/protocol/find_coordinator.py @@ -7,10 +7,13 @@ class FindCoordinatorResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), - ('coordinator_id', Int32), + ('node_id', Int32), ('host', String('utf-8')), ('port', Int32) ) + ALIASES = { + 'coordinator_id': 'node_id', + } class FindCoordinatorResponse_v1(Response): @@ -20,39 +23,49 @@ class FindCoordinatorResponse_v1(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('error_message', String('utf-8')), - ('coordinator_id', Int32), + ('node_id', Int32), ('host', String('utf-8')), ('port', Int32) ) + ALIASES = FindCoordinatorResponse_v0.ALIASES class FindCoordinatorResponse_v2(Response): API_KEY = 10 API_VERSION = 2 SCHEMA = FindCoordinatorResponse_v1.SCHEMA + ALIASES = FindCoordinatorResponse_v1.ALIASES class FindCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 SCHEMA = Schema( - ('consumer_group', String('utf-8')) + ('key', String('utf-8')) ) + ALIASES = { + 'consumer_group': 'key', + } class FindCoordinatorRequest_v1(Request): API_KEY = 10 API_VERSION = 1 SCHEMA = Schema( - ('coordinator_key', String('utf-8')), - ('coordinator_type', Int8) # 0: consumer, 1: transaction + ('key', String('utf-8')), + ('key_type', Int8) # 0: consumer, 1: transaction ) + ALIASES = { + 'coordinator_key': 'key', + 'coordinator_type': 'key_type', + } class FindCoordinatorRequest_v2(Request): API_KEY = 10 API_VERSION = 2 SCHEMA = FindCoordinatorRequest_v1.SCHEMA + ALIASES = FindCoordinatorRequest_v1.ALIASES FindCoordinatorRequest = [FindCoordinatorRequest_v0, FindCoordinatorRequest_v1, FindCoordinatorRequest_v2] From fb274e392bd9f414e71198c51405454ddebb61a3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 11:24:06 -0700 Subject: [PATCH 07/11] old proto JoinGroup/SyncGroup ALIASES --- kafka/protocol/group.py | 125 ++++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 38 deletions(-) diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 1c40838b2..71da0b6fe 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -18,19 +18,24 @@ class JoinGroupResponse_v0(Response): SCHEMA = Schema( ('error_code', Int16), ('generation_id', Int32), - ('group_protocol', String('utf-8')), - ('leader_id', String('utf-8')), + ('protocol_name', String('utf-8')), + ('leader', String('utf-8')), ('member_id', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), - ('member_metadata', Bytes))) + ('metadata', Bytes))) ) + ALIASES = { + 'group_protocol': 'protocol_name', + 'leader_id': 'leader', + } class JoinGroupResponse_v1(Response): API_KEY = 11 API_VERSION = 1 SCHEMA = JoinGroupResponse_v0.SCHEMA + ALIASES = JoinGroupResponse_v0.ALIASES class JoinGroupResponse_v2(Response): @@ -40,25 +45,28 @@ class JoinGroupResponse_v2(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('generation_id', Int32), - ('group_protocol', String('utf-8')), - ('leader_id', String('utf-8')), + ('protocol_name', String('utf-8')), + ('leader', String('utf-8')), ('member_id', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), - ('member_metadata', Bytes))) + ('metadata', Bytes))) ) + ALIASES = JoinGroupResponse_v1.ALIASES class JoinGroupResponse_v3(Response): API_KEY = 11 API_VERSION = 3 SCHEMA = JoinGroupResponse_v2.SCHEMA + ALIASES = JoinGroupResponse_v2.ALIASES class JoinGroupResponse_v4(Response): API_KEY = 11 API_VERSION = 4 SCHEMA = JoinGroupResponse_v3.SCHEMA + ALIASES = JoinGroupResponse_v3.ALIASES class JoinGroupResponse_v5(Response): @@ -68,77 +76,93 @@ class JoinGroupResponse_v5(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('generation_id', Int32), - ('group_protocol', String('utf-8')), - ('leader_id', String('utf-8')), + ('protocol_name', String('utf-8')), + ('leader', String('utf-8')), ('member_id', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')), - ('member_metadata', Bytes))) + ('metadata', Bytes))) ) + ALIASES = JoinGroupResponse_v4.ALIASES class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 SCHEMA = Schema( - ('group', String('utf-8')), - ('session_timeout', Int32), + ('group_id', String('utf-8')), + ('session_timeout_ms', Int32), ('member_id', String('utf-8')), ('protocol_type', String('utf-8')), - ('group_protocols', Array( - ('protocol_name', String('utf-8')), - ('protocol_metadata', Bytes))) + ('protocols', Array( + ('name', String('utf-8')), + ('metadata', Bytes))) ) + ALIASES = { + 'group': 'group_id', + 'session_timeout': 'session_timeout_ms', + 'group_protocols': 'protocols', + } class JoinGroupRequest_v1(Request): API_KEY = 11 API_VERSION = 1 SCHEMA = Schema( - ('group', String('utf-8')), - ('session_timeout', Int32), - ('rebalance_timeout', Int32), + ('group_id', String('utf-8')), + ('session_timeout_ms', Int32), + ('rebalance_timeout_ms', Int32), ('member_id', String('utf-8')), ('protocol_type', String('utf-8')), - ('group_protocols', Array( - ('protocol_name', String('utf-8')), - ('protocol_metadata', Bytes))) + ('protocols', Array( + ('name', String('utf-8')), + ('metadata', Bytes))) ) + ALIASES = { + 'group': 'group_id', + 'session_timeout': 'session_timeout_ms', + 'rebalance_timeout': 'rebalance_timeout_ms', + 'group_protocols': 'protocols', + } class JoinGroupRequest_v2(Request): API_KEY = 11 API_VERSION = 2 SCHEMA = JoinGroupRequest_v1.SCHEMA + ALIASES = JoinGroupRequest_v1.ALIASES class JoinGroupRequest_v3(Request): API_KEY = 11 API_VERSION = 3 SCHEMA = JoinGroupRequest_v2.SCHEMA + ALIASES = JoinGroupRequest_v2.ALIASES class JoinGroupRequest_v4(Request): API_KEY = 11 API_VERSION = 4 SCHEMA = JoinGroupRequest_v3.SCHEMA + ALIASES = JoinGroupRequest_v3.ALIASES class JoinGroupRequest_v5(Request): API_KEY = 11 API_VERSION = 5 SCHEMA = Schema( - ('group', String('utf-8')), - ('session_timeout', Int32), - ('rebalance_timeout', Int32), + ('group_id', String('utf-8')), + ('session_timeout_ms', Int32), + ('rebalance_timeout_ms', Int32), ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')), ('protocol_type', String('utf-8')), - ('group_protocols', Array( - ('protocol_name', String('utf-8')), - ('protocol_metadata', Bytes))) + ('protocols', Array( + ('name', String('utf-8')), + ('metadata', Bytes))) ) + ALIASES = JoinGroupRequest_v4.ALIASES JoinGroupRequest = [ @@ -166,8 +190,11 @@ class SyncGroupResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), - ('member_assignment', Bytes) + ('assignment', Bytes) ) + ALIASES = { + 'member_assignment': 'assignment', + } class SyncGroupResponse_v1(Response): @@ -176,59 +203,69 @@ class SyncGroupResponse_v1(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('error_code', Int16), - ('member_assignment', Bytes) + ('assignment', Bytes) ) + ALIASES = SyncGroupResponse_v0.ALIASES class SyncGroupResponse_v2(Response): API_KEY = 14 API_VERSION = 2 SCHEMA = SyncGroupResponse_v1.SCHEMA + ALIASES = SyncGroupResponse_v1.ALIASES class SyncGroupResponse_v3(Response): API_KEY = 14 API_VERSION = 3 SCHEMA = SyncGroupResponse_v2.SCHEMA + ALIASES = SyncGroupResponse_v2.ALIASES class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('generation_id', Int32), ('member_id', String('utf-8')), - ('group_assignment', Array( + ('assignments', Array( ('member_id', String('utf-8')), - ('member_metadata', Bytes))) + ('assignment', Bytes))) ) + ALIASES = { + 'group': 'group_id', + 'group_assignment': 'assignments', + } class SyncGroupRequest_v1(Request): API_KEY = 14 API_VERSION = 1 SCHEMA = SyncGroupRequest_v0.SCHEMA + ALIASES = SyncGroupRequest_v0.ALIASES class SyncGroupRequest_v2(Request): API_KEY = 14 API_VERSION = 2 SCHEMA = SyncGroupRequest_v1.SCHEMA + ALIASES = SyncGroupRequest_v1.ALIASES class SyncGroupRequest_v3(Request): API_KEY = 14 API_VERSION = 3 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('generation_id', Int32), ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')), - ('group_assignment', Array( + ('assignments', Array( ('member_id', String('utf-8')), - ('member_metadata', Bytes))) + ('assignment', Bytes))) ) + ALIASES = SyncGroupRequest_v2.ALIASES SyncGroupRequest = [ @@ -285,33 +322,39 @@ class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('generation_id', Int32), ('member_id', String('utf-8')) ) + ALIASES = { + 'group': 'group_id', + } class HeartbeatRequest_v1(Request): API_KEY = 12 API_VERSION = 1 SCHEMA = HeartbeatRequest_v0.SCHEMA + ALIASES = HeartbeatRequest_v0.ALIASES class HeartbeatRequest_v2(Request): API_KEY = 12 API_VERSION = 2 SCHEMA = HeartbeatRequest_v1.SCHEMA + ALIASES = HeartbeatRequest_v1.ALIASES class HeartbeatRequest_v3(Request): API_KEY = 12 API_VERSION = 3 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('generation_id', Int32), ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')) ) + ALIASES = HeartbeatRequest_v2.ALIASES HeartbeatRequest = [ @@ -364,32 +407,38 @@ class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('member_id', String('utf-8')) ) + ALIASES = { + 'group': 'group_id', + } class LeaveGroupRequest_v1(Request): API_KEY = 13 API_VERSION = 1 SCHEMA = LeaveGroupRequest_v0.SCHEMA + ALIASES = LeaveGroupRequest_v0.ALIASES class LeaveGroupRequest_v2(Request): API_KEY = 13 API_VERSION = 2 SCHEMA = LeaveGroupRequest_v1.SCHEMA + ALIASES = LeaveGroupRequest_v1.ALIASES class LeaveGroupRequest_v3(Request): API_KEY = 13 API_VERSION = 3 SCHEMA = Schema( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')))) ) + ALIASES = LeaveGroupRequest_v2.ALIASES LeaveGroupRequest = [ From 082bd7e66a18277848da73a6fdc9add8d5fdea54 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 15:24:08 -0700 Subject: [PATCH 08/11] old proto OffsetCommit/OffsetFetch ALIASES --- kafka/protocol/commit.py | 159 ++++++++++++++++++++++++--------------- 1 file changed, 97 insertions(+), 62 deletions(-) diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index beffe1d52..d6093f303 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -7,9 +7,9 @@ class OffsetCommitResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16))))) ) @@ -32,9 +32,9 @@ class OffsetCommitResponse_v3(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16))))) ) @@ -67,49 +67,63 @@ class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage SCHEMA = Schema( - ('consumer_group', String('utf-8')), + ('group_id', String('utf-8')), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_metadata', String('utf-8')))))) ) + ALIASES = { + 'consumer_group': 'group_id', + } class OffsetCommitRequest_v1(Request): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage SCHEMA = Schema( - ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), + ('group_id', String('utf-8')), + ('generation_id_or_member_epoch', Int32), + ('member_id', String('utf-8')), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('timestamp', Int64), - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('commit_timestamp', Int64), + ('committed_metadata', String('utf-8')))))) ) + ALIASES = { + 'consumer_group': 'group_id', + 'consumer_group_generation_id': 'generation_id_or_member_epoch', + 'consumer_id': 'member_id', + } class OffsetCommitRequest_v2(Request): API_KEY = 8 API_VERSION = 2 SCHEMA = Schema( - ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), - ('retention_time', Int64), # added retention_time, dropped timestamp + ('group_id', String('utf-8')), + ('generation_id_or_member_epoch', Int32), + ('member_id', String('utf-8')), + ('retention_time_ms', Int64), # added retention_time_ms, dropped timestamp ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_metadata', String('utf-8')))))) ) DEFAULT_RETENTION_TIME = -1 + ALIASES = { + 'consumer_group': 'group_id', + 'consumer_group_generation_id': 'generation_id_or_member_epoch', + 'consumer_id': 'member_id', + 'retention_time': 'retention_time_ms', + } class OffsetCommitRequest_v3(Request): @@ -117,6 +131,7 @@ class OffsetCommitRequest_v3(Request): API_VERSION = 3 SCHEMA = OffsetCommitRequest_v2.SCHEMA DEFAULT_RETENTION_TIME = -1 + ALIASES = OffsetCommitRequest_v2.ALIASES class OffsetCommitRequest_v4(Request): @@ -124,39 +139,46 @@ class OffsetCommitRequest_v4(Request): API_VERSION = 4 SCHEMA = OffsetCommitRequest_v3.SCHEMA DEFAULT_RETENTION_TIME = -1 + ALIASES = OffsetCommitRequest_v3.ALIASES class OffsetCommitRequest_v5(Request): API_KEY = 8 API_VERSION = 5 # drops retention_time SCHEMA = Schema( - ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), + ('group_id', String('utf-8')), + ('generation_id_or_member_epoch', Int32), + ('member_id', String('utf-8')), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_metadata', String('utf-8')))))) ) + ALIASES = { + 'consumer_group': 'group_id', + 'consumer_group_generation_id': 'generation_id_or_member_epoch', + 'consumer_id': 'member_id', + } class OffsetCommitRequest_v6(Request): API_KEY = 8 API_VERSION = 6 SCHEMA = Schema( - ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), + ('group_id', String('utf-8')), + ('generation_id_or_member_epoch', Int32), + ('member_id', String('utf-8')), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('leader_epoch', Int32), # added for fencing / kip-320. default -1 - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_leader_epoch', Int32), # added for fencing / kip-320. default -1 + ('committed_metadata', String('utf-8')))))) ) + ALIASES = OffsetCommitRequest_v5.ALIASES class OffsetCommitRequest_v7(Request): @@ -164,17 +186,22 @@ class OffsetCommitRequest_v7(Request): API_VERSION = 7 SCHEMA = Schema( ('group_id', String('utf-8')), - ('generation_id', Int32), + ('generation_id_or_member_epoch', Int32), ('member_id', String('utf-8')), ('group_instance_id', String('utf-8')), # added for static membership / kip-345 ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('leader_epoch', Int32), - ('metadata', String('utf-8')))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_leader_epoch', Int32), + ('committed_metadata', String('utf-8')))))) ) + ALIASES = { + 'consumer_group': 'group_id', + 'generation_id': 'generation_id_or_member_epoch', + 'consumer_id': 'member_id', + } OffsetCommitRequest = [ @@ -194,10 +221,10 @@ class OffsetFetchResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), + ('partition_index', Int32), + ('committed_offset', Int64), ('metadata', String('utf-8')), ('error_code', Int16))))) ) @@ -215,10 +242,10 @@ class OffsetFetchResponse_v2(Response): API_VERSION = 2 SCHEMA = Schema( ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), + ('partition_index', Int32), + ('committed_offset', Int64), ('metadata', String('utf-8')), ('error_code', Int16))))), ('error_code', Int16) @@ -231,10 +258,10 @@ class OffsetFetchResponse_v3(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), + ('partition_index', Int32), + ('committed_offset', Int64), ('metadata', String('utf-8')), ('error_code', Int16))))), ('error_code', Int16) @@ -253,11 +280,11 @@ class OffsetFetchResponse_v5(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('leader_epoch', Int32), + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_leader_epoch', Int32), ('metadata', String('utf-8')), ('error_code', Int16))))), ('error_code', Int16) @@ -268,17 +295,21 @@ class OffsetFetchRequest_v0(Request): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage SCHEMA = Schema( - ('consumer_group', String('utf-8')), + ('group_id', String('utf-8')), ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array(Int32)))) + ('name', String('utf-8')), + ('partition_indexes', Array(Int32)))) ) + ALIASES = { + 'consumer_group': 'group_id', + } class OffsetFetchRequest_v1(Request): API_KEY = 9 API_VERSION = 1 # kafka-backed storage SCHEMA = OffsetFetchRequest_v0.SCHEMA + ALIASES = OffsetFetchRequest_v0.ALIASES class OffsetFetchRequest_v2(Request): @@ -288,24 +319,28 @@ class OffsetFetchRequest_v2(Request): API_KEY = 9 API_VERSION = 2 SCHEMA = OffsetFetchRequest_v1.SCHEMA + ALIASES = OffsetFetchRequest_v1.ALIASES class OffsetFetchRequest_v3(Request): API_KEY = 9 API_VERSION = 3 SCHEMA = OffsetFetchRequest_v2.SCHEMA + ALIASES = OffsetFetchRequest_v2.ALIASES class OffsetFetchRequest_v4(Request): API_KEY = 9 API_VERSION = 4 SCHEMA = OffsetFetchRequest_v3.SCHEMA + ALIASES = OffsetFetchRequest_v3.ALIASES class OffsetFetchRequest_v5(Request): API_KEY = 9 API_VERSION = 5 SCHEMA = OffsetFetchRequest_v4.SCHEMA + ALIASES = OffsetFetchRequest_v4.ALIASES OffsetFetchRequest = [ From 120e9f3db5c132bddd14697f8f8fb6bfb9132620 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 16:29:34 -0700 Subject: [PATCH 09/11] old proto Admin api ALIASES --- kafka/protocol/admin.py | 404 +++++++++++++++++++++------------------- 1 file changed, 211 insertions(+), 193 deletions(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index a52827ead..5cdd2b0fd 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -8,21 +8,25 @@ class CreateTopicsResponse_v0(Response): API_KEY = 19 API_VERSION = 0 SCHEMA = Schema( - ('topic_errors', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), ('error_code', Int16))) ) + ALIASES = { + 'topic_errors': 'topics', + } class CreateTopicsResponse_v1(Response): API_KEY = 19 API_VERSION = 1 SCHEMA = Schema( - ('topic_errors', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), ('error_code', Int16), ('error_message', String('utf-8')))) ) + ALIASES = CreateTopicsResponse_v0.ALIASES class CreateTopicsResponse_v2(Response): @@ -30,65 +34,75 @@ class CreateTopicsResponse_v2(Response): API_VERSION = 2 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topic_errors', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), ('error_code', Int16), ('error_message', String('utf-8')))) ) + ALIASES = CreateTopicsResponse_v1.ALIASES + class CreateTopicsResponse_v3(Response): API_KEY = 19 API_VERSION = 3 SCHEMA = CreateTopicsResponse_v2.SCHEMA + ALIASES = CreateTopicsResponse_v2.ALIASES class CreateTopicsRequest_v0(Request): API_KEY = 19 API_VERSION = 0 SCHEMA = Schema( - ('create_topic_requests', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), ('num_partitions', Int32), ('replication_factor', Int16), - ('replica_assignment', Array( - ('partition_id', Int32), - ('replicas', Array(Int32)))), + ('assignments', Array( + ('partition_index', Int32), + ('broker_ids', Array(Int32)))), ('configs', Array( - ('config_key', String('utf-8')), - ('config_value', String('utf-8')))))), - ('timeout', Int32) + ('name', String('utf-8')), + ('value', String('utf-8')))))), + ('timeout_ms', Int32) ) + ALIASES = { + 'create_topic_requests': 'topics', + 'timeout': 'timeout_ms', + } class CreateTopicsRequest_v1(Request): API_KEY = 19 API_VERSION = 1 SCHEMA = Schema( - ('create_topic_requests', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), ('num_partitions', Int32), ('replication_factor', Int16), - ('replica_assignment', Array( - ('partition_id', Int32), - ('replicas', Array(Int32)))), + ('assignments', Array( + ('partition_index', Int32), + ('broker_ids', Array(Int32)))), ('configs', Array( - ('config_key', String('utf-8')), - ('config_value', String('utf-8')))))), - ('timeout', Int32), + ('name', String('utf-8')), + ('value', String('utf-8')))))), + ('timeout_ms', Int32), ('validate_only', Boolean) ) + ALIASES = CreateTopicsRequest_v0.ALIASES class CreateTopicsRequest_v2(Request): API_KEY = 19 API_VERSION = 2 SCHEMA = CreateTopicsRequest_v1.SCHEMA + ALIASES = CreateTopicsRequest_v1.ALIASES class CreateTopicsRequest_v3(Request): API_KEY = 19 API_VERSION = 3 SCHEMA = CreateTopicsRequest_v1.SCHEMA + ALIASES = CreateTopicsRequest_v1.ALIASES CreateTopicsRequest = [ @@ -105,10 +119,13 @@ class DeleteTopicsResponse_v0(Response): API_KEY = 20 API_VERSION = 0 SCHEMA = Schema( - ('topic_error_codes', Array( - ('topic', String('utf-8')), + ('responses', Array( + ('name', String('utf-8')), ('error_code', Int16))) ) + ALIASES = { + 'topic_error_codes': 'responses', + } class DeleteTopicsResponse_v1(Response): @@ -116,49 +133,59 @@ class DeleteTopicsResponse_v1(Response): API_VERSION = 1 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topic_error_codes', Array( - ('topic', String('utf-8')), + ('responses', Array( + ('name', String('utf-8')), ('error_code', Int16))) ) + ALIASES = DeleteTopicsResponse_v0.ALIASES class DeleteTopicsResponse_v2(Response): API_KEY = 20 API_VERSION = 2 SCHEMA = DeleteTopicsResponse_v1.SCHEMA + ALIASES = DeleteTopicsResponse_v1.ALIASES class DeleteTopicsResponse_v3(Response): API_KEY = 20 API_VERSION = 3 SCHEMA = DeleteTopicsResponse_v1.SCHEMA + ALIASES = DeleteTopicsResponse_v1.ALIASES class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 SCHEMA = Schema( - ('topics', Array(String('utf-8'))), - ('timeout', Int32) + ('topic_names', Array(String('utf-8'))), + ('timeout_ms', Int32) ) + ALIASES = { + 'topics': 'topic_names', + 'timeout': 'timeout_ms', + } class DeleteTopicsRequest_v1(Request): API_KEY = 20 API_VERSION = 1 SCHEMA = DeleteTopicsRequest_v0.SCHEMA + ALIASES = DeleteTopicsRequest_v0.ALIASES class DeleteTopicsRequest_v2(Request): API_KEY = 20 API_VERSION = 2 SCHEMA = DeleteTopicsRequest_v0.SCHEMA + ALIASES = DeleteTopicsRequest_v0.ALIASES class DeleteTopicsRequest_v3(Request): API_KEY = 20 API_VERSION = 3 SCHEMA = DeleteTopicsRequest_v0.SCHEMA + ALIASES = DeleteTopicsRequest_v0.ALIASES DeleteTopicsRequest = [ @@ -208,7 +235,7 @@ class ListGroupsResponse_v0(Response): SCHEMA = Schema( ('error_code', Int16), ('groups', Array( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('protocol_type', String('utf-8')))) ) @@ -220,7 +247,7 @@ class ListGroupsResponse_v1(Response): ('throttle_time_ms', Int32), ('error_code', Int16), ('groups', Array( - ('group', String('utf-8')), + ('group_id', String('utf-8')), ('protocol_type', String('utf-8')))) ) @@ -263,10 +290,10 @@ class DescribeGroupsResponse_v0(Response): SCHEMA = Schema( ('groups', Array( ('error_code', Int16), - ('group', String('utf-8')), - ('state', String('utf-8')), + ('group_id', String('utf-8')), + ('group_state', String('utf-8')), ('protocol_type', String('utf-8')), - ('protocol', String('utf-8')), + ('protocol_data', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), ('client_id', String('utf-8')), @@ -283,10 +310,10 @@ class DescribeGroupsResponse_v1(Response): ('throttle_time_ms', Int32), ('groups', Array( ('error_code', Int16), - ('group', String('utf-8')), - ('state', String('utf-8')), + ('group_id', String('utf-8')), + ('group_state', String('utf-8')), ('protocol_type', String('utf-8')), - ('protocol', String('utf-8')), + ('protocol_data', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), ('client_id', String('utf-8')), @@ -309,10 +336,10 @@ class DescribeGroupsResponse_v3(Response): ('throttle_time_ms', Int32), ('groups', Array( ('error_code', Int16), - ('group', String('utf-8')), - ('state', String('utf-8')), + ('group_id', String('utf-8')), + ('group_state', String('utf-8')), ('protocol_type', String('utf-8')), - ('protocol', String('utf-8')), + ('protocol_data', String('utf-8')), ('members', Array( ('member_id', String('utf-8')), ('client_id', String('utf-8')), @@ -390,7 +417,7 @@ class DescribeAclsResponse_v1(Response): ('resources', Array( ('resource_type', Int8), ('resource_name', String('utf-8')), - ('resource_pattern_type', Int8), + ('pattern_type', Int8), ('acls', Array( ('principal', String('utf-8')), ('host', String('utf-8')), @@ -409,27 +436,40 @@ class DescribeAclsRequest_v0(Request): API_KEY = 29 API_VERSION = 0 SCHEMA = Schema( - ('resource_type', Int8), - ('resource_name', String('utf-8')), - ('principal', String('utf-8')), - ('host', String('utf-8')), + ('resource_type_filter', Int8), + ('resource_name_filter', String('utf-8')), + ('principal_filter', String('utf-8')), + ('host_filter', String('utf-8')), ('operation', Int8), ('permission_type', Int8) ) + ALIASES = { + 'resource_type': 'resource_type_filter', + 'resource_name': 'resource_name_filter', + 'principal': 'principal_filter', + 'host': 'host_filter', + } class DescribeAclsRequest_v1(Request): API_KEY = 29 API_VERSION = 1 SCHEMA = Schema( - ('resource_type', Int8), - ('resource_name', String('utf-8')), - ('resource_pattern_type_filter', Int8), - ('principal', String('utf-8')), - ('host', String('utf-8')), + ('resource_type_filter', Int8), + ('resource_name_filter', String('utf-8')), + ('pattern_type_filter', Int8), + ('principal_filter', String('utf-8')), + ('host_filter', String('utf-8')), ('operation', Int8), ('permission_type', Int8) ) + ALIASES = { + 'resource_type': 'resource_type_filter', + 'resource_name': 'resource_name_filter', + 'resource_pattern_type_filter': 'pattern_type_filter', + 'principal': 'principal_filter', + 'host': 'host_filter', + } class DescribeAclsRequest_v2(Request): @@ -444,20 +484,27 @@ class DescribeAclsRequest_v2(Request): DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1, DescribeAclsRequest_v2] DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1, DescribeAclsResponse_v2] + class CreateAclsResponse_v0(Response): API_KEY = 30 API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('creation_responses', Array( + ('results', Array( ('error_code', Int16), ('error_message', String('utf-8')))) ) + ALIASES = { + 'creation_responses': 'results', + } + class CreateAclsResponse_v1(Response): API_KEY = 30 API_VERSION = 1 SCHEMA = CreateAclsResponse_v0.SCHEMA + ALIASES = CreateAclsResponse_v0.ALIASES + class CreateAclsRequest_v0(Request): API_KEY = 30 @@ -472,6 +519,7 @@ class CreateAclsRequest_v0(Request): ('permission_type', Int8))) ) + class CreateAclsRequest_v1(Request): API_KEY = 30 API_VERSION = 1 @@ -486,15 +534,17 @@ class CreateAclsRequest_v1(Request): ('permission_type', Int8))) ) + CreateAclsRequest = [CreateAclsRequest_v0, CreateAclsRequest_v1] CreateAclsResponse = [CreateAclsResponse_v0, CreateAclsResponse_v1] + class DeleteAclsResponse_v0(Response): API_KEY = 31 API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('filter_responses', Array( + ('filter_results', Array( ('error_code', Int16), ('error_message', String('utf-8')), ('matching_acls', Array( @@ -507,13 +557,17 @@ class DeleteAclsResponse_v0(Response): ('operation', Int8), ('permission_type', Int8))))) ) + ALIASES = { + 'filter_responses': 'filter_results', + } + class DeleteAclsResponse_v1(Response): API_KEY = 31 API_VERSION = 1 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('filter_responses', Array( + ('filter_results', Array( ('error_code', Int16), ('error_message', String('utf-8')), ('matching_acls', Array( @@ -521,60 +575,69 @@ class DeleteAclsResponse_v1(Response): ('error_message', String('utf-8')), ('resource_type', Int8), ('resource_name', String('utf-8')), - ('resource_pattern_type', Int8), + ('pattern_type', Int8), ('principal', String('utf-8')), ('host', String('utf-8')), ('operation', Int8), ('permission_type', Int8))))) ) + ALIASES = DeleteAclsResponse_v0.ALIASES + class DeleteAclsRequest_v0(Request): API_KEY = 31 API_VERSION = 0 SCHEMA = Schema( ('filters', Array( - ('resource_type', Int8), - ('resource_name', String('utf-8')), - ('principal', String('utf-8')), - ('host', String('utf-8')), + ('resource_type_filter', Int8), + ('resource_name_filter', String('utf-8')), + ('principal_filter', String('utf-8')), + ('host_filter', String('utf-8')), ('operation', Int8), ('permission_type', Int8))) ) + class DeleteAclsRequest_v1(Request): API_KEY = 31 API_VERSION = 1 SCHEMA = Schema( ('filters', Array( - ('resource_type', Int8), - ('resource_name', String('utf-8')), - ('resource_pattern_type_filter', Int8), - ('principal', String('utf-8')), - ('host', String('utf-8')), + ('resource_type_filter', Int8), + ('resource_name_filter', String('utf-8')), + ('pattern_type_filter', Int8), + ('principal_filter', String('utf-8')), + ('host_filter', String('utf-8')), ('operation', Int8), ('permission_type', Int8))) ) + DeleteAclsRequest = [DeleteAclsRequest_v0, DeleteAclsRequest_v1] DeleteAclsResponse = [DeleteAclsResponse_v0, DeleteAclsResponse_v1] + class AlterConfigsResponse_v0(Response): API_KEY = 33 API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('resources', Array( + ('responses', Array( ('error_code', Int16), ('error_message', String('utf-8')), ('resource_type', Int8), ('resource_name', String('utf-8')))) ) + ALIASES = { + 'resources': 'responses', + } class AlterConfigsResponse_v1(Response): API_KEY = 33 API_VERSION = 1 SCHEMA = AlterConfigsResponse_v0.SCHEMA + ALIASES = AlterConfigsResponse_v0.ALIASES class AlterConfigsRequest_v0(Request): @@ -584,17 +647,19 @@ class AlterConfigsRequest_v0(Request): ('resources', Array( ('resource_type', Int8), ('resource_name', String('utf-8')), - ('config_entries', Array( - ('config_name', String('utf-8')), - ('config_value', String('utf-8')))))), + ('configs', Array( + ('name', String('utf-8')), + ('value', String('utf-8')))))), ('validate_only', Boolean) ) + class AlterConfigsRequest_v1(Request): API_KEY = 33 API_VERSION = 1 SCHEMA = AlterConfigsRequest_v0.SCHEMA + AlterConfigsRequest = [AlterConfigsRequest_v0, AlterConfigsRequest_v1] AlterConfigsResponse = [AlterConfigsResponse_v0, AlterConfigsRequest_v1] @@ -604,62 +669,53 @@ class DescribeConfigsResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('resources', Array( + ('results', Array( ('error_code', Int16), ('error_message', String('utf-8')), ('resource_type', Int8), ('resource_name', String('utf-8')), - ('config_entries', Array( - ('config_names', String('utf-8')), - ('config_value', String('utf-8')), + ('configs', Array( + ('name', String('utf-8')), + ('value', String('utf-8')), ('read_only', Boolean), ('is_default', Boolean), ('is_sensitive', Boolean))))) ) + ALIASES = { + 'resources': 'results', + } + class DescribeConfigsResponse_v1(Response): API_KEY = 32 API_VERSION = 1 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('resources', Array( + ('results', Array( ('error_code', Int16), ('error_message', String('utf-8')), ('resource_type', Int8), ('resource_name', String('utf-8')), - ('config_entries', Array( - ('config_names', String('utf-8')), - ('config_value', String('utf-8')), + ('configs', Array( + ('name', String('utf-8')), + ('value', String('utf-8')), ('read_only', Boolean), ('config_source', Int8), ('is_sensitive', Boolean), - ('config_synonyms', Array( - ('config_name', String('utf-8')), - ('config_value', String('utf-8')), - ('config_source', Int8))))))) + ('synonyms', Array( + ('name', String('utf-8')), + ('value', String('utf-8')), + ('source', Int8))))))) ) + ALIASES = DescribeConfigsResponse_v0.ALIASES + class DescribeConfigsResponse_v2(Response): API_KEY = 32 API_VERSION = 2 - SCHEMA = Schema( - ('throttle_time_ms', Int32), - ('resources', Array( - ('error_code', Int16), - ('error_message', String('utf-8')), - ('resource_type', Int8), - ('resource_name', String('utf-8')), - ('config_entries', Array( - ('config_names', String('utf-8')), - ('config_value', String('utf-8')), - ('read_only', Boolean), - ('config_source', Int8), - ('is_sensitive', Boolean), - ('config_synonyms', Array( - ('config_name', String('utf-8')), - ('config_value', String('utf-8')), - ('config_source', Int8))))))) - ) + SCHEMA = DescribeConfigsResponse_v1.SCHEMA + ALIASES = DescribeConfigsResponse_v1.ALIASES + class DescribeConfigsRequest_v0(Request): API_KEY = 32 @@ -668,7 +724,7 @@ class DescribeConfigsRequest_v0(Request): ('resources', Array( ('resource_type', Int8), ('resource_name', String('utf-8')), - ('config_names', Array(String('utf-8'))))) + ('configuration_keys', Array(String('utf-8'))))) ) class DescribeConfigsRequest_v1(Request): @@ -678,7 +734,7 @@ class DescribeConfigsRequest_v1(Request): ('resources', Array( ('resource_type', Int8), ('resource_name', String('utf-8')), - ('config_names', Array(String('utf-8'))))), + ('configuration_keys', Array(String('utf-8'))))), ('include_synonyms', Boolean) ) @@ -704,7 +760,7 @@ class DescribeLogDirsResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('log_dirs', Array( + ('results', Array( ('error_code', Int16), ('log_dir', String('utf-8')), ('topics', Array( @@ -713,22 +769,21 @@ class DescribeLogDirsResponse_v0(Response): ('partition_index', Int32), ('partition_size', Int64), ('offset_lag', Int64), - ('is_future_key', Boolean) - )) - )) - )) + ('is_future_key', Boolean))))))) ) + ALIASES = { + 'log_dirs': 'results', + } class DescribeLogDirsRequest_v0(Request): API_KEY = 35 API_VERSION = 0 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Int32) - )) - ) + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Int32))) + ) DescribeLogDirsResponse = [ @@ -739,85 +794,52 @@ class DescribeLogDirsRequest_v0(Request): ] -class SaslAuthenticateResponse_v0(Response): - API_KEY = 36 - API_VERSION = 0 - SCHEMA = Schema( - ('error_code', Int16), - ('error_message', String('utf-8')), - ('sasl_auth_bytes', Bytes) - ) - - -class SaslAuthenticateResponse_v1(Response): - API_KEY = 36 - API_VERSION = 1 - SCHEMA = Schema( - ('error_code', Int16), - ('error_message', String('utf-8')), - ('sasl_auth_bytes', Bytes), - ('session_lifetime_ms', Int64) - ) - - -class SaslAuthenticateRequest_v0(Request): - API_KEY = 36 - API_VERSION = 0 - SCHEMA = Schema( - ('sasl_auth_bytes', Bytes) - ) - - -class SaslAuthenticateRequest_v1(Request): - API_KEY = 36 - API_VERSION = 1 - SCHEMA = SaslAuthenticateRequest_v0.SCHEMA - - -SaslAuthenticateRequest = [ - SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1, -] -SaslAuthenticateResponse = [ - SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1, -] - - class CreatePartitionsResponse_v0(Response): API_KEY = 37 API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topic_errors', Array( - ('topic', String('utf-8')), + ('results', Array( + ('name', String('utf-8')), ('error_code', Int16), ('error_message', String('utf-8')))) ) + ALIASES = { + 'topic_errors': 'results', + } class CreatePartitionsResponse_v1(Response): API_KEY = 37 API_VERSION = 1 SCHEMA = CreatePartitionsResponse_v0.SCHEMA + ALIASES = CreatePartitionsResponse_v0.ALIASES class CreatePartitionsRequest_v0(Request): API_KEY = 37 API_VERSION = 0 SCHEMA = Schema( - ('topic_partitions', Array( - ('topic', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), + # This is not compatible with new protocol ('new_partitions', Schema( ('count', Int32), ('assignment', Array(Array(Int32))))))), - ('timeout', Int32), + ('timeout_ms', Int32), ('validate_only', Boolean) ) + ALIASES = { + 'topic_partitions': 'topics', + 'timeout': 'timeout_ms', + } class CreatePartitionsRequest_v1(Request): API_KEY = 37 API_VERSION = 1 SCHEMA = CreatePartitionsRequest_v0.SCHEMA + ALIASES = CreatePartitionsRequest_v0.ALIASES CreatePartitionsRequest = [ @@ -862,12 +884,12 @@ class DeleteGroupsRequest_v1(Request): DeleteGroupsRequest = [ DeleteGroupsRequest_v0, DeleteGroupsRequest_v1 ] - DeleteGroupsResponse = [ DeleteGroupsResponse_v0, DeleteGroupsResponse_v1 ] +# Not used by kafka.admin class DescribeClientQuotasResponse_v0(Response): API_KEY = 48 API_VERSION = 0 @@ -880,7 +902,7 @@ class DescribeClientQuotasResponse_v0(Response): ('entity_type', String('utf-8')), ('entity_name', String('utf-8')))), ('values', Array( - ('name', String('utf-8')), + ('key', String('utf-8')), ('value', Float64))))), ) @@ -892,8 +914,7 @@ class DescribeClientQuotasRequest_v0(Request): ('components', Array( ('entity_type', String('utf-8')), ('match_type', Int8), - ('match', String('utf-8')), - )), + ('match', String('utf-8')))), ('strict', Boolean) ) @@ -901,12 +922,12 @@ class DescribeClientQuotasRequest_v0(Request): DescribeClientQuotasRequest = [ DescribeClientQuotasRequest_v0, ] - DescribeClientQuotasResponse = [ DescribeClientQuotasResponse_v0, ] +# Not used by kafka.admin class AlterPartitionReassignmentsResponse_v0(Response): API_KEY = 45 API_VERSION = 0 @@ -949,10 +970,10 @@ class AlterPartitionReassignmentsRequest_v0(Request): AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0] - AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0] +# Not used by kafka.admin class ListPartitionReassignmentsResponse_v0(Response): API_KEY = 46 API_VERSION = 0 @@ -984,7 +1005,7 @@ class ListPartitionReassignmentsRequest_v0(Request): ("timeout_ms", Int32), ("topics", CompactArray( ("name", CompactString("utf-8")), - ("partition_index", CompactArray(Int32)), + ("partition_indexes", CompactArray(Int32)), ("tags", TaggedFields) )), ("tags", TaggedFields) @@ -992,38 +1013,24 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] - ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] class ElectLeadersResponse_v0(Response): API_KEY = 43 - API_VERSION = 1 + API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('error_code', Int16), - ('replication_election_results', Array( + ('replica_election_results', Array( ('topic', String('utf-8')), ('partition_result', Array( ('partition_id', Int32), ('error_code', Int16), - ('error_message', String('utf-8')) - )) - )) - ) - - -class ElectLeadersRequest_v0(Request): - API_KEY = 43 - API_VERSION = 1 - SCHEMA = Schema( - ('election_type', Int8), - ('topic_partitions', Array( - ('topic', String('utf-8')), - ('partition_ids', Array(Int32)) - )), - ('timeout', Int32), + ('error_message', String('utf-8')))))) ) + ALIASES = { + 'replication_election_results': 'replica_election_results', + } class ElectLeadersResponse_v1(Response): @@ -1032,15 +1039,28 @@ class ElectLeadersResponse_v1(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('error_code', Int16), - ('replication_election_results', Array( + ('replica_election_results', Array( ('topic', String('utf-8')), ('partition_result', Array( ('partition_id', Int32), ('error_code', Int16), - ('error_message', String('utf-8')) - )) - )) + ('error_message', String('utf-8')))))) ) + ALIASES = ElectLeadersResponse_v0.ALIASES + + +class ElectLeadersRequest_v0(Request): + API_KEY = 43 + API_VERSION = 0 + SCHEMA = Schema( + ('topic_partitions', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))), + ('timeout_ms', Int32), + ) + ALIASES = { + 'timeout': 'timeout_ms', + } class ElectLeadersRequest_v1(Request): @@ -1050,16 +1070,14 @@ class ElectLeadersRequest_v1(Request): ('election_type', Int8), ('topic_partitions', Array( ('topic', String('utf-8')), - ('partition_ids', Array(Int32)) - )), - ('timeout', Int32), + ('partitions', Array(Int32)))), + ('timeout_ms', Int32), ) + ALIASES = ElectLeadersRequest_v0.ALIASES class ElectionType(IntEnum): - """ Leader election type - """ - + """Leader election type""" PREFERRED = 0, UNCLEAN = 1 From c7fa8ecf2b62558ecd91351ad4b26c0bb739614d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 16:30:12 -0700 Subject: [PATCH 10/11] old proto producer message ALIASES --- kafka/protocol/add_partitions_to_txn.py | 32 ++++--- kafka/protocol/produce.py | 106 +++++++++++++++--------- kafka/protocol/txn_offset_commit.py | 22 ++--- 3 files changed, 98 insertions(+), 62 deletions(-) diff --git a/kafka/protocol/add_partitions_to_txn.py b/kafka/protocol/add_partitions_to_txn.py index 70ceedc4d..b8cd14f93 100644 --- a/kafka/protocol/add_partitions_to_txn.py +++ b/kafka/protocol/add_partitions_to_txn.py @@ -7,47 +7,59 @@ class AddPartitionsToTxnResponse_v0(Response): API_VERSION = 0 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('results', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('error_code', Int16)))))) + ('results_by_topic_v3_and_below', Array( + ('name', String('utf-8')), + ('results_by_partition', Array( + ('partition_index', Int32), + ('partition_error_code', Int16)))))) + ALIASES = { + 'results': 'results_by_topic_v3_and_below', + } class AddPartitionsToTxnResponse_v1(Response): API_KEY = 24 API_VERSION = 1 SCHEMA = AddPartitionsToTxnResponse_v0.SCHEMA + ALIASES = AddPartitionsToTxnResponse_v0.ALIASES class AddPartitionsToTxnResponse_v2(Response): API_KEY = 24 API_VERSION = 2 SCHEMA = AddPartitionsToTxnResponse_v1.SCHEMA + ALIASES = AddPartitionsToTxnResponse_v1.ALIASES class AddPartitionsToTxnRequest_v0(Request): API_KEY = 24 API_VERSION = 0 SCHEMA = Schema( - ('transactional_id', String('utf-8')), - ('producer_id', Int64), - ('producer_epoch', Int16), - ('topics', Array( - ('topic', String('utf-8')), + ('v3_and_below_transactional_id', String('utf-8')), + ('v3_and_below_producer_id', Int64), + ('v3_and_below_producer_epoch', Int16), + ('v3_and_below_topics', Array( + ('name', String('utf-8')), ('partitions', Array(Int32))))) + ALIASES = { + 'transactional_id': 'v3_and_below_transactional_id', + 'producer_id': 'v3_and_below_producer_id', + 'producer_epoch': 'v3_and_below_producer_epoch', + } class AddPartitionsToTxnRequest_v1(Request): API_KEY = 24 API_VERSION = 1 SCHEMA = AddPartitionsToTxnRequest_v0.SCHEMA + ALIASES = AddPartitionsToTxnRequest_v0.ALIASES class AddPartitionsToTxnRequest_v2(Request): API_KEY = 24 API_VERSION = 2 SCHEMA = AddPartitionsToTxnRequest_v1.SCHEMA + ALIASES = AddPartitionsToTxnRequest_v1.ALIASES AddPartitionsToTxnRequest = [ diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index b13358f9d..665ecd037 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -6,42 +6,47 @@ class ProduceResponse_v0(Response): API_KEY = 0 API_VERSION = 0 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('responses', Array( + ('name', String('utf-8')), + ('partition_responses', Array( + ('index', Int32), ('error_code', Int16), - ('offset', Int64))))) + ('base_offset', Int64))))) ) + ALIASES = { + 'topics': 'responses', + } class ProduceResponse_v1(Response): API_KEY = 0 API_VERSION = 1 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('responses', Array( + ('name', String('utf-8')), + ('partition_responses', Array( + ('index', Int32), ('error_code', Int16), - ('offset', Int64))))), + ('base_offset', Int64))))), ('throttle_time_ms', Int32) ) + ALIASES = ProduceResponse_v0.ALIASES class ProduceResponse_v2(Response): API_KEY = 0 API_VERSION = 2 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('responses', Array( + ('name', String('utf-8')), + ('partition_responses', Array( + ('index', Int32), ('error_code', Int16), - ('offset', Int64), - ('timestamp', Int64))))), + ('base_offset', Int64), + ('log_append_time_ms', Int64))))), ('throttle_time_ms', Int32) ) + ALIASES = ProduceResponse_v1.ALIASES class ProduceResponse_v3(Response): @@ -49,6 +54,7 @@ class ProduceResponse_v3(Response): API_KEY = 0 API_VERSION = 3 SCHEMA = ProduceResponse_v2.SCHEMA + ALIASES = ProduceResponse_v2.ALIASES class ProduceResponse_v4(Response): @@ -59,22 +65,24 @@ class ProduceResponse_v4(Response): API_KEY = 0 API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA + ALIASES = ProduceResponse_v3.ALIASES class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('responses', Array( + ('name', String('utf-8')), + ('partition_responses', Array( + ('index', Int32), ('error_code', Int16), - ('offset', Int64), - ('timestamp', Int64), + ('base_offset', Int64), + ('log_append_time_ms', Int64), ('log_start_offset', Int64))))), ('throttle_time_ms', Int32) ) + ALIASES = ProduceResponse_v4.ALIASES class ProduceResponse_v6(Response): @@ -84,6 +92,7 @@ class ProduceResponse_v6(Response): API_KEY = 0 API_VERSION = 6 SCHEMA = ProduceResponse_v5.SCHEMA + ALIASES = ProduceResponse_v5.ALIASES class ProduceResponse_v7(Response): @@ -93,6 +102,7 @@ class ProduceResponse_v7(Response): API_KEY = 0 API_VERSION = 7 SCHEMA = ProduceResponse_v6.SCHEMA + ALIASES = ProduceResponse_v6.ALIASES class ProduceResponse_v8(Response): @@ -103,13 +113,13 @@ class ProduceResponse_v8(Response): API_KEY = 0 API_VERSION = 8 SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('responses', Array( + ('name', String('utf-8')), + ('partition_responses', Array( + ('index', Int32), ('error_code', Int16), - ('offset', Int64), - ('timestamp', Int64), + ('base_offset', Int64), + ('log_append_time_ms', Int64), ('log_start_offset', Int64), ('record_errors', (Array( ('batch_index', Int32), @@ -119,13 +129,14 @@ class ProduceResponse_v8(Response): ))), ('throttle_time_ms', Int32) ) + ALIASES = ProduceResponse_v7.ALIASES class ProduceRequest(Request): API_KEY = 0 def expect_response(self): - if self.required_acks == 0: # pylint: disable=no-member + if self.acks == 0: # pylint: disable=no-member return False return True @@ -133,24 +144,31 @@ def expect_response(self): class ProduceRequest_v0(ProduceRequest): API_VERSION = 0 SCHEMA = Schema( - ('required_acks', Int16), - ('timeout', Int32), - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('acks', Int16), + ('timeout_ms', Int32), + ('topic_data', Array( + ('name', String('utf-8')), + ('partition_data', Array( + ('index', Int32), ('records', Bytes))))) ) + ALIASES = { + 'required_acks': 'acks', + 'timeout': 'timeout_ms', + 'topics': 'topic_data', + } class ProduceRequest_v1(ProduceRequest): API_VERSION = 1 SCHEMA = ProduceRequest_v0.SCHEMA + ALIASES = ProduceRequest_v0.ALIASES class ProduceRequest_v2(ProduceRequest): API_VERSION = 2 SCHEMA = ProduceRequest_v1.SCHEMA + ALIASES = ProduceRequest_v1.ALIASES class ProduceRequest_v3(ProduceRequest): @@ -158,14 +176,15 @@ class ProduceRequest_v3(ProduceRequest): API_VERSION = 3 SCHEMA = Schema( ('transactional_id', String('utf-8')), - ('required_acks', Int16), - ('timeout', Int32), - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), + ('acks', Int16), + ('timeout_ms', Int32), + ('topic_data', Array( + ('name', String('utf-8')), + ('partition_data', Array( + ('index', Int32), ('records', Bytes))))) ) + ALIASES = ProduceRequest_v2.ALIASES class ProduceRequest_v4(ProduceRequest): @@ -175,6 +194,7 @@ class ProduceRequest_v4(ProduceRequest): """ API_VERSION = 4 SCHEMA = ProduceRequest_v3.SCHEMA + ALIASES = ProduceRequest_v3.ALIASES class ProduceRequest_v5(ProduceRequest): @@ -184,6 +204,7 @@ class ProduceRequest_v5(ProduceRequest): """ API_VERSION = 5 SCHEMA = ProduceRequest_v4.SCHEMA + ALIASES = ProduceRequest_v4.ALIASES class ProduceRequest_v6(ProduceRequest): @@ -192,6 +213,7 @@ class ProduceRequest_v6(ProduceRequest): """ API_VERSION = 6 SCHEMA = ProduceRequest_v5.SCHEMA + ALIASES = ProduceRequest_v5.ALIASES class ProduceRequest_v7(ProduceRequest): @@ -200,6 +222,7 @@ class ProduceRequest_v7(ProduceRequest): """ API_VERSION = 7 SCHEMA = ProduceRequest_v6.SCHEMA + ALIASES = ProduceRequest_v6.ALIASES class ProduceRequest_v8(ProduceRequest): @@ -209,6 +232,7 @@ class ProduceRequest_v8(ProduceRequest): """ API_VERSION = 8 SCHEMA = ProduceRequest_v7.SCHEMA + ALIASES = ProduceRequest_v7.ALIASES ProduceRequest = [ diff --git a/kafka/protocol/txn_offset_commit.py b/kafka/protocol/txn_offset_commit.py index fc9998779..0fbaf4694 100644 --- a/kafka/protocol/txn_offset_commit.py +++ b/kafka/protocol/txn_offset_commit.py @@ -8,9 +8,9 @@ class TxnOffsetCommitResponse_v0(Response): SCHEMA = Schema( ('throttle_time_ms', Int32), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), + ('partition_index', Int32), ('error_code', Int16)))))) @@ -35,11 +35,11 @@ class TxnOffsetCommitRequest_v0(Request): ('producer_id', Int64), ('producer_epoch', Int16), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8'))))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_metadata', String('utf-8'))))))) class TxnOffsetCommitRequest_v1(Request): @@ -57,12 +57,12 @@ class TxnOffsetCommitRequest_v2(Request): ('producer_id', Int64), ('producer_epoch', Int16), ('topics', Array( - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('leader_epoch', Int32), - ('metadata', String('utf-8'))))))) + ('partition_index', Int32), + ('committed_offset', Int64), + ('committed_leader_epoch', Int32), + ('committed_metadata', String('utf-8'))))))) TxnOffsetCommitRequest = [ From d3bafe8d73ca2276ba17d50e89c414ffb3cf492d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 21:58:51 -0700 Subject: [PATCH 11/11] Fixup AddPartitionsToTxnRequest topics alias --- kafka/protocol/add_partitions_to_txn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/protocol/add_partitions_to_txn.py b/kafka/protocol/add_partitions_to_txn.py index b8cd14f93..f4641f587 100644 --- a/kafka/protocol/add_partitions_to_txn.py +++ b/kafka/protocol/add_partitions_to_txn.py @@ -45,6 +45,7 @@ class AddPartitionsToTxnRequest_v0(Request): 'transactional_id': 'v3_and_below_transactional_id', 'producer_id': 'v3_and_below_producer_id', 'producer_epoch': 'v3_and_below_producer_epoch', + 'topics': 'v3_and_below_topics', }