From 6c81a15fb0d1a6652754dbbae275e94e0d38eb15 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 26 Mar 2026 18:33:48 +0800 Subject: [PATCH 1/3] Add tsblock tsfile writer --- .../chunk/AlignedChunkGroupWriterImpl.java | 24 +- .../chunk/TableChunkGroupWriterImpl.java | 67 +++++ .../write/v4/TableTsBlock2TsFileWriter.java | 266 ++++++++++++++++++ .../writer/TableTsBlock2TsFileWriterTest.java | 209 ++++++++++++++ 4 files changed, 554 insertions(+), 12 deletions(-) create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index 5fd9dde7d..87464239f 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -51,20 +51,20 @@ import java.util.stream.Collectors; public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { - private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); + protected static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); - private final IDeviceID deviceId; + protected final IDeviceID deviceId; // measurementID -> ValueChunkWriter - private final Map valueChunkWriterMap = new LinkedHashMap<>(); + protected final Map valueChunkWriterMap = new LinkedHashMap<>(); - private final TimeChunkWriter timeChunkWriter; + protected final TimeChunkWriter timeChunkWriter; - private final EncryptParameter encryprParam; + protected final EncryptParameter encryprParam; - private long lastTime = Long.MIN_VALUE; - private boolean isInitLastTime = false; - private boolean convertColumnNameToLowerCase = false; + protected long lastTime = Long.MIN_VALUE; + protected boolean isInitLastTime = false; + protected boolean convertColumnNameToLowerCase = false; public AlignedChunkGroupWriterImpl(IDeviceID deviceId) { this.deviceId = deviceId; @@ -392,7 +392,7 @@ private void writeEmptyDataInOneRow(List valueChunkWriterList) * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it * to pageBuffer */ - private boolean checkPageSizeAndMayOpenANewPage() { + protected boolean checkPageSizeAndMayOpenANewPage() { if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) { return true; } @@ -404,21 +404,21 @@ private boolean checkPageSizeAndMayOpenANewPage() { return false; } - private void writePageToPageBuffer() { + protected void writePageToPageBuffer() { timeChunkWriter.writePageToPageBuffer(); for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) { valueChunkWriter.writePageToPageBuffer(); } } - private void sealAllChunks() { + protected void sealAllChunks() { timeChunkWriter.sealCurrentPage(); for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) { valueChunkWriter.sealCurrentPage(); } } - private void checkIsHistoryData(long time) throws WriteProcessException { + protected void checkIsHistoryData(long time) throws WriteProcessException { if (isInitLastTime && time <= lastTime) { throw new WriteProcessException( "Not allowed to write out-of-order data in timeseries " diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java index 3e987b662..bf683c772 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java @@ -19,8 +19,14 @@ package org.apache.tsfile.write.chunk; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.IOException; +import java.util.List; public class TableChunkGroupWriterImpl extends AlignedChunkGroupWriterImpl { @@ -33,4 +39,65 @@ public TableChunkGroupWriterImpl(IDeviceID deviceId, EncryptParameter encryptPar super(deviceId, encryptParam); setConvertColumnNameToLowerCase(true); } + + public int write( + Column timeColumn, + Column[] valueColumns, + List measurementSchemas, + int startRowIndex, + int endRowIndex) + throws IOException { + int pointCount = 0; + ValueChunkWriter[] valueChunkWriters = new ValueChunkWriter[valueColumns.length]; + for (int i = 0; i < measurementSchemas.size(); i++) { + valueChunkWriters[i] = tryToAddSeriesWriterInternal(measurementSchemas.get(i)); + } + for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { + long time = timeColumn.getLong(rowIndex); + for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { + Column valueColumn = valueColumns[valueColumnIndex]; + IMeasurementSchema measurementSchema = measurementSchemas.get(valueColumnIndex); + ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex]; + boolean isNull = valueColumn.isNull(rowIndex); + switch (measurementSchema.getType()) { + case BOOLEAN: + valueChunkWriter.write(time, isNull ? false : valueColumn.getBoolean(rowIndex), isNull); + break; + case INT32: + case DATE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getInt(rowIndex), isNull); + break; + case INT64: + case TIMESTAMP: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getLong(rowIndex), isNull); + break; + case FLOAT: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getFloat(rowIndex), isNull); + break; + case DOUBLE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getDouble(rowIndex), isNull); + break; + case TEXT: + case BLOB: + case STRING: + case OBJECT: + valueChunkWriter.write(time, isNull ? null : valueColumn.getBinary(rowIndex), isNull); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Data type %s is not supported.", + measurementSchemas.get(valueColumnIndex).getType())); + } + } + timeChunkWriter.write(time); + lastTime = time; + isInitLastTime = true; + if (checkPageSizeAndMayOpenANewPage()) { + writePageToPageBuffer(); + } + pointCount++; + } + return pointCount; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java new file mode 100644 index 000000000..12429f77b --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl; +import org.apache.tsfile.write.chunk.IChunkGroupWriter; +import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl; +import org.apache.tsfile.write.chunk.ValueChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1. Split TsBlock by + * device (based on tag columns) 2. Optionally generate a new time column per device 3. Dispatch + * rows to corresponding ChunkGroupWriter + */ +public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter { + + private final boolean generateNewTimeColumn; + private final int timeColumnIndexInTsBlock; + private final int[] tagColumnIndexInTsBlock; + private final int[] fieldColumnIndexInTsBlock; + private final IMeasurementSchema[] fieldColumnSchemas; + + private final String tableName; + private final Map deviceRowCountMap; + + private int rowCount = 0; + + public TableTsBlock2TsFileWriter( + File file, + TableSchema tableSchema, + long memoryThreshold, + boolean generateNewTimeColumn, + int timeColumnIndexInTsBlock, + int[] tagColumnIndexesInTsBlock, + int[] fieldColumnIndexesInTsBlock, + IMeasurementSchema[] fieldColumnSchemas) + throws IOException { + super(file, tableSchema, memoryThreshold); + this.tableName = tableSchema.getTableName(); + this.generateNewTimeColumn = generateNewTimeColumn; + this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock; + this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock; + this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock; + this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null; + this.fieldColumnSchemas = fieldColumnSchemas; + } + + public void write(TsBlock tsBlock) throws IOException, WriteProcessException { + if (tsBlock == null || tsBlock.isEmpty()) { + return; + } + // Split TsBlock into device partitions and prepare time column + Pair>> timeColumnAndDeviceIdEndIndexPairs = + splitTsBlockByDeviceAndGetTimeColumn(tsBlock); + Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left; + // Extract value columns according to schema mapping + Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length]; + for (int i = 0; i < valueColumns.length; i++) { + valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]); + } + List> deviceIdEndIndexPairs = timeColumnAndDeviceIdEndIndexPairs.right; + int startIndex = 0; + + // Iterate each device segment and write data into its ChunkGroup + for (Pair pair : deviceIdEndIndexPairs) { + TableTsBlockChunkGroupWriterImpl chunkGroupWriter = + (TableTsBlockChunkGroupWriterImpl) tryToInitialGroupWriter(pair.left, true, true); + int writeCount = chunkGroupWriter.write(timeColumn, valueColumns, startIndex, pair.right); + rowCount += writeCount; + recordCount += writeCount; + startIndex = pair.right; + } + + this.checkMemorySizeAndMayFlushChunks(); + } + + /** + * Split TsBlock by device boundary. If generateNewTimeColumn is true, generate a monotonically + * increasing time column per device using deviceRowCountMap. + * + * @return Pair of (time column, device -> end index list) + */ + private Pair>> splitTsBlockByDeviceAndGetTimeColumn( + TsBlock tsBlock) { + long[] timestamps = null; + if (generateNewTimeColumn) { + timestamps = new long[tsBlock.getPositionCount()]; + } + List> deviceSplitResult = new ArrayList<>(); + IDeviceID lastDeviceID = null; + long lastDeviceCount = 0; + + // Iterate rows and detect device boundary changes + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + IDeviceID currDeviceID = getDeviceId(tsBlock, i); + // Device changed, flush previous segment + if (!currDeviceID.equals(lastDeviceID)) { + if (lastDeviceID != null) { + deviceSplitResult.add(new Pair(lastDeviceID, i)); + if (generateNewTimeColumn) { + deviceRowCountMap.put(lastDeviceID, lastDeviceCount); + } + } + lastDeviceID = currDeviceID; + if (generateNewTimeColumn) { + lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L); + } + } + // Generate synthetic time if required + if (generateNewTimeColumn) { + timestamps[i] = lastDeviceCount++; + } + } + + deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount())); + if (generateNewTimeColumn) { + deviceRowCountMap.put(lastDeviceID, lastDeviceCount); + return new Pair<>(new TimeColumn(timestamps.length, timestamps), deviceSplitResult); + } else { + return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock), deviceSplitResult); + } + } + + private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) { + String[] segments = new String[tagColumnIndexInTsBlock.length + 1]; + segments[0] = tableName; + for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) { + segments[i + 1] = + tsBlock + .getValueColumns()[tagColumnIndexInTsBlock[i]] + .getBinary(rowIdx) + .getStringValue(TSFileConfig.STRING_CHARSET); + } + return new StringArrayDeviceID(segments); + } + + @Override + protected IChunkGroupWriter tryToInitialGroupWriter( + IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException { + IChunkGroupWriter groupWriter = groupWriters.get(deviceId); + if (groupWriter == null) { + groupWriter = new TableTsBlockChunkGroupWriterImpl(deviceId); + ((AlignedChunkGroupWriterImpl) groupWriter) + .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); + groupWriters.put(deviceId, groupWriter); + } + return groupWriter; + } + + public int getDeviceCount() { + return alignedDeviceLastTimeMap.size(); + } + + public int getRowCount() { + return rowCount; + } + + /** + * ChunkGroup writer for a single device. Responsible for writing time column and multiple value + * columns. + */ + private class TableTsBlockChunkGroupWriterImpl extends TableChunkGroupWriterImpl { + private final ValueChunkWriter[] valueChunkWriters; + + public TableTsBlockChunkGroupWriterImpl(IDeviceID deviceId) throws IOException { + super(deviceId); + // Initialize ValueChunkWriter for each measurement + this.valueChunkWriters = new ValueChunkWriter[fieldColumnSchemas.length]; + for (int i = 0; i < fieldColumnSchemas.length; i++) { + valueChunkWriters[i] = tryToAddSeriesWriterInternal(fieldColumnSchemas[i]); + } + } + + /** + * Write a range of rows into chunk group. + * + * @param startRowIndex inclusive + * @param endRowIndex exclusive + */ + public int write(Column timeColumn, Column[] valueColumns, int startRowIndex, int endRowIndex) + throws WriteProcessException { + int pointCount = 0; + for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { + long time = timeColumn.getLong(rowIndex); + checkIsHistoryData(time); + for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { + Column valueColumn = valueColumns[valueColumnIndex]; + ValueChunkWriter valueChunkWriter = valueChunkWriters[valueColumnIndex]; + boolean isNull = valueColumn.isNull(rowIndex); + switch (valueChunkWriter.getDataType()) { + case BOOLEAN: + valueChunkWriter.write( + time, isNull ? false : valueColumn.getBoolean(rowIndex), isNull); + break; + case INT32: + case DATE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getInt(rowIndex), isNull); + break; + case INT64: + case TIMESTAMP: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getLong(rowIndex), isNull); + break; + case FLOAT: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getFloat(rowIndex), isNull); + break; + case DOUBLE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getDouble(rowIndex), isNull); + break; + case TEXT: + case BLOB: + case STRING: + valueChunkWriter.write(time, isNull ? null : valueColumn.getBinary(rowIndex), isNull); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Data type %s is not supported.", valueChunkWriter.getDataType().getType())); + } + } + timeChunkWriter.write(time); + lastTime = time; + isInitLastTime = true; + if (checkPageSizeAndMayOpenANewPage()) { + writePageToPageBuffer(); + } + pointCount++; + } + return pointCount; + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java new file mode 100644 index 000000000..fac64cf43 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.writer; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.TsFileGeneratorForTest; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class TableTsBlock2TsFileWriterTest { + + private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0); + + @After + public void tearDown() throws Exception { + Files.deleteIfExists(Paths.get(filePath)); + } + + @Test + public void test1() throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TsBlock tsBlock = getTsBlock(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + false, + 0, + new int[] {1}, + new int[] {2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(tsBlock); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(10, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(10, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(4, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(9, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + @Test + public void test2() throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + true, + -1, + new int[] {1}, + new int[] {0, 2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("t", TSDataType.TIMESTAMP), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(getTsBlock()); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(10, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(10, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(5, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(9, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + @Test + public void test3() throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + true, + -1, + new int[0], + new int[] {0, 1, 2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("t", TSDataType.TIMESTAMP), + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(getTsBlock()); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(1, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(1, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(6, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(99, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + private TableSchema getTableSchema() { + return new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)), + Arrays.asList( + ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD, ColumnCategory.FIELD)); + } + + private TsBlock getTsBlock() { + TsBlockBuilder tsBlockBuilder = + new TsBlockBuilder( + Arrays.asList( + TSDataType.TIMESTAMP, + TSDataType.STRING, + TSDataType.INT32, + TSDataType.INT32, + TSDataType.INT32)); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + tsBlockBuilder.getTimeColumnBuilder().writeLong(0); + tsBlockBuilder.getValueColumnBuilders()[0].writeLong(j); + tsBlockBuilder.getValueColumnBuilders()[1].writeBinary( + new Binary("device" + i, TSFileConfig.STRING_CHARSET)); + tsBlockBuilder.getValueColumnBuilders()[2].writeInt(j); + tsBlockBuilder.getValueColumnBuilders()[3].writeInt(j); + tsBlockBuilder.getValueColumnBuilders()[4].writeInt(j); + tsBlockBuilder.declarePosition(); + } + } + return tsBlockBuilder.build(); + } +} From 71fe7827cda5623ff0656668d686b08d20d77a1f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 27 Mar 2026 09:41:09 +0800 Subject: [PATCH 2/3] fix bugs --- .../chunk/TableChunkGroupWriterImpl.java | 67 ------------------- .../write/v4/TableTsBlock2TsFileWriter.java | 14 ++-- 2 files changed, 9 insertions(+), 72 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java index bf683c772..3e987b662 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java @@ -19,14 +19,8 @@ package org.apache.tsfile.write.chunk; -import org.apache.tsfile.block.column.Column; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.write.UnSupportedDataTypeException; -import org.apache.tsfile.write.schema.IMeasurementSchema; - -import java.io.IOException; -import java.util.List; public class TableChunkGroupWriterImpl extends AlignedChunkGroupWriterImpl { @@ -39,65 +33,4 @@ public TableChunkGroupWriterImpl(IDeviceID deviceId, EncryptParameter encryptPar super(deviceId, encryptParam); setConvertColumnNameToLowerCase(true); } - - public int write( - Column timeColumn, - Column[] valueColumns, - List measurementSchemas, - int startRowIndex, - int endRowIndex) - throws IOException { - int pointCount = 0; - ValueChunkWriter[] valueChunkWriters = new ValueChunkWriter[valueColumns.length]; - for (int i = 0; i < measurementSchemas.size(); i++) { - valueChunkWriters[i] = tryToAddSeriesWriterInternal(measurementSchemas.get(i)); - } - for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { - long time = timeColumn.getLong(rowIndex); - for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { - Column valueColumn = valueColumns[valueColumnIndex]; - IMeasurementSchema measurementSchema = measurementSchemas.get(valueColumnIndex); - ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex]; - boolean isNull = valueColumn.isNull(rowIndex); - switch (measurementSchema.getType()) { - case BOOLEAN: - valueChunkWriter.write(time, isNull ? false : valueColumn.getBoolean(rowIndex), isNull); - break; - case INT32: - case DATE: - valueChunkWriter.write(time, isNull ? 0 : valueColumn.getInt(rowIndex), isNull); - break; - case INT64: - case TIMESTAMP: - valueChunkWriter.write(time, isNull ? 0 : valueColumn.getLong(rowIndex), isNull); - break; - case FLOAT: - valueChunkWriter.write(time, isNull ? 0 : valueColumn.getFloat(rowIndex), isNull); - break; - case DOUBLE: - valueChunkWriter.write(time, isNull ? 0 : valueColumn.getDouble(rowIndex), isNull); - break; - case TEXT: - case BLOB: - case STRING: - case OBJECT: - valueChunkWriter.write(time, isNull ? null : valueColumn.getBinary(rowIndex), isNull); - break; - default: - throw new UnSupportedDataTypeException( - String.format( - "Data type %s is not supported.", - measurementSchemas.get(valueColumnIndex).getType())); - } - } - timeChunkWriter.write(time); - lastTime = time; - isInitLastTime = true; - if (checkPageSizeAndMayOpenANewPage()) { - writePageToPageBuffer(); - } - pointCount++; - } - return pointCount; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java index 12429f77b..17c610e8d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java @@ -160,11 +160,12 @@ private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) { String[] segments = new String[tagColumnIndexInTsBlock.length + 1]; segments[0] = tableName; for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) { - segments[i + 1] = - tsBlock - .getValueColumns()[tagColumnIndexInTsBlock[i]] - .getBinary(rowIdx) - .getStringValue(TSFileConfig.STRING_CHARSET); + Column tagColumn = tsBlock.getColumn(tagColumnIndexInTsBlock[i]); + if (tagColumn.isNull(rowIdx)) { + segments[i + 1] = null; + } else { + segments[i + 1] = tagColumn.getBinary(rowIdx).getStringValue(TSFileConfig.STRING_CHARSET); + } } return new StringArrayDeviceID(segments); } @@ -216,6 +217,9 @@ public int write(Column timeColumn, Column[] valueColumns, int startRowIndex, in throws WriteProcessException { int pointCount = 0; for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { + if (timeColumn.isNull(rowIndex)) { + throw new WriteProcessException("All values in time column should not be null"); + } long time = timeColumn.getLong(rowIndex); checkIsHistoryData(time); for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { From 8e48b36f5d35c1e89bcda2efc647d6ada036ee14 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 27 Mar 2026 09:45:38 +0800 Subject: [PATCH 3/3] rename ut --- .../tsfile/write/writer/TableTsBlock2TsFileWriterTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java index fac64cf43..80320d13d 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java @@ -57,7 +57,8 @@ public void tearDown() throws Exception { } @Test - public void test1() throws IOException, WriteProcessException { + public void testWriteWithExistingTimeColumnAndTagColumns() + throws IOException, WriteProcessException { TableSchema tableSchema = getTableSchema(); TsBlock tsBlock = getTsBlock(); TableTsBlock2TsFileWriter writer = @@ -95,7 +96,7 @@ public void test1() throws IOException, WriteProcessException { } @Test - public void test2() throws IOException, WriteProcessException { + public void testWriteWithExistingTagColumns() throws IOException, WriteProcessException { TableSchema tableSchema = getTableSchema(); TableTsBlock2TsFileWriter writer = new TableTsBlock2TsFileWriter( @@ -133,7 +134,7 @@ public void test2() throws IOException, WriteProcessException { } @Test - public void test3() throws IOException, WriteProcessException { + public void testWriteWithoutTimeColumnAndTagColumns() throws IOException, WriteProcessException { TableSchema tableSchema = getTableSchema(); TableTsBlock2TsFileWriter writer = new TableTsBlock2TsFileWriter(