Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ commons-io:commons-io;2.17.0
net.java.dev.jna:jna-platform;5.17.0
net.oneandone.reflections8:reflections8;0.11.7
net.jonathangiles.tools:dependencyChecker-maven-plugin;1.0.6
org.apache.arrow:arrow-memory-unsafe;18.1.0
org.apache.arrow:arrow-vector;18.1.0
org.apache.commons:commons-collections4;4.4
org.apache.commons:commons-text;1.10.0
org.apache.maven:maven-plugin-api;3.9.11
Expand All @@ -186,6 +188,7 @@ org.apache.maven.plugins:maven-shade-plugin;3.6.0
org.apache.maven.plugins:maven-site-plugin;3.21.0
org.apache.maven.plugins:maven-source-plugin;3.3.1
org.apache.maven.plugins:maven-surefire-plugin;3.5.3
org.checkerframework:checker-qual;3.42.0
org.codehaus.mojo:animal-sniffer-maven-plugin;1.24
org.codehaus.mojo:build-helper-maven-plugin;3.6.1
org.codehaus.mojo:exec-maven-plugin;3.5.1
Expand Down
4 changes: 4 additions & 0 deletions sdk/parents/azure-client-sdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@

<!-- Used by many libraries to bring in annotations used by Reactor -->
<include>com.google.code.findbugs:jsr305:[3.0.2]</include> <!-- {x-include-update;com.google.code.findbugs:jsr305;external_dependency} -->

<include>org.apache.arrow:arrow-memory-unsafe:[18.1.0]</include> <!-- {x-include-update;org.apache.arrow:arrow-memory-unsafe;external_dependency} -->
<include>org.apache.arrow:arrow-vector:[18.1.0]</include> <!-- {x-include-update;org.apache.arrow:arrow-vector;external_dependency} -->
<include>org.checkerframework:checker-qual:[3.42.0]</include> <!-- {x-include-update;org.checkerframework:checker-qual;external_dependency} -->
</includes>
</bannedDependencies>

Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_47f4243e59"
"Tag": "java/storage/azure-storage-blob_08307061c5"
}
30 changes: 30 additions & 0 deletions sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
--add-reads com.azure.storage.blob=com.azure.core.http.okhttp
--add-reads com.azure.storage.blob=com.azure.core.http.jdk.httpclient
--add-reads com.azure.storage.blob=com.azure.core.http.vertx
--add-opens java.base/java.nio=ALL-UNNAMED <!-- Arrow's BufferAllocator uses sun.misc.Unsafe to allocate direct byte buffers, which requires this open. -->
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED <!-- Arrow's BufferAllocator uses sun.misc.Unsafe to allocate direct byte buffers, which requires this open. -->
</javaModulesSurefireArgLine>
<parallelizeLiveTests>concurrent</parallelizeLiveTests>

Expand Down Expand Up @@ -138,8 +140,36 @@
<version>1.17.7</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
<scope>test</scope>
</dependency>

<!-- Arrow dependencies -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.42.0</version>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-bom</artifactId>
<version>18.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<profiles>
<profile>
<id>inject-sas-service-version</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.azure.storage.blob.implementation.models.EncryptionScope;
import com.azure.storage.blob.implementation.models.ListBlobsFlatSegmentResponse;
import com.azure.storage.blob.implementation.models.ListBlobsHierarchySegmentResponse;
import com.azure.storage.blob.implementation.util.ArrowBlobListDeserializer;
import com.azure.storage.blob.implementation.util.BlobConstants;
import com.azure.storage.blob.implementation.util.BlobSasImplUtil;
import com.azure.storage.blob.implementation.util.ModelHelper;
Expand Down Expand Up @@ -1125,11 +1126,18 @@ PagedFlux<BlobItem> listBlobsFlatWithOptionalTimeout(ListBlobsOptions options, S
finalOptions = new ListBlobsOptions().setMaxResultsPerPage(pageSize)
.setPrefix(options.getPrefix())
.setDetails(options.getDetails());
if (Boolean.TRUE.equals(options.getUseArrow())) {
finalOptions.setUseArrow(true).setEndBefore(options.getEndBefore());
}
}
} else {
finalOptions = options;
}

if (finalOptions != null && Boolean.TRUE.equals(finalOptions.getUseArrow())) {
return listBlobsFlatSegmentArrow(marker, finalOptions, timeout);
}

