-
Notifications
You must be signed in to change notification settings - Fork 381
[AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM. #4167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,27 @@ | |
| */ | ||
| public class SortedPosDeleteWriter<T> implements Closeable { | ||
| private static final long DEFAULT_RECORDS_NUM_THRESHOLD = Long.MAX_VALUE; | ||
| private static final Runtime RUNTIME = Runtime.getRuntime(); | ||
| // Note: totalMemory/freeMemory only reflect the currently allocated heap, not max heap. | ||
| // When the heap has not fully expanded, usedMemory() can be underestimated and delay flushes. | ||
| private static final HeapUsageProvider RUNTIME_HEAP_USAGE = | ||
| new HeapUsageProvider() { | ||
| @Override | ||
| public long maxMemory() { | ||
| return RUNTIME.maxMemory(); | ||
| } | ||
|
|
||
| @Override | ||
| public long usedMemory() { | ||
| return RUNTIME.totalMemory() - RUNTIME.freeMemory(); | ||
| } | ||
| }; | ||
|
|
||
| interface HeapUsageProvider { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Nitpick] 当前 不过当前设计作为 package-private 是合理的——限制了 API 表面积,测试通过同一包访问。这只是一个提醒。
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Intentionally package-private. This keeps the API surface minimal. If future use cases require cross-package customization (e.g., Spark module), we can promote it to public at that time. Current design is appropriate for the immediate needs.
保持 package-private 可以限制 API 暴露面。如果将来出现跨包使用场景(如 Spark 模块自定义实现),那个时候再改为 public 应该就可以了。 |
||
| long maxMemory(); | ||
|
|
||
| long usedMemory(); | ||
| } | ||
|
|
||
| private final Map<CharSequenceWrapper, List<PosRow<T>>> posDeletes = Maps.newHashMap(); | ||
| private final List<DeleteFile> completedFiles = Lists.newArrayList(); | ||
|
|
@@ -61,6 +82,9 @@ public class SortedPosDeleteWriter<T> implements Closeable { | |
| private final FileFormat format; | ||
| private final TaskWriterKey writerKey; | ||
| private final long recordsNumThreshold; | ||
| private final double heapUsageRatioThreshold; | ||
| private final int heapFlushMinRecords; | ||
| private final HeapUsageProvider heapUsageProvider; | ||
|
|
||
| private int records = 0; | ||
|
|
||
|
|
@@ -72,14 +96,67 @@ public SortedPosDeleteWriter( | |
| long mask, | ||
| long index, | ||
| StructLike partitionKey, | ||
| long recordsNumThreshold) { | ||
| long recordsNumThreshold, | ||
| double heapUsageRatioThreshold, | ||
| int heapFlushMinRecords) { | ||
| this( | ||
| appenderFactory, | ||
| fileFactory, | ||
| io, | ||
| format, | ||
| mask, | ||
| index, | ||
| partitionKey, | ||
| recordsNumThreshold, | ||
| heapUsageRatioThreshold, | ||
| heapFlushMinRecords, | ||
| RUNTIME_HEAP_USAGE); | ||
| } | ||
|
|
||
| SortedPosDeleteWriter( | ||
| FileAppenderFactory<T> appenderFactory, | ||
| OutputFileFactory fileFactory, | ||
| AuthenticatedFileIO io, | ||
| FileFormat format, | ||
| long mask, | ||
| long index, | ||
| StructLike partitionKey, | ||
| long recordsNumThreshold, | ||
| double heapUsageRatioThreshold, | ||
| int heapFlushMinRecords, | ||
| HeapUsageProvider heapUsageProvider) { | ||
| this.appenderFactory = appenderFactory; | ||
| this.fileFactory = fileFactory; | ||
| this.io = io; | ||
| this.format = format; | ||
| this.writerKey = | ||
| new TaskWriterKey(partitionKey, DataTreeNode.of(mask, index), DataFileType.POS_DELETE_FILE); | ||
| this.recordsNumThreshold = recordsNumThreshold; | ||
| this.heapUsageRatioThreshold = heapUsageRatioThreshold; | ||
| this.heapFlushMinRecords = heapFlushMinRecords; | ||
| this.heapUsageProvider = heapUsageProvider; | ||
| } | ||
|
|
||
| public SortedPosDeleteWriter( | ||
| FileAppenderFactory<T> appenderFactory, | ||
| OutputFileFactory fileFactory, | ||
| AuthenticatedFileIO io, | ||
| FileFormat format, | ||
| long mask, | ||
| long index, | ||
| StructLike partitionKey, | ||
| long recordsNumThreshold) { | ||
| this( | ||
| appenderFactory, | ||
| fileFactory, | ||
| io, | ||
| format, | ||
| mask, | ||
| index, | ||
| partitionKey, | ||
| recordsNumThreshold, | ||
| 0d, | ||
| 0); | ||
| } | ||
|
|
||
| public SortedPosDeleteWriter( | ||
|
|
@@ -98,7 +175,9 @@ public SortedPosDeleteWriter( | |
| mask, | ||
| index, | ||
| partitionKey, | ||
| DEFAULT_RECORDS_NUM_THRESHOLD); | ||
| DEFAULT_RECORDS_NUM_THRESHOLD, | ||
| 0d, | ||
| 0); | ||
| } | ||
|
|
||
| public SortedPosDeleteWriter( | ||
|
|
@@ -115,7 +194,31 @@ public SortedPosDeleteWriter( | |
| 0, | ||
| 0, | ||
| partitionKey, | ||
| DEFAULT_RECORDS_NUM_THRESHOLD); | ||
| DEFAULT_RECORDS_NUM_THRESHOLD, | ||
| 0d, | ||
| 0); | ||
| } | ||
|
|
||
| public SortedPosDeleteWriter( | ||
| FileAppenderFactory<T> appenderFactory, | ||
| OutputFileFactory fileFactory, | ||
| AuthenticatedFileIO io, | ||
| FileFormat format, | ||
| StructLike partitionKey, | ||
| long recordsNumThreshold, | ||
| double heapUsageRatioThreshold, | ||
| int heapFlushMinRecords) { | ||
| this( | ||
| appenderFactory, | ||
| fileFactory, | ||
| io, | ||
| format, | ||
| 0, | ||
| 0, | ||
| partitionKey, | ||
| recordsNumThreshold, | ||
| heapUsageRatioThreshold, | ||
| heapFlushMinRecords); | ||
| } | ||
|
|
||
| public void delete(CharSequence path, long pos) { | ||
|
|
@@ -132,9 +235,17 @@ public void delete(CharSequence path, long pos, T row) { | |
|
|
||
| records += 1; | ||
|
|
||
| // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the | ||
| // threshold. | ||
| if (records >= recordsNumThreshold) { | ||
| // Avoid querying JVM memory on every delete (may be costly on some JDKs). | ||
| // Sample heap usage every heapFlushMinRecords records at most. | ||
| boolean flushByHeap = false; | ||
| if (heapUsageRatioThreshold > 0 && heapUsageRatioThreshold < 1) { | ||
| int checkInterval = Math.max(1, heapFlushMinRecords); | ||
| if (records % checkInterval == 0) { | ||
| flushByHeap = shouldFlushByHeap(); | ||
| } | ||
| } | ||
|
|
||
| if (records >= recordsNumThreshold || flushByHeap) { | ||
| flushDeletes(); | ||
| } | ||
| } | ||
|
|
@@ -208,6 +319,24 @@ private void flushDeletes() { | |
| completedFiles.add(writer.toDeleteFile()); | ||
| } | ||
|
|
||
| private boolean shouldFlushByHeap() { | ||
| // Guard: allow disabling heap-based flushing by setting an invalid ratio. | ||
| if (heapUsageRatioThreshold <= 0 || heapUsageRatioThreshold >= 1) { | ||
| return false; | ||
| } | ||
| // Guard: avoid flushing too frequently on small buffers. | ||
| if (records < Math.max(1, heapFlushMinRecords)) { | ||
| return false; | ||
| } | ||
| long max = heapUsageProvider.maxMemory(); | ||
| if (max <= 0) { | ||
| return false; | ||
| } | ||
| // Approximate current heap usage; do not force GC here. | ||
| long used = heapUsageProvider.usedMemory(); | ||
| return used >= (long) (max * heapUsageRatioThreshold); | ||
| } | ||
|
|
||
| private static class PosRow<R> { | ||
| private final long pos; | ||
| private final R row; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Improvement]
usedMemory()计算方式的局限性Runtime.totalMemory() - Runtime.freeMemory()是 JVM 堆使用量的粗略估算:totalMemory()是 JVM 当前已分配的堆大小(非最大堆),JVM 启动初期远小于maxMemory()freeMemory()是已分配堆中的空闲部分这意味着在 JVM 堆尚未完全扩展时,
usedMemory()返回值偏小,heap flush 不会及时触发——而这恰恰是最需要保护的时刻。建议: 在注释中明确说明此计算方式的局限性,或考虑使用
MemoryPoolMXBean获取更精确的 Eden/Old 区使用数据。There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged. This limitation is documented in the code comments (L53-54).
The current approach prioritizes low overhead and simplicity while ensuring accuracy when it matters most: when heap approaches maxMemory (the actual OOM risk window), totalMemory ≈ maxMemory and the calculation is accurate.
More precise alternatives like MemoryPoolMXBean can be added as an optional strategy in future enhancements, but the current simple implementation provides the right balance between overhead and OOM protection.
Could we consider implementing this feature in the future?
已经确认这个问题,在L53-54添加了代码注释。
当前方案优先考虑低开销和简洁性,同时确保在最关键的时刻(即堆内存接近 maxMemory,实际的 OOM 风险窗口)的准确性:当堆内存接近 maxMemory 时,总内存 ≈ maxMemory,此时计算结果准确。
更精确的替代方案(例如 MemoryPoolMXBean)可以作为未来增强功能中的可选策略,当前简单的实现方案在开销和OOM更平衡一点。
我们可以考虑在未来实现这个功能吗?