Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ public SortedPosDeleteWriter<Record> buildBasePosDeleteWriter(
org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX
+ MetadataColumns.DELETE_FILE_POS.name(),
MetricsModes.Full.get().toString());
double heapUsageRatioThreshold =
PropertyUtil.propertyAsDouble(
table.properties(),
TableProperties.POS_DELETE_FLUSH_HEAP_RATIO,
TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT);
long recordsNumThreshold =
PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.POS_DELETE_FLUSH_RECORDS,
TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT);
int heapFlushMinRecords =
PropertyUtil.propertyAsInt(
table.properties(),
TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS,
TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT);
return new SortedPosDeleteWriter<>(
appenderFactory,
new CommonOutputFileFactory(
Expand All @@ -185,7 +200,10 @@ public SortedPosDeleteWriter<Record> buildBasePosDeleteWriter(
fileFormat,
mask,
index,
partitionKey);
partitionKey,
recordsNumThreshold,
heapUsageRatioThreshold,
heapFlushMinRecords);
}

public GenericChangeTaskWriter buildChangeWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.amoro.data.DataTreeNode;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableProperties;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -30,9 +31,11 @@
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.util.PropertyUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -62,6 +65,9 @@ public class MixedTreeNodePosDeleteWriter<T>
private final String location;

private final PartitionSpec spec;
private final double heapUsageRatioThreshold;
private final long recordsNumThreshold;
private final int heapFlushMinRecords;

public MixedTreeNodePosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
Expand All @@ -71,7 +77,9 @@ public MixedTreeNodePosDeleteWriter(
EncryptionManager encryptionManager,
Long transactionId,
String location,
PartitionSpec spec) {
PartitionSpec spec,
Map<String, String> properties) {
Map<String, String> safeProperties = properties == null ? Collections.emptyMap() : properties;
this.appenderFactory = appenderFactory;
this.format = format;
this.partition = partition;
Expand All @@ -80,6 +88,42 @@ public MixedTreeNodePosDeleteWriter(
this.transactionId = transactionId;
this.location = location;
this.spec = spec;
this.heapUsageRatioThreshold =
PropertyUtil.propertyAsDouble(
safeProperties,
TableProperties.POS_DELETE_FLUSH_HEAP_RATIO,
TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT);
this.recordsNumThreshold =
PropertyUtil.propertyAsLong(
safeProperties,
TableProperties.POS_DELETE_FLUSH_RECORDS,
TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT);
this.heapFlushMinRecords =
PropertyUtil.propertyAsInt(
safeProperties,
TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS,
TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT);
}

public MixedTreeNodePosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
FileFormat format,
StructLike partition,
AuthenticatedFileIO fileIO,
EncryptionManager encryptionManager,
Long transactionId,
String location,
PartitionSpec spec) {
this(
appenderFactory,
format,
partition,
fileIO,
encryptionManager,
transactionId,
location,
spec,
Collections.emptyMap());
}

@Override
Expand Down Expand Up @@ -109,7 +153,10 @@ private SortedPosDeleteWriter<T> generatePosDelete(DataTreeNode treeNode) {
format,
treeNode.mask(),
treeNode.index(),
partition);
partition,
recordsNumThreshold,
heapUsageRatioThreshold,
heapFlushMinRecords);
}

