Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
fb14366
Dynamic protocol classes using upstream json schemas
dpkp Dec 15, 2025
4a9c6fd
Drop _version getattr checks from ApiStructData encode/decode
dpkp Mar 9, 2026
983e3d7
Set ApiMessage _class_version = None
dpkp Mar 9, 2026
2397c97
Field: import uuid
dpkp Mar 9, 2026
982ce14
Disable some pylint errors
dpkp Mar 9, 2026
499ebb7
Rename test_schema.py test_load_json.py
dpkp Mar 9, 2026
f6fae36
Disable more pylint errors
dpkp Mar 9, 2026
fee7dc6
mv test_api_compatibility to test/protocol/new/
dpkp Mar 9, 2026
b853335
Rename message tests -> test_new_foo.py
dpkp Mar 9, 2026
471dfba
expect_response()
dpkp Mar 10, 2026
c14fb48
Extract __doc__ and __license__ from json comments
dpkp Mar 10, 2026
c0f78fc
Simplify metaclasses with __init_subclass__; separate VersionSubscrip…
dpkp Mar 10, 2026
24f424f
Fix ApiArray None encoding
dpkp Mar 10, 2026
e6c49e8
Add ResponseClassRegistry for new protocol stack
dpkp Mar 10, 2026
611fcb0
Memoize Field default
dpkp Mar 10, 2026
d8e3121
Drop protocol_type=None config from JoinGroupResponse v5 test
dpkp Mar 10, 2026
464dbc4
Support BitFields; add json_patch support; alter MetadataResponse aut…
dpkp Mar 10, 2026
d315905
Drop unused imports; useNewApiVersionsRequest/Response in api compati…
dpkp Mar 10, 2026
ad6219a
Use Field as base class; add FieldBasicType
dpkp Mar 11, 2026
3033a6c
drop fields kwarg
dpkp Mar 11, 2026
337d3a5
ApiStruct fixups: accept any non-array with fields
dpkp Mar 11, 2026
8cca839
Separate ApiStructArray
dpkp Mar 11, 2026
0720821
Drop extra tags {} from test_new_parser (align with master)
dpkp Mar 11, 2026
143d6b1
reorg Field classes
dpkp Mar 15, 2026
187c581
Rename ApiStructData -> DataContainer
dpkp Mar 15, 2026
a901abe
Use StructField.set_data_class() to add class dynamically from DataCo…
dpkp Mar 16, 2026
526fb00
Drop top-level __init__ imports
dpkp Mar 16, 2026
b3c9256
Reorg kafka.protocol.new.messages; add missing admin msgs
dpkp Mar 16, 2026
14dfd9a
Drop .messages package; group messages by function
dpkp Mar 16, 2026
27c6bee
.schema -> .schemas
dpkp Mar 16, 2026
85e741e
OffsetForLeaderEpoch req/resp
dpkp Mar 16, 2026
fa869b0
Move json files to .schemas.resources; mv .field to .schemas.fields; …
dpkp Mar 16, 2026
62a4fd9
.schemas.fields.codecs
dpkp Mar 16, 2026
c98faf8
__all__ fixup
dpkp Mar 16, 2026
3ce00bd
update pyproject package data
dpkp Mar 16, 2026
0e75337
kafka.protocol.new.schemas.resources __init__
dpkp Mar 16, 2026
234da1e
do not test against __version__
dpkp Mar 16, 2026
1377fe8
Remove to_schema
dpkp Mar 16, 2026
3060f46
drop old TODO
dpkp Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added kafka/protocol/new/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions kafka/protocol/new/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from .acl import *
from .cluster import *
from .groups import *
from .topics import *

