Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,23 @@ public static AbstractOptimizingPlanner createOptimizingPlanner(
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
Expression partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
: tableRuntime.getPendingInput().getPartitions().entrySet().stream()
.map(
entry ->
ExpressionUtil.convertPartitionDataToDataFilter(
table, entry.getKey(), entry.getValue()))
.reduce(Expressions::or)
.orElse(Expressions.alwaysTrue());
AbstractOptimizingEvaluator.PendingInput pendingInput = tableRuntime.getPendingInput();
Expression partitionFilter;
if (pendingInput == null) {
partitionFilter = Expressions.alwaysTrue();
} else {
// Rebuild the in-memory partitions map from serialized partition paths,
// since the partitions field is @JsonIgnore and lost during DB persistence.
pendingInput.rebuildPartitions(table);
partitionFilter =
pendingInput.getPartitions().entrySet().stream()
.map(
entry ->
ExpressionUtil.convertPartitionDataToDataFilter(
table, entry.getKey(), entry.getValue()))
.reduce(Expressions::or)
.orElse(Expressions.alwaysTrue());
}
long processId = snowflakeIdGenerator.generateId();
ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
OptimizingConfig config = tableRuntime.getOptimizingConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.TableSnapshot;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -122,6 +123,65 @@ public void testFragmentFiles() {
assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles));
}

@Test
public void testPendingInputSerializationRoundTrip() throws Exception {
closeFullOptimizingInterval();
updateBaseHashBucket(1);
List<Record> newRecords =
OptimizingTestHelpers.generateRecord(tableTestHelper(), 1, 4, "2022-01-01T12:00:00");
long transactionId = beginTransaction();
OptimizingTestHelpers.appendBase(
getMixedTable(),
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false));

newRecords =
OptimizingTestHelpers.generateRecord(tableTestHelper(), 5, 8, "2022-01-01T12:00:00");
transactionId = beginTransaction();
OptimizingTestHelpers.appendBase(
getMixedTable(),
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false));

AbstractOptimizingEvaluator evaluator = buildOptimizingEvaluator();
Assert.assertTrue(evaluator.isNecessary());
AbstractOptimizingEvaluator.PendingInput original = evaluator.getOptimizingPendingInput();
Assert.assertFalse(original.getPartitions().isEmpty());
Assert.assertFalse(original.getPartitionPaths().isEmpty());

// Build partition paths before serialization (simulating what happens before DB persistence)
original.buildPartitionPaths(getMixedTable());

// Serialize to JSON and deserialize back (simulating DB persistence round-trip)
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(original);
AbstractOptimizingEvaluator.PendingInput deserialized =
mapper.readValue(json, AbstractOptimizingEvaluator.PendingInput.class);

// After deserialization, partitions map should be empty (it's @JsonIgnore)
Assert.assertTrue(deserialized.getPartitions().isEmpty());
// But partitionPaths should be preserved
Assert.assertEquals(original.getPartitionPaths(), deserialized.getPartitionPaths());

// Rebuild partitions from paths
deserialized.rebuildPartitions(getMixedTable());
Assert.assertFalse(deserialized.getPartitions().isEmpty());
Assert.assertEquals(original.getPartitions().size(), deserialized.getPartitions().size());
// Verify each specId has the same number of partitions
for (Map.Entry<Integer, Set<StructLike>> entry : original.getPartitions().entrySet()) {
Assert.assertTrue(deserialized.getPartitions().containsKey(entry.getKey()));
Assert.assertEquals(
entry.getValue().size(), deserialized.getPartitions().get(entry.getKey()).size());
}

// Verify other fields are preserved
Assert.assertEquals(original.getDataFileCount(), deserialized.getDataFileCount());
Assert.assertEquals(original.getDataFileSize(), deserialized.getDataFileSize());
Assert.assertEquals(
original.getEqualityDeleteFileCount(), deserialized.getEqualityDeleteFileCount());
Assert.assertEquals(
original.getPositionalDeleteFileCount(), deserialized.getPositionalDeleteFileCount());
Assert.assertEquals(original.getHealthScore(), deserialized.getHealthScore());
}

