Fix ADLS Gen2 plugin to support Azure Blob Soft Delete#17806
Fix ADLS Gen2 plugin to support Azure Blob Soft Delete#17806
Conversation
There was a problem hiding this comment.
Pull request overview
Adds Azure Blob API uploads to the ADLS Gen2 PinotFS implementation to support storage accounts with Blob Soft Delete enabled, while retaining DFS for reads/metadata and as a fallback.
Changes:
- Introduces
BlobContainerClientinitialization (blob endpoint) alongside the existing DFS client. - Routes uploads through Blob API with DFS upload as a fallback when Blob client isn’t available.
- Adds unit tests covering Blob upload success and Blob upload failure paths.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java | Initializes Blob SDK client and uses it for uploads; retains DFS upload fallback. |
| pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java | Adds tests validating Blob upload path and exception wrapping. |
| pinot-plugins/pinot-file-system/pinot-adls/pom.xml | Adds azure-storage-blob dependency to support Blob API uploads. |
Comments suppressed due to low confidence (1)
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:1
- In the DEFAULT auth case,
defaultAzureCredentialBuilder.build()is invoked twice, creating two separate credential instances. Build once into a local variable (e.g.,TokenCredential credential = defaultAzureCredentialBuilder.build()) and pass the same instance to both builders to avoid duplicate initialization work and potential differences in internal caching behavior.
/**
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #17806 +/- ##
============================================
- Coverage 63.04% 63.03% -0.02%
Complexity 1617 1617
============================================
Files 3202 3202
Lines 194718 194718
Branches 30047 30047
============================================
- Hits 122760 122740 -20
- Misses 62233 62246 +13
- Partials 9725 9732 +7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
cfd6e35 to
69add46
Compare
69add46 to
1e6b02d
Compare
1e6b02d to
e85be0b
Compare
0e06b62 to
f56c65e
Compare
| private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, byte[] contentMd5, long contentLength) | ||
| throws IOException { | ||
| String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri); | ||
|
|
||
| if (_blobContainerClient != null) { | ||
| return copyInputStreamToDstViaBlob(inputStream, dstUri, path, contentMd5); | ||
| } | ||
| return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5); | ||
| } |
There was a problem hiding this comment.
contentLength is passed into copyInputStreamToDst(...) but never used. Either remove it to avoid dead parameters, or use it to select/parameterize the Blob upload call (e.g., provide the known length to the SDK upload options).
| int bytesRead; | ||
| int blockIdCounter = 0; | ||
| byte[] buffer = new byte[BUFFER_SIZE]; | ||
| List<String> blockIds = new ArrayList<>(); | ||
|
|
||
| try { | ||
| MessageDigest md5Block = MessageDigest.getInstance("MD5"); | ||
| while ((bytesRead = inputStream.read(buffer)) != -1) { | ||
| md5Block.reset(); | ||
| md5Block.update(buffer, 0, bytesRead); | ||
| byte[] md5BlockHash = md5Block.digest(); | ||
|
|
||
| String blockId = Base64.getEncoder() | ||
| .encodeToString(String.format("%08d", blockIdCounter).getBytes(StandardCharsets.UTF_8)); | ||
| blockIdCounter++; | ||
| blockIds.add(blockId); | ||
|
|
||
| try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(buffer, 0, bytesRead)) { | ||
| blockBlobClient.stageBlockWithResponse(blockId, byteArrayInputStream, bytesRead, md5BlockHash, null, null, | ||
| Context.NONE); | ||
| } | ||
| } | ||
| if (blobHttpHeaders != null) { | ||
| blockBlobClient.commitBlockListWithResponse(blockIds, blobHttpHeaders, null, null, null, null, Context.NONE); | ||
| } else { | ||
| blockBlobClient.commitBlockList(blockIds); | ||
| } |
There was a problem hiding this comment.
uploadWithBlockLevelChecksum(...) reuses the DFS buffer size (4MB) for Blob block staging. For Block Blobs, small blocks can dramatically increase block count and risk hitting Azure's max-blocks-per-blob limit (which effectively caps max upload size at ~4MB * maxBlocks). Consider using a larger block size (or making it configurable) when staging blocks via the Blob API.
The DFS (Data Lake) API endpoint does not support Azure Blob Soft Delete, causing 409 EndpointUnsupportedAccountFeatures errors when uploading segments to storage accounts with org-mandated Soft Delete policies. This change adds a BlobContainerClient alongside the existing DataLakeFileSystemClient, using the Blob API (*.blob.core.windows.net) for file uploads which is fully compatible with Soft Delete. The DFS API is retained for read/metadata operations (list, exists, open, etc.) and as a write fallback when the BlobContainerClient is not initialized. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use BlobParallelUploadOptions with uploadWithResponse to ensure consistent overwrite behavior across all non-checksum upload paths. Remove unused contentLength parameter from copyInputStreamToDstViaBlob. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
f56c65e to
56f124c
Compare
The DFS (Data Lake) API endpoint does not support Azure Blob Soft Delete, causing 409 EndpointUnsupportedAccountFeatures errors when uploading segments to storage accounts with org-mandated Soft Delete policies.
This change adds a BlobContainerClient alongside the existing DataLakeFileSystemClient, using the Blob API (*.blob.core.windows.net) for file uploads which is fully compatible with Soft Delete. The DFS API is retained for read/metadata operations (list, exists, open, etc.) and as a write fallback when the BlobContainerClient is not initialized.