diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index 5fd9dde7d..87464239f 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -51,20 +51,20 @@ import java.util.stream.Collectors; public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { - private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); + protected static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); - private final IDeviceID deviceId; + protected final IDeviceID deviceId; // measurementID -> ValueChunkWriter - private final Map valueChunkWriterMap = new LinkedHashMap<>(); + protected final Map valueChunkWriterMap = new LinkedHashMap<>(); - private final TimeChunkWriter timeChunkWriter; + protected final TimeChunkWriter timeChunkWriter; - private final EncryptParameter encryprParam; + protected final EncryptParameter encryprParam; - private long lastTime = Long.MIN_VALUE; - private boolean isInitLastTime = false; - private boolean convertColumnNameToLowerCase = false; + protected long lastTime = Long.MIN_VALUE; + protected boolean isInitLastTime = false; + protected boolean convertColumnNameToLowerCase = false; public AlignedChunkGroupWriterImpl(IDeviceID deviceId) { this.deviceId = deviceId; @@ -392,7 +392,7 @@ private void writeEmptyDataInOneRow(List valueChunkWriterList) * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it * to pageBuffer */ - private boolean checkPageSizeAndMayOpenANewPage() { + protected boolean checkPageSizeAndMayOpenANewPage() { if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) { return true; } @@ -404,21 +404,21 @@ private boolean checkPageSizeAndMayOpenANewPage() { return false; } - private void writePageToPageBuffer() { + protected void writePageToPageBuffer() { timeChunkWriter.writePageToPageBuffer(); for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) { valueChunkWriter.writePageToPageBuffer(); } } - private void sealAllChunks() { + protected void sealAllChunks() { timeChunkWriter.sealCurrentPage(); for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) { valueChunkWriter.sealCurrentPage(); } } - private void checkIsHistoryData(long time) throws WriteProcessException { + protected void checkIsHistoryData(long time) throws WriteProcessException { if (isInitLastTime && time <= lastTime) { throw new WriteProcessException( "Not allowed to write out-of-order data in timeseries " diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java new file mode 100644 index 000000000..17c610e8d --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl; +import org.apache.tsfile.write.chunk.IChunkGroupWriter; +import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl; +import org.apache.tsfile.write.chunk.ValueChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1. Split TsBlock by + * device (based on tag columns) 2. Optionally generate a new time column per device 3. Dispatch + * rows to corresponding ChunkGroupWriter + */ +public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter { + + private final boolean generateNewTimeColumn; + private final int timeColumnIndexInTsBlock; + private final int[] tagColumnIndexInTsBlock; + private final int[] fieldColumnIndexInTsBlock; + private final IMeasurementSchema[] fieldColumnSchemas; + + private final String tableName; + private final Map deviceRowCountMap; + + private int rowCount = 0; + + public TableTsBlock2TsFileWriter( + File file, + TableSchema tableSchema, + long memoryThreshold, + boolean generateNewTimeColumn, + int timeColumnIndexInTsBlock, + int[] tagColumnIndexesInTsBlock, + int[] fieldColumnIndexesInTsBlock, + IMeasurementSchema[] fieldColumnSchemas) + throws IOException { + super(file, tableSchema, memoryThreshold); + this.tableName = tableSchema.getTableName(); + this.generateNewTimeColumn = generateNewTimeColumn; + this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock; + this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock; + this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock; + this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null; + this.fieldColumnSchemas = fieldColumnSchemas; + } + + public void write(TsBlock tsBlock) throws IOException, WriteProcessException { + if (tsBlock == null || tsBlock.isEmpty()) { + return; + } + // Split TsBlock into device partitions and prepare time column + Pair>> timeColumnAndDeviceIdEndIndexPairs = + splitTsBlockByDeviceAndGetTimeColumn(tsBlock); + Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left; + // Extract value columns according to schema mapping + Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length]; + for (int i = 0; i < valueColumns.length; i++) { + valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]); + } + List> deviceIdEndIndexPairs = timeColumnAndDeviceIdEndIndexPairs.right; + int startIndex = 0; + + // Iterate each device segment and write data into its ChunkGroup + for (Pair pair : deviceIdEndIndexPairs) { + TableTsBlockChunkGroupWriterImpl chunkGroupWriter = + (TableTsBlockChunkGroupWriterImpl) tryToInitialGroupWriter(pair.left, true, true); + int writeCount = chunkGroupWriter.write(timeColumn, valueColumns, startIndex, pair.right); + rowCount += writeCount; + recordCount += writeCount; + startIndex = pair.right; + } + + this.checkMemorySizeAndMayFlushChunks(); + } + + /** + * Split TsBlock by device boundary. If generateNewTimeColumn is true, generate a monotonically + * increasing time column per device using deviceRowCountMap. + * + * @return Pair of (time column, device -> end index list) + */ + private Pair>> splitTsBlockByDeviceAndGetTimeColumn( + TsBlock tsBlock) { + long[] timestamps = null; + if (generateNewTimeColumn) { + timestamps = new long[tsBlock.getPositionCount()]; + } + List> deviceSplitResult = new ArrayList<>(); + IDeviceID lastDeviceID = null; + long lastDeviceCount = 0; + + // Iterate rows and detect device boundary changes + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + IDeviceID currDeviceID = getDeviceId(tsBlock, i); + // Device changed, flush previous segment + if (!currDeviceID.equals(lastDeviceID)) { + if (lastDeviceID != null) { + deviceSplitResult.add(new Pair(lastDeviceID, i)); + if (generateNewTimeColumn) { + deviceRowCountMap.put(lastDeviceID, lastDeviceCount); + } + } + lastDeviceID = currDeviceID; + if (generateNewTimeColumn) { + lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L); + } + } + // Generate synthetic time if required + if (generateNewTimeColumn) { + timestamps[i] = lastDeviceCount++; + } + } + + deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount())); + if (generateNewTimeColumn) { + deviceRowCountMap.put(lastDeviceID, lastDeviceCount); + return new Pair<>(new TimeColumn(timestamps.length, timestamps), deviceSplitResult); + } else { + return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock), deviceSplitResult); + } + } + + private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) { + String[] segments = new String[tagColumnIndexInTsBlock.length + 1]; + segments[0] = tableName; + for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) { + Column tagColumn = tsBlock.getColumn(tagColumnIndexInTsBlock[i]); + if (tagColumn.isNull(rowIdx)) { + segments[i + 1] = null; + } else { + segments[i + 1] = tagColumn.getBinary(rowIdx).getStringValue(TSFileConfig.STRING_CHARSET); + } + } + return new StringArrayDeviceID(segments); + } + + @Override + protected IChunkGroupWriter tryToInitialGroupWriter( + IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException { + IChunkGroupWriter groupWriter = groupWriters.get(deviceId); + if (groupWriter == null) { + groupWriter = new TableTsBlockChunkGroupWriterImpl(deviceId); + ((AlignedChunkGroupWriterImpl) groupWriter) + .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); + groupWriters.put(deviceId, groupWriter); + } + return groupWriter; + } + + public int getDeviceCount() { + return alignedDeviceLastTimeMap.size(); + } + + public int getRowCount() { + return rowCount; + } + + /** + * ChunkGroup writer for a single device. Responsible for writing time column and multiple value + * columns. + */ + private class TableTsBlockChunkGroupWriterImpl extends TableChunkGroupWriterImpl { + private final ValueChunkWriter[] valueChunkWriters; + + public TableTsBlockChunkGroupWriterImpl(IDeviceID deviceId) throws IOException { + super(deviceId); + // Initialize ValueChunkWriter for each measurement + this.valueChunkWriters = new ValueChunkWriter[fieldColumnSchemas.length]; + for (int i = 0; i < fieldColumnSchemas.length; i++) { + valueChunkWriters[i] = tryToAddSeriesWriterInternal(fieldColumnSchemas[i]); + } + } + + /** + * Write a range of rows into chunk group. + * + * @param startRowIndex inclusive + * @param endRowIndex exclusive + */ + public int write(Column timeColumn, Column[] valueColumns, int startRowIndex, int endRowIndex) + throws WriteProcessException { + int pointCount = 0; + for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { + if (timeColumn.isNull(rowIndex)) { + throw new WriteProcessException("All values in time column should not be null"); + } + long time = timeColumn.getLong(rowIndex); + checkIsHistoryData(time); + for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { + Column valueColumn = valueColumns[valueColumnIndex]; + ValueChunkWriter valueChunkWriter = valueChunkWriters[valueColumnIndex]; + boolean isNull = valueColumn.isNull(rowIndex); + switch (valueChunkWriter.getDataType()) { + case BOOLEAN: + valueChunkWriter.write( + time, isNull ? false : valueColumn.getBoolean(rowIndex), isNull); + break; + case INT32: + case DATE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getInt(rowIndex), isNull); + break; + case INT64: + case TIMESTAMP: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getLong(rowIndex), isNull); + break; + case FLOAT: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getFloat(rowIndex), isNull); + break; + case DOUBLE: + valueChunkWriter.write(time, isNull ? 0 : valueColumn.getDouble(rowIndex), isNull); + break; + case TEXT: + case BLOB: + case STRING: + valueChunkWriter.write(time, isNull ? null : valueColumn.getBinary(rowIndex), isNull); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Data type %s is not supported.", valueChunkWriter.getDataType().getType())); + } + } + timeChunkWriter.write(time); + lastTime = time; + isInitLastTime = true; + if (checkPageSizeAndMayOpenANewPage()) { + writePageToPageBuffer(); + } + pointCount++; + } + return pointCount; + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java new file mode 100644 index 000000000..80320d13d --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.writer; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.TsFileGeneratorForTest; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class TableTsBlock2TsFileWriterTest { + + private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0); + + @After + public void tearDown() throws Exception { + Files.deleteIfExists(Paths.get(filePath)); + } + + @Test + public void testWriteWithExistingTimeColumnAndTagColumns() + throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TsBlock tsBlock = getTsBlock(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + false, + 0, + new int[] {1}, + new int[] {2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(tsBlock); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(10, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(10, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(4, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(9, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + @Test + public void testWriteWithExistingTagColumns() throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + true, + -1, + new int[] {1}, + new int[] {0, 2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("t", TSDataType.TIMESTAMP), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(getTsBlock()); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(10, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(10, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(5, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(9, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + @Test + public void testWriteWithoutTimeColumnAndTagColumns() throws IOException, WriteProcessException { + TableSchema tableSchema = getTableSchema(); + TableTsBlock2TsFileWriter writer = + new TableTsBlock2TsFileWriter( + new File(filePath), + tableSchema, + 32 * 1024 * 1024, + true, + -1, + new int[0], + new int[] {0, 1, 2, 3, 4}, + new IMeasurementSchema[] { + new MeasurementSchema("t", TSDataType.TIMESTAMP), + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32), + }); + writer.write(getTsBlock()); + writer.close(); + + Assert.assertEquals(100, writer.getRowCount()); + Assert.assertEquals(1, writer.getDeviceCount()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1")); + Map> deviceTimeseriesMetadataMap = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(1, deviceTimeseriesMetadataMap.size()); + for (Map.Entry> entry : + deviceTimeseriesMetadataMap.entrySet()) { + Assert.assertEquals(6, entry.getValue().size()); + Assert.assertEquals(0, entry.getValue().get(0).getStatistics().getStartTime()); + Assert.assertEquals(99, entry.getValue().get(1).getStatistics().getEndTime()); + } + } + } + + private TableSchema getTableSchema() { + return new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)), + Arrays.asList( + ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD, ColumnCategory.FIELD)); + } + + private TsBlock getTsBlock() { + TsBlockBuilder tsBlockBuilder = + new TsBlockBuilder( + Arrays.asList( + TSDataType.TIMESTAMP, + TSDataType.STRING, + TSDataType.INT32, + TSDataType.INT32, + TSDataType.INT32)); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + tsBlockBuilder.getTimeColumnBuilder().writeLong(0); + tsBlockBuilder.getValueColumnBuilders()[0].writeLong(j); + tsBlockBuilder.getValueColumnBuilders()[1].writeBinary( + new Binary("device" + i, TSFileConfig.STRING_CHARSET)); + tsBlockBuilder.getValueColumnBuilders()[2].writeInt(j); + tsBlockBuilder.getValueColumnBuilders()[3].writeInt(j); + tsBlockBuilder.getValueColumnBuilders()[4].writeInt(j); + tsBlockBuilder.declarePosition(); + } + } + return tsBlockBuilder.build(); + } +}