@Test
public void testFragmentFilesWithPartitionFilterTimeStamp() {
getMixedTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.amoro.table.KeyedTableSnapshot;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableSnapshot;
import org.apache.amoro.utils.ExpressionUtil;
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.amoro.utils.TablePropertyUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
Expand Down Expand Up @@ -166,22 +170,29 @@ public PendingInput getPendingInput() {
// to be inconsistent with the snapshot summary of iceberg
if (TableFormat.ICEBERG == mixedTable.format()) {
Snapshot snapshot = mixedTable.asUnkeyedTable().snapshot(currentSnapshot.snapshotId());
return new PendingInput(partitionPlanMap.values(), snapshot);
PendingInput pendingInput = new PendingInput(partitionPlanMap.values(), snapshot);
pendingInput.buildPartitionPaths(mixedTable);
return pendingInput;
}

return new PendingInput(partitionPlanMap.values());
PendingInput pendingInput = new PendingInput(partitionPlanMap.values());
pendingInput.buildPartitionPaths(mixedTable);
return pendingInput;
}

public PendingInput getOptimizingPendingInput() {
if (!isInitialized) {
initEvaluator();
}
return new PendingInput(needOptimizingPlanMap.values());
PendingInput pendingInput = new PendingInput(needOptimizingPlanMap.values());
pendingInput.buildPartitionPaths(mixedTable);
return pendingInput;
}

public static class PendingInput {

@JsonIgnore private final Map<Integer, Set<StructLike>> partitions = Maps.newHashMap();
private Set<String> partitionPaths = Sets.newHashSet();

private int totalFileCount = 0;
private long totalFileSize = 0L;
Expand Down Expand Up @@ -275,6 +286,72 @@ public Map<Integer, Set<StructLike>> getPartitions() {
return partitions;
}

/**
* Get serializable partition paths for JSON persistence. Each entry is in the format
* "specId:partitionPath", e.g. "1:dt=2024-01-01/hour=00".
*/
@JsonProperty("partitionPaths")
public Set<String> getPartitionPaths() {
return partitionPaths;
}

@JsonProperty("partitionPaths")
public void setPartitionPaths(Set<String> partitionPaths) {
this.partitionPaths = partitionPaths != null ? partitionPaths : Sets.newHashSet();
}

/**
* Rebuild the in-memory partitions map from serializable partition paths. Must be called before
* {@link #getPartitions()} when PendingInput is deserialized from JSON.
*
* @param table the table to resolve partition specs
*/
public void rebuildPartitions(MixedTable table) {
if (!partitions.isEmpty() || partitionPaths.isEmpty()) {
return;
}
for (String path : partitionPaths) {
try {
int colonIdx = path.indexOf(':');
if (colonIdx <= 0) {
continue;
}
int specId = Integer.parseInt(path.substring(0, colonIdx));
String partitionPath = path.substring(colonIdx + 1);
PartitionSpec spec = MixedTableUtil.getMixedTablePartitionSpecById(table, specId);
if (spec != null) {
StructLike struct;
if (spec.isUnpartitioned() || partitionPath.isEmpty()) {
// For unpartitioned tables, use an empty record since there's no path to parse
struct = TablePropertyUtil.EMPTY_STRUCT;
} else {
struct = MixedDataFiles.data(spec, partitionPath);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using org.apache.iceberg.DataFiles.data() here will break partition rebuild for Mixed-format tables. Iceberg's native DataFiles.data() delegates to Conversions.fromPartitionString(), which only handles standard Iceberg type conversions. It does not support Amoro's custom partition transforms:

  • month transform: Mixed-format tables serialize month partitions as yyyy-MM (e.g., month=2024-01). Iceberg's parser expects an integer month offset, not a date string, and will throw NumberFormatException.
  • hour transform: Similarly serialized as yyyy-MM-dd-HH, which Iceberg cannot parse.
  • Hive null sentinel: __HIVE_DEFAULT_PARTITION__ is not handled by Iceberg's parser.

Amoro already provides MixedDataFiles.data(PartitionSpec, String) in org.apache.amoro.utils.MixedDataFiles (line 110) that correctly handles all these cases via its own fromPartitionString().

The catch(Exception) block below will silently swallow these parse failures, logging only a warning while leaving partitions empty — producing the same Expressions.alwaysTrue() fallback as the original bug. This effectively makes the fix a no-op for Mixed-format tables with custom transforms.

Suggested fix:

import org.apache.amoro.utils.MixedDataFiles;
// ...
struct = MixedDataFiles.data(spec, partitionPath);

Also, please consider adding test cases for:

  1. Mixed-format tables with month/hour partition transforms (round-trip serialization)
  2. Unpartitioned tables (ensure EMPTY_STRUCT path works correctly)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

partitions.computeIfAbsent(specId, k -> Sets.newHashSet()).add(struct);
}
} catch (Exception e) {
LOG.warn("Failed to rebuild partition from path: {}", path, e);
}
}
}

/** Build serializable partition paths from in-memory partitions. Called before persisting. */
public void buildPartitionPaths(MixedTable table) {
if (!partitionPaths.isEmpty()) {
return;
}
for (Map.Entry<Integer, Set<StructLike>> entry : partitions.entrySet()) {
int specId = entry.getKey();
PartitionSpec spec = MixedTableUtil.getMixedTablePartitionSpecById(table, specId);
if (spec == null) {
continue;
}
for (StructLike struct : entry.getValue()) {
partitionPaths.add(specId + ":" + spec.partitionToPath(struct));
}
}
}

public int getDataFileCount() {
return dataFileCount;
}
Expand Down
Loading
Loading