Skip to content

Commit 0dc3382

Browse files
committed
Adding tests for Azure tags and metadata
1 parent 50f7b78 commit 0dc3382

3 files changed

Lines changed: 848 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.processors.azure.storage;
18+
19+
import com.azure.storage.blob.BlobClient;
20+
import com.azure.storage.blob.BlobContainerClient;
21+
import com.azure.storage.blob.BlobServiceClient;
22+
import com.azure.storage.blob.models.BlobErrorCode;
23+
import com.azure.storage.blob.models.BlobProperties;
24+
import com.azure.storage.blob.models.BlobStorageException;
25+
import org.apache.nifi.components.PropertyDescriptor;
26+
import org.apache.nifi.context.PropertyContext;
27+
import org.apache.nifi.flowfile.FlowFile;
28+
import org.apache.nifi.provenance.ProvenanceEventRecord;
29+
import org.apache.nifi.provenance.ProvenanceEventType;
30+
import org.apache.nifi.util.MockFlowFile;
31+
import org.apache.nifi.util.TestRunner;
32+
import org.apache.nifi.util.TestRunners;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
40+
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
41+
import static org.junit.jupiter.api.Assertions.assertEquals;
42+
import static org.junit.jupiter.api.Assertions.assertFalse;
43+
import static org.junit.jupiter.api.Assertions.assertNotNull;
44+
import static org.junit.jupiter.api.Assertions.assertNull;
45+
import static org.junit.jupiter.api.Assertions.assertTrue;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
48+
49+
public class TestGetAzureBlobStorageMetadata_v12 {
50+
51+
private static final String CONTAINER_NAME = "test-container";
52+
private static final String BLOB_NAME = "test-blob";
53+
54+
private TestRunner runner;
55+
private BlobServiceClient storageClient;
56+
private BlobClient blobClient;
57+
private BlobProperties blobProperties;
58+
59+
@BeforeEach
60+
void setUp() {
61+
storageClient = mock(BlobServiceClient.class);
62+
final BlobContainerClient containerClient = mock(BlobContainerClient.class);
63+
blobClient = mock(BlobClient.class);
64+
blobProperties = mock(BlobProperties.class);
65+
66+
when(storageClient.getBlobContainerClient(CONTAINER_NAME)).thenReturn(containerClient);
67+
when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient);
68+
when(blobClient.getProperties()).thenReturn(blobProperties);
69+
70+
final GetAzureBlobStorageMetadata_v12 processor = new GetAzureBlobStorageMetadata_v12() {
71+
@Override
72+
protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
73+
return storageClient;
74+
}
75+
76+
@Override
77+
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
78+
return super.getSupportedPropertyDescriptors().stream()
79+
.filter(pd -> !pd.equals(BLOB_STORAGE_CREDENTIALS_SERVICE))
80+
.toList();
81+
}
82+
};
83+
84+
runner = TestRunners.newTestRunner(processor);
85+
runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER, CONTAINER_NAME);
86+
runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME, BLOB_NAME);
87+
}
88+
89+
@Test
90+
void testSuccessfulMetadataRetrieval() {
91+
final Map<String, String> metadata = Map.of(
92+
"author", "jane-doe",
93+
"source-system", "erp",
94+
"processing-date", "2024-01-15"
95+
);
96+
when(blobProperties.getMetadata()).thenReturn(metadata);
97+
98+
runner.enqueue("");
99+
runner.run();
100+
101+
runner.assertAllFlowFilesTransferred(
102+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
103+
104+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
105+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst();
106+
107+
assertEquals("jane-doe", flowFile.getAttribute("azure.user.metadata.author"));
108+
assertEquals("erp", flowFile.getAttribute("azure.user.metadata.source-system"));
109+
assertEquals("2024-01-15", flowFile.getAttribute("azure.user.metadata.processing-date"));
110+
}
111+
112+
@Test
113+
void testEmptyMetadataRetrieval() {
114+
when(blobProperties.getMetadata()).thenReturn(Map.of());
115+
116+
runner.enqueue("");
117+
runner.run();
118+
119+
runner.assertAllFlowFilesTransferred(
120+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
121+
122+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
123+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst();
124+
125+
flowFile.getAttributes().forEach((key, value) ->
126+
assertFalse(key.startsWith("azure.user.metadata."),
127+
"No metadata attributes should exist when blob has none, found: " + key)
128+
);
129+
}
130+
131+
@Test
132+
void testBlobNotFound() {
133+
BlobStorageException exception = mockBlobStorageException(BlobErrorCode.BLOB_NOT_FOUND);
134+
when(blobClient.getProperties()).thenThrow(exception);
135+
136+
runner.enqueue("");
137+
runner.run();
138+
139+
runner.assertAllFlowFilesTransferred(
140+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_NOT_FOUND, 1);
141+
}
142+
143+
@Test
144+
void testOtherBlobStorageExceptionRoutesToFailure() {
145+
BlobStorageException exception = mockBlobStorageException(BlobErrorCode.AUTHORIZATION_FAILURE);
146+
when(blobClient.getProperties()).thenThrow(exception);
147+
148+
runner.enqueue("");
149+
runner.run();
150+
151+
runner.assertAllFlowFilesTransferred(
152+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE, 1);
153+
154+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
155+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE).getFirst();
156+
assertTrue(flowFile.isPenalized(), "FlowFile should be penalized on failure");
157+
}
158+
159+
@Test
160+
void testContainerAndBlobNameFromFlowFileAttributes() {
161+
runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER,
162+
"${azure.container}");
163+
runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME,
164+
"${azure.blobname}");
165+
166+
final String dynamicContainer = "other-container";
167+
final String dynamicBlob = "other-blob";
168+
169+
final BlobContainerClient otherContainerClient = mock(BlobContainerClient.class);
170+
final BlobClient otherBlobClient = mock(BlobClient.class);
171+
final BlobProperties otherBlobProperties = mock(BlobProperties.class);
172+
173+
when(storageClient.getBlobContainerClient(dynamicContainer))
174+
.thenReturn(otherContainerClient);
175+
when(otherContainerClient.getBlobClient(dynamicBlob))
176+
.thenReturn(otherBlobClient);
177+
when(otherBlobClient.getProperties()).thenReturn(otherBlobProperties);
178+
when(otherBlobProperties.getMetadata()).thenReturn(Map.of("origin", "external"));
179+
180+
runner.enqueue("", Map.of(
181+
"azure.container", dynamicContainer,
182+
"azure.blobname", dynamicBlob
183+
));
184+
runner.run();
185+
186+
runner.assertAllFlowFilesTransferred(
187+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
188+
189+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
190+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst();
191+
assertEquals("external", flowFile.getAttribute("azure.user.metadata.origin"));
192+
}
193+
194+
@Test
195+
void testProvenanceEventOnFound() {
196+
when(blobProperties.getMetadata()).thenReturn(Map.of("key", "value"));
197+
198+
runner.enqueue("");
199+
runner.run();
200+
201+
runner.assertAllFlowFilesTransferred(
202+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
203+
204+
final ProvenanceEventRecord modifyEvent = runner.getProvenanceEvents().stream()
205+
.filter(e -> e.getEventType() == ProvenanceEventType.ATTRIBUTES_MODIFIED)
206+
.findFirst()
207+
.orElse(null);
208+
assertNotNull(modifyEvent, "Should have an ATTRIBUTES_MODIFIED provenance event");
209+
}
210+
211+
@Test
212+
void testMetadataAttributePrefix() {
213+
when(blobProperties.getMetadata()).thenReturn(Map.of("customKey", "customValue"));
214+
215+
runner.enqueue("");
216+
runner.run();
217+
218+
runner.assertAllFlowFilesTransferred(
219+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
220+
221+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
222+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst();
223+
assertEquals("customValue", flowFile.getAttribute("azure.user.metadata.customKey"));
224+
assertNull(flowFile.getAttribute("customKey"),
225+
"Raw key should not appear without prefix");
226+
}
227+
228+
@Test
229+
void testMultipleFlowFiles() {
230+
when(blobProperties.getMetadata())
231+
.thenReturn(Map.of("seq", "1"))
232+
.thenReturn(Map.of("seq", "2"));
233+
234+
runner.enqueue("");
235+
runner.enqueue("");
236+
runner.run(2);
237+
238+
assertEquals(2, runner.getFlowFilesForRelationship(
239+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).size());
240+
}
241+
242+
@Test
243+
void testMetadataWithManyEntries() {
244+
final Map<String, String> largeMetadata = new HashMap<>();
245+
for (int i = 0; i < 20; i++) {
246+
largeMetadata.put("key" + i, "value" + i);
247+
}
248+
when(blobProperties.getMetadata()).thenReturn(largeMetadata);
249+
250+
runner.enqueue("");
251+
runner.run();
252+
253+
runner.assertAllFlowFilesTransferred(
254+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1);
255+
256+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(
257+
AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst();
258+
259+
for (int i = 0; i < 20; i++) {
260+
assertEquals("value" + i,
261+
flowFile.getAttribute("azure.user.metadata.key" + i));
262+
}
263+
}
264+
265+
private static BlobStorageException mockBlobStorageException(BlobErrorCode errorCode) {
266+
final BlobStorageException exception = mock(BlobStorageException.class);
267+
when(exception.getErrorCode()).thenReturn(errorCode);
268+
when(exception.getMessage()).thenReturn("Mocked: " + errorCode);
269+
return exception;
270+
}
271+
}

0 commit comments

Comments
 (0)