Skip to content

Commit 95230ad

Browse files
committed
Allow run-end encoding of TimestampWithOffset
1 parent 7156421 commit 95230ad

2 files changed

Lines changed: 130 additions & 5 deletions

File tree

arrow/extensions/timestamp_with_offset.go

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package extensions
1919
import (
2020
"errors"
2121
"fmt"
22+
"math"
2223
"reflect"
2324
"strings"
2425
"time"
@@ -41,6 +42,13 @@ func isOffsetTypeOk(offsetType arrow.DataType) bool {
4142
return true
4243
case *arrow.DictionaryType:
4344
return arrow.IsInteger(offsetType.IndexType.ID()) && arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
45+
case *arrow.RunEndEncodedType:
46+
return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
47+
arrow.TypeEqual(offsetType.Encoded(), arrow.PrimitiveTypes.Int16)
48+
// FIXME: Technically this should be non-nullable, but a Arrow IPC does not deserialize
49+
// ValueNullable properly, so enforcing this here would always fail when reading from an IPC
50+
// stream
51+
// !offsetType.ValueNullable
4452
default:
4553
return false
4654
}
@@ -140,6 +148,21 @@ func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index arro
140148
return NewTimestampWithOffsetType(unit, &offsetType)
141149
}
142150

151+
// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to
152+
// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, Int16)), where T is any TimeUnit and E is a
153+
// valid run-ends type.
154+
//
155+
// The error will be populated if runEnds is not a valid run-end encoding run-ends type.
156+
func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds arrow.DataType) (*TimestampWithOffsetType, error) {
157+
offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
158+
if !offsetType.ValidRunEndsType(runEnds) {
159+
return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", runEnds))
160+
}
161+
162+
return NewTimestampWithOffsetType(unit, offsetType)
163+
}
164+
165+
143166
func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
144167
return reflect.TypeOf(TimestampWithOffsetArray{})
145168
}
@@ -231,6 +254,9 @@ func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, int
231254
// Get the raw arrow values at the given index
232255
//
233256
// SAFETY: the value at i must not be nil
257+
//
258+
// NOTE: the REE implementation of this calls `LogicalValuesArray()`, which can be slow
259+
// for large arrays.
234260
func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16, arrow.TimeUnit) {
235261
structs := a.Storage().(*array.Struct)
236262

@@ -247,6 +273,10 @@ func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16
247273
offsetMinutes = offsets.Value(i)
248274
case *array.Dictionary:
249275
offsetMinutes = offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
276+
case *array.RunEndEncoded:
277+
logicalValues := offsets.LogicalValuesArray().(*array.Int16)
278+
defer logicalValues.Release()
279+
offsetMinutes = logicalValues.Value(i)
250280
}
251281

252282
return utcTimestamp, offsetMinutes, timeUnit
@@ -262,6 +292,7 @@ func (a *TimestampWithOffsetArray) Value(i int) time.Time {
262292

263293
func (a *TimestampWithOffsetArray) Values() []time.Time {
264294
values := make([]time.Time, a.Len())
295+
// TODO: optimize for run-end encoding
265296
for i := range a.Len() {
266297
val := a.Value(i)
267298
values[i] = val
@@ -307,6 +338,8 @@ type TimestampWithOffsetBuilder struct {
307338
Layout string
308339
unit arrow.TimeUnit
309340
offsetType arrow.DataType
341+
// lastOffset is only used to determine when to start new runs with run-end encoded offsets
342+
lastOffset int16
310343
}
311344

312345
// NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface
@@ -320,38 +353,60 @@ func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, of
320353
return &TimestampWithOffsetBuilder{
321354
unit: unit,
322355
offsetType: offsetType,
356+
lastOffset: math.MaxInt16,
323357
Layout: time.RFC3339,
324358
ExtensionBuilder: array.NewExtensionBuilder(mem, dataType),
325359
}, nil
326360
}
327361

328362
func (b *TimestampWithOffsetBuilder) Append(v time.Time) {
329363
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
364+
offsetMinutes16 := int16(offsetMinutes)
330365
structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder)
331366

332367
structBuilder.Append(true)
333368
structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp)
334369

335370
switch offsets := structBuilder.FieldBuilder(1).(type) {
336371
case *array.Int16Builder:
337-
offsets.Append(int16(offsetMinutes))
372+
offsets.Append(offsetMinutes16)
338373
case *array.Int16DictionaryBuilder:
339-
offsets.Append(int16(offsetMinutes))
374+
offsets.Append(offsetMinutes16)
375+
case *array.RunEndEncodedBuilder:
376+
if offsetMinutes != b.lastOffset {
377+
offsets.Append(1)
378+
offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16)
379+
} else {
380+
offsets.ContinueRun(1)
381+
}
382+
383+
b.lastOffset = offsetMinutes16
340384
}
385+
341386
}
342387

