Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,26 +540,22 @@ def _send_join_group_request(self):

# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
member_metadata = [
(protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
for protocol, metadata in self.group_protocols()
]
version = self._client.api_version(JoinGroupRequest, max_version=5)
if version == 0:
request = JoinGroupRequest[version](
self.group_id,
self.config['session_timeout_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
self.group_protocols())
elif version <= 4:
request = JoinGroupRequest[version](
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
self.group_protocols())
else:
request = JoinGroupRequest[version](
self.group_id,
Expand All @@ -568,7 +564,7 @@ def _send_join_group_request(self):
self._generation.member_id,
self.group_instance_id,
self.protocol_type(),
member_metadata)
self.group_protocols())

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
Expand Down Expand Up @@ -706,10 +702,6 @@ def _on_join_leader(self, response):
group_assignment = self._perform_assignment(response.leader_id,
response.group_protocol,
members)
for member_id, assignment in group_assignment.items():
if not isinstance(assignment, bytes):
group_assignment[member_id] = assignment.encode()

except Exception as e:
return Future().failure(e)

Expand Down
5 changes: 3 additions & 2 deletions kafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class Bytes(AbstractType):
def encode(cls, value):
if value is None:
return Int32.encode(-1)
else:
return Int32.encode(len(value)) + value
elif not isinstance(value, bytes):
value = value.encode()
return Int32.encode(len(value)) + value

@classmethod
def decode(cls, data):
Expand Down
11 changes: 10 additions & 1 deletion test/protocol/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from kafka.protocol.struct import Struct
from kafka.protocol.types import Schema, Int32, String, TaggedFields
from kafka.protocol.types import Schema, Int32, String, TaggedFields, Bytes


def test_schema_type():
Expand Down Expand Up @@ -52,3 +52,12 @@ def test_struct(args, kwargs):
data = struct(*args, **kwargs)
assert data.encode() == encoded
assert struct.decode(encoded) == data


def test_bytes_struct():
schema = Schema(('f1', Int32), ('f2', String()))
struct = type('TestStruct', (Struct,), {'SCHEMA': schema})
data = struct(f1=123, f2="bar")
bytes_encoded = Bytes.encode(data)
assert bytes_encoded[4:] == data.encode()
assert bytes_encoded[:4] == Int32.encode(len(data.encode()))
Loading