Skip to content

Commit 6949ed5

Browse files
aditanaseadragomir
authored andcommitted
[HSTACK] - consolidate partitions if we have many small files
1 parent 0c8ea6d commit 6949ed5

1 file changed

Lines changed: 43 additions & 21 deletions

File tree

datafusion/datasource/src/file_groups.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ impl FileGroupPartitioner {
213213
.iter()
214214
.map(|f| f.object_meta.size as i64)
215215
.sum::<i64>();
216-
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
216+
217+
// bail if we are asked to *split* a set of files that are already too small
218+
// if we are being asked to consolidate, we proceed
219+
if (total_size < (repartition_file_min_size as i64)
220+
&& target_partitions >= file_groups.len())
221+
|| total_size == 0
222+
{
217223
return None;
218224
}
219225

@@ -228,30 +234,46 @@ impl FileGroupPartitioner {
228234
.scan(
229235
(current_partition_index, current_partition_size),
230236
|state, source_file| {
231-
let mut produced_files = vec![];
232-
let mut range_start = 0;
233-
while range_start < source_file.object_meta.size {
234-
let range_end = min(
235-
range_start + (target_partition_size - state.1),
236-
source_file.object_meta.size,
237-
);
238-
239-
let mut produced_file = source_file.clone();
240-
produced_file.range = Some(FileRange {
241-
start: range_start as i64,
242-
end: range_end as i64,
243-
});
244-
produced_files.push((state.0, produced_file));
245-
246-
if state.1 + (range_end - range_start) >= target_partition_size {
237+
// Skip splitting files smaller than repartition_file_min_size
238+
// This may result in a few more partitions than requested (maybe 1 more)
239+
if source_file.object_meta.size > 0
240+
&& source_file.object_meta.size < repartition_file_min_size
241+
{
242+
state.1 += source_file.object_meta.size;
243+
if state.1 > target_partition_size {
247244
state.0 += 1;
248245
state.1 = 0;
249-
} else {
250-
state.1 += range_end - range_start;
251246
}
252-
range_start = range_end;
247+
let small_file = (state.0, source_file.clone());
248+
Some(vec![small_file])
249+
} else {
250+
let mut produced_files = vec![];
251+
let mut range_start = 0;
252+
while range_start < source_file.object_meta.size {
253+
let range_end = min(
254+
range_start + (target_partition_size - state.1),
255+
source_file.object_meta.size,
256+
);
257+
258+
let mut produced_file = source_file.clone();
259+
produced_file.range = Some(FileRange {
260+
start: range_start as i64,
261+
end: range_end as i64,
262+
});
263+
produced_files.push((state.0, produced_file));
264+
265+
if state.1 + (range_end - range_start)
266+
>= target_partition_size
267+
{
268+
state.0 += 1;
269+
state.1 = 0;
270+
} else {
271+
state.1 += range_end - range_start;
272+
}
273+
range_start = range_end;
274+
}
275+
Some(produced_files)
253276
}
254-
Some(produced_files)
255277
},
256278
)
257279
.flatten()

0 commit comments

Comments
 (0)