File tree Expand file tree Collapse file tree
datafusion/datasource/src Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -213,7 +213,9 @@ impl FileGroupPartitioner {
213213 . iter ( )
214214 . map ( |f| f. effective_size ( ) )
215215 . sum :: < u64 > ( ) ;
216- if total_size < ( repartition_file_min_size as u64 ) || total_size == 0 {
216+ if ( total_size < ( repartition_file_min_size as u64 )
217+ && target_partitions >= file_groups. len ( ) )
218+ || total_size == 0 {
217219 return None ;
218220 }
219221
@@ -228,6 +230,16 @@ impl FileGroupPartitioner {
228230 . scan (
229231 ( current_partition_index, current_partition_size) ,
230232 |( current_partition_index, current_partition_size) , source_file| {
233+ if source_file. object_meta . size > 0
234+ && source_file. object_meta . size < repartition_file_min_size as u64 {
235+ * current_partition_size += source_file. object_meta . size ;
236+ if * current_partition_size > target_partition_size {
237+ * current_partition_index += 1 ;
238+ * current_partition_size = 0 ;
239+ }
240+ let small_file = ( * current_partition_index, source_file. clone ( ) ) ;
241+ return Some ( vec ! [ small_file] ) ;
242+ }
231243 let mut produced_files = vec ! [ ] ;
232244 let ( mut range_start, file_end) = source_file. range ( ) ;
233245 while range_start < file_end {
You can’t perform that action at this time.
0 commit comments