diff --git a/iotdb-client/client-py/iotdb/Session.py b/iotdb-client/client-py/iotdb/Session.py index a3d89d717464b..d4e013a334252 100644 --- a/iotdb-client/client-py/iotdb/Session.py +++ b/iotdb-client/client-py/iotdb/Session.py @@ -1680,6 +1680,18 @@ def value_to_bytes(data_types, values): values_tobe_packed.append(b"\x0b") values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) + # OBJECT + elif data_type == 12: + if isinstance(value, str): + value_bytes = bytes(value, "utf-8") + else: + value_bytes = value + format_str_list.append("ci") + format_str_list.append(str(len(value_bytes))) + format_str_list.append("s") + values_tobe_packed.append(b"\x0c") + values_tobe_packed.append(len(value_bytes)) + values_tobe_packed.append(value_bytes) else: raise RuntimeError("Unsupported data type:" + str(data_type)) format_str = "".join(format_str_list) diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py index cc5577e885c62..82d580210d3cd 100644 --- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py +++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py @@ -17,6 +17,7 @@ # import numpy as np +from iotdb.utils.IoTDBConstants import TSDataType # Serialized tsBlock: # +-------------+---------------+---------+------------+-----------+----------+ @@ -25,6 +26,26 @@ # | int32 | list[byte] | int32 | list[byte] | bytes | byte | # +-------------+---------------+---------+------------+-----------+----------+ +VALID_TS_DATA_TYPES = ( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.TIMESTAMP, + TSDataType.DATE, + TSDataType.BLOB, + TSDataType.STRING, + TSDataType.OBJECT, +) +BINARY_TS_DATA_TYPES = ( + TSDataType.TEXT, + TSDataType.BLOB, + TSDataType.STRING, + TSDataType.OBJECT, +) + def deserialize(buffer): value_column_count, buffer = read_int_from_buffer(buffer) @@ -73,7 +94,7 @@ def read_from_buffer(buffer, size): def read_column_types(buffer, value_column_count): data_types = np.frombuffer(buffer, dtype=np.uint8, count=value_column_count) new_buffer = buffer[value_column_count:] - if not np.all(np.isin(data_types, (0, 1, 2, 3, 4, 5, 8, 9, 10, 11))): + if not np.all(np.isin(data_types, VALID_TS_DATA_TYPES)): raise Exception("Invalid data type encountered: " + str(data_types)) return data_types, new_buffer @@ -113,9 +134,9 @@ def read_int64_column(buffer, data_type, position_count): else: size = np.count_nonzero(~null_indicators) - if data_type == 2: + if data_type == TSDataType.INT64: dtype = ">i8" - elif data_type == 4: + elif data_type == TSDataType.DOUBLE: dtype = ">f8" else: raise Exception("Invalid data type: " + str(data_type)) @@ -139,9 +160,9 @@ def read_int32_column(buffer, data_type, position_count): else: size = np.count_nonzero(~null_indicators) - if (data_type == 1) or (data_type == 9): + if (data_type == TSDataType.INT32) or (data_type == TSDataType.DATE): dtype = ">i4" - elif data_type == 3: + elif data_type == TSDataType.FLOAT: dtype = ">f4" else: raise Exception("Invalid data type: " + str(data_type)) @@ -159,8 +180,8 @@ def read_int32_column(buffer, data_type, position_count): def read_byte_column(buffer, data_type, position_count): - if data_type != 0: - raise Exception("Invalid data type: " + data_type) + if data_type != TSDataType.BOOLEAN: + raise Exception("Invalid data type: " + str(data_type)) null_indicators, buffer = deserialize_null_indicators(buffer, position_count) res, buffer = deserialize_from_boolean_array(buffer, position_count) return res, null_indicators, buffer @@ -190,8 +211,8 @@ def deserialize_from_boolean_array(buffer, size): def read_binary_column(buffer, data_type, position_count): - if data_type != 5: - raise Exception("Invalid data type: " + data_type) + if data_type not in BINARY_TS_DATA_TYPES: + raise Exception("Invalid data type: " + str(data_type)) null_indicators, buffer = deserialize_null_indicators(buffer, position_count) if null_indicators is None: @@ -229,10 +250,18 @@ def read_run_length_column(buffer, data_type, position_count): def repeat(column, data_type, position_count): - if data_type in (0, 5): + if data_type in ( + TSDataType.BOOLEAN, + TSDataType.TEXT, + TSDataType.BLOB, + TSDataType.STRING, + TSDataType.OBJECT, + ): if column.size == 1: return np.full( - position_count, column[0], dtype=(bool if data_type == 0 else object) + position_count, + column[0], + dtype=(bool if data_type == TSDataType.BOOLEAN else object), ) else: return np.array(column * position_count, dtype=object) diff --git a/iotdb-client/client-py/iotdb/utils/Field.py b/iotdb-client/client-py/iotdb/utils/Field.py index d9a0ee77776ec..9ee1455039f61 100644 --- a/iotdb-client/client-py/iotdb/utils/Field.py +++ b/iotdb-client/client-py/iotdb/utils/Field.py @@ -24,6 +24,25 @@ import pandas as pd +def parse_object_byte_array_to_string(value: bytes) -> str: + if value is None: + return "None" + if len(value) < 8: + object_size = 0 + else: + # tsfile BytesUtils.bytesToLong builds an unsigned long from bytes. + # Using signed=False keeps it consistent for non-negative object sizes. + object_size = int.from_bytes(value[:8], byteorder="big", signed=False) + + if object_size < 1024: + return "(Object) {} B".format(object_size) + if object_size < 1048576: # 1024^2 + return "(Object) {:.2f} KB".format(object_size / 1024.0) + if object_size < 1073741824: # 1024^3 + return "(Object) {:.2f} MB".format(object_size / 1048576.0) + return "(Object) {:.2f} GB".format(object_size / 1073741824.0) + + class Field(object): def __init__(self, data_type, value=None, timezone=None, precision=None): """ @@ -58,6 +77,7 @@ def copy(field): output.get_data_type() == TSDataType.TEXT or output.get_data_type() == TSDataType.STRING or output.get_data_type() == TSDataType.BLOB + or output.get_data_type() == TSDataType.OBJECT ): output.set_binary_value(field.get_binary_value()) else: @@ -154,6 +174,7 @@ def get_binary_value(self): self.__data_type != TSDataType.TEXT and self.__data_type != TSDataType.STRING and self.__data_type != TSDataType.BLOB + and self.__data_type != TSDataType.OBJECT or self.value is None or self.value is pd.NA ): @@ -186,13 +207,16 @@ def get_string_value(self): if self.__data_type is None or self.value is None or self.value is pd.NA: return "None" # TEXT, STRING - if self.__data_type == 5 or self.__data_type == 11: + if self.__data_type == TSDataType.TEXT or self.__data_type == TSDataType.STRING: return self.value.decode("utf-8") + # OBJECT + elif self.__data_type == TSDataType.OBJECT: + return parse_object_byte_array_to_string(self.value) # BLOB - elif self.__data_type == 10: + elif self.__data_type == TSDataType.BLOB: return str(hex(int.from_bytes(self.value, byteorder="big"))) # TIMESTAMP - elif self.__data_type == 8: + elif self.__data_type == TSDataType.TIMESTAMP: return isoformat( convert_to_timestamp(self.value, self.__precision, self.__timezone), self.__precision, @@ -210,24 +234,26 @@ def get_object_value(self, data_type): """ if self.__data_type is None or self.value is None or self.value is pd.NA: return None - if data_type == 0: + if data_type == TSDataType.BOOLEAN: return bool(self.value) - elif data_type == 1: + elif data_type == TSDataType.INT32: return np.int32(self.value) - elif data_type == 2: + elif data_type == TSDataType.INT64: return np.int64(self.value) - elif data_type == 3: + elif data_type == TSDataType.FLOAT: return np.float32(self.value) - elif data_type == 4: + elif data_type == TSDataType.DOUBLE: return np.float64(self.value) - elif data_type == 8: + elif data_type == TSDataType.TIMESTAMP: return convert_to_timestamp(self.value, self.__precision, self.__timezone) - elif data_type == 9: + elif data_type == TSDataType.DATE: return parse_int_to_date(self.value) - elif data_type == 5 or data_type == 11: + elif data_type == TSDataType.TEXT or data_type == TSDataType.STRING: return self.value.decode("utf-8") - elif data_type == 10: + elif data_type == TSDataType.BLOB: return self.value + elif data_type == TSDataType.OBJECT: + return parse_object_byte_array_to_string(self.value) else: raise RuntimeError("Unsupported data type:" + str(data_type)) diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py index 4b9082b5353fd..5fa34e1d09763 100644 --- a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py +++ b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py @@ -32,6 +32,7 @@ class TSDataType(IntEnum): DATE = 9 BLOB = 10 STRING = 11 + OBJECT = 12 def np_dtype(self): return { @@ -44,6 +45,7 @@ def np_dtype(self): TSDataType.TIMESTAMP: np.dtype(">i8"), TSDataType.DATE: date, TSDataType.BLOB: bytes, + TSDataType.OBJECT: bytes, TSDataType.STRING: str, }[self] diff --git a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py index 9a0860d3a43e9..b5b1a89929fa7 100644 --- a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py +++ b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py @@ -141,8 +141,10 @@ def get_binary_values(self): or data_type == 8 ): bs = value.tobytes() - # TEXT, STRING, BLOB - elif data_type == 5 or data_type == 11 or data_type == 10: + # TEXT, STRING, BLOB, OBJECT + elif ( + data_type == 5 or data_type == 11 or data_type == 10 or data_type == 12 + ): format_str_list = [">"] values_tobe_packed = [] for str_list in value: diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 9c5b3ec06ddc5..a19ff49f70a68 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -189,6 +189,7 @@ def get_typed_point(field: Field, none_value=None): TSDataType.STRING: lambda f: f.get_string_value(), TSDataType.DATE: lambda f: f.get_date_value(), TSDataType.BLOB: lambda f: f.get_binary_value(), + TSDataType.OBJECT: lambda f: f.get_string_value(), } result_next_type: TSDataType = field.get_data_type() diff --git a/iotdb-client/client-py/iotdb/utils/Tablet.py b/iotdb-client/client-py/iotdb/utils/Tablet.py index 9b241723fe5b7..8b6257bf5eb47 100644 --- a/iotdb-client/client-py/iotdb/utils/Tablet.py +++ b/iotdb-client/client-py/iotdb/utils/Tablet.py @@ -189,8 +189,10 @@ def get_binary_values(self): values_tobe_packed.append(0) self.__mark_none_value(bitmaps, i, j) has_none = True - # TEXT, STRING, BLOB - elif data_type == 5 or data_type == 11 or data_type == 10: + # TEXT, STRING, BLOB, OBJECT + elif ( + data_type == 5 or data_type == 11 or data_type == 10 or data_type == 12 + ): for j in range(self.__row_number): if self.__values[j][i] is not None: if isinstance(self.__values[j][i], str): @@ -203,7 +205,7 @@ def get_binary_values(self): values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) else: - value_bytes = bytes("", "utf-8") + value_bytes = b"" format_str_list.append("i") format_str_list.append(str(len(value_bytes))) format_str_list.append("s") diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index 0edc76f68fd6d..dc16aad2c3c41 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -346,6 +346,9 @@ def _process_buffer(self): # TEXT, STRING elif data_type in (5, 11): data_array = np.array([x.decode("utf-8") for x in column_array]) + # OBJECT (kept as bytes; Field will format it later) + elif data_type == 12: + data_array = column_array # TIMESTAMP elif data_type == 8: data_array = pd.Series( @@ -381,6 +384,7 @@ def _process_buffer(self): data_type == 5 or data_type == 11 or data_type == 10 + or data_type == 12 or data_type == 9 or data_type == 8 ): diff --git a/iotdb-client/client-py/tests/unit/test_object_serialization_unit.py b/iotdb-client/client-py/tests/unit/test_object_serialization_unit.py new file mode 100644 index 0000000000000..9f8b74d39cf52 --- /dev/null +++ b/iotdb-client/client-py/tests/unit/test_object_serialization_unit.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import struct + +from iotdb.Session import Session +from iotdb.utils.Field import Field, parse_object_byte_array_to_string +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.Tablet import Tablet + + +def test_object_serialization_and_formatting_should_match_tsfile_semantics(): + # The server-side parse rule only cares about the first 8 bytes: + # BytesUtils.parseObjectByteArrayToString -> objectSize (big-endian unsigned long) + object_size = 67108864 # 64MB + stored_object_bytes = ( + object_size.to_bytes(8, byteorder="big", signed=False) + b"rest" + ) + expected_string = parse_object_byte_array_to_string(stored_object_bytes) + + # 1) Parsing/formatting (no server) + field = Field(TSDataType.OBJECT, stored_object_bytes) + assert field.get_binary_value() == stored_object_bytes + assert field.get_string_value() == expected_string + assert field.get_object_value(TSDataType.OBJECT) == expected_string + + # 2) Tablet serialization (insertTablet) + # For OBJECT-like binary columns, Tablet packs: [int length][bytes] per row. + tablet = Tablet( + "root.sg_py_object_unit", + ["obj_m1"], + [TSDataType.OBJECT], + [[stored_object_bytes]], + [100], + ) + actual_tablet_binary_values = tablet.get_binary_values() + expected_tablet_binary_values = struct.pack( + f">i{len(stored_object_bytes)}s", + len(stored_object_bytes), + stored_object_bytes, + ) + assert actual_tablet_binary_values == expected_tablet_binary_values + + # 3) Record serialization (insertRecords style): Session.value_to_bytes + # OBJECT branch uses an extra type marker: b"\x0c" followed by [int length][bytes]. + actual_record_bytes = Session.value_to_bytes( + [TSDataType.OBJECT], [stored_object_bytes] + ) + expected_record_bytes = struct.pack( + f">ci{len(stored_object_bytes)}s", + b"\x0c", + len(stored_object_bytes), + stored_object_bytes, + ) + assert actual_record_bytes == expected_record_bytes