return listBlobsFlatSegment(marker, finalOptions, timeout).map(response -> {
List<BlobItem> value = response.getValue().getSegment() == null
? Collections.emptyList()
Expand Down Expand Up @@ -1177,6 +1185,62 @@ PagedFlux<BlobItem> listBlobsFlatWithOptionalTimeout(ListBlobsOptions options, S
timeout);
}

private Mono<PagedResponse<BlobItem>> listBlobsFlatSegmentArrow(String marker, ListBlobsOptions options,
Duration timeout) {
options = options == null ? new ListBlobsOptions() : options;

ArrayList<ListBlobsIncludeItem> include
= options.getDetails().toList().isEmpty() ? null : options.getDetails().toList();

ListBlobsOptions finalOptions = options;
return StorageImplUtils.applyOptionalTimeout(this.azureBlobStorage.getContainers()
.listBlobFlatSegmentApacheArrowWithResponseAsync(containerName, finalOptions.getPrefix(), marker,
finalOptions.getMaxResultsPerPage(), include, null, finalOptions.getStartFrom(),
finalOptions.getEndBefore(), null, Context.NONE),
timeout).flatMap(response -> {
String contentType = response.getHeaders().getValue(com.azure.core.http.HttpHeaderName.CONTENT_TYPE);

return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(bytes -> {
java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(bytes);

if (contentType != null && contentType.contentEquals("application/vnd.apache.arrow.stream")) {
ArrowBlobListDeserializer.ArrowListBlobsResult arrowResult
= ArrowBlobListDeserializer.deserialize(bais);

List<BlobItem> value = arrowResult.getBlobItems()
.stream()
.map(ModelHelper::populateBlobItem)
.collect(Collectors.toList());

return (PagedResponse<BlobItem>) new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(), response.getHeaders(), value, arrowResult.getNextMarker(),
response.getDeserializedHeaders());
} else {
// XML fallback
try {
ListBlobsFlatSegmentResponse xmlResponse
= ListBlobsFlatSegmentResponse.fromXml(com.azure.xml.XmlReader.fromStream(bais));

List<BlobItem> value = xmlResponse.getSegment() == null
? Collections.emptyList()
: xmlResponse.getSegment()
.getBlobItems()
.stream()
.map(ModelHelper::populateBlobItem)
.collect(Collectors.toList());

return (PagedResponse<BlobItem>) new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(), response.getHeaders(), value, xmlResponse.getNextMarker(),
null);
} catch (javax.xml.stream.XMLStreamException e) {
throw LOGGER
.logExceptionAsError(new RuntimeException("Failed to parse XML fallback response", e));
}
}
});
});
}

/**
* Returns a reactive Publisher emitting all the blobs and directories (prefixes) under the given directory
* (prefix). Directories will have {@link BlobItem#isPrefix()} set to true.
Expand Down Expand Up @@ -1302,10 +1366,18 @@ PagedFlux<BlobItem> listBlobsHierarchyWithOptionalTimeout(String delimiter, List
.setPrefix(options.getPrefix())
.setDetails(options.getDetails())
.setStartFrom(options.getStartFrom());
if (Boolean.TRUE.equals(options.getUseArrow())) {
finalOptions.setUseArrow(true).setEndBefore(options.getEndBefore());
}
}
} else {
finalOptions = options;
}

if (finalOptions != null && Boolean.TRUE.equals(finalOptions.getUseArrow())) {
return listBlobsHierarchySegmentArrow(marker, delimiter, finalOptions, timeout);
}

return listBlobsHierarchySegment(marker, delimiter, finalOptions, timeout).map(response -> {
BlobHierarchyListSegment segment = response.getValue().getSegment();
List<BlobItem> value;
Expand Down Expand Up @@ -1344,6 +1416,71 @@ PagedFlux<BlobItem> listBlobsHierarchyWithOptionalTimeout(String delimiter, List
timeout);
}

private Mono<PagedResponse<BlobItem>> listBlobsHierarchySegmentArrow(String marker, String delimiter,
ListBlobsOptions options, Duration timeout) {
options = options == null ? new ListBlobsOptions() : options;
if (options.getDetails().getRetrieveSnapshots()) {
throw LOGGER.logExceptionAsError(
new UnsupportedOperationException("Including snapshots in a hierarchical listing is not supported."));
}

ArrayList<ListBlobsIncludeItem> include
= options.getDetails().toList().isEmpty() ? null : options.getDetails().toList();

ListBlobsOptions finalOptions = options;
return StorageImplUtils
.applyOptionalTimeout(this.azureBlobStorage.getContainers()
.listBlobHierarchySegmentApacheArrowWithResponseAsync(containerName, delimiter,
finalOptions.getPrefix(), marker, finalOptions.getMaxResultsPerPage(), include, null,
finalOptions.getStartFrom(), finalOptions.getEndBefore(), null, Context.NONE),
timeout)
.flatMap(response -> {
String contentType = response.getHeaders().getValue(com.azure.core.http.HttpHeaderName.CONTENT_TYPE);

return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(bytes -> {
java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(bytes);

if (contentType != null && contentType.contentEquals("application/vnd.apache.arrow.stream")) {
ArrowBlobListDeserializer.ArrowListBlobsResult arrowResult
= ArrowBlobListDeserializer.deserialize(bais);

List<BlobItem> value = arrowResult.getBlobItems()
.stream()
.map(ModelHelper::populateBlobItem)
.collect(Collectors.toList());

return (PagedResponse<BlobItem>) new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(), response.getHeaders(), value, arrowResult.getNextMarker(),
response.getDeserializedHeaders());
} else {
// XML fallback
try {
ListBlobsHierarchySegmentResponse xmlResponse
= ListBlobsHierarchySegmentResponse.fromXml(com.azure.xml.XmlReader.fromStream(bais));

BlobHierarchyListSegment segment = xmlResponse.getSegment();
List<BlobItem> value = new ArrayList<>();
if (segment != null) {
segment.getBlobItems()
.forEach(item -> value.add(BlobItemConstructorProxy.create(item)));
segment.getBlobPrefixes()
.forEach(prefix -> value
.add(new BlobItem().setName(ModelHelper.toBlobNameString(prefix.getName()))
.setIsPrefix(true)));
}

return (PagedResponse<BlobItem>) new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(), response.getHeaders(), value, xmlResponse.getNextMarker(),
null);
} catch (javax.xml.stream.XMLStreamException e) {
throw LOGGER
.logExceptionAsError(new RuntimeException("Failed to parse XML fallback response", e));
}
}
});
});
}

/**
* Returns a reactive Publisher emitting the blobs in this container whose tags match the query expression. For more
* information, including information on the query syntax, see the <a href="https://docs.microsoft.com/rest/api/storageservices/find-blobs-by-tags">Azure Docs</a>.
Expand Down
Loading