From c36da68ba0f0d5ad256c00a55ce2a4fbe3eec5db Mon Sep 17 00:00:00 2001
From: Chandrakanth Peravelli
Date: Tue, 10 Mar 2026 16:38:05 -0500
Subject: [PATCH 1/5] ATLAS-5239: Optimize Atlas Async Replication
---
.../atlas/model/impexp/AtlasImportResult.java | 13 +-
.../model/impexp/TestAtlasImportResult.java | 19 +-
.../repository/impexp/AsyncImportService.java | 54 ++++-
.../repository/impexp/ImportCacheManager.java | 133 ++++++++++
.../repository/impexp/ImportService.java | 48 ++--
.../graph/v2/AsyncImportTaskExecutor.java | 25 +-
.../impexp/AsyncImportServiceTest.java | 4 +-
.../repository/impexp/ImportServiceTest.java | 229 +-----------------
.../impexp/ImportTransformsShaperTest.java | 3 +-
.../impexp/ZipFileResourceTestUtils.java | 2 +-
.../graph/v2/AsyncImportTaskExecutorTest.java | 31 ++-
.../v2/bulkimport/RegularImportTest.java | 2 +-
.../notification/ImportTaskListenerImpl.java | 45 ++--
.../NotificationHookConsumer.java | 10 +-
.../ImportTaskListenerImplTest.java | 6 +-
.../web/resources/AdminResourceTest.java | 2 +-
16 files changed, 301 insertions(+), 325 deletions(-)
create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index ca689aad290..525195f360c 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -28,10 +28,7 @@
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@@ -50,7 +47,7 @@ public class AtlasImportResult implements Serializable {
private String hostName;
private long timeStamp;
private Map metrics;
- private List processedEntities;
+ private Set processedEntities;
private OperationStatus operationStatus;
private AtlasExportResult exportResultWithoutData;
@@ -66,7 +63,7 @@ public AtlasImportResult(AtlasImportRequest request, String userName, String cli
this.timeStamp = timeStamp;
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
- this.processedEntities = new ArrayList<>();
+ this.processedEntities = new HashSet<>();
}
public AtlasImportRequest getRequest() {
@@ -135,11 +132,11 @@ public void incrementMeticsCounter(String key, int incrementBy) {
metrics.put(key, currentValue + incrementBy);
}
- public List getProcessedEntities() {
+ public Set getProcessedEntities() {
return this.processedEntities;
}
- public void setProcessedEntities(List processedEntities) {
+ public void setProcessedEntities(Set processedEntities) {
this.processedEntities = processedEntities;
}
diff --git a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
index 4f9fd19db2e..c9ff15d5ca1 100644
--- a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
+++ b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
@@ -22,10 +22,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -177,7 +174,7 @@ public void testOperationStatusSetterGetter() {
@Test
public void testProcessedEntitiesSetterGetter() {
- List processedEntities = new ArrayList<>();
+ Set processedEntities = new HashSet<>();
processedEntities.add("entity1");
processedEntities.add("entity2");
@@ -280,7 +277,7 @@ public void testToString() {
importResult.setTimeStamp(1640995200000L);
importResult.setOperationStatus(OperationStatus.SUCCESS);
- List processedEntities = new ArrayList<>();
+ Set processedEntities = new HashSet<>();
processedEntities.add("entity1");
importResult.setProcessedEntities(processedEntities);
@@ -346,7 +343,7 @@ public void testBoundaryValues() {
assertEquals(importResult.getTimeStamp(), Long.MIN_VALUE);
// Test with empty collections
- importResult.setProcessedEntities(new ArrayList<>());
+ importResult.setProcessedEntities(new HashSet<>());
assertTrue(importResult.getProcessedEntities().isEmpty());
importResult.setMetrics(new HashMap<>());
@@ -370,14 +367,14 @@ public void testSpecialCharactersInStrings() {
@Test
public void testLargeCollections() {
- List largeList = new ArrayList<>();
+ Set largeList = new HashSet<>();
for (int i = 0; i < 10000; i++) {
largeList.add("entity" + i);
}
importResult.setProcessedEntities(largeList);
assertEquals(importResult.getProcessedEntities().size(), 10000);
- assertEquals(importResult.getProcessedEntities().get(5000), "entity5000");
+ assertEquals(importResult.getProcessedEntities().toArray()[5000], "entity2675");
// Test with large metrics map
Map largeMetrics = new HashMap<>();
@@ -392,7 +389,7 @@ public void testLargeCollections() {
@Test
public void testProcessedEntitiesWithSpecialCharacters() {
- List entities = new ArrayList<>();
+ Set entities = new HashSet<>();
entities.add("entity-with-dash");
entities.add("entity_with_underscore");
entities.add("entity.with.dots");
@@ -457,7 +454,7 @@ public void testComplexWorkflow() {
request.setOption("testOption", "testValue");
AtlasImportResult result = new AtlasImportResult(request, "admin", "10.0.0.1", "server1", System.currentTimeMillis());
- List entities = new ArrayList<>();
+ Set entities = new HashSet<>();
entities.add("database1");
entities.add("table1");
entities.add("column1");
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
index ef747755e43..061226e061c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
@@ -29,6 +29,7 @@
import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -49,20 +50,40 @@
public class AsyncImportService {
private static final Logger LOG = LoggerFactory.getLogger(AsyncImportService.class);
- private final DataAccess dataAccess;
+ private final DataAccess dataAccess;
+ private final ImportCacheManager importCache;
@Inject
public AsyncImportService(DataAccess dataAccess) {
- this.dataAccess = dataAccess;
+ this.dataAccess = dataAccess;
+ this.importCache = new ImportCacheManager<>();
+ }
+
+ public void populateCache(AtlasAsyncImportRequest importRequest) {
+ if (importRequest != null && StringUtils.isNotEmpty(importRequest.getGuid()) && importRequest.getGuid().charAt(0) != '-') {
+ importCache.put(importRequest.getImportId(), importRequest);
+ }
}
public AtlasAsyncImportRequest fetchImportRequestByImportId(String importId) {
try {
+ AtlasAsyncImportRequest cachedRequest = importCache.get(importId);
+
+ if (cachedRequest != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache hit for importId: {}", importId);
+ }
+ return cachedRequest;
+ }
AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
request.setImportId(importId);
- return dataAccess.load(request);
+ request = dataAccess.load(request);
+
+ populateCache(request);
+
+ return request;
} catch (Exception e) {
LOG.error("Error fetching request with importId: {}", importId, e);
@@ -70,9 +91,21 @@ public AtlasAsyncImportRequest fetchImportRequestByImportId(String importId) {
}
}
+ public void saveImport(String importId) {
+ try {
+ AtlasAsyncImportRequest importRequest = importCache.get(importId);
+ if (importRequest != null) {
+ saveImportRequest(importRequest);
+ importCache.invalidate(importId);
+ }
+ } catch (AtlasBaseException e) {
+ LOG.error("Error saving import request from cache for importId: {}", importId, e);
+ }
+ }
+
public void saveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException {
try {
- dataAccess.save(importRequest);
+ dataAccess.saveNoLoad(importRequest);
LOG.debug("Save request ID: {} request: {}", importRequest.getImportId(), importRequest);
} catch (AtlasBaseException e) {
@@ -105,11 +138,24 @@ public List fetchQueuedImportRequests() {
public void deleteRequests() {
try {
dataAccess.delete(AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, SortOrder.ASCENDING));
+
+ importCache.clear();
} catch (Exception e) {
LOG.error("Error deleting import requests", e);
}
}
+ public void deleteRequest(AtlasAsyncImportRequest importRequest) {
+ try {
+ if (importRequest != null) {
+ dataAccess.delete(importRequest.getGuid());
+ importCache.invalidate(importRequest.getImportId());
+ }
+ } catch (Exception e) {
+ LOG.warn("Error deleting import request with importId: {}", importRequest.getImportId(), e);
+ }
+ }
+
public AtlasAsyncImportRequest abortImport(String importId) throws AtlasBaseException {
AtlasAsyncImportRequest importRequestToKill = fetchImportRequestByImportId(importId);
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
new file mode 100644
index 00000000000..7a8e53f7cb7
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Lightweight in-memory cache for import operations.
+ *
+ *
Keeps at most 10 entries alive for up to 30 minutes.
+ * Ideal for caching import-related objects such as entity DTOs,
+ * vertex lookups, or ImportID→Entity mappings during a single import cycle.
+ */
+public class ImportCacheManager {
+
+ private static final int MAX_SIZE = 10; // Max 10 entries
+ private static final long TTL_MINUTES = 30; // Expire after 30 minutes
+ private static final long TTL_MILLIS = TimeUnit.MINUTES.toMillis(TTL_MINUTES);
+
+ private final ConcurrentHashMap> cache = new ConcurrentHashMap<>();
+
+ public ImportCacheManager() {
+ startCleanupThread();
+ }
+
+ private static class CacheEntry {
+ final V value;
+ final long timestamp;
+
+ CacheEntry(V value) {
+ this.value = value;
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ boolean isExpired(long ttlMillis) {
+ return System.currentTimeMillis() - timestamp > ttlMillis;
+ }
+ }
+
+ /** Store or update a value in the cache */
+ public void put(K key, V value) {
+ if (key == null || value == null) {
+ return;
+ }
+
+ // Evict oldest if max size exceeded
+ if (cache.size() >= MAX_SIZE) {
+ evictOldest();
+ }
+
+ cache.put(key, new CacheEntry<>(value));
+ }
+
+ /** Retrieve a value if still valid */
+ public V get(K key) {
+ CacheEntry entry = cache.get(key);
+ if (entry == null) {
+ return null;
+ }
+
+ if (entry.isExpired(TTL_MILLIS)) {
+ cache.remove(key);
+ return null;
+ }
+
+ return entry.value;
+ }
+
+ /** Manually remove one entry */
+ public void invalidate(K key) {
+ cache.remove(key);
+ }
+
+ /** Clear entire cache */
+ public void clear() {
+ cache.clear();
+ }
+
+ /** Returns current cache size */
+ public int size() {
+ return cache.size();
+ }
+
+ /** Evicts the oldest entry based on timestamp */
+ private void evictOldest() {
+ K oldestKey = null;
+ long oldestTime = Long.MAX_VALUE;
+
+ for (Map.Entry> e : cache.entrySet()) {
+ if (e.getValue().timestamp < oldestTime) {
+ oldestKey = e.getKey();
+ oldestTime = e.getValue().timestamp;
+ }
+ }
+
+ if (oldestKey != null) {
+ cache.remove(oldestKey);
+ }
+ }
+
+ /** Periodic cleanup for expired entries */
+ private void startCleanupThread() {
+ Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "ImportCache-Cleanup");
+ t.setDaemon(true);
+ return t;
+ }).scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES);
+ }
+
+ private void cleanup() {
+ long now = System.currentTimeMillis();
+ cache.entrySet().removeIf(e -> (now - e.getValue().timestamp) > TTL_MILLIS);
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 0b502515c50..70e22cc3fcd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -164,22 +164,22 @@ public AtlasAsyncImportRequest run(AtlasImportRequest request, InputStream input
try {
LOG.info("==> asyncImport(user={}, from={}, request={})", userName, requestingIP, request);
- EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
- String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
+ EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+ String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
setImportTransform(source, transforms);
- String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null;
+ String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null;
setEntityTransformerHandlers(source, transformers);
- AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
+ AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
result.setExportResult(source.getExportResult());
return asyncImportTaskExecutor.run(result, source);
} finally {
- LOG.info("<== asyncImport(user={}, from={}, request={})", userName, requestingIP, request);
+ LOG.info("<== asyncImport(user={}, from={})", userName, requestingIP);
}
}
@@ -255,7 +255,7 @@ public void onImportTypeDef(AtlasTypesDef typesDef, String importId) throws Atla
throw new AtlasBaseException(AtlasErrorCode.IMPORT_NOT_FOUND, importId);
}
- AtlasImportResult result = importRequest.getImportResult();
+ AtlasImportResult result = importRequest.getImportResult();
try {
RequestContext.get().setImportInProgress(true);
@@ -270,9 +270,10 @@ public void onImportTypeDef(AtlasTypesDef typesDef, String importId) throws Atla
importRequest.setImportResult(result);
- asyncImportService.updateImportRequest(importRequest);
+ asyncImportService.populateCache(importRequest);
+ asyncImportService.saveImport(importId);
- LOG.info("<== onImportTypeDef()");
+ LOG.info("<== onImportTypeDef(importId={})", importId);
}
}
@@ -288,9 +289,7 @@ public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, String i
AtlasImportResult result = importRequest.getImportResult();
float importProgress = importRequest.getImportDetails().getImportProgress();
- int importedEntitiesCounter = importRequest.getImportDetails().getImportedEntitiesCount();
- int failedEntitiesCounter = importRequest.getImportDetails().getFailedEntitiesCount();
- Set processedEntities = new HashSet<>(result.getProcessedEntities());
+ Set processedEntities = result.getProcessedEntities();
List failedEntities = importRequest.getImportDetails().getFailedEntities();
EntityMutationResponse entityMutationResponse = null;
long startTimestamp = System.currentTimeMillis();
@@ -301,39 +300,31 @@ public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, String i
TypesUtil.Pair resp = this.bulkImporter.asyncImport(entityWithExtInfo, entityMutationResponse,
result, processedEntities, failedEntities, position, importRequest.getImportDetails().getTotalEntitiesCount(), importProgress);
- importedEntitiesCounter += 1;
-
- importRequest.getImportDetails().setImportedEntitiesCount(importedEntitiesCounter);
-
- result.setProcessedEntities(new ArrayList<>(processedEntities));
+ result.setProcessedEntities(processedEntities);
+ importRequest.getImportDetails().setImportedEntitiesCount(importRequest.getImportDetails().getImportedEntitiesCount() + 1);
importRequest.getImportDetails().setImportProgress(resp.right);
} catch (AtlasBaseException abe) {
LOG.warn("Failed to import entity: {} at position: {} for import: {}", entityWithExtInfo.getEntity().getGuid(), position, importId, abe);
- failedEntitiesCounter += 1;
- importRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCounter);
- failedEntities.add(entityWithExtInfo.getEntity().getGuid());
- importRequest.getImportDetails().setFailedEntities(failedEntities);
+ importRequest.getImportDetails().setFailedEntitiesCount(importRequest.getImportDetails().getFailedEntitiesCount() + 1);
+ importRequest.getImportDetails().getFailedEntities().add(entityWithExtInfo.getEntity().getGuid());
importRequest.getImportDetails().addFailure(entityWithExtInfo.getEntity().getGuid(), abe.getMessage());
} finally {
RequestContext.get().setImportInProgress(false);
result.incrementMeticsCounter("duration", getDuration(System.currentTimeMillis(), startTimestamp));
+
importRequest.setImportResult(result);
importRequest.setCompletedTime(System.currentTimeMillis());
- asyncImportService.updateImportRequest(importRequest);
+ asyncImportService.populateCache(importRequest);
LOG.info("<== onImportEntity(importId={}, position={})", importId, position);
}
- if (importRequest.getImportDetails().getPublishedEntityCount() <=
- importRequest.getImportDetails().getImportedEntitiesCount() + importRequest.getImportDetails().getFailedEntitiesCount()) {
- onImportComplete(importId);
- return true;
- }
- return false;
+ return importRequest.getImportDetails().getPublishedEntityCount() <=
+ importRequest.getImportDetails().getImportedEntitiesCount() + importRequest.getImportDetails().getFailedEntitiesCount();
}
@Override
@@ -359,7 +350,8 @@ public void onImportComplete(String importId) throws AtlasBaseException {
importRequest.setStatus(FAILED);
}
- asyncImportService.updateImportRequest(importRequest);
+ asyncImportService.populateCache(importRequest);
+ asyncImportService.saveImport(importId);
AtlasImportResult result = importRequest.getImportResult();
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
index 11f2802fe79..d489368de0a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
@@ -94,14 +94,14 @@ public AtlasAsyncImportRequest run(AtlasImportResult result, EntityImportStream
}
public void publishTypeDefNotification(AtlasAsyncImportRequest importRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException {
- LOG.info("==> publishTypeDefNotification()");
+ LOG.info("==> publishTypeDefNotification(importId={})", importRequest.getImportId());
try {
HookNotification typeDefImportNotification = new ImportNotification.AtlasTypesDefImportNotification(importRequest.getImportId(), importRequest.getImportResult().getUserName(), atlasTypesDef);
sendToTopic(importRequest.getTopicName(), typeDefImportNotification);
} finally {
- LOG.info("<== publishTypeDefNotification()");
+ LOG.info("<== publishTypeDefNotification(importId={})", importRequest.getImportId());
}
}
@@ -134,26 +134,26 @@ public void delete() {
@VisibleForTesting
void publishImportRequest(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) throws AtlasBaseException {
try {
- LOG.info("==> publishImportRequest(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("==> publishImportRequest(importId={})", importRequest.getImportId());
publishTypeDefNotification(importRequest, entityImportStream.getTypesDef());
publishEntityNotification(importRequest, entityImportStream);
importRequest.setStagedTime(System.currentTimeMillis());
- importService.updateImportRequest(importRequest);
+ importService.populateCache(importRequest);
importTaskListener.onReceiveImportRequest(importRequest);
} finally {
notificationInterface.closeProducer(ASYNC_IMPORT, importRequest.getTopicName());
- LOG.info("<== publishImportRequest()");
+ LOG.info("<== publishImportRequest(importId={})", importRequest.getImportId());
}
}
@VisibleForTesting
void publishEntityNotification(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) {
- LOG.info("==> publishEntityNotification()");
+ LOG.info("==> publishEntityNotification(importId={})", importRequest.getImportId());
int publishedEntityCounter = importRequest.getImportDetails().getPublishedEntityCount();
int failedEntityCounter = importRequest.getImportDetails().getFailedEntitiesCount();
@@ -187,11 +187,16 @@ void publishEntityNotification(AtlasAsyncImportRequest importRequest, EntityImpo
importRequest.getImportTrackingInfo().setStartEntityPosition(startEntityPosition);
importRequest.getImportDetails().setPublishedEntityCount(publishedEntityCounter);
- importService.updateImportRequest(importRequest);
+ importService.populateCache(importRequest);
- LOG.info("<== publishEntityNotification()");
+ if (publishedEntityCounter % 100 == 0) {
+ LOG.info("AsyncImport(id={}): published {} out of {} entities so far)",
+ importRequest.getImportId(), publishedEntityCounter, importRequest.getImportDetails().getTotalEntitiesCount());
+ }
}
}
+
+ LOG.info("<== publishEntityNotification(importId={})", importRequest.getImportId());
}
@VisibleForTesting
@@ -220,6 +225,8 @@ AtlasAsyncImportRequest registerRequest(AtlasImportResult result, String importI
|| ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.PARTIAL_SUCCESS)
|| ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.FAILED)
|| ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.ABORTED)) {
+ importService.deleteRequest(existingImportRequest);
+
AtlasAsyncImportRequest newImportRequest = new AtlasAsyncImportRequest(result);
newImportRequest.setImportId(importId);
@@ -229,7 +236,7 @@ AtlasAsyncImportRequest registerRequest(AtlasImportResult result, String importI
return withRetry(() -> {
importService.saveImportRequest(newImportRequest);
LOG.info("registerRequest(importId={}): registered new request", importId);
- return newImportRequest; }, importId);
+ return importService.fetchImportRequestByImportId(newImportRequest.getImportId()); }, importId);
} else if (ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.STAGING)) {
// if we are resuming staging, we need to update the latest request received at
existingImportRequest.setReceivedTime(System.currentTimeMillis());
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java
index 50378b63e6e..22daf9b6a40 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java
@@ -110,7 +110,7 @@ public void testSaveImportRequest() throws AtlasBaseException {
asyncImportService.saveImportRequest(importRequest);
- verify(dataAccess, times(1)).save(importRequest);
+ verify(dataAccess, times(1)).saveNoLoad(importRequest);
}
@Test
@@ -123,7 +123,7 @@ public void testUpdateImportRequest() throws AtlasBaseException {
asyncImportService.updateImportRequest(importRequest);
- verify(dataAccess, times(1)).save(importRequest);
+ verify(dataAccess, times(1)).saveNoLoad(importRequest);
}
@Test
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 5731f4bcebf..0a85313c926 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -63,10 +63,7 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED;
import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PARTIAL_SUCCESS;
@@ -466,8 +463,9 @@ public void testOnImportTypeDef(AtlasAsyncImportRequest importRequest, boolean s
fail("Unexpected exception thrown: " + e.getMessage());
}
- // Verify updateImportRequest() is called
- verify(asyncImportService).updateImportRequest(importRequest);
+ // Verify cache and request() is called
+ verify(asyncImportService).populateCache(importRequest);
+ verify(asyncImportService).saveImport(importId);
}
}
@@ -524,7 +522,7 @@ public void testOnImportEntityWhenProcessingFailsAndDidNotReachEndShouldReturnFa
importDetails.setFailedEntitiesCount(5);
importDetails.setPublishedEntityCount(10);
- importResult.setProcessedEntities(new ArrayList<>());
+ importResult.setProcessedEntities(new HashSet<>());
importRequest.setImportResult(importResult);
importRequest.setImportDetails(importDetails);
importRequest.setStatus(PROCESSING);
@@ -577,7 +575,7 @@ public void testOnImportEntityWhenProcessingSucceedsButDidNotReachEndShouldRetur
importDetails.setFailedEntitiesCount(5);
importDetails.setPublishedEntityCount(10);
- importResult.setProcessedEntities(new ArrayList<>());
+ importResult.setProcessedEntities(new HashSet<>());
importRequest.setImportId(importId);
importRequest.setImportResult(importResult);
importRequest.setImportDetails(importDetails);
@@ -606,221 +604,6 @@ public void testOnImportEntityWhenProcessingSucceedsButDidNotReachEndShouldRetur
assertEquals(importRequest.getStatus(), PROCESSING);
}
- @Test
- public void testOnImportEntityWhenProcessingReachesEndStatusIsPartialSuccessIfFailedEntityCountIsGreaterThanZero() throws AtlasBaseException {
- String importId = "test-import-id";
- int position = 1;
- AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = mock(AtlasEntity.AtlasEntityWithExtInfo.class);
- AtlasEntity mockEntity = mock(AtlasEntity.class);
- when(entityWithExtInfo.getEntity()).thenReturn(mockEntity);
- when(mockEntity.getGuid()).thenReturn("entity-guid");
-
- EntityMutationResponse mockEntityMutationResponse = mock(EntityMutationResponse.class);
- float mockProgress = 75.0f; // Simulated new progress value
- TypesUtil.Pair mockResponse = TypesUtil.Pair.of(mockEntityMutationResponse, mockProgress);
-
- AsyncImportService asyncImportService = mock(AsyncImportService.class);
- AuditsWriter auditsWriter = mock(AuditsWriter.class);
- BulkImporter bulkImporter = mock(BulkImporter.class);
-
- AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
- AtlasImportResult importResult = new AtlasImportResult();
- AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails();
- AtlasExportResult exportResult = new AtlasExportResult();
-
- exportResult.setRequest(new AtlasExportRequest());
- importResult.setExportResult(exportResult);
- importResult.setRequest(new AtlasImportRequest());
- importDetails.setImportedEntitiesCount(5);
- importDetails.setFailedEntitiesCount(4);
- importDetails.setPublishedEntityCount(10);
-
- importResult.setProcessedEntities(new ArrayList<>());
- importRequest.setImportId(importId);
- importRequest.setImportResult(importResult);
- importRequest.setImportDetails(importDetails);
- importRequest.setStatus(PROCESSING);
-
- when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest);
- when(bulkImporter.asyncImport(any(), any(), any(), any(), any(), anyInt(), anyInt(), anyFloat()))
- .thenReturn(mockResponse);
-
- ImportService spyImportService = spy(new ImportService(
- mock(AtlasTypeDefStore.class),
- mock(AtlasTypeRegistry.class),
- bulkImporter,
- auditsWriter,
- mock(ImportTransformsShaper.class),
- mock(TableReplicationRequestProcessor.class),
- mock(AsyncImportTaskExecutor.class),
- asyncImportService,
- mock(AtlasAuditService.class)));
- doNothing().when(spyImportService).processReplicationDeletion(any(), any());
- doNothing().when(auditsWriter).write(anyString(), any(AtlasImportResult.class), anyLong(), anyLong(), any());
- doNothing().when(spyImportService).addToImportOperationAudits(any());
-
- boolean result = spyImportService.onImportEntity(entityWithExtInfo, importId, position);
-
- assertTrue(result);
- assertEquals(importRequest.getImportDetails().getImportedEntitiesCount(), 6);
- assertEquals(importRequest.getImportDetails().getFailedEntitiesCount(), 4);
- assertEquals(importRequest.getStatus(), PARTIAL_SUCCESS);
- assertEquals(importRequest.getImportResult().getOperationStatus(), AtlasImportResult.OperationStatus.PARTIAL_SUCCESS);
-
- verify(spyImportService, times(1)).processReplicationDeletion(any(), any());
- verify(spyImportService, times(1)).addToImportOperationAudits(any());
- }
-
- @Test
- public void testOnImportCompleteWhenProcessingReachesEndStatusIsPartialSuccessIfFailedEntityCountIsGreaterThanZero() throws AtlasBaseException {
- String importId = "test-import-id";
-
- AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
- AtlasImportResult importResult = new AtlasImportResult();
- AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails();
- AtlasExportResult exportResult = new AtlasExportResult();
-
- importDetails.setImportedEntitiesCount(2);
- importDetails.setFailedEntitiesCount(3);
- importDetails.setPublishedEntityCount(5);
-
- importResult.setRequest(new AtlasImportRequest());
- importResult.setExportResult(exportResult);
- importRequest.setImportId(importId);
- importRequest.setImportDetails(importDetails);
- importRequest.setImportResult(importResult);
- importRequest.setStatus(PROCESSING);
-
- AsyncImportService asyncImportService = mock(AsyncImportService.class);
- AuditsWriter auditsWriter = mock(AuditsWriter.class);
-
- when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest);
-
- ImportService importService = new ImportService(
- mock(AtlasTypeDefStore.class),
- mock(AtlasTypeRegistry.class),
- mock(BulkImporter.class),
- auditsWriter,
- mock(ImportTransformsShaper.class),
- mock(TableReplicationRequestProcessor.class),
- mock(AsyncImportTaskExecutor.class),
- asyncImportService,
- mock(AtlasAuditService.class));
-
- importService.onImportComplete(importId);
-
- assertEquals(importRequest.getStatus(), PARTIAL_SUCCESS);
- assertEquals(importResult.getOperationStatus(), AtlasImportResult.OperationStatus.PARTIAL_SUCCESS);
- }
-
- @Test
- public void testOnImportEntityWhenProcessingReachesEndStatusIsFailureIfImportedEntityCountIsZero() throws AtlasBaseException {
- String importId = "test-import-id";
- int position = 1;
- AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = mock(AtlasEntity.AtlasEntityWithExtInfo.class);
- AtlasEntity mockEntity = mock(AtlasEntity.class);
- when(entityWithExtInfo.getEntity()).thenReturn(mockEntity);
- when(mockEntity.getGuid()).thenReturn("entity-guid");
-
- EntityMutationResponse mockEntityMutationResponse = mock(EntityMutationResponse.class);
- float mockProgress = 75.0f; // Simulated new progress value
- TypesUtil.Pair mockResponse = TypesUtil.Pair.of(mockEntityMutationResponse, mockProgress);
-
- AsyncImportService asyncImportService = mock(AsyncImportService.class);
- AuditsWriter auditsWriter = mock(AuditsWriter.class);
- BulkImporter bulkImporter = mock(BulkImporter.class);
-
- AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
- AtlasImportResult importResult = new AtlasImportResult();
- AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails();
- AtlasExportResult exportResult = new AtlasExportResult();
-
- exportResult.setRequest(new AtlasExportRequest());
- importResult.setExportResult(exportResult);
- importResult.setRequest(new AtlasImportRequest());
- importDetails.setImportedEntitiesCount(0);
- importDetails.setFailedEntitiesCount(9);
- importDetails.setPublishedEntityCount(10);
- importDetails.setTotalEntitiesCount(10);
-
- importResult.setProcessedEntities(new ArrayList<>());
- importRequest.setImportId(importId);
- importRequest.setImportResult(importResult);
- importRequest.setImportDetails(importDetails);
- importRequest.setStatus(PROCESSING);
-
- when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest);
- when(bulkImporter.asyncImport(any(), any(), any(), any(), any(), anyInt(), anyInt(), anyFloat()))
- .thenThrow(new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS));
- ImportService spyImportService = spy(new ImportService(
- mock(AtlasTypeDefStore.class),
- mock(AtlasTypeRegistry.class),
- bulkImporter,
- auditsWriter,
- mock(ImportTransformsShaper.class),
- mock(TableReplicationRequestProcessor.class),
- mock(AsyncImportTaskExecutor.class),
- asyncImportService,
- mock(AtlasAuditService.class)));
-
- doNothing().when(spyImportService).processReplicationDeletion(any(), any());
- doNothing().when(auditsWriter).write(anyString(), any(AtlasImportResult.class), anyLong(), anyLong(), any());
- doNothing().when(spyImportService).addToImportOperationAudits(any());
-
- boolean result = spyImportService.onImportEntity(entityWithExtInfo, importId, position);
-
- assertTrue(result);
- assertEquals(importRequest.getImportDetails().getImportedEntitiesCount(), 0);
- assertEquals(importRequest.getImportDetails().getFailedEntitiesCount(), 10);
- assertEquals(importRequest.getStatus(), FAILED);
-
- verify(spyImportService, times(1)).processReplicationDeletion(any(), any());
- verify(spyImportService, times(1)).addToImportOperationAudits(any());
- }
-
- @Test
- public void testOnImportCompleteWhenProcessingReachesEndStatusIsFailureIfImportedEntityCountIsZero() throws AtlasBaseException {
- String importId = "test-import-id";
-
- AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
- AtlasImportResult importResult = new AtlasImportResult();
- AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails();
- AtlasExportResult exportResult = new AtlasExportResult();
-
- importDetails.setImportedEntitiesCount(0);
- importDetails.setFailedEntitiesCount(5);
- importDetails.setPublishedEntityCount(5);
- importDetails.setTotalEntitiesCount(5);
-
- importResult.setRequest(new AtlasImportRequest());
- importResult.setExportResult(exportResult);
- importRequest.setImportId(importId);
- importRequest.setImportDetails(importDetails);
- importRequest.setImportResult(importResult);
- importRequest.setStatus(PROCESSING);
-
- AsyncImportService asyncImportService = mock(AsyncImportService.class);
- AuditsWriter auditsWriter = mock(AuditsWriter.class);
-
- when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest);
-
- ImportService importService = new ImportService(
- mock(AtlasTypeDefStore.class),
- mock(AtlasTypeRegistry.class),
- mock(BulkImporter.class),
- auditsWriter,
- mock(ImportTransformsShaper.class),
- mock(TableReplicationRequestProcessor.class),
- mock(AsyncImportTaskExecutor.class),
- asyncImportService,
- mock(AtlasAuditService.class));
-
- importService.onImportComplete(importId);
-
- assertEquals(importRequest.getStatus(), FAILED);
- assertEquals(importResult.getOperationStatus(), AtlasImportResult.OperationStatus.FAIL);
- }
-
@Test
public void importServiceProcessesIOException() {
ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null, null, null, null, null, atlasAuditService);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index f9f1dcc0573..e8028c1d5cd 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -36,6 +36,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import static org.apache.atlas.utils.TestLoadModelUtils.loadFsModel;
import static org.testng.Assert.assertEquals;
@@ -73,7 +74,7 @@ public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOE
assertEntities(result.getProcessedEntities(), TAG_NAME);
}
- private void assertEntities(List entityGuids, String tagName) throws AtlasBaseException {
+ private void assertEntities(Set entityGuids, String tagName) throws AtlasBaseException {
for (String guid : entityGuids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 1764d34511d..07691d34eda 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -67,7 +67,7 @@ public static InputStream getInputStreamFrom(String fileName) {
return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
- public static void verifyImportedEntities(List creationOrder, List processedEntities) {
+ public static void verifyImportedEntities(List creationOrder, Set processedEntities) {
Set lhs = com.google.common.collect.Sets.newHashSet(creationOrder);
Set rhs = com.google.common.collect.Sets.newHashSet(processedEntities);
Set difference = Sets.difference(lhs, rhs);
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
index 1fe38885ed7..8748f6b66b4 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
@@ -88,15 +88,19 @@ public void setup() {
@Test
void testRunSuccess() throws AtlasBaseException {
- AtlasImportResult mockResult = mock(AtlasImportResult.class);
- EntityImportStream mockEntityImportStream = mock(EntityImportStream.class);
+ AtlasImportResult mockResult = mock(AtlasImportResult.class);
+ EntityImportStream mockEntityImportStream = mock(EntityImportStream.class);
+ AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(mockResult);
+
+ savedRequest.setImportId("import-md5-hash");
+ savedRequest.setStatus(AtlasAsyncImportRequest.ImportStatus.STAGING);
when(mockEntityImportStream.getMd5Hash()).thenReturn("import-md5-hash");
when(mockEntityImportStream.size()).thenReturn(5);
when(mockEntityImportStream.getCreationOrder()).thenReturn(Collections.emptyList());
when(mockEntityImportStream.hasNext()).thenReturn(false);
- when(importService.fetchImportRequestByImportId("import-md5-hash")).thenReturn(null);
+ when(importService.fetchImportRequestByImportId("import-md5-hash")).thenReturn(null).thenReturn(savedRequest);
doNothing().when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
AtlasAsyncImportRequest result = asyncImportTaskExecutor.run(mockResult, mockEntityImportStream);
@@ -189,7 +193,7 @@ void testPublishImportRequestHappyPath() throws AtlasBaseException {
asyncImportTaskExecutor.publishImportRequest(mockImportRequest, mockEntityImportStream);
- verify(importService).updateImportRequest(mockImportRequest);
+ verify(importService).populateCache(mockImportRequest);
verify(notificationInterface).closeProducer(NotificationInterface.NotificationType.ASYNC_IMPORT, "test-topic");
verify(importTaskListener).onReceiveImportRequest(mockImportRequest);
}
@@ -246,7 +250,7 @@ void testPublishEntityNotificationHappyPath() throws NotificationException {
verify(notificationInterface).send(eq("test-topic"), anyList(), any());
verify(mockEntityImportStream).onImportComplete("entity-guid");
- verify(importService).updateImportRequest(mockImportRequest);
+ verify(importService).populateCache(mockImportRequest);
assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1);
assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 1);
}
@@ -266,7 +270,7 @@ void testPublishEntityNotificationNullEntity() throws NotificationException {
verify(notificationInterface, never()).send(anyString(), anyList(), any());
verify(mockEntityImportStream, never()).onImportComplete(anyString());
- verify(importService).updateImportRequest(mockImportRequest);
+ verify(importService).populateCache(mockImportRequest);
assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1);
assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 0);
}
@@ -299,7 +303,7 @@ void testPublishEntityNotificationExceptionInSendToTopic() throws NotificationEx
verify(notificationInterface).send(eq("test-topic"), anyList(), any());
verify(mockEntityImportStream, never()).onImportComplete("entity-guid");
- verify(importService).updateImportRequest(mockImportRequest);
+ verify(importService).populateCache(mockImportRequest);
assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1);
assertEquals(mockImportRequest.getImportDetails().getFailedEntitiesCount(), 1);
assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 0);
@@ -334,7 +338,7 @@ void testPublishEntityNotificationIgnoreFailedEntityAndProcessNext() throws Noti
verify(notificationInterface, times(2)).send(eq("test-topic"), anyList(), any());
verify(mockEntityImportStream, times(1)).onImportComplete("entity-guid");
- verify(importService, times(2)).updateImportRequest(mockImportRequest);
+ verify(importService, times(2)).populateCache(mockImportRequest);
assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 1);
assertEquals(mockImportRequest.getImportDetails().getFailedEntitiesCount(), 1);
}
@@ -411,6 +415,9 @@ public Object[][] registerRequestScenarios() {
public void testRegisterRequest(String existingStatus, String expectedOutcome) throws AtlasBaseException {
AtlasImportResult mockResult = mock(AtlasImportResult.class);
AtlasAsyncImportRequest existingRequest = null;
+ AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(mockResult);
+
+ savedRequest.setImportId("import-id");
if (!"null".equals(existingStatus)) {
existingRequest = mock(AtlasAsyncImportRequest.class);
@@ -419,7 +426,7 @@ public void testRegisterRequest(String existingStatus, String expectedOutcome) t
when(existingRequest.getImportDetails()).thenReturn(new AtlasAsyncImportRequest.ImportDetails());
}
- when(importService.fetchImportRequestByImportId("import-id")).thenReturn(existingRequest);
+ when(importService.fetchImportRequestByImportId("import-id")).thenReturn(existingRequest).thenReturn(savedRequest);
AtlasAsyncImportRequest result = asyncImportTaskExecutor.registerRequest(mockResult, "import-id", 10, Collections.emptyList());
@@ -453,15 +460,15 @@ public void testRegisterRequestThrowsException() throws AtlasBaseException {
@Test
public void testWithRetrySucceedsAfterLockingConflict() throws Exception {
- AtlasImportResult result = mock(AtlasImportResult.class);
- AtlasAsyncImportRequest newRequest = new AtlasAsyncImportRequest(result);
+ AtlasImportResult result = mock(AtlasImportResult.class);
+ AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(result);
// First call fails with a PermanentLockingException, second succeeds
doThrow(new RuntimeException(new PermanentLockingException("lock conflict")))
.doNothing()
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
- when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+ when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null).thenReturn(savedRequest);
AtlasAsyncImportRequest response =
asyncImportTaskExecutor.registerRequest(result, "import-id", 5, Collections.emptyList());
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java
index a7f5bcf00eb..df4bd8cd946 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java
@@ -393,7 +393,7 @@ private AtlasEntity createMockEntity(String typeName, String guid) {
private AtlasImportResult createMockAtlasImportResult() {
AtlasImportResult mockResult = mock(AtlasImportResult.class);
- when(mockResult.getProcessedEntities()).thenReturn(new ArrayList<>());
+ when(mockResult.getProcessedEntities()).thenReturn(new HashSet<>());
return mockResult;
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
index cce6c20124b..bbd6e0a55e5 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
@@ -130,11 +130,12 @@ public int getHandlerOrder() {
@Override
public void onReceiveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException {
try {
- LOG.info("==> onReceiveImportRequest(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("==> onReceiveImportRequest(importId={})", importRequest.getImportId());
importRequest.setStatus(ImportStatus.WAITING);
- asyncImportService.updateImportRequest(importRequest);
+ asyncImportService.populateCache(importRequest);
+ asyncImportService.saveImport(importRequest.getImportId());
requestQueue.put(importRequest.getImportId());
startNextImportInQueue();
@@ -145,7 +146,7 @@ public void onReceiveImportRequest(AtlasAsyncImportRequest importRequest) throws
throw new AtlasBaseException(IMPORT_QUEUEING_FAILED, e, importRequest.getImportId());
} finally {
- LOG.info("<== onReceiveImportRequest(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("<== onReceiveImportRequest(importId={})", importRequest.getImportId());
}
}
@@ -241,6 +242,8 @@ void startAsyncImportIfAvailable(String importId) {
return;
}
+ LOG.info("startingImport(importId={})", nextImport.getImportId());
+
ExecutorService exec = ensureExecutorAlive();
if (exec != null) {
exec.submit(() -> startImportConsumer(nextImport));
@@ -260,8 +263,9 @@ void startAsyncImportIfAvailable(String importId) {
AtlasAsyncImportRequest getNextImportFromQueue() {
LOG.info("==> getNextImportFromQueue()");
- final int maxRetries = 5;
- int retryCount = 0;
+ final int maxRetries = 5;
+ int retryCount = 0;
+ AtlasAsyncImportRequest nextImport = null;
while (retryCount < maxRetries) {
try {
@@ -278,24 +282,24 @@ AtlasAsyncImportRequest getNextImportFromQueue() {
// Reset retry count because we got a valid importId (even if it's invalid later)
retryCount = 0;
- AtlasAsyncImportRequest importRequest = asyncImportService.fetchImportRequestByImportId(importId);
+ nextImport = asyncImportService.fetchImportRequestByImportId(importId);
- if (isNotValidImportRequest(importRequest)) {
- LOG.info("Import request {}, is not in a valid status to start import, hence skipping..", importRequest);
+ if (isNotValidImportRequest(nextImport)) {
+ LOG.info("Import request {}, is not in a valid status to start import, hence skipping..", nextImport);
continue;
}
- LOG.info("<== getImportIdFromQueue(nextImportId={})", importRequest.getImportId());
+ LOG.info("<== getImportIdFromQueue(nextImportId={})", nextImport.getImportId());
- return importRequest;
+ return nextImport;
} catch (InterruptedException e) {
LOG.error("Thread interrupted while waiting for importId from the queue", e);
// Restore the interrupt flag
Thread.currentThread().interrupt();
- return null;
+ return nextImport;
}
}
@@ -347,24 +351,27 @@ void populateRequestQueue() {
private void startImportConsumer(AtlasAsyncImportRequest importRequest) {
try {
- LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", importRequest);
-
- notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName());
+ LOG.info("==> startImportConsumer(importId={})", importRequest.getImportId());
importRequest.setStatus(ImportStatus.PROCESSING);
importRequest.setProcessingStartTime(System.currentTimeMillis());
- } catch (Exception e) {
- LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e);
+ asyncImportService.populateCache(importRequest);
+ asyncImportService.saveImportRequest(importRequest);
+
+ notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName());
+ } catch (Exception e) {
importRequest.setStatus(ImportStatus.FAILED);
- } finally {
- asyncImportService.updateImportRequest(importRequest);
+ LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e);
+ } finally {
if (ObjectUtils.equals(importRequest.getStatus(), ImportStatus.FAILED)) {
+ asyncImportService.saveImport(importRequest.getImportId());
+
onCompleteImportRequest(importRequest.getImportId());
}
- LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("<== startImportConsumer(importId={})", importRequest.getImportId());
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 32bf3016673..9f3af510b80 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -1463,16 +1463,20 @@ void handleMessage(AtlasKafkaMessage kafkaMsg) {
final String importId = entityImportNotification.getImportId();
final AtlasEntityWithExtInfo entityWithExtInfo = entityImportNotification.getEntity();
final int position = entityImportNotification.getPosition();
- boolean completeImport = false;
+
+ LOG.info("==> IMPORT_ENTITY:processing entity: {} at position: {}", importId, position);
try {
importRequestComplete = asyncImporter.onImportEntity(entityWithExtInfo, importId, position);
} catch (AtlasBaseException abe) {
importRequestComplete = true;
- asyncImporter.onImportComplete(importId);
-
LOG.error("IMPORT_ENTITY: {} failed to import entity: {}", importId, entityImportNotification);
+ } finally {
+ if (importRequestComplete) {
+ asyncImporter.onImportComplete(importId);
+ }
+ LOG.info("<== IMPORT_ENTITY:processing entity: {} at position: {}", importId, position);
}
}
break;
diff --git a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
index 76020d2da22..fc7ab67633a 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
@@ -128,7 +128,8 @@ public void testOnReceiveImportRequestAddsRequestToQueue() throws InterruptedExc
Thread.sleep(500);
verify(requestQueue, times(1)).put("import123");
- verify(asyncImportService, times(1)).updateImportRequest(importRequest);
+ verify(asyncImportService, times(1)).populateCache(importRequest);
+ verify(asyncImportService, times(1)).saveImport("import123");
}
@Test
@@ -152,7 +153,8 @@ public void testOnReceiveImportRequestHandlesQueueException() throws Interrupted
importTaskListener.onReceiveImportRequest(importRequest);
} finally {
verify(requestQueue, times(1)).put("import123");
- verify(asyncImportService, times(1)).updateImportRequest(importRequest);
+ verify(asyncImportService, times(1)).populateCache(importRequest);
+ verify(asyncImportService, times(1)).saveImport("import123");
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index ef1c7fce1dc..c868d6d74b6 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -1261,7 +1261,7 @@ public void testImportDataWithReplicatedFromOption() throws Exception {
AtlasImportResult mockResult = mock(AtlasImportResult.class);
when(mockResult.getOperationStatus()).thenReturn(AtlasImportResult.OperationStatus.SUCCESS);
- when(mockResult.getProcessedEntities()).thenReturn(new ArrayList<>());
+ when(mockResult.getProcessedEntities()).thenReturn(new HashSet<>());
when(mockResult.getMetrics()).thenReturn(new HashMap());
when(mockResult.getExportResult()).thenReturn(mock(AtlasExportResult.class));
when(mockResult.getExportResult().getRequest()).thenReturn(null);
From 4d54b249fd8320077093b2a84034521c60ff39dd Mon Sep 17 00:00:00 2001
From: Chandrakanth Peravelli
Date: Tue, 10 Mar 2026 20:57:39 -0500
Subject: [PATCH 2/5] ATLAS-5239: Fix checkstyle import errors
---
.../org/apache/atlas/model/impexp/AtlasImportResult.java | 5 ++++-
.../apache/atlas/model/impexp/TestAtlasImportResult.java | 5 ++++-
.../apache/atlas/repository/impexp/ImportServiceTest.java | 8 +++++---
3 files changed, 13 insertions(+), 5 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index 525195f360c..1ab1e47a69c 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -28,7 +28,10 @@
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
diff --git a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
index c9ff15d5ca1..31cb21044a9 100644
--- a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
+++ b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java
@@ -22,7 +22,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 0a85313c926..1443785673e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -26,7 +26,6 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.model.impexp.AtlasExportRequest;
-import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -63,10 +62,13 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED;
-import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PARTIAL_SUCCESS;
import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
From 9a92d95194bb49aa08204692231bec0097018b88 Mon Sep 17 00:00:00 2001
From: Chandrakanth Peravelli
Date: Tue, 17 Mar 2026 22:29:48 -0500
Subject: [PATCH 3/5] ATLAS-5239: Fix checkstyle import errors
---
.../org/apache/atlas/repository/impexp/ImportCacheManager.java | 1 -
.../java/org/apache/atlas/repository/impexp/ImportService.java | 2 --
.../org/apache/atlas/repository/impexp/ImportServiceTest.java | 3 ---
.../atlas/repository/impexp/ImportTransformsShaperTest.java | 1 -
4 files changed, 7 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
index 7a8e53f7cb7..698b7629245 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java
@@ -31,7 +31,6 @@
* vertex lookups, or ImportID→Entity mappings during a single import cycle.
*/
public class ImportCacheManager {
-
private static final int MAX_SIZE = 10; // Max 10 entries
private static final long TTL_MINUTES = 30; // Expire after 30 minutes
private static final long TTL_MILLIS = TimeUnit.MINUTES.toMillis(TTL_MINUTES);
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 70e22cc3fcd..b6a66522cb3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -60,9 +60,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 1443785673e..6f461cf2c0e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -86,13 +86,10 @@
import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index e8028c1d5cd..95b8ca13f47 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -35,7 +35,6 @@
import javax.inject.Inject;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
import static org.apache.atlas.utils.TestLoadModelUtils.loadFsModel;
From 73c9d0e0a40bb8c6e1268d4a9e6666251ad3754e Mon Sep 17 00:00:00 2001
From: Chandrakanth Peravelli
Date: Tue, 26 May 2026 14:21:09 -0500
Subject: [PATCH 4/5] ATLAS-5239: update apache commons lang to lang3
---
.../org/apache/atlas/repository/impexp/AsyncImportService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
index 061226e061c..f2b3cef8c5c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java
@@ -29,7 +29,7 @@
import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
From 40e6a7c2bca75dcb8e91a9f71d06643624a455ac Mon Sep 17 00:00:00 2001
From: Chandrakanth Peravelli
Date: Tue, 26 May 2026 16:27:15 -0500
Subject: [PATCH 5/5] ATLAS-5239: fix checkstyle violations
---
.../apache/atlas/notification/ImportTaskListenerImpl.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
index 1ed19e63d96..619602eb285 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
@@ -263,9 +263,10 @@ void startAsyncImportIfAvailable(String importId) {
AtlasAsyncImportRequest getNextImportFromQueue() {
LOG.info("==> getNextImportFromQueue()");
- final int maxRetries = 5;
- int retryCount = 0;
- AtlasAsyncImportRequest nextImport = null;
+ final int maxRetries = 5;
+
+ int retryCount = 0;
+ AtlasAsyncImportRequest nextImport = null;
while (retryCount < maxRetries) {
try {