Skip to content

Commit 57717e6

Browse files
authored
Fix SessionDataSet: row count, async dispose, and null measurement issues (#53, #54, #55) (#56)
* Fix SessionDataSet: row count, async dispose, and null measurement issues (#53, #54, #55) 修复 SessionDataSet 行数统计、异步释放及空值测点问题 - Fix CurrentBatchRowCount() always returning 0 by eagerly constructing the first TsBlock in RpcDataSet constructor when initial data is available 修复 CurrentBatchRowCount() 始终返回 0 的问题,在构造函数中预先反序列化首个 TsBlock - Add IAsyncDisposable to SessionDataSet and RpcDataSet, providing DisposeAsync() that properly awaits Close() to avoid sync-over-async deadlocks 为 SessionDataSet 和 RpcDataSet 添加 IAsyncDisposable 接口,支持 await using 语法 - Fix GetRow() including null-valued columns in RowRecord by using IsNull() check before calling type-specific getters, instead of relying on value type null checks which always pass for int/bool/float/etc. 修复 GetRow() 中值类型默认值绕过 null 检查导致空值列被错误包含的问题 * Fix SessionDataSet.Close() idempotency and add regression tests 修复 SessionDataSet.Close() 幂等性问题并添加回归测试 - Set _isClosed = true in Close() finally block to prevent NullReferenceException on repeated Close/Dispose/DisposeAsync calls 在 Close() 的 finally 块中设置 _isClosed = true,防止重复调用导致空引用异常 - Add RpcDataSetTests covering: - CurrentBatchRowCount returns correct size before first row read - CurrentBatchRowCount returns 0 when no data - GetRow excludes null-valued measurements for value types (BOOLEAN, INT32, DOUBLE) - DataTypes/Measurements/Values lists stay consistent 添加 RpcDataSet 回归测试覆盖行数统计和空值测点排除
1 parent 9fd7a9c commit 57717e6

3 files changed

Lines changed: 238 additions & 10 deletions

File tree

src/Apache.IoTDB/DataStructure/RpcDataSet.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
namespace Apache.IoTDB.DataStructure
2828
{
29-
public class RpcDataSet : System.IDisposable
29+
public class RpcDataSet : System.IDisposable, System.IAsyncDisposable
3030
{
3131
private const string TimestampColumnName = "Time";
3232
private const string DefaultTimeFormat = "yyyy-MM-dd HH:mm:ss.fff";
@@ -140,6 +140,11 @@ public RpcDataSet(string sql, List<string> columnNameList, List<string> columnTy
140140
_tsBlockSize = 0;
141141
_tsBlockIndex = -1;
142142

143+
if (HasCachedByteBuffer())
144+
{
145+
ConstructOneTsBlock();
146+
}
147+
143148
_zoneId = FindTimeZoneSafe(zoneId);
144149

145150
if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count)
@@ -172,6 +177,22 @@ public void Dispose()
172177
GC.SuppressFinalize(this);
173178
}
174179

180+
public async ValueTask DisposeAsync()
181+
{
182+
if (!disposedValue)
183+
{
184+
try
185+
{
186+
await Close().ConfigureAwait(false);
187+
}
188+
catch
189+
{
190+
}
191+
disposedValue = true;
192+
}
193+
GC.SuppressFinalize(this);
194+
}
195+
175196
public async Task Close()
176197
{
177198
if (_isClosed) return;
@@ -634,11 +655,9 @@ public RowRecord GetRow()
634655
long timestamp = 0;
635656
foreach (string columnName in columns)
636657
{
637-
object localfield;
638658
string typeStr = _columnTypeList[i];
639659
TSDataType dataType = Client.GetDataTypeByStr(typeStr);
640660

641-
// Identify the real time column by tsBlock index, not by data type
642661
int tsBlockColumnIndex = GetTsBlockColumnIndexForColumnName(columnName);
643662
if (tsBlockColumnIndex == -1)
644663
{
@@ -647,6 +666,13 @@ public RowRecord GetRow()
647666
continue;
648667
}
649668

669+
if (IsNull(tsBlockColumnIndex, _tsBlockIndex))
670+
{
671+
i += 1;
672+
continue;
673+
}
674+
675+
object localfield;
650676
switch (dataType)
651677
{
652678
case TSDataType.BOOLEAN:
@@ -682,12 +708,9 @@ public RowRecord GetRow()
682708
string err_msg = "value format not supported";
683709
throw new TException(err_msg, null);
684710
}
685-
if (localfield != null)
686-
{
687-
fieldList.Add(localfield);
688-
measurementList.Add(columnName);
689-
dataTypeList.Add(dataType);
690-
}
711+
fieldList.Add(localfield);
712+
measurementList.Add(columnName);
713+
dataTypeList.Add(dataType);
691714
i += 1;
692715
}
693716
return new RowRecord(timestamp, fieldList, measurementList, dataTypeList);

src/Apache.IoTDB/DataStructure/SessionDataSet.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
namespace Apache.IoTDB.DataStructure
2727
{
28-
public class SessionDataSet : System.IDisposable
28+
public class SessionDataSet : System.IDisposable, System.IAsyncDisposable
2929
{
3030
private readonly long _queryId;
3131
private readonly long _statementId;
@@ -154,6 +154,7 @@ public async Task Close()
154154
}
155155
finally
156156
{
157+
_isClosed = true;
157158
await _rpcDataSet.Close();
158159
_clientQueue.Add(_client);
159160
_client = null;
@@ -184,5 +185,21 @@ public void Dispose()
184185
Dispose(disposing: true);
185186
GC.SuppressFinalize(this);
186187
}
188+
189+
public async ValueTask DisposeAsync()
190+
{
191+
if (!disposedValue)
192+
{
193+
try
194+
{
195+
await this.Close().ConfigureAwait(false);
196+
}
197+
catch
198+
{
199+
}
200+
disposedValue = true;
201+
}
202+
GC.SuppressFinalize(this);
203+
}
187204
}
188205
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System;
21+
using System.Collections.Generic;
22+
using System.Linq;
23+
using Apache.IoTDB.DataStructure;
24+
using NUnit.Framework;
25+
26+
namespace Apache.IoTDB.Tests
27+
{
28+
[TestFixture]
29+
public class RpcDataSetTests
30+
{
31+
/// <summary>
32+
/// Builds a serialized TsBlock with 2 rows and 3 value columns (INT32, BOOLEAN, DOUBLE).
33+
/// Row 0: time=1000, int32=42, boolean=null, double=3.14
34+
/// Row 1: time=2000, int32=null, boolean=true, double=null
35+
/// </summary>
36+
private static byte[] BuildTestTsBlockBytes()
37+
{
38+
var buf = new ByteBuffer(256);
39+
40+
// 1. value column count
41+
buf.AddInt(3);
42+
43+
// 2. value column data types: INT32(1), BOOLEAN(0), DOUBLE(4)
44+
buf.AddByte((byte)TSDataType.INT32);
45+
buf.AddByte((byte)TSDataType.BOOLEAN);
46+
buf.AddByte((byte)TSDataType.DOUBLE);
47+
48+
// 3. position count
49+
buf.AddInt(2);
50+
51+
// 4. column encodings: time=Int64Array(2), int32=Int32Array(1), boolean=ByteArray(0), double=Int64Array(2)
52+
buf.AddByte((byte)ColumnEncoding.Int64Array);
53+
buf.AddByte((byte)ColumnEncoding.Int32Array);
54+
buf.AddByte((byte)ColumnEncoding.ByteArray);
55+
buf.AddByte((byte)ColumnEncoding.Int64Array);
56+
57+
// 5. Time column (Int64Array): no nulls, 2 values
58+
buf.AddByte(0); // mayHaveNull = false
59+
buf.AddLong(1000L);
60+
buf.AddLong(2000L);
61+
62+
// 6. INT32 column (Int32Array): row1 is null
63+
buf.AddByte(1); // mayHaveNull = true
64+
// null indicators packed: [false, true] → bit7=0, bit6=1 → 0x40
65+
buf.AddByte(0x40);
66+
// only non-null value (row 0)
67+
buf.AddInt(42);
68+
69+
// 7. BOOLEAN column (ByteArray): row0 is null
70+
buf.AddByte(1); // mayHaveNull = true
71+
// null indicators packed: [true, false] → bit7=1, bit6=0 → 0x80
72+
buf.AddByte(0x80);
73+
// boolean values packed (all positions): [false, true] → bit7=0, bit6=1 → 0x40
74+
buf.AddByte(0x40);
75+
76+
// 8. DOUBLE column (Int64Array): row1 is null
77+
buf.AddByte(1); // mayHaveNull = true
78+
// null indicators packed: [false, true] → 0x40
79+
buf.AddByte(0x40);
80+
// only non-null value (row 0)
81+
buf.AddDouble(3.14);
82+
83+
return buf.GetBuffer();
84+
}
85+
86+
private RpcDataSet CreateTestDataSet(List<byte[]> queryResult = null)
87+
{
88+
var columnNames = new List<string> { "s_int32", "s_boolean", "s_double" };
89+
var columnTypes = new List<string> { "INT32", "BOOLEAN", "DOUBLE" };
90+
var columnNameIndex = new Dictionary<string, int>
91+
{
92+
{ "s_int32", 0 },
93+
{ "s_boolean", 1 },
94+
{ "s_double", 2 }
95+
};
96+
var columnIndex2TsBlockColumnIndexList = new List<int> { 0, 1, 2 };
97+
98+
queryResult ??= new List<byte[]> { BuildTestTsBlockBytes() };
99+
100+
return new RpcDataSet(
101+
sql: "SELECT * FROM root.test",
102+
columnNameList: columnNames,
103+
columnTypeList: columnTypes,
104+
columnNameIndex: columnNameIndex,
105+
ignoreTimestamp: false,
106+
moreData: false,
107+
queryId: 1,
108+
statementId: 1,
109+
client: null,
110+
sessionId: 1,
111+
queryResult: queryResult,
112+
fetchSize: 1024,
113+
timeout: 10000,
114+
zoneId: "UTC",
115+
columnIndex2TsBlockColumnIndexList: columnIndex2TsBlockColumnIndexList
116+
);
117+
}
118+
119+
[Test]
120+
public void CurrentBatchRowCount_ReturnsCorrectSize_BeforeFirstNext()
121+
{
122+
var dataSet = CreateTestDataSet();
123+
124+
Assert.That(dataSet._tsBlockSize, Is.EqualTo(2),
125+
"CurrentBatchRowCount should return the TsBlock row count immediately after construction.");
126+
}
127+
128+
[Test]
129+
public void CurrentBatchRowCount_ReturnsZero_WhenNoData()
130+
{
131+
var dataSet = CreateTestDataSet(queryResult: new List<byte[]>());
132+
133+
Assert.That(dataSet._tsBlockSize, Is.EqualTo(0),
134+
"CurrentBatchRowCount should return 0 when no query results are provided.");
135+
}
136+
137+
[Test]
138+
public void GetRow_ExcludesNullValuedMeasurements_ForValueTypes()
139+
{
140+
var dataSet = CreateTestDataSet();
141+
142+
// Advance to row 0: int32=42, boolean=null, double=3.14
143+
dataSet.Next();
144+
var row0 = dataSet.GetRow();
145+
146+
Assert.That(row0.Timestamps, Is.EqualTo(1000L));
147+
Assert.That(row0.Measurements, Does.Contain("s_int32"));
148+
Assert.That(row0.Measurements, Does.Not.Contain("s_boolean"),
149+
"Null BOOLEAN measurement should be excluded from row.");
150+
Assert.That(row0.Measurements, Does.Contain("s_double"));
151+
Assert.That(row0.Values.Count, Is.EqualTo(2), "Row 0 should have 2 non-null values.");
152+
}
153+
154+
[Test]
155+
public void GetRow_ExcludesNullValuedMeasurements_ForInt32AndDouble()
156+
{
157+
var dataSet = CreateTestDataSet();
158+
159+
// Advance to row 0 then row 1
160+
dataSet.Next();
161+
dataSet.Next();
162+
var row1 = dataSet.GetRow();
163+
164+
Assert.That(row1.Timestamps, Is.EqualTo(2000L));
165+
Assert.That(row1.Measurements, Does.Not.Contain("s_int32"),
166+
"Null INT32 measurement should be excluded from row.");
167+
Assert.That(row1.Measurements, Does.Contain("s_boolean"));
168+
Assert.That(row1.Measurements, Does.Not.Contain("s_double"),
169+
"Null DOUBLE measurement should be excluded from row.");
170+
Assert.That(row1.Values.Count, Is.EqualTo(1), "Row 1 should have 1 non-null value.");
171+
Assert.That(row1.Values[0], Is.EqualTo(true));
172+
}
173+
174+
[Test]
175+
public void GetRow_DataTypesMatchMeasurements()
176+
{
177+
var dataSet = CreateTestDataSet();
178+
179+
dataSet.Next();
180+
var row0 = dataSet.GetRow();
181+
182+
Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Measurements.Count),
183+
"DataTypes count should match Measurements count.");
184+
Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Values.Count),
185+
"DataTypes count should match Values count.");
186+
}
187+
}
188+
}

0 commit comments

Comments
 (0)