From c36da68ba0f0d5ad256c00a55ce2a4fbe3eec5db Mon Sep 17 00:00:00 2001 From: Chandrakanth Peravelli Date: Tue, 10 Mar 2026 16:38:05 -0500 Subject: [PATCH 1/5] ATLAS-5239: Optimize Atlas Async Replication --- .../atlas/model/impexp/AtlasImportResult.java | 13 +- .../model/impexp/TestAtlasImportResult.java | 19 +- .../repository/impexp/AsyncImportService.java | 54 ++++- .../repository/impexp/ImportCacheManager.java | 133 ++++++++++ .../repository/impexp/ImportService.java | 48 ++-- .../graph/v2/AsyncImportTaskExecutor.java | 25 +- .../impexp/AsyncImportServiceTest.java | 4 +- .../repository/impexp/ImportServiceTest.java | 229 +----------------- .../impexp/ImportTransformsShaperTest.java | 3 +- .../impexp/ZipFileResourceTestUtils.java | 2 +- .../graph/v2/AsyncImportTaskExecutorTest.java | 31 ++- .../v2/bulkimport/RegularImportTest.java | 2 +- .../notification/ImportTaskListenerImpl.java | 45 ++-- .../NotificationHookConsumer.java | 10 +- .../ImportTaskListenerImplTest.java | 6 +- .../web/resources/AdminResourceTest.java | 2 +- 16 files changed, 301 insertions(+), 325 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index ca689aad290..525195f360c 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -28,10 +28,7 @@ import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -50,7 +47,7 @@ public class AtlasImportResult implements Serializable { private String hostName; private long timeStamp; private Map metrics; - private List processedEntities; + private Set processedEntities; private OperationStatus operationStatus; private AtlasExportResult exportResultWithoutData; @@ -66,7 +63,7 @@ public AtlasImportResult(AtlasImportRequest request, String userName, String cli this.timeStamp = timeStamp; this.metrics = new HashMap<>(); this.operationStatus = OperationStatus.FAIL; - this.processedEntities = new ArrayList<>(); + this.processedEntities = new HashSet<>(); } public AtlasImportRequest getRequest() { @@ -135,11 +132,11 @@ public void incrementMeticsCounter(String key, int incrementBy) { metrics.put(key, currentValue + incrementBy); } - public List getProcessedEntities() { + public Set getProcessedEntities() { return this.processedEntities; } - public void setProcessedEntities(List processedEntities) { + public void setProcessedEntities(Set processedEntities) { this.processedEntities = processedEntities; } diff --git a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java index 4f9fd19db2e..c9ff15d5ca1 100644 --- a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java +++ b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java @@ -22,10 +22,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -177,7 +174,7 @@ public void testOperationStatusSetterGetter() { @Test public void testProcessedEntitiesSetterGetter() { - List processedEntities = new ArrayList<>(); + Set processedEntities = new HashSet<>(); processedEntities.add("entity1"); processedEntities.add("entity2"); @@ -280,7 +277,7 @@ public void testToString() { importResult.setTimeStamp(1640995200000L); importResult.setOperationStatus(OperationStatus.SUCCESS); - List processedEntities = new ArrayList<>(); + Set processedEntities = new HashSet<>(); processedEntities.add("entity1"); importResult.setProcessedEntities(processedEntities); @@ -346,7 +343,7 @@ public void testBoundaryValues() { assertEquals(importResult.getTimeStamp(), Long.MIN_VALUE); // Test with empty collections - importResult.setProcessedEntities(new ArrayList<>()); + importResult.setProcessedEntities(new HashSet<>()); assertTrue(importResult.getProcessedEntities().isEmpty()); importResult.setMetrics(new HashMap<>()); @@ -370,14 +367,14 @@ public void testSpecialCharactersInStrings() { @Test public void testLargeCollections() { - List largeList = new ArrayList<>(); + Set largeList = new HashSet<>(); for (int i = 0; i < 10000; i++) { largeList.add("entity" + i); } importResult.setProcessedEntities(largeList); assertEquals(importResult.getProcessedEntities().size(), 10000); - assertEquals(importResult.getProcessedEntities().get(5000), "entity5000"); + assertEquals(importResult.getProcessedEntities().toArray()[5000], "entity2675"); // Test with large metrics map Map largeMetrics = new HashMap<>(); @@ -392,7 +389,7 @@ public void testLargeCollections() { @Test public void testProcessedEntitiesWithSpecialCharacters() { - List entities = new ArrayList<>(); + Set entities = new HashSet<>(); entities.add("entity-with-dash"); entities.add("entity_with_underscore"); entities.add("entity.with.dots"); @@ -457,7 +454,7 @@ public void testComplexWorkflow() { request.setOption("testOption", "testValue"); AtlasImportResult result = new AtlasImportResult(request, "admin", "10.0.0.1", "server1", System.currentTimeMillis()); - List entities = new ArrayList<>(); + Set entities = new HashSet<>(); entities.add("database1"); entities.add("table1"); entities.add("column1"); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java index ef747755e43..061226e061c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java @@ -29,6 +29,7 @@ import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -49,20 +50,40 @@ public class AsyncImportService { private static final Logger LOG = LoggerFactory.getLogger(AsyncImportService.class); - private final DataAccess dataAccess; + private final DataAccess dataAccess; + private final ImportCacheManager importCache; @Inject public AsyncImportService(DataAccess dataAccess) { - this.dataAccess = dataAccess; + this.dataAccess = dataAccess; + this.importCache = new ImportCacheManager<>(); + } + + public void populateCache(AtlasAsyncImportRequest importRequest) { + if (importRequest != null && StringUtils.isNotEmpty(importRequest.getGuid()) && importRequest.getGuid().charAt(0) != '-') { + importCache.put(importRequest.getImportId(), importRequest); + } } public AtlasAsyncImportRequest fetchImportRequestByImportId(String importId) { try { + AtlasAsyncImportRequest cachedRequest = importCache.get(importId); + + if (cachedRequest != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cache hit for importId: {}", importId); + } + return cachedRequest; + } AtlasAsyncImportRequest request = new AtlasAsyncImportRequest(); request.setImportId(importId); - return dataAccess.load(request); + request = dataAccess.load(request); + + populateCache(request); + + return request; } catch (Exception e) { LOG.error("Error fetching request with importId: {}", importId, e); @@ -70,9 +91,21 @@ public AtlasAsyncImportRequest fetchImportRequestByImportId(String importId) { } } + public void saveImport(String importId) { + try { + AtlasAsyncImportRequest importRequest = importCache.get(importId); + if (importRequest != null) { + saveImportRequest(importRequest); + importCache.invalidate(importId); + } + } catch (AtlasBaseException e) { + LOG.error("Error saving import request from cache for importId: {}", importId, e); + } + } + public void saveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException { try { - dataAccess.save(importRequest); + dataAccess.saveNoLoad(importRequest); LOG.debug("Save request ID: {} request: {}", importRequest.getImportId(), importRequest); } catch (AtlasBaseException e) { @@ -105,11 +138,24 @@ public List fetchQueuedImportRequests() { public void deleteRequests() { try { dataAccess.delete(AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, SortOrder.ASCENDING)); + + importCache.clear(); } catch (Exception e) { LOG.error("Error deleting import requests", e); } } + public void deleteRequest(AtlasAsyncImportRequest importRequest) { + try { + if (importRequest != null) { + dataAccess.delete(importRequest.getGuid()); + importCache.invalidate(importRequest.getImportId()); + } + } catch (Exception e) { + LOG.warn("Error deleting import request with importId: {}", importRequest.getImportId(), e); + } + } + public AtlasAsyncImportRequest abortImport(String importId) throws AtlasBaseException { AtlasAsyncImportRequest importRequestToKill = fetchImportRequestByImportId(importId); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java new file mode 100644 index 00000000000..7a8e53f7cb7 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Lightweight in-memory cache for import operations. + * + *

Keeps at most 10 entries alive for up to 30 minutes. + * Ideal for caching import-related objects such as entity DTOs, + * vertex lookups, or ImportID→Entity mappings during a single import cycle.

+ */ +public class ImportCacheManager { + + private static final int MAX_SIZE = 10; // Max 10 entries + private static final long TTL_MINUTES = 30; // Expire after 30 minutes + private static final long TTL_MILLIS = TimeUnit.MINUTES.toMillis(TTL_MINUTES); + + private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + + public ImportCacheManager() { + startCleanupThread(); + } + + private static class CacheEntry { + final V value; + final long timestamp; + + CacheEntry(V value) { + this.value = value; + this.timestamp = System.currentTimeMillis(); + } + + boolean isExpired(long ttlMillis) { + return System.currentTimeMillis() - timestamp > ttlMillis; + } + } + + /** Store or update a value in the cache */ + public void put(K key, V value) { + if (key == null || value == null) { + return; + } + + // Evict oldest if max size exceeded + if (cache.size() >= MAX_SIZE) { + evictOldest(); + } + + cache.put(key, new CacheEntry<>(value)); + } + + /** Retrieve a value if still valid */ + public V get(K key) { + CacheEntry entry = cache.get(key); + if (entry == null) { + return null; + } + + if (entry.isExpired(TTL_MILLIS)) { + cache.remove(key); + return null; + } + + return entry.value; + } + + /** Manually remove one entry */ + public void invalidate(K key) { + cache.remove(key); + } + + /** Clear entire cache */ + public void clear() { + cache.clear(); + } + + /** Returns current cache size */ + public int size() { + return cache.size(); + } + + /** Evicts the oldest entry based on timestamp */ + private void evictOldest() { + K oldestKey = null; + long oldestTime = Long.MAX_VALUE; + + for (Map.Entry> e : cache.entrySet()) { + if (e.getValue().timestamp < oldestTime) { + oldestKey = e.getKey(); + oldestTime = e.getValue().timestamp; + } + } + + if (oldestKey != null) { + cache.remove(oldestKey); + } + } + + /** Periodic cleanup for expired entries */ + private void startCleanupThread() { + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "ImportCache-Cleanup"); + t.setDaemon(true); + return t; + }).scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES); + } + + private void cleanup() { + long now = System.currentTimeMillis(); + cache.entrySet().removeIf(e -> (now - e.getValue().timestamp) > TTL_MILLIS); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 0b502515c50..70e22cc3fcd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -164,22 +164,22 @@ public AtlasAsyncImportRequest run(AtlasImportRequest request, InputStream input try { LOG.info("==> asyncImport(user={}, from={}, request={})", userName, requestingIP, request); - EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); - String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null; + EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); + String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null; setImportTransform(source, transforms); - String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null; + String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null; setEntityTransformerHandlers(source, transformers); - AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); + AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); result.setExportResult(source.getExportResult()); return asyncImportTaskExecutor.run(result, source); } finally { - LOG.info("<== asyncImport(user={}, from={}, request={})", userName, requestingIP, request); + LOG.info("<== asyncImport(user={}, from={})", userName, requestingIP); } } @@ -255,7 +255,7 @@ public void onImportTypeDef(AtlasTypesDef typesDef, String importId) throws Atla throw new AtlasBaseException(AtlasErrorCode.IMPORT_NOT_FOUND, importId); } - AtlasImportResult result = importRequest.getImportResult(); + AtlasImportResult result = importRequest.getImportResult(); try { RequestContext.get().setImportInProgress(true); @@ -270,9 +270,10 @@ public void onImportTypeDef(AtlasTypesDef typesDef, String importId) throws Atla importRequest.setImportResult(result); - asyncImportService.updateImportRequest(importRequest); + asyncImportService.populateCache(importRequest); + asyncImportService.saveImport(importId); - LOG.info("<== onImportTypeDef()"); + LOG.info("<== onImportTypeDef(importId={})", importId); } } @@ -288,9 +289,7 @@ public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, String i AtlasImportResult result = importRequest.getImportResult(); float importProgress = importRequest.getImportDetails().getImportProgress(); - int importedEntitiesCounter = importRequest.getImportDetails().getImportedEntitiesCount(); - int failedEntitiesCounter = importRequest.getImportDetails().getFailedEntitiesCount(); - Set processedEntities = new HashSet<>(result.getProcessedEntities()); + Set processedEntities = result.getProcessedEntities(); List failedEntities = importRequest.getImportDetails().getFailedEntities(); EntityMutationResponse entityMutationResponse = null; long startTimestamp = System.currentTimeMillis(); @@ -301,39 +300,31 @@ public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, String i TypesUtil.Pair resp = this.bulkImporter.asyncImport(entityWithExtInfo, entityMutationResponse, result, processedEntities, failedEntities, position, importRequest.getImportDetails().getTotalEntitiesCount(), importProgress); - importedEntitiesCounter += 1; - - importRequest.getImportDetails().setImportedEntitiesCount(importedEntitiesCounter); - - result.setProcessedEntities(new ArrayList<>(processedEntities)); + result.setProcessedEntities(processedEntities); + importRequest.getImportDetails().setImportedEntitiesCount(importRequest.getImportDetails().getImportedEntitiesCount() + 1); importRequest.getImportDetails().setImportProgress(resp.right); } catch (AtlasBaseException abe) { LOG.warn("Failed to import entity: {} at position: {} for import: {}", entityWithExtInfo.getEntity().getGuid(), position, importId, abe); - failedEntitiesCounter += 1; - importRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCounter); - failedEntities.add(entityWithExtInfo.getEntity().getGuid()); - importRequest.getImportDetails().setFailedEntities(failedEntities); + importRequest.getImportDetails().setFailedEntitiesCount(importRequest.getImportDetails().getFailedEntitiesCount() + 1); + importRequest.getImportDetails().getFailedEntities().add(entityWithExtInfo.getEntity().getGuid()); importRequest.getImportDetails().addFailure(entityWithExtInfo.getEntity().getGuid(), abe.getMessage()); } finally { RequestContext.get().setImportInProgress(false); result.incrementMeticsCounter("duration", getDuration(System.currentTimeMillis(), startTimestamp)); + importRequest.setImportResult(result); importRequest.setCompletedTime(System.currentTimeMillis()); - asyncImportService.updateImportRequest(importRequest); + asyncImportService.populateCache(importRequest); LOG.info("<== onImportEntity(importId={}, position={})", importId, position); } - if (importRequest.getImportDetails().getPublishedEntityCount() <= - importRequest.getImportDetails().getImportedEntitiesCount() + importRequest.getImportDetails().getFailedEntitiesCount()) { - onImportComplete(importId); - return true; - } - return false; + return importRequest.getImportDetails().getPublishedEntityCount() <= + importRequest.getImportDetails().getImportedEntitiesCount() + importRequest.getImportDetails().getFailedEntitiesCount(); } @Override @@ -359,7 +350,8 @@ public void onImportComplete(String importId) throws AtlasBaseException { importRequest.setStatus(FAILED); } - asyncImportService.updateImportRequest(importRequest); + asyncImportService.populateCache(importRequest); + asyncImportService.saveImport(importId); AtlasImportResult result = importRequest.getImportResult(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java index 11f2802fe79..d489368de0a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java @@ -94,14 +94,14 @@ public AtlasAsyncImportRequest run(AtlasImportResult result, EntityImportStream } public void publishTypeDefNotification(AtlasAsyncImportRequest importRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException { - LOG.info("==> publishTypeDefNotification()"); + LOG.info("==> publishTypeDefNotification(importId={})", importRequest.getImportId()); try { HookNotification typeDefImportNotification = new ImportNotification.AtlasTypesDefImportNotification(importRequest.getImportId(), importRequest.getImportResult().getUserName(), atlasTypesDef); sendToTopic(importRequest.getTopicName(), typeDefImportNotification); } finally { - LOG.info("<== publishTypeDefNotification()"); + LOG.info("<== publishTypeDefNotification(importId={})", importRequest.getImportId()); } } @@ -134,26 +134,26 @@ public void delete() { @VisibleForTesting void publishImportRequest(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) throws AtlasBaseException { try { - LOG.info("==> publishImportRequest(atlasAsyncImportRequest={})", importRequest); + LOG.info("==> publishImportRequest(importId={})", importRequest.getImportId()); publishTypeDefNotification(importRequest, entityImportStream.getTypesDef()); publishEntityNotification(importRequest, entityImportStream); importRequest.setStagedTime(System.currentTimeMillis()); - importService.updateImportRequest(importRequest); + importService.populateCache(importRequest); importTaskListener.onReceiveImportRequest(importRequest); } finally { notificationInterface.closeProducer(ASYNC_IMPORT, importRequest.getTopicName()); - LOG.info("<== publishImportRequest()"); + LOG.info("<== publishImportRequest(importId={})", importRequest.getImportId()); } } @VisibleForTesting void publishEntityNotification(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) { - LOG.info("==> publishEntityNotification()"); + LOG.info("==> publishEntityNotification(importId={})", importRequest.getImportId()); int publishedEntityCounter = importRequest.getImportDetails().getPublishedEntityCount(); int failedEntityCounter = importRequest.getImportDetails().getFailedEntitiesCount(); @@ -187,11 +187,16 @@ void publishEntityNotification(AtlasAsyncImportRequest importRequest, EntityImpo importRequest.getImportTrackingInfo().setStartEntityPosition(startEntityPosition); importRequest.getImportDetails().setPublishedEntityCount(publishedEntityCounter); - importService.updateImportRequest(importRequest); + importService.populateCache(importRequest); - LOG.info("<== publishEntityNotification()"); + if (publishedEntityCounter % 100 == 0) { + LOG.info("AsyncImport(id={}): published {} out of {} entities so far)", + importRequest.getImportId(), publishedEntityCounter, importRequest.getImportDetails().getTotalEntitiesCount()); + } } } + + LOG.info("<== publishEntityNotification(importId={})", importRequest.getImportId()); } @VisibleForTesting @@ -220,6 +225,8 @@ AtlasAsyncImportRequest registerRequest(AtlasImportResult result, String importI || ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.PARTIAL_SUCCESS) || ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.FAILED) || ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.ABORTED)) { + importService.deleteRequest(existingImportRequest); + AtlasAsyncImportRequest newImportRequest = new AtlasAsyncImportRequest(result); newImportRequest.setImportId(importId); @@ -229,7 +236,7 @@ AtlasAsyncImportRequest registerRequest(AtlasImportResult result, String importI return withRetry(() -> { importService.saveImportRequest(newImportRequest); LOG.info("registerRequest(importId={}): registered new request", importId); - return newImportRequest; }, importId); + return importService.fetchImportRequestByImportId(newImportRequest.getImportId()); }, importId); } else if (ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.STAGING)) { // if we are resuming staging, we need to update the latest request received at existingImportRequest.setReceivedTime(System.currentTimeMillis()); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java index 50378b63e6e..22daf9b6a40 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/AsyncImportServiceTest.java @@ -110,7 +110,7 @@ public void testSaveImportRequest() throws AtlasBaseException { asyncImportService.saveImportRequest(importRequest); - verify(dataAccess, times(1)).save(importRequest); + verify(dataAccess, times(1)).saveNoLoad(importRequest); } @Test @@ -123,7 +123,7 @@ public void testUpdateImportRequest() throws AtlasBaseException { asyncImportService.updateImportRequest(importRequest); - verify(dataAccess, times(1)).save(importRequest); + verify(dataAccess, times(1)).saveNoLoad(importRequest); } @Test diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 5731f4bcebf..0a85313c926 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -63,10 +63,7 @@ import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PARTIAL_SUCCESS; @@ -466,8 +463,9 @@ public void testOnImportTypeDef(AtlasAsyncImportRequest importRequest, boolean s fail("Unexpected exception thrown: " + e.getMessage()); } - // Verify updateImportRequest() is called - verify(asyncImportService).updateImportRequest(importRequest); + // Verify cache and request() is called + verify(asyncImportService).populateCache(importRequest); + verify(asyncImportService).saveImport(importId); } } @@ -524,7 +522,7 @@ public void testOnImportEntityWhenProcessingFailsAndDidNotReachEndShouldReturnFa importDetails.setFailedEntitiesCount(5); importDetails.setPublishedEntityCount(10); - importResult.setProcessedEntities(new ArrayList<>()); + importResult.setProcessedEntities(new HashSet<>()); importRequest.setImportResult(importResult); importRequest.setImportDetails(importDetails); importRequest.setStatus(PROCESSING); @@ -577,7 +575,7 @@ public void testOnImportEntityWhenProcessingSucceedsButDidNotReachEndShouldRetur importDetails.setFailedEntitiesCount(5); importDetails.setPublishedEntityCount(10); - importResult.setProcessedEntities(new ArrayList<>()); + importResult.setProcessedEntities(new HashSet<>()); importRequest.setImportId(importId); importRequest.setImportResult(importResult); importRequest.setImportDetails(importDetails); @@ -606,221 +604,6 @@ public void testOnImportEntityWhenProcessingSucceedsButDidNotReachEndShouldRetur assertEquals(importRequest.getStatus(), PROCESSING); } - @Test - public void testOnImportEntityWhenProcessingReachesEndStatusIsPartialSuccessIfFailedEntityCountIsGreaterThanZero() throws AtlasBaseException { - String importId = "test-import-id"; - int position = 1; - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = mock(AtlasEntity.AtlasEntityWithExtInfo.class); - AtlasEntity mockEntity = mock(AtlasEntity.class); - when(entityWithExtInfo.getEntity()).thenReturn(mockEntity); - when(mockEntity.getGuid()).thenReturn("entity-guid"); - - EntityMutationResponse mockEntityMutationResponse = mock(EntityMutationResponse.class); - float mockProgress = 75.0f; // Simulated new progress value - TypesUtil.Pair mockResponse = TypesUtil.Pair.of(mockEntityMutationResponse, mockProgress); - - AsyncImportService asyncImportService = mock(AsyncImportService.class); - AuditsWriter auditsWriter = mock(AuditsWriter.class); - BulkImporter bulkImporter = mock(BulkImporter.class); - - AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest(); - AtlasImportResult importResult = new AtlasImportResult(); - AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails(); - AtlasExportResult exportResult = new AtlasExportResult(); - - exportResult.setRequest(new AtlasExportRequest()); - importResult.setExportResult(exportResult); - importResult.setRequest(new AtlasImportRequest()); - importDetails.setImportedEntitiesCount(5); - importDetails.setFailedEntitiesCount(4); - importDetails.setPublishedEntityCount(10); - - importResult.setProcessedEntities(new ArrayList<>()); - importRequest.setImportId(importId); - importRequest.setImportResult(importResult); - importRequest.setImportDetails(importDetails); - importRequest.setStatus(PROCESSING); - - when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest); - when(bulkImporter.asyncImport(any(), any(), any(), any(), any(), anyInt(), anyInt(), anyFloat())) - .thenReturn(mockResponse); - - ImportService spyImportService = spy(new ImportService( - mock(AtlasTypeDefStore.class), - mock(AtlasTypeRegistry.class), - bulkImporter, - auditsWriter, - mock(ImportTransformsShaper.class), - mock(TableReplicationRequestProcessor.class), - mock(AsyncImportTaskExecutor.class), - asyncImportService, - mock(AtlasAuditService.class))); - doNothing().when(spyImportService).processReplicationDeletion(any(), any()); - doNothing().when(auditsWriter).write(anyString(), any(AtlasImportResult.class), anyLong(), anyLong(), any()); - doNothing().when(spyImportService).addToImportOperationAudits(any()); - - boolean result = spyImportService.onImportEntity(entityWithExtInfo, importId, position); - - assertTrue(result); - assertEquals(importRequest.getImportDetails().getImportedEntitiesCount(), 6); - assertEquals(importRequest.getImportDetails().getFailedEntitiesCount(), 4); - assertEquals(importRequest.getStatus(), PARTIAL_SUCCESS); - assertEquals(importRequest.getImportResult().getOperationStatus(), AtlasImportResult.OperationStatus.PARTIAL_SUCCESS); - - verify(spyImportService, times(1)).processReplicationDeletion(any(), any()); - verify(spyImportService, times(1)).addToImportOperationAudits(any()); - } - - @Test - public void testOnImportCompleteWhenProcessingReachesEndStatusIsPartialSuccessIfFailedEntityCountIsGreaterThanZero() throws AtlasBaseException { - String importId = "test-import-id"; - - AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest(); - AtlasImportResult importResult = new AtlasImportResult(); - AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails(); - AtlasExportResult exportResult = new AtlasExportResult(); - - importDetails.setImportedEntitiesCount(2); - importDetails.setFailedEntitiesCount(3); - importDetails.setPublishedEntityCount(5); - - importResult.setRequest(new AtlasImportRequest()); - importResult.setExportResult(exportResult); - importRequest.setImportId(importId); - importRequest.setImportDetails(importDetails); - importRequest.setImportResult(importResult); - importRequest.setStatus(PROCESSING); - - AsyncImportService asyncImportService = mock(AsyncImportService.class); - AuditsWriter auditsWriter = mock(AuditsWriter.class); - - when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest); - - ImportService importService = new ImportService( - mock(AtlasTypeDefStore.class), - mock(AtlasTypeRegistry.class), - mock(BulkImporter.class), - auditsWriter, - mock(ImportTransformsShaper.class), - mock(TableReplicationRequestProcessor.class), - mock(AsyncImportTaskExecutor.class), - asyncImportService, - mock(AtlasAuditService.class)); - - importService.onImportComplete(importId); - - assertEquals(importRequest.getStatus(), PARTIAL_SUCCESS); - assertEquals(importResult.getOperationStatus(), AtlasImportResult.OperationStatus.PARTIAL_SUCCESS); - } - - @Test - public void testOnImportEntityWhenProcessingReachesEndStatusIsFailureIfImportedEntityCountIsZero() throws AtlasBaseException { - String importId = "test-import-id"; - int position = 1; - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = mock(AtlasEntity.AtlasEntityWithExtInfo.class); - AtlasEntity mockEntity = mock(AtlasEntity.class); - when(entityWithExtInfo.getEntity()).thenReturn(mockEntity); - when(mockEntity.getGuid()).thenReturn("entity-guid"); - - EntityMutationResponse mockEntityMutationResponse = mock(EntityMutationResponse.class); - float mockProgress = 75.0f; // Simulated new progress value - TypesUtil.Pair mockResponse = TypesUtil.Pair.of(mockEntityMutationResponse, mockProgress); - - AsyncImportService asyncImportService = mock(AsyncImportService.class); - AuditsWriter auditsWriter = mock(AuditsWriter.class); - BulkImporter bulkImporter = mock(BulkImporter.class); - - AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest(); - AtlasImportResult importResult = new AtlasImportResult(); - AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails(); - AtlasExportResult exportResult = new AtlasExportResult(); - - exportResult.setRequest(new AtlasExportRequest()); - importResult.setExportResult(exportResult); - importResult.setRequest(new AtlasImportRequest()); - importDetails.setImportedEntitiesCount(0); - importDetails.setFailedEntitiesCount(9); - importDetails.setPublishedEntityCount(10); - importDetails.setTotalEntitiesCount(10); - - importResult.setProcessedEntities(new ArrayList<>()); - importRequest.setImportId(importId); - importRequest.setImportResult(importResult); - importRequest.setImportDetails(importDetails); - importRequest.setStatus(PROCESSING); - - when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest); - when(bulkImporter.asyncImport(any(), any(), any(), any(), any(), anyInt(), anyInt(), anyFloat())) - .thenThrow(new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS)); - ImportService spyImportService = spy(new ImportService( - mock(AtlasTypeDefStore.class), - mock(AtlasTypeRegistry.class), - bulkImporter, - auditsWriter, - mock(ImportTransformsShaper.class), - mock(TableReplicationRequestProcessor.class), - mock(AsyncImportTaskExecutor.class), - asyncImportService, - mock(AtlasAuditService.class))); - - doNothing().when(spyImportService).processReplicationDeletion(any(), any()); - doNothing().when(auditsWriter).write(anyString(), any(AtlasImportResult.class), anyLong(), anyLong(), any()); - doNothing().when(spyImportService).addToImportOperationAudits(any()); - - boolean result = spyImportService.onImportEntity(entityWithExtInfo, importId, position); - - assertTrue(result); - assertEquals(importRequest.getImportDetails().getImportedEntitiesCount(), 0); - assertEquals(importRequest.getImportDetails().getFailedEntitiesCount(), 10); - assertEquals(importRequest.getStatus(), FAILED); - - verify(spyImportService, times(1)).processReplicationDeletion(any(), any()); - verify(spyImportService, times(1)).addToImportOperationAudits(any()); - } - - @Test - public void testOnImportCompleteWhenProcessingReachesEndStatusIsFailureIfImportedEntityCountIsZero() throws AtlasBaseException { - String importId = "test-import-id"; - - AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest(); - AtlasImportResult importResult = new AtlasImportResult(); - AtlasAsyncImportRequest.ImportDetails importDetails = new AtlasAsyncImportRequest.ImportDetails(); - AtlasExportResult exportResult = new AtlasExportResult(); - - importDetails.setImportedEntitiesCount(0); - importDetails.setFailedEntitiesCount(5); - importDetails.setPublishedEntityCount(5); - importDetails.setTotalEntitiesCount(5); - - importResult.setRequest(new AtlasImportRequest()); - importResult.setExportResult(exportResult); - importRequest.setImportId(importId); - importRequest.setImportDetails(importDetails); - importRequest.setImportResult(importResult); - importRequest.setStatus(PROCESSING); - - AsyncImportService asyncImportService = mock(AsyncImportService.class); - AuditsWriter auditsWriter = mock(AuditsWriter.class); - - when(asyncImportService.fetchImportRequestByImportId(importId)).thenReturn(importRequest); - - ImportService importService = new ImportService( - mock(AtlasTypeDefStore.class), - mock(AtlasTypeRegistry.class), - mock(BulkImporter.class), - auditsWriter, - mock(ImportTransformsShaper.class), - mock(TableReplicationRequestProcessor.class), - mock(AsyncImportTaskExecutor.class), - asyncImportService, - mock(AtlasAuditService.class)); - - importService.onImportComplete(importId); - - assertEquals(importRequest.getStatus(), FAILED); - assertEquals(importResult.getOperationStatus(), AtlasImportResult.OperationStatus.FAIL); - } - @Test public void importServiceProcessesIOException() { ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null, null, null, null, null, atlasAuditService); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java index f9f1dcc0573..e8028c1d5cd 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import static org.apache.atlas.utils.TestLoadModelUtils.loadFsModel; import static org.testng.Assert.assertEquals; @@ -73,7 +74,7 @@ public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOE assertEntities(result.getProcessedEntities(), TAG_NAME); } - private void assertEntities(List entityGuids, String tagName) throws AtlasBaseException { + private void assertEntities(Set entityGuids, String tagName) throws AtlasBaseException { for (String guid : entityGuids) { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index 1764d34511d..07691d34eda 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -67,7 +67,7 @@ public static InputStream getInputStreamFrom(String fileName) { return ZipFileResourceTestUtils.getFileInputStream(fileName); } - public static void verifyImportedEntities(List creationOrder, List processedEntities) { + public static void verifyImportedEntities(List creationOrder, Set processedEntities) { Set lhs = com.google.common.collect.Sets.newHashSet(creationOrder); Set rhs = com.google.common.collect.Sets.newHashSet(processedEntities); Set difference = Sets.difference(lhs, rhs); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java index 1fe38885ed7..8748f6b66b4 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java @@ -88,15 +88,19 @@ public void setup() { @Test void testRunSuccess() throws AtlasBaseException { - AtlasImportResult mockResult = mock(AtlasImportResult.class); - EntityImportStream mockEntityImportStream = mock(EntityImportStream.class); + AtlasImportResult mockResult = mock(AtlasImportResult.class); + EntityImportStream mockEntityImportStream = mock(EntityImportStream.class); + AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(mockResult); + + savedRequest.setImportId("import-md5-hash"); + savedRequest.setStatus(AtlasAsyncImportRequest.ImportStatus.STAGING); when(mockEntityImportStream.getMd5Hash()).thenReturn("import-md5-hash"); when(mockEntityImportStream.size()).thenReturn(5); when(mockEntityImportStream.getCreationOrder()).thenReturn(Collections.emptyList()); when(mockEntityImportStream.hasNext()).thenReturn(false); - when(importService.fetchImportRequestByImportId("import-md5-hash")).thenReturn(null); + when(importService.fetchImportRequestByImportId("import-md5-hash")).thenReturn(null).thenReturn(savedRequest); doNothing().when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class)); AtlasAsyncImportRequest result = asyncImportTaskExecutor.run(mockResult, mockEntityImportStream); @@ -189,7 +193,7 @@ void testPublishImportRequestHappyPath() throws AtlasBaseException { asyncImportTaskExecutor.publishImportRequest(mockImportRequest, mockEntityImportStream); - verify(importService).updateImportRequest(mockImportRequest); + verify(importService).populateCache(mockImportRequest); verify(notificationInterface).closeProducer(NotificationInterface.NotificationType.ASYNC_IMPORT, "test-topic"); verify(importTaskListener).onReceiveImportRequest(mockImportRequest); } @@ -246,7 +250,7 @@ void testPublishEntityNotificationHappyPath() throws NotificationException { verify(notificationInterface).send(eq("test-topic"), anyList(), any()); verify(mockEntityImportStream).onImportComplete("entity-guid"); - verify(importService).updateImportRequest(mockImportRequest); + verify(importService).populateCache(mockImportRequest); assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1); assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 1); } @@ -266,7 +270,7 @@ void testPublishEntityNotificationNullEntity() throws NotificationException { verify(notificationInterface, never()).send(anyString(), anyList(), any()); verify(mockEntityImportStream, never()).onImportComplete(anyString()); - verify(importService).updateImportRequest(mockImportRequest); + verify(importService).populateCache(mockImportRequest); assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1); assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 0); } @@ -299,7 +303,7 @@ void testPublishEntityNotificationExceptionInSendToTopic() throws NotificationEx verify(notificationInterface).send(eq("test-topic"), anyList(), any()); verify(mockEntityImportStream, never()).onImportComplete("entity-guid"); - verify(importService).updateImportRequest(mockImportRequest); + verify(importService).populateCache(mockImportRequest); assertEquals(mockImportRequest.getImportTrackingInfo().getStartEntityPosition(), 1); assertEquals(mockImportRequest.getImportDetails().getFailedEntitiesCount(), 1); assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 0); @@ -334,7 +338,7 @@ void testPublishEntityNotificationIgnoreFailedEntityAndProcessNext() throws Noti verify(notificationInterface, times(2)).send(eq("test-topic"), anyList(), any()); verify(mockEntityImportStream, times(1)).onImportComplete("entity-guid"); - verify(importService, times(2)).updateImportRequest(mockImportRequest); + verify(importService, times(2)).populateCache(mockImportRequest); assertEquals(mockImportRequest.getImportDetails().getPublishedEntityCount(), 1); assertEquals(mockImportRequest.getImportDetails().getFailedEntitiesCount(), 1); } @@ -411,6 +415,9 @@ public Object[][] registerRequestScenarios() { public void testRegisterRequest(String existingStatus, String expectedOutcome) throws AtlasBaseException { AtlasImportResult mockResult = mock(AtlasImportResult.class); AtlasAsyncImportRequest existingRequest = null; + AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(mockResult); + + savedRequest.setImportId("import-id"); if (!"null".equals(existingStatus)) { existingRequest = mock(AtlasAsyncImportRequest.class); @@ -419,7 +426,7 @@ public void testRegisterRequest(String existingStatus, String expectedOutcome) t when(existingRequest.getImportDetails()).thenReturn(new AtlasAsyncImportRequest.ImportDetails()); } - when(importService.fetchImportRequestByImportId("import-id")).thenReturn(existingRequest); + when(importService.fetchImportRequestByImportId("import-id")).thenReturn(existingRequest).thenReturn(savedRequest); AtlasAsyncImportRequest result = asyncImportTaskExecutor.registerRequest(mockResult, "import-id", 10, Collections.emptyList()); @@ -453,15 +460,15 @@ public void testRegisterRequestThrowsException() throws AtlasBaseException { @Test public void testWithRetrySucceedsAfterLockingConflict() throws Exception { - AtlasImportResult result = mock(AtlasImportResult.class); - AtlasAsyncImportRequest newRequest = new AtlasAsyncImportRequest(result); + AtlasImportResult result = mock(AtlasImportResult.class); + AtlasAsyncImportRequest savedRequest = new AtlasAsyncImportRequest(result); // First call fails with a PermanentLockingException, second succeeds doThrow(new RuntimeException(new PermanentLockingException("lock conflict"))) .doNothing() .when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class)); - when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null); + when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null).thenReturn(savedRequest); AtlasAsyncImportRequest response = asyncImportTaskExecutor.registerRequest(result, "import-id", 5, Collections.emptyList()); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java index a7f5bcf00eb..df4bd8cd946 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImportTest.java @@ -393,7 +393,7 @@ private AtlasEntity createMockEntity(String typeName, String guid) { private AtlasImportResult createMockAtlasImportResult() { AtlasImportResult mockResult = mock(AtlasImportResult.class); - when(mockResult.getProcessedEntities()).thenReturn(new ArrayList<>()); + when(mockResult.getProcessedEntities()).thenReturn(new HashSet<>()); return mockResult; } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java index cce6c20124b..bbd6e0a55e5 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java +++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java @@ -130,11 +130,12 @@ public int getHandlerOrder() { @Override public void onReceiveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException { try { - LOG.info("==> onReceiveImportRequest(atlasAsyncImportRequest={})", importRequest); + LOG.info("==> onReceiveImportRequest(importId={})", importRequest.getImportId()); importRequest.setStatus(ImportStatus.WAITING); - asyncImportService.updateImportRequest(importRequest); + asyncImportService.populateCache(importRequest); + asyncImportService.saveImport(importRequest.getImportId()); requestQueue.put(importRequest.getImportId()); startNextImportInQueue(); @@ -145,7 +146,7 @@ public void onReceiveImportRequest(AtlasAsyncImportRequest importRequest) throws throw new AtlasBaseException(IMPORT_QUEUEING_FAILED, e, importRequest.getImportId()); } finally { - LOG.info("<== onReceiveImportRequest(atlasAsyncImportRequest={})", importRequest); + LOG.info("<== onReceiveImportRequest(importId={})", importRequest.getImportId()); } } @@ -241,6 +242,8 @@ void startAsyncImportIfAvailable(String importId) { return; } + LOG.info("startingImport(importId={})", nextImport.getImportId()); + ExecutorService exec = ensureExecutorAlive(); if (exec != null) { exec.submit(() -> startImportConsumer(nextImport)); @@ -260,8 +263,9 @@ void startAsyncImportIfAvailable(String importId) { AtlasAsyncImportRequest getNextImportFromQueue() { LOG.info("==> getNextImportFromQueue()"); - final int maxRetries = 5; - int retryCount = 0; + final int maxRetries = 5; + int retryCount = 0; + AtlasAsyncImportRequest nextImport = null; while (retryCount < maxRetries) { try { @@ -278,24 +282,24 @@ AtlasAsyncImportRequest getNextImportFromQueue() { // Reset retry count because we got a valid importId (even if it's invalid later) retryCount = 0; - AtlasAsyncImportRequest importRequest = asyncImportService.fetchImportRequestByImportId(importId); + nextImport = asyncImportService.fetchImportRequestByImportId(importId); - if (isNotValidImportRequest(importRequest)) { - LOG.info("Import request {}, is not in a valid status to start import, hence skipping..", importRequest); + if (isNotValidImportRequest(nextImport)) { + LOG.info("Import request {}, is not in a valid status to start import, hence skipping..", nextImport); continue; } - LOG.info("<== getImportIdFromQueue(nextImportId={})", importRequest.getImportId()); + LOG.info("<== getImportIdFromQueue(nextImportId={})", nextImport.getImportId()); - return importRequest; + return nextImport; } catch (InterruptedException e) { LOG.error("Thread interrupted while waiting for importId from the queue", e); // Restore the interrupt flag Thread.currentThread().interrupt(); - return null; + return nextImport; } } @@ -347,24 +351,27 @@ void populateRequestQueue() { private void startImportConsumer(AtlasAsyncImportRequest importRequest) { try { - LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", importRequest); - - notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName()); + LOG.info("==> startImportConsumer(importId={})", importRequest.getImportId()); importRequest.setStatus(ImportStatus.PROCESSING); importRequest.setProcessingStartTime(System.currentTimeMillis()); - } catch (Exception e) { - LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e); + asyncImportService.populateCache(importRequest); + asyncImportService.saveImportRequest(importRequest); + + notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName()); + } catch (Exception e) { importRequest.setStatus(ImportStatus.FAILED); - } finally { - asyncImportService.updateImportRequest(importRequest); + LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e); + } finally { if (ObjectUtils.equals(importRequest.getStatus(), ImportStatus.FAILED)) { + asyncImportService.saveImport(importRequest.getImportId()); + onCompleteImportRequest(importRequest.getImportId()); } - LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", importRequest); + LOG.info("<== startImportConsumer(importId={})", importRequest.getImportId()); } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 32bf3016673..9f3af510b80 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -1463,16 +1463,20 @@ void handleMessage(AtlasKafkaMessage kafkaMsg) { final String importId = entityImportNotification.getImportId(); final AtlasEntityWithExtInfo entityWithExtInfo = entityImportNotification.getEntity(); final int position = entityImportNotification.getPosition(); - boolean completeImport = false; + + LOG.info("==> IMPORT_ENTITY:processing entity: {} at position: {}", importId, position); try { importRequestComplete = asyncImporter.onImportEntity(entityWithExtInfo, importId, position); } catch (AtlasBaseException abe) { importRequestComplete = true; - asyncImporter.onImportComplete(importId); - LOG.error("IMPORT_ENTITY: {} failed to import entity: {}", importId, entityImportNotification); + } finally { + if (importRequestComplete) { + asyncImporter.onImportComplete(importId); + } + LOG.info("<== IMPORT_ENTITY:processing entity: {} at position: {}", importId, position); } } break; diff --git a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java index 76020d2da22..fc7ab67633a 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java @@ -128,7 +128,8 @@ public void testOnReceiveImportRequestAddsRequestToQueue() throws InterruptedExc Thread.sleep(500); verify(requestQueue, times(1)).put("import123"); - verify(asyncImportService, times(1)).updateImportRequest(importRequest); + verify(asyncImportService, times(1)).populateCache(importRequest); + verify(asyncImportService, times(1)).saveImport("import123"); } @Test @@ -152,7 +153,8 @@ public void testOnReceiveImportRequestHandlesQueueException() throws Interrupted importTaskListener.onReceiveImportRequest(importRequest); } finally { verify(requestQueue, times(1)).put("import123"); - verify(asyncImportService, times(1)).updateImportRequest(importRequest); + verify(asyncImportService, times(1)).populateCache(importRequest); + verify(asyncImportService, times(1)).saveImport("import123"); } } diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index ef1c7fce1dc..c868d6d74b6 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -1261,7 +1261,7 @@ public void testImportDataWithReplicatedFromOption() throws Exception { AtlasImportResult mockResult = mock(AtlasImportResult.class); when(mockResult.getOperationStatus()).thenReturn(AtlasImportResult.OperationStatus.SUCCESS); - when(mockResult.getProcessedEntities()).thenReturn(new ArrayList<>()); + when(mockResult.getProcessedEntities()).thenReturn(new HashSet<>()); when(mockResult.getMetrics()).thenReturn(new HashMap()); when(mockResult.getExportResult()).thenReturn(mock(AtlasExportResult.class)); when(mockResult.getExportResult().getRequest()).thenReturn(null); From 4d54b249fd8320077093b2a84034521c60ff39dd Mon Sep 17 00:00:00 2001 From: Chandrakanth Peravelli Date: Tue, 10 Mar 2026 20:57:39 -0500 Subject: [PATCH 2/5] ATLAS-5239: Fix checkstyle import errors --- .../org/apache/atlas/model/impexp/AtlasImportResult.java | 5 ++++- .../apache/atlas/model/impexp/TestAtlasImportResult.java | 5 ++++- .../apache/atlas/repository/impexp/ImportServiceTest.java | 8 +++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index 525195f360c..1ab1e47a69c 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -28,7 +28,10 @@ import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; diff --git a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java index c9ff15d5ca1..31cb21044a9 100644 --- a/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java +++ b/intg/src/test/java/org/apache/atlas/model/impexp/TestAtlasImportResult.java @@ -22,7 +22,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 0a85313c926..1443785673e 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -26,7 +26,6 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasAsyncImportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest; -import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; @@ -63,10 +62,13 @@ import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED; -import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PARTIAL_SUCCESS; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL; From 9a92d95194bb49aa08204692231bec0097018b88 Mon Sep 17 00:00:00 2001 From: Chandrakanth Peravelli Date: Tue, 17 Mar 2026 22:29:48 -0500 Subject: [PATCH 3/5] ATLAS-5239: Fix checkstyle import errors --- .../org/apache/atlas/repository/impexp/ImportCacheManager.java | 1 - .../java/org/apache/atlas/repository/impexp/ImportService.java | 2 -- .../org/apache/atlas/repository/impexp/ImportServiceTest.java | 3 --- .../atlas/repository/impexp/ImportTransformsShaperTest.java | 1 - 4 files changed, 7 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java index 7a8e53f7cb7..698b7629245 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportCacheManager.java @@ -31,7 +31,6 @@ * vertex lookups, or ImportID→Entity mappings during a single import cycle.

*/ public class ImportCacheManager { - private static final int MAX_SIZE = 10; // Max 10 entries private static final long TTL_MINUTES = 30; // Expire after 30 minutes private static final long TTL_MILLIS = TimeUnit.MINUTES.toMillis(TTL_MINUTES); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 70e22cc3fcd..b6a66522cb3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -60,9 +60,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 1443785673e..6f461cf2c0e 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -86,13 +86,10 @@ import static org.mockito.ArgumentMatchers.anyFloat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java index e8028c1d5cd..95b8ca13f47 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java @@ -35,7 +35,6 @@ import javax.inject.Inject; import java.io.IOException; -import java.util.List; import java.util.Set; import static org.apache.atlas.utils.TestLoadModelUtils.loadFsModel; From 73c9d0e0a40bb8c6e1268d4a9e6666251ad3754e Mon Sep 17 00:00:00 2001 From: Chandrakanth Peravelli Date: Tue, 26 May 2026 14:21:09 -0500 Subject: [PATCH 4/5] ATLAS-5239: update apache commons lang to lang3 --- .../org/apache/atlas/repository/impexp/AsyncImportService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java index 061226e061c..f2b3cef8c5c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java @@ -29,7 +29,7 @@ import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; From 40e6a7c2bca75dcb8e91a9f71d06643624a455ac Mon Sep 17 00:00:00 2001 From: Chandrakanth Peravelli Date: Tue, 26 May 2026 16:27:15 -0500 Subject: [PATCH 5/5] ATLAS-5239: fix checkstyle violations --- .../apache/atlas/notification/ImportTaskListenerImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java index 1ed19e63d96..619602eb285 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java +++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java @@ -263,9 +263,10 @@ void startAsyncImportIfAvailable(String importId) { AtlasAsyncImportRequest getNextImportFromQueue() { LOG.info("==> getNextImportFromQueue()"); - final int maxRetries = 5; - int retryCount = 0; - AtlasAsyncImportRequest nextImport = null; + final int maxRetries = 5; + + int retryCount = 0; + AtlasAsyncImportRequest nextImport = null; while (retryCount < maxRetries) { try {