public List<DeleteFile> complete() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@
*/
public class SortedPosDeleteWriter<T> implements Closeable {
private static final long DEFAULT_RECORDS_NUM_THRESHOLD = Long.MAX_VALUE;
private static final Runtime RUNTIME = Runtime.getRuntime();
// Note: totalMemory/freeMemory only reflect the currently allocated heap, not max heap.
// When the heap has not fully expanded, usedMemory() can be underestimated and delay flushes.
private static final HeapUsageProvider RUNTIME_HEAP_USAGE =
new HeapUsageProvider() {
@Override
public long maxMemory() {
return RUNTIME.maxMemory();
}

@Override
public long usedMemory() {
return RUNTIME.totalMemory() - RUNTIME.freeMemory();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Improvement] usedMemory() 计算方式的局限性

Runtime.totalMemory() - Runtime.freeMemory() 是 JVM 堆使用量的粗略估算:

  • totalMemory() 是 JVM 当前已分配的堆大小(非最大堆),JVM 启动初期远小于 maxMemory()
  • freeMemory() 是已分配堆中的空闲部分

这意味着在 JVM 堆尚未完全扩展时,usedMemory() 返回值偏小,heap flush 不会及时触发——而这恰恰是最需要保护的时刻。

建议: 在注释中明确说明此计算方式的局限性,或考虑使用 MemoryPoolMXBean 获取更精确的 Eden/Old 区使用数据。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English Version

Acknowledged. This limitation is documented in the code comments (L53-54).

The current approach prioritizes low overhead and simplicity while ensuring accuracy when it matters most: when heap approaches maxMemory (the actual OOM risk window), totalMemory ≈ maxMemory and the calculation is accurate.

More precise alternatives like MemoryPoolMXBean can be added as an optional strategy in future enhancements, but the current simple implementation provides the right balance between overhead and OOM protection.

Could we consider implementing this feature in the future?

Chinese Version

已经确认这个问题,在L53-54添加了代码注释。

当前方案优先考虑低开销和简洁性,同时确保在最关键的时刻(即堆内存接近 maxMemory,实际的 OOM 风险窗口)的准确性:当堆内存接近 maxMemory 时,总内存 ≈ maxMemory,此时计算结果准确。

更精确的替代方案(例如 MemoryPoolMXBean)可以作为未来增强功能中的可选策略,当前简单的实现方案在开销和OOM更平衡一点。

我们可以考虑在未来实现这个功能吗?

}
};

interface HeapUsageProvider {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nitpick] HeapUsageProvider 接口可见性

当前 HeapUsageProvider 是 package-private(无修饰符)。如果未来需要在其他包中使用(如 Spark 模块自定义实现),可能需要改为 public

不过当前设计作为 package-private 是合理的——限制了 API 表面积,测试通过同一包访问。这只是一个提醒。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English Version

Intentionally package-private. This keeps the API surface minimal. If future use cases require cross-package customization (e.g., Spark module), we can promote it to public at that time. Current design is appropriate for the immediate needs.

Chinese Version

保持 package-private 可以限制 API 暴露面。如果将来出现跨包使用场景(如 Spark 模块自定义实现),那个时候再改为 public 应该就可以了。

long maxMemory();

long usedMemory();
}

private final Map<CharSequenceWrapper, List<PosRow<T>>> posDeletes = Maps.newHashMap();
private final List<DeleteFile> completedFiles = Lists.newArrayList();
Expand All @@ -61,6 +82,9 @@ public class SortedPosDeleteWriter<T> implements Closeable {
private final FileFormat format;
private final TaskWriterKey writerKey;
private final long recordsNumThreshold;
private final double heapUsageRatioThreshold;
private final int heapFlushMinRecords;
private final HeapUsageProvider heapUsageProvider;

private int records = 0;

Expand All @@ -72,14 +96,67 @@ public SortedPosDeleteWriter(
long mask,
long index,
StructLike partitionKey,
long recordsNumThreshold) {
long recordsNumThreshold,
double heapUsageRatioThreshold,
int heapFlushMinRecords) {
this(
appenderFactory,
fileFactory,
io,
format,
mask,
index,
partitionKey,
recordsNumThreshold,
heapUsageRatioThreshold,
heapFlushMinRecords,
RUNTIME_HEAP_USAGE);
}

SortedPosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
AuthenticatedFileIO io,
FileFormat format,
long mask,
long index,
StructLike partitionKey,
long recordsNumThreshold,
double heapUsageRatioThreshold,
int heapFlushMinRecords,
HeapUsageProvider heapUsageProvider) {
this.appenderFactory = appenderFactory;
this.fileFactory = fileFactory;
this.io = io;
this.format = format;
this.writerKey =
new TaskWriterKey(partitionKey, DataTreeNode.of(mask, index), DataFileType.POS_DELETE_FILE);
this.recordsNumThreshold = recordsNumThreshold;
this.heapUsageRatioThreshold = heapUsageRatioThreshold;
this.heapFlushMinRecords = heapFlushMinRecords;
this.heapUsageProvider = heapUsageProvider;
}

public SortedPosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
AuthenticatedFileIO io,
FileFormat format,
long mask,
long index,
StructLike partitionKey,
long recordsNumThreshold) {
this(
appenderFactory,
fileFactory,
io,
format,
mask,
index,
partitionKey,
recordsNumThreshold,
0d,
0);
}

public SortedPosDeleteWriter(
Expand All @@ -98,7 +175,9 @@ public SortedPosDeleteWriter(
mask,
index,
partitionKey,
DEFAULT_RECORDS_NUM_THRESHOLD);
DEFAULT_RECORDS_NUM_THRESHOLD,
0d,
0);
}

public SortedPosDeleteWriter(
Expand All @@ -115,7 +194,31 @@ public SortedPosDeleteWriter(
0,
0,
partitionKey,
DEFAULT_RECORDS_NUM_THRESHOLD);
DEFAULT_RECORDS_NUM_THRESHOLD,
0d,
0);
}

public SortedPosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
AuthenticatedFileIO io,
FileFormat format,
StructLike partitionKey,
long recordsNumThreshold,
double heapUsageRatioThreshold,
int heapFlushMinRecords) {
this(
appenderFactory,
fileFactory,
io,
format,
0,
0,
partitionKey,
recordsNumThreshold,
heapUsageRatioThreshold,
heapFlushMinRecords);
}

public void delete(CharSequence path, long pos) {
Expand All @@ -132,9 +235,17 @@ public void delete(CharSequence path, long pos, T row) {

records += 1;

// TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the
// threshold.
if (records >= recordsNumThreshold) {
// Avoid querying JVM memory on every delete (may be costly on some JDKs).
// Sample heap usage every heapFlushMinRecords records at most.
boolean flushByHeap = false;
if (heapUsageRatioThreshold > 0 && heapUsageRatioThreshold < 1) {
int checkInterval = Math.max(1, heapFlushMinRecords);
if (records % checkInterval == 0) {
flushByHeap = shouldFlushByHeap();
}
}

if (records >= recordsNumThreshold || flushByHeap) {
flushDeletes();
}
}
Expand Down Expand Up @@ -208,6 +319,24 @@ private void flushDeletes() {
completedFiles.add(writer.toDeleteFile());
}

private boolean shouldFlushByHeap() {
// Guard: allow disabling heap-based flushing by setting an invalid ratio.
if (heapUsageRatioThreshold <= 0 || heapUsageRatioThreshold >= 1) {
return false;
}
// Guard: avoid flushing too frequently on small buffers.
if (records < Math.max(1, heapFlushMinRecords)) {
return false;
}
long max = heapUsageProvider.maxMemory();
if (max <= 0) {
return false;
}
// Approximate current heap usage; do not force GC here.
long used = heapUsageProvider.usedMemory();
return used >= (long) (max * heapUsageRatioThreshold);
}

private static class PosRow<R> {
private final long pos;
private final R row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected FileWriter<PositionDelete<Record>, DeleteWriteResult> posWriter() {
encryptionManager(),
getTransactionId(input.rePosDeletedDataFilesForMixed()),
baseLocation(),
table.spec());
table.spec(),
table.properties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ private TableProperties() {}
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 134217728; // 128 MB

public static final String POS_DELETE_FLUSH_HEAP_RATIO = "pos-delete.flush.heap.ratio";
public static final double POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT = 0.8d;

public static final String POS_DELETE_FLUSH_RECORDS = "pos-delete.flush.records";
public static final long POS_DELETE_FLUSH_RECORDS_DEFAULT = Long.MAX_VALUE;

public static final String POS_DELETE_FLUSH_HEAP_MIN_RECORDS =
"pos-delete.flush.heap.min-records";
public static final int POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT = 1000;

public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void test() throws IOException {
table.encryption(),
1L,
table.location(),
table.spec());
table.spec(),
table.properties());

writer.setTreeNode(DataTreeNode.ofId(4));
writer.delete("a", 0);
Expand Down
Loading
Loading