Skip to content

Commit 42abc6b

Browse files
Add DataLakeFileSystemClient constructor in ADLSFileIO (#14966)
spotless Add tests/cleanup remove redundant null check remove redundant comment Add caching just for supplier method Cache ADLS clients and call initialize in constructor use host for key caching
1 parent 48e4944 commit 42abc6b

2 files changed

Lines changed: 333 additions & 0 deletions

File tree

azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSFileIO.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.iceberg.io.OutputFile;
3939
import org.apache.iceberg.metrics.MetricsContext;
4040
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
41+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
42+
import org.apache.iceberg.util.SerializableFunction;
4143
import org.apache.iceberg.util.SerializableMap;
4244
import org.apache.iceberg.util.Tasks;
4345
import org.apache.iceberg.util.ThreadPools;
@@ -57,6 +59,8 @@ public class ADLSFileIO implements DelegateFileIO {
5759
private MetricsContext metrics = MetricsContext.nullMetrics();
5860
private SerializableMap<String, String> properties;
5961
private VendedAdlsCredentialProvider vendedAdlsCredentialProvider;
62+
private SerializableFunction<ADLSLocation, DataLakeFileSystemClient> clientSupplier;
63+
private transient volatile Map<String, DataLakeFileSystemClient> clientCache;
6064

6165
/**
6266
* No-arg constructor to load the FileIO dynamically.
@@ -70,6 +74,23 @@ public ADLSFileIO() {}
7074
this.azureProperties = azureProperties;
7175
}
7276

77+
/**
78+
* Constructor with custom DataLakeFileSystemClient function.
79+
*
80+
* <p>Unlike the no-arg constructor, this constructor initializes properties and azureProperties
81+
* immediately, allowing immediate use without calling {@link ADLSFileIO#initialize(Map)}.
82+
*
83+
* <p>The function receives an {@link ADLSLocation} and should return an appropriate {@link
84+
* DataLakeFileSystemClient} for that location. Clients are cached per storage account and
85+
* container combination.
86+
*
87+
* @param clientSupplier function that creates a client for a given location
88+
*/
89+
public ADLSFileIO(SerializableFunction<ADLSLocation, DataLakeFileSystemClient> clientSupplier) {
90+
this.clientSupplier = clientSupplier;
91+
initialize(Maps.newHashMap());
92+
}
93+
7394
@Override
7495
public InputFile newInputFile(String path) {
7596
return new ADLSInputFile(path, fileClient(path), azureProperties, metrics);
@@ -109,6 +130,22 @@ public DataLakeFileSystemClient client(String path) {
109130

110131
@VisibleForTesting
111132
DataLakeFileSystemClient client(ADLSLocation location) {
133+
if (clientCache == null) {
134+
synchronized (this) {
135+
if (clientCache == null) {
136+
clientCache = Maps.newConcurrentMap();
137+
}
138+
}
139+
}
140+
String cacheKey = location.host() + "/" + location.container().orElse("");
141+
return clientCache.computeIfAbsent(cacheKey, k -> buildClient(location));
142+
}
143+
144+
private DataLakeFileSystemClient buildClient(ADLSLocation location) {
145+
if (clientSupplier != null) {
146+
return clientSupplier.apply(location);
147+
}
148+
112149
DataLakeFileSystemClientBuilder clientBuilder =
113150
new DataLakeFileSystemClientBuilder().httpClient(HTTP);
114151

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.azure.adlsv2;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import com.azure.storage.file.datalake.DataLakeFileClient;
26+
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
27+
import java.io.IOException;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import org.apache.iceberg.TestHelpers;
30+
import org.apache.iceberg.io.FileIO;
31+
import org.apache.iceberg.io.InputFile;
32+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
33+
import org.apache.iceberg.util.SerializableFunction;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.params.ParameterizedTest;
36+
import org.junit.jupiter.params.provider.MethodSource;
37+
38+
public class TestADLSFileIO {
39+
40+
@Test
41+
public void testConstructorWithClientSupplier() {
42+
DataLakeFileSystemClient mockClient = mock(DataLakeFileSystemClient.class);
43+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier = location -> mockClient;
44+
45+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
46+
47+
// Verify properties are initialized (should not throw NPE)
48+
assertThat(fileIO.properties()).isNotNull();
49+
assertThat(fileIO.properties()).isEmpty();
50+
}
51+
52+
@Test
53+
public void testConstructorWithClientSupplierAndInitialize() {
54+
DataLakeFileSystemClient mockClient = mock(DataLakeFileSystemClient.class);
55+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier = location -> mockClient;
56+
57+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
58+
fileIO.initialize(ImmutableMap.of("key1", "value1"));
59+
60+
// Verify properties from initialize are used
61+
assertThat(fileIO.properties()).containsEntry("key1", "value1");
62+
}
63+
64+
@Test
65+
public void testClientSupplierIsUsed() {
66+
DataLakeFileSystemClient mockClient = mock(DataLakeFileSystemClient.class);
67+
68+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier = location -> mockClient;
69+
70+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
71+
fileIO.initialize(ImmutableMap.of());
72+
73+
// Call client method to verify supplier is invoked
74+
DataLakeFileSystemClient client =
75+
fileIO.client("abfs://container@account.dfs.core.windows.net/path/to/file");
76+
77+
assertThat(client).isEqualTo(mockClient);
78+
}
79+
80+
@Test
81+
public void testClientSupplierWithoutInitialize() {
82+
DataLakeFileSystemClient mockClient = mock(DataLakeFileSystemClient.class);
83+
DataLakeFileClient mockFileClient = mock(DataLakeFileClient.class);
84+
85+
when(mockClient.getFileClient("path/to/file")).thenReturn(mockFileClient);
86+
87+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier = location -> mockClient;
88+
89+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
90+
91+
// Should work without calling initialize()
92+
// This verifies azureProperties is initialized in constructor
93+
assertThat(fileIO.properties()).isNotNull();
94+
assertThat(fileIO.properties()).isEmpty();
95+
96+
// Should be able to create files without NPE
97+
InputFile inputFile =
98+
fileIO.newInputFile("abfs://container@account.dfs.core.windows.net/path/to/file");
99+
assertThat(inputFile).isNotNull();
100+
}
101+
102+
@Test
103+
public void testNoArgConstructor() {
104+
ADLSFileIO fileIO = new ADLSFileIO();
105+
106+
// Properties should be null before initialization
107+
// Initialize with empty map to avoid NPE
108+
fileIO.initialize(ImmutableMap.of());
109+
110+
assertThat(fileIO.properties()).isNotNull();
111+
assertThat(fileIO.properties()).isEmpty();
112+
}
113+
114+
@ParameterizedTest
115+
@MethodSource("org.apache.iceberg.TestHelpers#serializers")
116+
public void testSerializationWithClientSupplier(
117+
TestHelpers.RoundTripSerializer<FileIO> roundTripSerializer)
118+
throws IOException, ClassNotFoundException {
119+
// Use an AtomicInteger to track supplier invocations across serialization
120+
AtomicInteger supplierInvocationCount = new AtomicInteger(0);
121+
122+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier =
123+
location -> {
124+
supplierInvocationCount.incrementAndGet();
125+
// Return null - we're only testing serialization, not actual client usage
126+
return null;
127+
};
128+
129+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
130+
fileIO.initialize(ImmutableMap.of("key1", "value1", "key2", "value2"));
131+
132+
// Verify original FileIO works
133+
assertThat(fileIO.properties()).containsEntry("key1", "value1");
134+
assertThat(fileIO.properties()).containsEntry("key2", "value2");
135+
136+
// Serialize and deserialize
137+
FileIO deserializedFileIO = roundTripSerializer.apply(fileIO);
138+
139+
// Verify properties are preserved after deserialization
140+
assertThat(deserializedFileIO.properties()).isEqualTo(fileIO.properties());
141+
142+
// Verify the deserialized FileIO is an ADLSFileIO and can call client()
143+
assertThat(deserializedFileIO).isInstanceOf(ADLSFileIO.class);
144+
ADLSFileIO deserializedADLSFileIO = (ADLSFileIO) deserializedFileIO;
145+
146+
// Call client() to verify the supplier was serialized and can be invoked
147+
DataLakeFileSystemClient client =
148+
deserializedADLSFileIO.client("abfs://container@account.dfs.core.windows.net/path");
149+
// The supplier returns null, so client should be null
150+
assertThat(client).isNull();
151+
}
152+
153+
@ParameterizedTest
154+
@MethodSource("org.apache.iceberg.TestHelpers#serializers")
155+
public void testSerializationWithNoArgConstructor(
156+
TestHelpers.RoundTripSerializer<FileIO> roundTripSerializer)
157+
throws IOException, ClassNotFoundException {
158+
ADLSFileIO fileIO = new ADLSFileIO();
159+
fileIO.initialize(ImmutableMap.of("key1", "value1", "key2", "value2"));
160+
161+
// Serialize and deserialize
162+
FileIO deserializedFileIO = roundTripSerializer.apply(fileIO);
163+
164+
// Verify properties are preserved after deserialization
165+
assertThat(deserializedFileIO.properties()).isEqualTo(fileIO.properties());
166+
}
167+
168+
@Test
169+
public void testClientSupplierIsCachedPerContainer() {
170+
DataLakeFileSystemClient mockClient1 = mock(DataLakeFileSystemClient.class);
171+
DataLakeFileSystemClient mockClient2 = mock(DataLakeFileSystemClient.class);
172+
AtomicInteger supplierInvocationCount = new AtomicInteger(0);
173+
174+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier =
175+
location -> {
176+
supplierInvocationCount.incrementAndGet();
177+
// Return different clients for different containers
178+
return location.container().orElse("").equals("container1") ? mockClient1 : mockClient2;
179+
};
180+
181+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
182+
183+
// Same container - should cache
184+
DataLakeFileSystemClient client1 =
185+
fileIO.client("abfs://container1@account.dfs.core.windows.net/path1");
186+
DataLakeFileSystemClient client2 =
187+
fileIO.client("abfs://container1@account.dfs.core.windows.net/path2");
188+
189+
assertThat(supplierInvocationCount.get()).isEqualTo(1);
190+
assertThat(client1).isSameAs(client2);
191+
192+
// Different container - should call supplier again
193+
DataLakeFileSystemClient client3 =
194+
fileIO.client("abfs://container2@account.dfs.core.windows.net/path3");
195+
196+
assertThat(supplierInvocationCount.get()).isEqualTo(2);
197+
assertThat(client3).isSameAs(mockClient2);
198+
}
199+
200+
@Test
201+
public void testClientCachedPerStorageAccountAndContainer() {
202+
DataLakeFileSystemClient mockClient1 = mock(DataLakeFileSystemClient.class);
203+
DataLakeFileSystemClient mockClient2 = mock(DataLakeFileSystemClient.class);
204+
DataLakeFileSystemClient mockClient3 = mock(DataLakeFileSystemClient.class);
205+
AtomicInteger supplierInvocationCount = new AtomicInteger(0);
206+
207+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier =
208+
location -> {
209+
supplierInvocationCount.incrementAndGet();
210+
String host = location.host();
211+
String container = location.container().orElse("");
212+
if (host.equals("account1.dfs.core.windows.net") && container.equals("container")) {
213+
return mockClient1;
214+
} else if (host.equals("account2.dfs.core.windows.net")
215+
&& container.equals("container")) {
216+
return mockClient2;
217+
} else {
218+
return mockClient3;
219+
}
220+
};
221+
222+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
223+
224+
// Same account, same container - should cache
225+
DataLakeFileSystemClient client1 =
226+
fileIO.client("abfs://container@account1.dfs.core.windows.net/path1");
227+
DataLakeFileSystemClient client2 =
228+
fileIO.client("abfs://container@account1.dfs.core.windows.net/path2");
229+
230+
assertThat(supplierInvocationCount.get()).isEqualTo(1);
231+
assertThat(client1).isSameAs(client2);
232+
assertThat(client1).isSameAs(mockClient1);
233+
234+
// Different account, same container - should call supplier again
235+
DataLakeFileSystemClient client3 =
236+
fileIO.client("abfs://container@account2.dfs.core.windows.net/path3");
237+
238+
assertThat(supplierInvocationCount.get()).isEqualTo(2);
239+
assertThat(client3).isSameAs(mockClient2);
240+
assertThat(client3).isNotSameAs(client1);
241+
242+
// Same account as first, different container - should call supplier again
243+
DataLakeFileSystemClient client4 =
244+
fileIO.client("abfs://other@account1.dfs.core.windows.net/path4");
245+
246+
assertThat(supplierInvocationCount.get()).isEqualTo(3);
247+
assertThat(client4).isSameAs(mockClient3);
248+
}
249+
250+
@Test
251+
public void testClientSupplierCachingIsThreadSafe() throws Exception {
252+
DataLakeFileSystemClient mockClient = mock(DataLakeFileSystemClient.class);
253+
AtomicInteger supplierInvocationCount = new AtomicInteger(0);
254+
255+
SerializableFunction<ADLSLocation, DataLakeFileSystemClient> supplier =
256+
location -> {
257+
supplierInvocationCount.incrementAndGet();
258+
return mockClient;
259+
};
260+
261+
ADLSFileIO fileIO = new ADLSFileIO(supplier);
262+
263+
// Run multiple threads concurrently calling client() with same container
264+
int numThreads = 10;
265+
Thread[] threads = new Thread[numThreads];
266+
DataLakeFileSystemClient[] results = new DataLakeFileSystemClient[numThreads];
267+
268+
for (int i = 0; i < numThreads; i++) {
269+
final int index = i;
270+
threads[i] =
271+
new Thread(
272+
() -> {
273+
results[index] =
274+
fileIO.client("abfs://container@account.dfs.core.windows.net/path/" + index);
275+
});
276+
}
277+
278+
// Start all threads
279+
for (Thread thread : threads) {
280+
thread.start();
281+
}
282+
283+
// Wait for all threads to complete
284+
for (Thread thread : threads) {
285+
thread.join();
286+
}
287+
288+
// Verify supplier was only called once even with concurrent access (same container)
289+
assertThat(supplierInvocationCount.get()).isEqualTo(1);
290+
291+
// Verify all threads got the same client
292+
for (DataLakeFileSystemClient result : results) {
293+
assertThat(result).isSameAs(mockClient);
294+
}
295+
}
296+
}

0 commit comments

Comments
 (0)