2828import java .util .Collections ;
2929import java .util .HashSet ;
3030import java .util .List ;
31+ import java .util .Locale ;
3132import java .util .Map ;
3233import java .util .Set ;
34+ import java .util .TreeSet ;
3335import java .util .concurrent .ExecutionException ;
3436import java .util .concurrent .ExecutorService ;
3537import java .util .concurrent .Executors ;
7678import org .apache .iceberg .parquet .Parquet ;
7779import org .apache .iceberg .parquet .ParquetUtil ;
7880import org .apache .iceberg .rest .RESTCatalog ;
81+ import org .apache .parquet .hadoop .metadata .CompressionCodecName ;
7982import org .apache .parquet .hadoop .metadata .ParquetMetadata ;
8083import org .apache .parquet .schema .MessageType ;
8184import org .slf4j .Logger ;
@@ -108,6 +111,20 @@ public static Result run(
108111 return new Result (0 , 0 );
109112 }
110113
114+ if (options .compression () != null ) {
115+ // Validate against the list of parquet compression codecs supported by Iceberg.
116+ Set <String > validCompressionCodecs =
117+ Arrays .stream (CompressionCodecName .values ())
118+ .map (c -> c .name ().toLowerCase (Locale .ENGLISH ))
119+ .collect (Collectors .toCollection (HashSet ::new ));
120+ validCompressionCodecs .add ("as-source" );
121+ if (!validCompressionCodecs .contains (options .compression ().toLowerCase (Locale .ENGLISH ))) {
122+ String accepted = String .join (", " , new TreeSet <>(validCompressionCodecs ));
123+ throw new IllegalArgumentException (
124+ "Unknown --compression value: " + options .compression () + ". Accepted: " + accepted );
125+ }
126+ }
127+
111128 Table table = catalog .loadTable (nsTable );
112129
113130 // Create transaction and pass it to updatePartitionAndSortOrderMetadata
@@ -501,72 +518,98 @@ private static List<DataFile> processFile(
501518 .build ());
502519 dataFileSizeInBytes = inputFile .getLength ();
503520 dataFile = dstDataFile ;
504- } else if (partitionSpec .isPartitioned () && partitionKey == null ) {
505- return copyPartitionedAndSorted (
506- file ,
507- tableSchema ,
508- partitionSpec ,
509- sortOrder ,
510- metricsConfig ,
511- tableIO ,
512- inputFile ,
513- dstDataFileSource );
514- } else if (sortOrder .isSorted () && !sorted ) {
515- return Collections .singletonList (
516- copySorted (
517- file ,
518- dstDataFileSource .get (file ),
519- tableSchema ,
520- partitionSpec ,
521- sortOrder ,
522- metricsConfig ,
523- tableIO ,
524- inputFile ,
525- dataFileNamingStrategy ,
526- partitionKey ));
527521 } else {
528- // Table isn't partitioned or sorted. Copy as is.
529- String dstDataFile ;
530- if (partitionSpec .isPartitioned () && partitionKey != null ) {
531- // File has inferred partition, use partition path
532- dstDataFile = dstDataFileSource .get (partitionSpec , partitionKey , file );
533- } else {
534- dstDataFile = dstDataFileSource .get (file );
535- }
536- if (checkNotExists .apply (dstDataFile )) {
537- return Collections .emptyList ();
522+ // Copy path: compute compression override from CLI or as-source
523+ String compressionCodecOverride = null ;
524+ if (options .compression () != null ) {
525+ if ("as-source" .equalsIgnoreCase (options .compression ())) {
526+ var blocks = metadata .getBlocks ();
527+ if (!blocks .isEmpty ()) {
528+ compressionCodecOverride =
529+ blocks .get (0 ).getColumns ().get (0 ).getCodec ().name ().toLowerCase ();
530+ }
531+ } else {
532+ compressionCodecOverride = options .compression ().toLowerCase ();
533+ }
538534 }
539- OutputFile outputFile =
540- tableIO .newOutputFile (Strings .replacePrefix (dstDataFile , "s3://" , "s3a://" ));
541- // TODO: support transferTo below (note that compression, etc. might be different)
542- // try (var d = outputFile.create()) {
543- // try (var s = inputFile.newStream()) { s.transferTo(d); }
544- // }
545- Parquet .ReadBuilder readBuilder =
546- Parquet .read (inputFile )
547- .createReaderFunc (s -> GenericParquetReaders .buildReader (tableSchema , s ))
548- .project (tableSchema )
549- .reuseContainers ();
550535
551- Parquet .WriteBuilder writeBuilder =
552- Parquet .write (outputFile )
553- .overwrite (dataFileNamingStrategy == DataFileNamingStrategy .Name .PRESERVE_ORIGINAL )
554- .createWriterFunc (GenericParquetWriter ::buildWriter )
555- .metricsConfig (metricsConfig )
556- .schema (tableSchema );
536+ if (partitionSpec .isPartitioned () && partitionKey == null ) {
537+ return copyPartitionedAndSorted (
538+ file ,
539+ tableSchema ,
540+ partitionSpec ,
541+ sortOrder ,
542+ metricsConfig ,
543+ tableIO ,
544+ inputFile ,
545+ dstDataFileSource ,
546+ table .properties (),
547+ compressionCodecOverride );
548+ } else if (sortOrder .isSorted () && !sorted ) {
549+ return Collections .singletonList (
550+ copySorted (
551+ file ,
552+ dstDataFileSource .get (file ),
553+ tableSchema ,
554+ partitionSpec ,
555+ sortOrder ,
556+ metricsConfig ,
557+ tableIO ,
558+ inputFile ,
559+ dataFileNamingStrategy ,
560+ partitionKey ,
561+ table .properties (),
562+ compressionCodecOverride ));
563+ } else {
564+ // Table isn't partitioned or sorted. Copy as is.
565+ String dstDataFile ;
566+ if (partitionSpec .isPartitioned () && partitionKey != null ) {
567+ // File has inferred partition, use partition path
568+ dstDataFile = dstDataFileSource .get (partitionSpec , partitionKey , file );
569+ } else {
570+ dstDataFile = dstDataFileSource .get (file );
571+ }
572+ if (checkNotExists .apply (dstDataFile )) {
573+ return Collections .emptyList ();
574+ }
575+ OutputFile outputFile =
576+ tableIO .newOutputFile (Strings .replacePrefix (dstDataFile , "s3://" , "s3a://" ));
577+ // TODO: support transferTo below (note that compression, etc. might be different)
578+ // try (var d = outputFile.create()) {
579+ // try (var s = inputFile.newStream()) { s.transferTo(d); }
580+ // }
581+ Parquet .ReadBuilder readBuilder =
582+ Parquet .read (inputFile )
583+ .createReaderFunc (s -> GenericParquetReaders .buildReader (tableSchema , s ))
584+ .project (tableSchema )
585+ .reuseContainers ();
586+
587+ Parquet .WriteBuilder writeBuilder =
588+ Parquet .write (outputFile )
589+ .setAll (table .properties ())
590+ .overwrite (dataFileNamingStrategy == DataFileNamingStrategy .Name .PRESERVE_ORIGINAL )
591+ .createWriterFunc (GenericParquetWriter ::buildWriter )
592+ .metricsConfig (metricsConfig )
593+ .schema (tableSchema );
594+ if (compressionCodecOverride != null ) {
595+
596+ writeBuilder =
597+ writeBuilder .set (TableProperties .PARQUET_COMPRESSION , compressionCodecOverride );
598+ }
557599
558- logger .info ("{}: copying to {}" , file , dstDataFile );
600+ logger .info ("{}: copying to {}" , file , dstDataFile );
559601
560- try (CloseableIterable <Record > parquetReader = readBuilder .build ()) {
561- try (FileAppender <Record > writer = writeBuilder .build ()) {
562- writer .addAll (parquetReader );
563- writer .close (); // for write.length()
564- dataFileSizeInBytes = writer .length ();
565- metrics = writer .metrics ();
602+ try (CloseableIterable <Record > parquetReader = readBuilder .build ()) {
603+ try (FileAppender <Record > writer = writeBuilder .build ()) {
604+ writer .addAll (parquetReader );
605+ writer .close (); // for write.length()
606+ dataFileSizeInBytes = writer .length ();
607+ metrics = writer .metrics ();
608+ }
566609 }
567- }
568610
569- dataFile = dstDataFile ;
611+ dataFile = dstDataFile ;
612+ }
570613 }
571614 logger .info (
572615 "{}: adding data file (copy took {}s)" , file , (System .currentTimeMillis () - start ) / 1000 );
@@ -594,7 +637,9 @@ private static List<DataFile> copyPartitionedAndSorted(
594637 MetricsConfig metricsConfig ,
595638 FileIO tableIO ,
596639 InputFile inputFile ,
597- DataFileNamingStrategy dstDataFileSource )
640+ DataFileNamingStrategy dstDataFileSource ,
641+ Map <String , String > tableProperties ,
642+ @ Nullable String compressionCodecOverride )
598643 throws IOException {
599644 logger .info ("{}: partitioning{}" , file , sortOrder .isSorted () ? "+sorting" : "" );
600645
@@ -628,10 +673,15 @@ private static List<DataFile> copyPartitionedAndSorted(
628673
629674 Parquet .WriteBuilder writeBuilder =
630675 Parquet .write (outputFile )
676+ .setAll (tableProperties )
631677 .overwrite (true ) // FIXME
632678 .createWriterFunc (GenericParquetWriter ::buildWriter )
633679 .metricsConfig (metricsConfig )
634680 .schema (tableSchema );
681+ if (compressionCodecOverride != null ) {
682+ writeBuilder =
683+ writeBuilder .set (TableProperties .PARQUET_COMPRESSION , compressionCodecOverride );
684+ }
635685
636686 try (FileAppender <Record > writer = writeBuilder .build ()) {
637687 for (Record record : records ) {
@@ -674,7 +724,9 @@ private static DataFile copySorted(
674724 FileIO tableIO ,
675725 InputFile inputFile ,
676726 DataFileNamingStrategy .Name dataFileNamingStrategy ,
677- PartitionKey partitionKey )
727+ PartitionKey partitionKey ,
728+ Map <String , String > tableProperties ,
729+ @ Nullable String compressionCodecOverride )
678730 throws IOException {
679731 logger .info ("{}: copying (sorted) to {}" , file , dstDataFile );
680732
@@ -704,11 +756,16 @@ private static DataFile copySorted(
704756 // Write sorted records to outputFile
705757 Parquet .WriteBuilder writeBuilder =
706758 Parquet .write (outputFile )
759+ .setAll (tableProperties )
707760 .overwrite (
708761 dataFileNamingStrategy == DataFileNamingStrategy .Name .PRESERVE_ORIGINAL ) // FIXME
709762 .createWriterFunc (GenericParquetWriter ::buildWriter )
710763 .metricsConfig (metricsConfig )
711764 .schema (tableSchema );
765+ if (compressionCodecOverride != null ) {
766+ writeBuilder =
767+ writeBuilder .set (TableProperties .PARQUET_COMPRESSION , compressionCodecOverride );
768+ }
712769
713770 long fileSizeInBytes ;
714771 Metrics metrics ;
@@ -799,7 +856,8 @@ public record Options(
799856 @ Nullable String retryListFile ,
800857 @ Nullable List <Main .IcePartition > partitionList ,
801858 @ Nullable List <Main .IceSortOrder > sortOrderList ,
802- int threadCount ) {
859+ int threadCount ,
860+ @ Nullable String compression ) {
803861
804862 public static Builder builder () {
805863 return new Builder ();
@@ -822,6 +880,7 @@ public static final class Builder {
822880 private List <Main .IcePartition > partitionList = List .of ();
823881 private List <Main .IceSortOrder > sortOrderList = List .of ();
824882 private int threadCount = Runtime .getRuntime ().availableProcessors ();
883+ private String compression ;
825884
826885 private Builder () {}
827886
@@ -905,6 +964,11 @@ public Builder threadCount(int threadCount) {
905964 return this ;
906965 }
907966
967+ public Builder compression (String compression ) {
968+ this .compression = compression ;
969+ return this ;
970+ }
971+
908972 public Options build () {
909973 return new Options (
910974 dataFileNamingStrategy ,
@@ -922,7 +986,8 @@ public Options build() {
922986 retryListFile ,
923987 partitionList ,
924988 sortOrderList ,
925- threadCount );
989+ threadCount ,
990+ compression );
926991 }
927992 }
928993 }
0 commit comments