-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathreader.rs
More file actions
183 lines (163 loc) · 5.45 KB
/
reader.rs
File metadata and controls
183 lines (163 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::try_join;
use once_cell::sync::OnceCell;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::MaskFuture;
use vortex_array::builtins::ArrayBuiltins;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldMask;
use vortex_array::expr::Expression;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_mask::Mask;
use vortex_session::VortexSession;
use crate::children::LayoutChildren;
use crate::segments::SegmentSource;
pub type LayoutReaderRef = Arc<dyn LayoutReader>;
/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
/// evaluation operations.
pub trait LayoutReader: 'static + Send + Sync {
/// Returns the name of the layout reader for debugging.
fn name(&self) -> &Arc<str>;
/// Returns the un-projected dtype of the layout reader.
fn dtype(&self) -> &DType;
/// Returns the number of rows in the layout.
fn row_count(&self) -> u64;
/// Register the splits of this layout reader.
// TODO(ngates): this is a temporary API until we make layout readers stream based.
fn register_splits(
&self,
field_mask: &[FieldMask],
row_range: &Range<u64>,
splits: &mut BTreeSet<u64>,
) -> VortexResult<()>;
/// Returns a mask where all false values are proven to be false in the given expression.
///
/// The returned mask **does not** need to have been intersected with the input mask.
fn pruning_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: Mask,
) -> VortexResult<MaskFuture>;
/// Refines the given mask, returning a mask equal in length to the input mask.
///
/// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
/// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
/// as possible before it is used.
///
/// ## Post-conditions
///
/// The returned mask **MUST** have been intersected with the input mask.
fn filter_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<MaskFuture>;
/// Evaluates an expression against an array.
///
/// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
/// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
/// as possible before it is used.
///
/// ## Post-conditions
///
/// The returned array **MUST** have length equal to the true count of the input mask.
fn projection_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<ArrayFuture>;
}
pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_mask_matches_row_range(
mask_len: usize,
row_range: &Range<u64>,
context: &str,
) {
let row_range_len = usize::try_from(
row_range
.end
.checked_sub(row_range.start)
.vortex_expect("row range underflow"),
)
.vortex_expect("row range length must fit usize");
debug_assert_eq!(
mask_len, row_range_len,
"{context}: mask length must match row range length"
);
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_mask_matches_row_range(
_mask_len: usize,
_row_range: &Range<u64>,
_context: &str,
) {
}
pub trait ArrayFutureExt {
fn masked(self, mask: MaskFuture) -> Self;
}
impl ArrayFutureExt for ArrayFuture {
/// Returns a new `ArrayFuture` that masks the output with a mask
fn masked(self, mask: MaskFuture) -> Self {
Box::pin(async move {
let (array, mask) = try_join!(self, mask)?;
array.mask(mask.into_array())
})
}
}
pub struct LazyReaderChildren {
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
// TODO(ngates): we may want a hash map of some sort here?
cache: Vec<OnceCell<LayoutReaderRef>>,
}
impl LazyReaderChildren {
pub fn new(
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
) -> Self {
let nchildren = children.nchildren();
let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
Self {
children,
dtypes,
names,
segment_source,
session,
cache,
}
}
pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
if idx >= self.cache.len() {
vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
}
self.cache[idx].get_or_try_init(|| {
let dtype = &self.dtypes[idx];
let child = self.children.child(idx, dtype)?;
child.new_reader(
Arc::clone(&self.names[idx]),
self.segment_source.clone(),
&self.session,
)
})
}
}