Skip to content

Commit 9542517

Browse files
committed
[HSTACK] - consolidate partitions if we have many small files
1 parent eab0b90 commit 9542517

1 file changed

Lines changed: 41 additions & 21 deletions

File tree

datafusion/datasource/src/file_groups.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ impl FileGroupPartitioner {
213213
.iter()
214214
.map(|f| f.object_meta.size as i64)
215215
.sum::<i64>();
216-
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
216+
217+
// bail if we are asked to *split* a set of files that are already too small
218+
// if we are being asked to consolidate, we proceed
219+
if (total_size < (repartition_file_min_size as i64)
220+
&& target_partitions >= file_groups.len())
221+
|| total_size == 0
222+
{
217223
return None;
218224
}
219225

@@ -228,30 +234,44 @@ impl FileGroupPartitioner {
228234
.scan(
229235
(current_partition_index, current_partition_size),
230236
|state, source_file| {
231-
let mut produced_files = vec![];
232-
let mut range_start = 0;
233-
while range_start < source_file.object_meta.size {
234-
let range_end = min(
235-
range_start + (target_partition_size - state.1),
236-
source_file.object_meta.size,
237-
);
238-
239-
let mut produced_file = source_file.clone();
240-
produced_file.range = Some(FileRange {
241-
start: range_start as i64,
242-
end: range_end as i64,
243-
});
244-
produced_files.push((state.0, produced_file));
245-
246-
if state.1 + (range_end - range_start) >= target_partition_size {
237+
// Skip splitting files smaller than repartition_file_min_size
238+
// This may result in a few more partitions than requested (maybe 1 more)
239+
if source_file.object_meta.size < repartition_file_min_size {
240+
state.1 += source_file.object_meta.size;
241+
if state.1 > target_partition_size {
247242
state.0 += 1;
248243
state.1 = 0;
249-
} else {
250-
state.1 += range_end - range_start;
251244
}
252-
range_start = range_end;
245+
let small_file = (state.0, source_file.clone());
246+
Some(vec![small_file])
247+
} else {
248+
let mut produced_files = vec![];
249+
let mut range_start = 0;
250+
while range_start < source_file.object_meta.size {
251+
let range_end = min(
252+
range_start + (target_partition_size - state.1),
253+
source_file.object_meta.size,
254+
);
255+
256+
let mut produced_file = source_file.clone();
257+
produced_file.range = Some(FileRange {
258+
start: range_start as i64,
259+
end: range_end as i64,
260+
});
261+
produced_files.push((state.0, produced_file));
262+
263+
if state.1 + (range_end - range_start)
264+
>= target_partition_size
265+
{
266+
state.0 += 1;
267+
state.1 = 0;
268+
} else {
269+
state.1 += range_end - range_start;
270+
}
271+
range_start = range_end;
272+
}
273+
Some(produced_files)
253274
}
254-
Some(produced_files)
255275
},
256276
)
257277
.flatten()

0 commit comments

Comments
 (0)