343388
func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) {
344389
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
390+
offsetMinutes16 := int16(offsetMinutes)
345391
structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder)
346392

347393
structBuilder.Append(true)
348394
structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp)
349395

350396
switch offsets := structBuilder.FieldBuilder(1).(type) {
351397
case *array.Int16Builder:
352-
offsets.UnsafeAppend(int16(offsetMinutes))
398+
offsets.UnsafeAppend(offsetMinutes16)
353399
case *array.Int16DictionaryBuilder:
354-
offsets.Append(int16(offsetMinutes))
400+
offsets.Append(offsetMinutes16)
401+
case *array.RunEndEncodedBuilder:
402+
if offsetMinutes != b.lastOffset {
403+
offsets.Append(1)
404+
offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16)
405+
} else {
406+
offsets.ContinueRun(1)
407+
}
408+
409+
b.lastOffset = offsetMinutes16
355410
}
356411
}
357412

@@ -399,13 +454,31 @@ func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []b
399454
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
400455
if valids[i] {
401456
timestamps.UnsafeAppend(timestamp)
402-
// TODO: I was here, this needs to be equivalent to UnsafeAppend
403457
offsets.Append(offsetMinutes)
404458
} else {
405459
timestamps.UnsafeAppendBoolToBitmap(false)
406460
offsets.UnsafeAppendBoolToBitmap(false)
407461
}
408462
}
463+
case *array.RunEndEncodedBuilder:
464+
offsetValuesBuilder := offsets.ValueBuilder().(*array.Int16Builder)
465+
for i, v := range values {
466+
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
467+
if valids[i] {
468+
timestamps.UnsafeAppend(timestamp)
469+
offsetMinutes16 := int16(offsetMinutes)
470+
if offsetMinutes != b.lastOffset {
471+
offsets.Append(1)
472+
offsetValuesBuilder.Append(offsetMinutes16)
473+
} else {
474+
offsets.ContinueRun(1)
475+
}
476+
b.lastOffset = offsetMinutes16
477+
} else {
478+
timestamps.UnsafeAppendBoolToBitmap(false)
479+
offsets.UnsafeAppendBoolToBitmap(false)
480+
}
481+
}
409482
}
410483
}
411484

arrow/extensions/timestamp_with_offset_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ func dict(index arrow.DataType) arrow.DataType {
5050
}
5151
}
5252

53+
func ree(runEnds arrow.DataType) arrow.DataType {
54+
v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
55+
v.ValueNullable = false
56+
return v
57+
}
58+
5359
// All tests use this in a for loop to make sure everything works for every possible
5460
// encoding of offsets (primitive, dictionary, run-end)
5561
var allAllowedOffsetTypes = []arrow.DataType{
@@ -65,6 +71,11 @@ var allAllowedOffsetTypes = []arrow.DataType{
6571
dict(arrow.PrimitiveTypes.Int16),
6672
dict(arrow.PrimitiveTypes.Int32),
6773
dict(arrow.PrimitiveTypes.Int64),
74+
75+
// run-end encoded offsetType
76+
ree(arrow.PrimitiveTypes.Int16),
77+
ree(arrow.PrimitiveTypes.Int32),
78+
ree(arrow.PrimitiveTypes.Int64),
6879
}
6980

7081
func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) {
@@ -141,6 +152,47 @@ func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) {
141152
}
142153
}
143154

155+
func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) {
156+
invalidRunEndsType := arrow.PrimitiveTypes.Float32
157+
_, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, invalidRunEndsType)
158+
assert.True(t, err != nil, "Err should not be nil if run ends type is invalid")
159+
160+
runEndsTypes := []arrow.DataType{
161+
arrow.PrimitiveTypes.Int16,
162+
arrow.PrimitiveTypes.Int32,
163+
arrow.PrimitiveTypes.Int64,
164+
};
165+
166+
for _, indexType := range runEndsTypes {
167+
typ, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, indexType)
168+
assert.True(t, err == nil, "Err should be nil")
169+
170+
assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
171+
assert.True(t, typ.ExtensionEquals(typ))
172+
173+
assert.True(t, arrow.TypeEqual(typ, typ))
174+
assert.True(t, arrow.TypeEqual(
175+
arrow.StructOf(
176+
arrow.Field{
177+
Name: "timestamp",
178+
Type: &arrow.TimestampType{
179+
Unit: testTimeUnit,
180+
TimeZone: "UTC",
181+
},
182+
Nullable: false,
183+
},
184+
arrow.Field{
185+
Name: "offset_minutes",
186+
Type: ree(indexType),
187+
Nullable: false,
188+
},
189+
),
190+
typ.StorageType()))
191+
192+
assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
193+
}
194+
}
195+
144196
func TestTimestampWithOffsetExtensionBuilder(t *testing.T) {
145197
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
146198
defer mem.AssertSize(t, 0)

0 commit comments

Comments
 (0)