diff --git a/kafka/protocol/new/data_container.py b/kafka/protocol/new/data_container.py index 56ca3e296..2b8a5e39b 100644 --- a/kafka/protocol/new/data_container.py +++ b/kafka/protocol/new/data_container.py @@ -110,6 +110,34 @@ def __iter__(self): raise RuntimeError('DataContainer Iteration not supported without _version') return iter([getattr(self, field.name) for field in self._struct.untagged_fields(self._version)]) + def _to_dict_vals(self, meta=False, json=True): + if meta: + yield ('_type', self.__class__.__name__) + yield ('_version', self._version) + if meta != 'all': + meta=False + for field in self._struct._fields: + if self._version is not None and not field.for_version_q(self._version): + continue + if field.is_struct(): + yield (field.name, dict(getattr(self, field.name)._to_dict_vals(meta=meta, json=json))) + elif field.is_struct_array(): + yield (field.name, [dict(val._to_dict_vals(meta=meta, json=json)) for val in getattr(self, field.name)]) + else: + val = getattr(self, field.name) + if json: + if isinstance(val, bytes): + val = val.decode() + elif isinstance(val, set): + val = list(val) + yield (field.name, val) + + def to_dict(self, meta=False, json=True): + """Use meta=True to include top-level version; meta='all' to include all internal versions + json=False to return raw encoding; json=True (default) to convert values to be json-serializable + """ + return dict(self._to_dict_vals(meta=meta, json=json)) + def __getitem__(self, key): if self._version is None: raise RuntimeError('DataContainer subscript not supported without _version')