__all__ = [
'CreateAclsRequest', 'CreateAclsResponse',
'DeleteAclsRequest', 'DeleteAclsResponse',
'DescribeAclsRequest', 'DescribeAclsResponse',
'ACLResourceType', 'ACLOperation',
'ACLPermissionType', 'ACLResourcePatternType',

'DescribeClusterRequest', 'DescribeClusterResponse',
'DescribeConfigsRequest', 'DescribeConfigsResponse',
'AlterConfigsRequest', 'AlterConfigsResponse',
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType',

'DescribeGroupsRequest', 'DescribeGroupsResponse',
'ListGroupsRequest', 'ListGroupsResponse',
'DeleteGroupsRequest', 'DeleteGroupsResponse',

'CreateTopicsRequest', 'CreateTopicsResponse',
'DeleteTopicsRequest', 'DeleteTopicsResponse',
'CreatePartitionsRequest', 'CreatePartitionsResponse',
'DeleteRecordsRequest', 'DeleteRecordsResponse',
]
83 changes: 83 additions & 0 deletions kafka/protocol/new/admin/acl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from enum import IntEnum

from ..api_message import ApiMessage


class CreateAclsRequest(ApiMessage): pass
class CreateAclsResponse(ApiMessage): pass

class DeleteAclsRequest(ApiMessage): pass
class DeleteAclsResponse(ApiMessage): pass

class DescribeAclsRequest(ApiMessage): pass
class DescribeAclsResponse(ApiMessage): pass


class ACLResourceType(IntEnum):
"""Type of kafka resource to set ACL for

The ANY value is only valid in a filter context
"""
UNKNOWN = 0,
ANY = 1,
CLUSTER = 4,
DELEGATION_TOKEN = 6,
GROUP = 3,
TOPIC = 2,
TRANSACTIONAL_ID = 5


class ACLOperation(IntEnum):
"""Type of operation

The ANY value is only valid in a filter context
"""
UNKNOWN = 0,
ANY = 1,
ALL = 2,
READ = 3,
WRITE = 4,
CREATE = 5,
DELETE = 6,
ALTER = 7,
DESCRIBE = 8,
CLUSTER_ACTION = 9,
DESCRIBE_CONFIGS = 10,
ALTER_CONFIGS = 11,
IDEMPOTENT_WRITE = 12,
CREATE_TOKENS = 13,
DESCRIBE_TOKENS = 14


class ACLPermissionType(IntEnum):
"""An enumerated type of permissions

The ANY value is only valid in a filter context
"""
UNKNOWN = 0,
ANY = 1,
DENY = 2,
ALLOW = 3


class ACLResourcePatternType(IntEnum):
"""An enumerated type of resource patterns

More details on the pattern types and how they work
can be found in KIP-290 (Support for prefixed ACLs)
https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
"""
UNKNOWN = 0,
ANY = 1,
MATCH = 2,
LITERAL = 3,
PREFIXED = 4


__all__ = [
'CreateAclsRequest', 'CreateAclsResponse',
'DeleteAclsRequest', 'DeleteAclsResponse',
'DescribeAclsRequest', 'DescribeAclsResponse',
'ACLResourceType', 'ACLOperation',
'ACLPermissionType', 'ACLResourcePatternType',
]
35 changes: 35 additions & 0 deletions kafka/protocol/new/admin/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from enum import IntEnum

from ..api_message import ApiMessage


class DescribeClusterRequest(ApiMessage): pass
class DescribeClusterResponse(ApiMessage): pass

class DescribeConfigsRequest(ApiMessage): pass
class DescribeConfigsResponse(ApiMessage): pass

class AlterConfigsRequest(ApiMessage): pass
class AlterConfigsResponse(ApiMessage): pass

class DescribeLogDirsRequest(ApiMessage): pass
class DescribeLogDirsResponse(ApiMessage): pass

class ElectLeadersRequest(ApiMessage): pass
class ElectLeadersResponse(ApiMessage): pass

class ElectionType(IntEnum):
""" Leader election type
"""

PREFERRED = 0,
UNCLEAN = 1


