Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import javax.xml.bind.annotation.XmlRootElement;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand All @@ -50,7 +50,7 @@ public class AtlasImportResult implements Serializable {
private String hostName;
private long timeStamp;
private Map<String, Integer> metrics;
private List<String> processedEntities;
private Set<String> processedEntities;
private OperationStatus operationStatus;
private AtlasExportResult exportResultWithoutData;

Expand All @@ -66,7 +66,7 @@ public AtlasImportResult(AtlasImportRequest request, String userName, String cli
this.timeStamp = timeStamp;
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
this.processedEntities = new ArrayList<>();
this.processedEntities = new HashSet<>();
}

public AtlasImportRequest getRequest() {
Expand Down Expand Up @@ -135,11 +135,11 @@ public void incrementMeticsCounter(String key, int incrementBy) {
metrics.put(key, currentValue + incrementBy);
}

public List<String> getProcessedEntities() {
public Set<String> getProcessedEntities() {
return this.processedEntities;
}

public void setProcessedEntities(List<String> processedEntities) {
public void setProcessedEntities(Set<String> processedEntities) {
this.processedEntities = processedEntities;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testOperationStatusSetterGetter() {

@Test
public void testProcessedEntitiesSetterGetter() {
List<String> processedEntities = new ArrayList<>();
Set<String> processedEntities = new HashSet<>();
processedEntities.add("entity1");
processedEntities.add("entity2");

Expand Down Expand Up @@ -280,7 +280,7 @@ public void testToString() {
importResult.setTimeStamp(1640995200000L);
importResult.setOperationStatus(OperationStatus.SUCCESS);

List<String> processedEntities = new ArrayList<>();
Set<String> processedEntities = new HashSet<>();
processedEntities.add("entity1");
importResult.setProcessedEntities(processedEntities);

Expand Down Expand Up @@ -346,7 +346,7 @@ public void testBoundaryValues() {
assertEquals(importResult.getTimeStamp(), Long.MIN_VALUE);

// Test with empty collections
importResult.setProcessedEntities(new ArrayList<>());
importResult.setProcessedEntities(new HashSet<>());
assertTrue(importResult.getProcessedEntities().isEmpty());

importResult.setMetrics(new HashMap<>());
Expand All @@ -370,14 +370,14 @@ public void testSpecialCharactersInStrings() {

@Test
public void testLargeCollections() {
List<String> largeList = new ArrayList<>();
Set<String> largeList = new HashSet<>();
for (int i = 0; i < 10000; i++) {
largeList.add("entity" + i);
}
importResult.setProcessedEntities(largeList);

assertEquals(importResult.getProcessedEntities().size(), 10000);
assertEquals(importResult.getProcessedEntities().get(5000), "entity5000");
assertEquals(importResult.getProcessedEntities().toArray()[5000], "entity2675");

// Test with large metrics map
Map<String, Integer> largeMetrics = new HashMap<>();
Expand All @@ -392,7 +392,7 @@ public void testLargeCollections() {

@Test
public void testProcessedEntitiesWithSpecialCharacters() {
List<String> entities = new ArrayList<>();
Set<String> entities = new HashSet<>();
entities.add("entity-with-dash");
entities.add("entity_with_underscore");
entities.add("entity.with.dots");
Expand Down Expand Up @@ -457,7 +457,7 @@ public void testComplexWorkflow() {
request.setOption("testOption", "testValue");

AtlasImportResult result = new AtlasImportResult(request, "admin", "10.0.0.1", "server1", System.currentTimeMillis());
List<String> entities = new ArrayList<>();
Set<String> entities = new HashSet<>();
entities.add("database1");
entities.add("table1");
entities.add("column1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand All @@ -49,30 +50,62 @@
public class AsyncImportService {
private static final Logger LOG = LoggerFactory.getLogger(AsyncImportService.class);

private final DataAccess dataAccess;
private final DataAccess dataAccess;
private final ImportCacheManager<String, AtlasAsyncImportRequest> importCache;

@Inject
public AsyncImportService(DataAccess dataAccess) {
this.dataAccess = dataAccess;
this.dataAccess = dataAccess;
this.importCache = new ImportCacheManager<>();
}

public void populateCache(AtlasAsyncImportRequest importRequest) {
if (importRequest != null && StringUtils.isNotEmpty(importRequest.getGuid()) && importRequest.getGuid().charAt(0) != '-') {
importCache.put(importRequest.getImportId(), importRequest);
}
}

public AtlasAsyncImportRequest fetchImportRequestByImportId(String importId) {
try {
AtlasAsyncImportRequest cachedRequest = importCache.get(importId);

if (cachedRequest != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cache hit for importId: {}", importId);
}
return cachedRequest;
}
AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();

request.setImportId(importId);

return dataAccess.load(request);
request = dataAccess.load(request);

populateCache(request);

return request;
} catch (Exception e) {
LOG.error("Error fetching request with importId: {}", importId, e);

return null;
}
}

public void saveImport(String importId) {
try {
AtlasAsyncImportRequest importRequest = importCache.get(importId);
if (importRequest != null) {
saveImportRequest(importRequest);
importCache.invalidate(importId);
}
} catch (AtlasBaseException e) {
LOG.error("Error saving import request from cache for importId: {}", importId, e);
}
}

public void saveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException {
try {
dataAccess.save(importRequest);
dataAccess.saveNoLoad(importRequest);

LOG.debug("Save request ID: {} request: {}", importRequest.getImportId(), importRequest);
} catch (AtlasBaseException e) {
Expand Down Expand Up @@ -105,11 +138,24 @@ public List<String> fetchQueuedImportRequests() {
public void deleteRequests() {
try {
dataAccess.delete(AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, SortOrder.ASCENDING));

importCache.clear();
} catch (Exception e) {
LOG.error("Error deleting import requests", e);
}
}

public void deleteRequest(AtlasAsyncImportRequest importRequest) {
try {
if (importRequest != null) {
dataAccess.delete(importRequest.getGuid());
importCache.invalidate(importRequest.getImportId());
}
} catch (Exception e) {
LOG.warn("Error deleting import request with importId: {}", importRequest.getImportId(), e);
}
}

public AtlasAsyncImportRequest abortImport(String importId) throws AtlasBaseException {
AtlasAsyncImportRequest importRequestToKill = fetchImportRequestByImportId(importId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.atlas.repository.impexp;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Lightweight in-memory cache for import operations.
*
* <p>Keeps at most 10 entries alive for up to 30 minutes.
* Ideal for caching import-related objects such as entity DTOs,
* vertex lookups, or ImportID→Entity mappings during a single import cycle.</p>
*/
public class ImportCacheManager<K, V> {
private static final int MAX_SIZE = 10; // Max 10 entries
private static final long TTL_MINUTES = 30; // Expire after 30 minutes
private static final long TTL_MILLIS = TimeUnit.MINUTES.toMillis(TTL_MINUTES);

private final ConcurrentHashMap<K, CacheEntry<V>> cache = new ConcurrentHashMap<>();

public ImportCacheManager() {
startCleanupThread();
}

private static class CacheEntry<V> {
final V value;
final long timestamp;

CacheEntry(V value) {
this.value = value;
this.timestamp = System.currentTimeMillis();
}

boolean isExpired(long ttlMillis) {
return System.currentTimeMillis() - timestamp > ttlMillis;
}
}

/** Store or update a value in the cache */
public void put(K key, V value) {
if (key == null || value == null) {
return;
}

// Evict oldest if max size exceeded
if (cache.size() >= MAX_SIZE) {
evictOldest();
}

cache.put(key, new CacheEntry<>(value));
}

/** Retrieve a value if still valid */
public V get(K key) {
CacheEntry<V> entry = cache.get(key);
if (entry == null) {
return null;
}

if (entry.isExpired(TTL_MILLIS)) {
cache.remove(key);
return null;
}

return entry.value;
}

/** Manually remove one entry */
public void invalidate(K key) {
cache.remove(key);
}

/** Clear entire cache */
public void clear() {
cache.clear();
}

/** Returns current cache size */
public int size() {
return cache.size();
}

/** Evicts the oldest entry based on timestamp */
private void evictOldest() {
K oldestKey = null;
long oldestTime = Long.MAX_VALUE;

for (Map.Entry<K, CacheEntry<V>> e : cache.entrySet()) {
if (e.getValue().timestamp < oldestTime) {
oldestKey = e.getKey();
oldestTime = e.getValue().timestamp;
}
}

if (oldestKey != null) {
cache.remove(oldestKey);
}
}

/** Periodic cleanup for expired entries */
private void startCleanupThread() {
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ImportCache-Cleanup");
t.setDaemon(true);
return t;
}).scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES);
}

private void cleanup() {
long now = System.currentTimeMillis();
cache.entrySet().removeIf(e -> (now - e.getValue().timestamp) > TTL_MILLIS);
}
}
Loading