Add tsblock tsfile writer#753
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a V4 writer to convert table-model TsBlock data into a TsFile, with accompanying tests, plus some refactoring to enable subclass access in aligned chunk-group writers.
Changes:
- Introduce
TableTsBlock2TsFileWriterto split a table-modelTsBlockby device and write per-device aligned chunks (with optional synthetic time generation). - Add unit tests covering multiple mappings (existing time column vs generated time; tag-driven device vs single-device).
- Extend chunk writer implementations (new
TableChunkGroupWriterImpl.write(...)and make someAlignedChunkGroupWriterImplmembersprotected) to support the new writer flow.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java | Adds tests validating the new TsBlock→TsFile writing behavior across different column mappings. |
| java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java | New writer that partitions TsBlock rows by device and writes aligned chunk groups, optionally generating per-device timestamps. |
| java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java | Adds a convenience write(...) API for table-style writes (currently contains correctness issues). |
| java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java | Changes access modifiers to protected to enable reuse/extension by table-model writers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| segments[i + 1] = | ||
| tsBlock | ||
| .getValueColumns()[tagColumnIndexInTsBlock[i]] | ||
| .getBinary(rowIdx) | ||
| .getStringValue(TSFileConfig.STRING_CHARSET); |
There was a problem hiding this comment.
getDeviceId assumes every tag column is a non-null STRING/BINARY and unconditionally calls getBinary(rowIdx).getStringValue(...). If a tag value is null (or the tag column type isn’t binary-compatible), this will throw at runtime. Consider following the Tablet#getDeviceID approach: check isNull(rowIdx) and use getObject(rowIdx)/toString() (or otherwise handle non-string tag types) so device ID generation is robust to nulls and supported tag types.
| segments[i + 1] = | |
| tsBlock | |
| .getValueColumns()[tagColumnIndexInTsBlock[i]] | |
| .getBinary(rowIdx) | |
| .getStringValue(TSFileConfig.STRING_CHARSET); | |
| Column tagColumn = tsBlock.getValueColumns()[tagColumnIndexInTsBlock[i]]; | |
| if (tagColumn.isNull(rowIdx)) { | |
| segments[i + 1] = null; | |
| } else { | |
| Object value = tagColumn.getObject(rowIdx); | |
| segments[i + 1] = value == null ? null : value.toString(); | |
| } |
| } | ||
|
|
||
| @Test | ||
| public void test1() throws IOException, WriteProcessException { |
There was a problem hiding this comment.
The test method names test1, test2, test3 don’t communicate what scenarios are being validated (e.g., existing time column vs generated time column, with/without tag columns). Renaming them to reflect the specific behavior under test will make failures much easier to triage.
| public void test1() throws IOException, WriteProcessException { | |
| public void testWriteWithExistingTimeColumnAndTagColumns() throws IOException, WriteProcessException { |
| ValueChunkWriter[] valueChunkWriters = new ValueChunkWriter[valueColumns.length]; | ||
| for (int i = 0; i < measurementSchemas.size(); i++) { | ||
| valueChunkWriters[i] = tryToAddSeriesWriterInternal(measurementSchemas.get(i)); | ||
| } | ||
| for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { | ||
| long time = timeColumn.getLong(rowIndex); | ||
| for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) { | ||
| Column valueColumn = valueColumns[valueColumnIndex]; | ||
| IMeasurementSchema measurementSchema = measurementSchemas.get(valueColumnIndex); | ||
| ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex]; | ||
| boolean isNull = valueColumn.isNull(rowIndex); |
There was a problem hiding this comment.
In write(...), the ValueChunkWriter is indexed by rowIndex (valueChunkWriters[rowIndex]) instead of by valueColumnIndex. This will throw ArrayIndexOutOfBoundsException as soon as endRowIndex exceeds the number of columns, and it also writes data into the wrong series. Use the value-column index when selecting the writer, and consider validating measurementSchemas.size() matches valueColumns.length before initializing/iterating to avoid AIOOB or null writers. Also, this method currently skips the out-of-order check (checkIsHistoryData(time)) that other chunk writers perform, so out-of-order writes may silently corrupt state; it should apply the same check (and adjust the throws signature accordingly).
| throws WriteProcessException { | ||
| int pointCount = 0; | ||
| for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) { | ||
| long time = timeColumn.getLong(rowIndex); |
There was a problem hiding this comment.
it may be better to check timeColumn.isNull, you can throw exception if so.
No description provided.