__all__ = [
'DescribeClusterRequest', 'DescribeClusterResponse',
'DescribeConfigsRequest', 'DescribeConfigsResponse',
'AlterConfigsRequest', 'AlterConfigsResponse',
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType',
]
18 changes: 18 additions & 0 deletions kafka/protocol/new/admin/groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from ..api_message import ApiMessage


class DescribeGroupsRequest(ApiMessage): pass
class DescribeGroupsResponse(ApiMessage): pass

class ListGroupsRequest(ApiMessage): pass
class ListGroupsResponse(ApiMessage): pass

class DeleteGroupsRequest(ApiMessage): pass
class DeleteGroupsResponse(ApiMessage): pass


__all__ = [
'DescribeGroupsRequest', 'DescribeGroupsResponse',
'ListGroupsRequest', 'ListGroupsResponse',
'DeleteGroupsRequest', 'DeleteGroupsResponse',
]
22 changes: 22 additions & 0 deletions kafka/protocol/new/admin/topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from ..api_message import ApiMessage


class CreateTopicsRequest(ApiMessage): pass
class CreateTopicsResponse(ApiMessage): pass

class DeleteTopicsRequest(ApiMessage): pass
class DeleteTopicsResponse(ApiMessage): pass

class CreatePartitionsRequest(ApiMessage): pass
class CreatePartitionsResponse(ApiMessage): pass

class DeleteRecordsRequest(ApiMessage): pass
class DeleteRecordsResponse(ApiMessage): pass


__all__ = [
'CreateTopicsRequest', 'CreateTopicsResponse',
'DeleteTopicsRequest', 'DeleteTopicsResponse',
'CreatePartitionsRequest', 'CreatePartitionsResponse',
'DeleteRecordsRequest', 'DeleteRecordsResponse',
]
59 changes: 59 additions & 0 deletions kafka/protocol/new/api_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from .data_container import DataContainer, SlotsBuilder
from .schemas import BaseField, StructField, load_json


class ApiHeaderMeta(SlotsBuilder):
def __new__(metacls, name, bases, attrs, **kw):
if kw.get('init', True):
json = load_json(name)
attrs['_json'] = json
attrs['_struct'] = StructField(json)
return super().__new__(metacls, name, bases, attrs, **kw)


class ApiHeader(DataContainer, metaclass=ApiHeaderMeta, init=False):
__slots__ = ()

def __init_subclass__(cls, **kw):
super().__init_subclass__(**kw)
if kw.get('init', True):
# pylint: disable=E1101
assert cls._json['type'] == 'header'
cls._flexible_versions = BaseField.parse_versions(cls._json['flexibleVersions'])
cls._valid_versions = BaseField.parse_versions(cls._json['validVersions'])

def encode(self, flexible=False):
# Request versions are 1-2, Response versions are 0-1
version = self._flexible_versions[0] if flexible else self._valid_versions[0] # pylint: disable=E1136
# compact=False is probably wrong,
# but it works to make sure that the client_id request header field
# is never encoded as compact (required to support ApiVersionsRequest for unsupported version)
return super().encode(version=version, compact=False, tagged=flexible)

@classmethod
def decode(cls, data, flexible=False):
# Request versions are 1-2, Response versions are 0-1
version = cls._flexible_versions[0] if flexible else cls._valid_versions[0] # pylint: disable=E1136
return cls._struct.decode(data, version=version, compact=False, tagged=flexible, data_class=cls)


class ResponseClassRegistry:
_response_class_registry = {}

@classmethod
def register_response_class(cls, response_class):
cls._response_class_registry[response_class.API_KEY] = response_class

@classmethod
def get_response_class(cls, request_header):
response_class = cls._response_class_registry.get(request_header.request_api_key)
if response_class is not None:
return response_class[request_header.request_api_version]


class RequestHeader(ApiHeader):
def get_response_class(self):
return ResponseClassRegistry.get_response_class(self)


class ResponseHeader(ApiHeader): pass
Loading
Loading