diff --git a/src/main/java/org/phoebus/channelfinder/Application.java b/src/main/java/org/phoebus/channelfinder/Application.java index 28d4bee9..7f7e9799 100644 --- a/src/main/java/org/phoebus/channelfinder/Application.java +++ b/src/main/java/org/phoebus/channelfinder/Application.java @@ -31,9 +31,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.FileCopyUtils; @EnableAutoConfiguration @@ -113,19 +111,4 @@ public List channelProcessors() { }); return processors; } - - /** - * {@link TaskExecutor} used when calling {@link ChannelProcessor}s. - * - * @return A {@link TaskExecutor} - */ - @Bean("channelFinderTaskExecutor") - public TaskExecutor taskExecutor() { - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setCorePoolSize(3); - taskExecutor.setMaxPoolSize(10); - taskExecutor.setQueueCapacity(25); - - return taskExecutor; - } } diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 3da84200..2781b57b 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -1,18 +1,24 @@ package org.phoebus.channelfinder.configuration; +import jakarta.annotation.PostConstruct; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; @@ -22,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; import tools.jackson.core.JacksonException; /** @@ -40,6 +47,7 @@ public class AAChannelProcessor implements ChannelProcessor { private static final String PV_STATUS_PROPERTY_NAME = "pvStatus"; // Matches in recsync private static final String PV_STATUS_INACTIVE = "Inactive"; public static final String PV_STATUS_ACTIVE = "Active"; + private static final int SKIPPED_PV_LOG_LIMIT = 10; @Value("${aa.enabled:true}") private boolean aaEnabled; @@ -60,10 +68,19 @@ public class AAChannelProcessor implements ChannelProcessor { private String archiverPropertyName; @Value("${aa.auto_pause:}") - private List autoPauseOptions; + private volatile List autoPauseOptions; + + @Value("${aa.post_support:}") + private volatile List postSupportArchivers; @Autowired private ArchiverService archiverService; + @Value("${aa.policy_refresh_interval_seconds:3600}") + private long policyRefreshIntervalSeconds; + + private volatile Map> cachedPolicies = Map.of(); + private volatile Instant lastPolicyRefresh; + @Override public boolean enabled() { return aaEnabled; @@ -74,8 +91,24 @@ public void setEnabled(boolean enabled) { this.aaEnabled = enabled; } + @PostConstruct + public void initPolicyCache() { + refreshPolicies(); + } + + @Scheduled( + fixedDelayString = "${aa.policy_refresh_interval_seconds:3600}", + timeUnit = TimeUnit.SECONDS) + public void scheduledPolicyRefresh() { + refreshPolicies(); + } + @Override public ChannelProcessorInfo processorInfo() { + String policyCounts = + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().size()) + .collect(Collectors.joining(", ")); return new ChannelProcessorInfo( "AAChannelProcessor", aaEnabled, @@ -87,7 +120,15 @@ public ChannelProcessorInfo processorInfo() { "Archivers", aaURLs.keySet().toString(), "AutoPauseOn", - autoPauseOptions.toString())); + autoPauseOptions.toString(), + "PostSupportArchivers", + postSupportArchivers.toString(), + "PolicyRefreshIntervalSeconds", + String.valueOf(policyRefreshIntervalSeconds), + "LastPolicyRefresh", + lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), + "CachedPoliciesPerArchiver", + policyCounts.isEmpty() ? "none" : policyCounts)); } /** @@ -106,87 +147,128 @@ public long process(List channels) throws JacksonException { return 0; } - // Get Info (policy and version) of each archiver - Map archiversInfo = getArchiversInfo(aaURLs); + Map archiversInfo = getArchiversInfoFromCache(); if (archiversInfo.isEmpty()) { + logger.log( + Level.WARNING, + () -> + String.format( + "No reachable archivers configured; skipping %d channels.", channels.size())); return 0; } - Map> archiverAliasToArchivePVOptions = - new HashMap<>(); // AA identifier, ArchivePVOptions - for (String alias : archiversInfo.keySet()) { - archiverAliasToArchivePVOptions.put(alias, new ArrayList<>()); - } + logger.log(Level.INFO, () -> String.format("Processing %d channels.", channels.size())); + Map> pvsByArchiver = buildArchivePVMap(channels, archiversInfo); + + long count = submitToArchivers(pvsByArchiver, archiversInfo); + logger.log(Level.INFO, () -> String.format("Configured %d channels.", count)); + return count; + } + + private Map> buildArchivePVMap( + List channels, Map archiversInfo) { + Map> result = new HashMap<>(); + archiversInfo.keySet().forEach(alias -> result.put(alias, new ArrayList<>())); - logger.log(Level.INFO, "Get channelfinder properties for aa processor."); channels.forEach( channel -> { - Optional archiveProperty = - channel.getProperties().stream() - .filter( - xmlProperty -> archivePropertyName.equalsIgnoreCase(xmlProperty.getName())) - .findFirst(); + Optional archiveProperty = findProperty(channel, archivePropertyName); if (archiveProperty.isPresent()) { - channel.getProperties().stream() - .filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName())) - .findFirst() - .map( - xmlProperty -> { - String archiverValue = xmlProperty.getValue(); - // archiver property can be comma separated list of archivers - if (archiverValue != null && !archiverValue.isEmpty()) { - return Arrays.stream(archiverValue.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()); - } else { - return defaultArchivers.stream(); - } - }) - .orElse( - defaultArchivers - .stream()) // Use defaultArchivers list if no matching property found + resolveArchiverAliases(channel) .forEach( archiverAlias -> { try { addChannelChange( - channel, - archiverAliasToArchivePVOptions, - archiversInfo, - archiveProperty, - archiverAlias); + channel, result, archiversInfo, archiveProperty, archiverAlias); } catch (Exception e) { logger.log( - Level.WARNING, String.format("Failed to process %s", channel), e); + Level.WARNING, + () -> + String.format( + "Failed to add channel '%s' to archiver '%s': %s", + channel.getName(), archiverAlias, e.getMessage())); } }); } else if (autoPauseOptions.contains(archivePropertyName)) { - aaURLs + archiversInfo .keySet() .forEach( archiverAlias -> - archiverAliasToArchivePVOptions + result .get(archiverAlias) .add(createArchivePV(List.of(), channel, "", PV_STATUS_INACTIVE))); } }); + return result; + } + + private long submitToArchivers( + Map> pvsByArchiver, Map archiversInfo) + throws JacksonException { long count = 0; - for (Map.Entry> e : archiverAliasToArchivePVOptions.entrySet()) { + for (Map.Entry> e : pvsByArchiver.entrySet()) { ArchiverInfo archiverInfo = archiversInfo.get(e.getKey()); if (archiverInfo == null) { - logger.log(Level.WARNING, String.format("Failed to process %s", e.getKey())); + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver alias '%s' present in PV map but missing from archiver info; skipping.", + e.getKey())); continue; } - Map archivePVSList = - e.getValue().stream() - .collect( - Collectors.toMap(ArchivePVOptions::getPv, archivePVOptions -> archivePVOptions)); - Map> archiveActionArchivePVMap = - getArchiveActions(archivePVSList, archiverInfo); - count += archiverService.configureAA(archiveActionArchivePVMap, archiverInfo.url()); + Map pvMap = + e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); + Optional>> actions = + getArchiveActions(pvMap, archiverInfo); + if (actions.isEmpty()) { + warnSkippedActivePvs(e.getKey(), e.getValue()); + continue; + } + count += archiverService.configureAA(actions.get(), archiverInfo.url()); } - long finalCount = count; - logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); - return finalCount; + return count; + } + + private void warnSkippedActivePvs(String archiverAlias, List pvs) { + List activePvNames = + pvs.stream() + .filter(pv -> PV_STATUS_ACTIVE.equals(pv.getPvStatus())) + .map(ArchivePVOptions::getPv) + .toList(); + if (activePvNames.isEmpty()) { + return; + } + int total = activePvNames.size(); + String sample = + String.join(", ", activePvNames.subList(0, Math.min(SKIPPED_PV_LOG_LIMIT, total))); + String suffix = + total > SKIPPED_PV_LOG_LIMIT ? " ... (" + (total - SKIPPED_PV_LOG_LIMIT) + " more)" : ""; + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' unreachable: %d Active PV(s) may remain paused until the archiver recovers" + + " and another channel update arrives — RESUME was not sent: %s%s", + archiverAlias, total, sample, suffix)); + } + + private Stream resolveArchiverAliases(Channel channel) { + return findPropertyValue(channel, archiverPropertyName) + .map(v -> Arrays.stream(v.split(",")).map(String::trim).filter(s -> !s.isEmpty())) + .orElseGet(defaultArchivers::stream); + } + + private Optional findProperty(Channel channel, String propertyName) { + return channel.getProperties().stream() + .filter(p -> propertyName.equalsIgnoreCase(p.getName())) + .findFirst(); + } + + private Optional findPropertyValue(Channel channel, String propertyName) { + return findProperty(channel, propertyName) + .map(Property::getValue) + .filter(v -> v != null && !v.isEmpty()); } private void addChannelChange( @@ -196,11 +278,7 @@ private void addChannelChange( Optional archiveProperty, String archiverAlias) { String pvStatus = - channel.getProperties().stream() - .filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName())) - .findFirst() - .map(Property::getValue) - .orElse(PV_STATUS_INACTIVE); + findPropertyValue(channel, PV_STATUS_PROPERTY_NAME).orElse(PV_STATUS_INACTIVE); if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) { ArchivePVOptions newArchiverPV = createArchivePV( @@ -226,24 +304,43 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { return ArchiveAction.NONE; } - private Map> getArchiveActions( + private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { - if (archiverInfo == null) { - return Map.of(); - } - - logger.log(Level.INFO, () -> String.format("Get archiver status in archiver %s", archiverInfo)); + logger.log( + Level.FINE, + () -> + String.format( + "Querying status of %d PVs from archiver '%s'.", + archivePVS.size(), archiverInfo.alias())); Map> result = new EnumMap<>(ArchiveAction.class); Arrays.stream(ArchiveAction.values()) .forEach(archiveAction -> result.put(archiveAction, new ArrayList<>())); // Don't request to archive an empty list. if (archivePVS.isEmpty()) { - return result; + return Optional.of(result); } - List> statuses = - archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); - logger.log(Level.FINER, "Statuses {0}", statuses); + List> statuses; + List pvList = new ArrayList<>(archivePVS.keySet()); + try { + statuses = + postSupportArchivers.contains(archiverInfo.alias()) + ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) + : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); + } catch (ArchiverServiceException e) { + logger.log( + Level.WARNING, + () -> + String.format( + "Status fetch failed for archiver '%s'; skipping %d PVs to avoid spurious ARCHIVE submissions: %s", + archiverInfo.alias(), archivePVS.size(), e.getMessage())); + return Optional.empty(); + } + logger.log( + Level.FINER, + () -> + String.format( + "Status response from archiver '%s': %s", archiverInfo.alias(), statuses)); statuses.forEach( archivePVStatusJsonMap -> { String archiveStatus = archivePVStatusJsonMap.get("status"); @@ -252,14 +349,21 @@ private Map> getArchiveActions( if (archiveStatus == null || pvName == null) { logger.log( Level.WARNING, - "Missing status or pvName in archivePVStatusJsonMap: {0}", - archivePVStatusJsonMap); + () -> + String.format( + "Archiver '%s' returned entry with missing 'status' or 'pvName': %s", + archiverInfo.alias(), archivePVStatusJsonMap)); return; } ArchivePVOptions archivePVOptions = archivePVS.get(pvName); if (archivePVOptions == null) { - logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName); + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' returned status for unknown PV '%s'; ignoring.", + archiverInfo.alias(), pvName)); return; } @@ -269,11 +373,11 @@ private Map> getArchiveActions( List archivePVOptionsList = result.get(action); archivePVOptionsList.add(archivePVOptions); }); - return result; + return Optional.of(result); } private ArchivePVOptions createArchivePV( - List policyList, Channel channel, String archiveProperty, String pvStaus) { + List policyList, Channel channel, String archiveProperty, String pvStatus) { ArchivePVOptions newArchiverPV = new ArchivePVOptions(); if (aaPVA && !channel.getName().contains("://")) { newArchiverPV.setPv("pva://" + channel.getName()); @@ -281,20 +385,52 @@ private ArchivePVOptions createArchivePV( newArchiverPV.setPv(channel.getName()); } newArchiverPV.setSamplingParameters(archiveProperty, policyList); - newArchiverPV.setPvStatus(pvStaus); + newArchiverPV.setPvStatus(pvStatus); return newArchiverPV; } - private Map getArchiversInfo(Map aaURLs) { - Map result = new HashMap<>(); - for (Map.Entry aa : aaURLs.entrySet()) { - if (StringUtils.isEmpty(aa.getValue())) { - // Empty archiver tagged - continue; - } - List policies = archiverService.getAAPolicies(aa.getValue()); - result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), policies)); + private Map getArchiversInfoFromCache() { + Map> snapshot = cachedPolicies; + return aaURLs.entrySet().stream() + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .filter( + e -> { + if (!snapshot.containsKey(e.getKey())) { + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' has no cached policies (unreachable at last refresh); skipping.", + e.getKey())); + return false; + } + return true; + }) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> new ArchiverInfo(e.getKey(), e.getValue(), snapshot.get(e.getKey())))); + } + + private void refreshPolicies() { + if (aaURLs.isEmpty()) { + logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh."); + return; } - return result; + logger.log(Level.INFO, "Refreshing AA policy cache for {0} archivers.", aaURLs.size()); + Map> updated = + aaURLs.entrySet().stream() + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, e -> archiverService.getAAPolicies(e.getValue()))); + cachedPolicies = Collections.unmodifiableMap(updated); + lastPolicyRefresh = Instant.now(); + logger.log( + Level.INFO, + "AA policy cache refreshed: {0}", + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().size() + " policies") + .collect(Collectors.joining(", "))); } } diff --git a/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java new file mode 100644 index 00000000..dff1a099 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java @@ -0,0 +1,52 @@ +package org.phoebus.channelfinder.configuration; + +import java.util.logging.Level; +import java.util.logging.Logger; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +/** + * {@link ThreadPoolTaskExecutor} used when calling {@link ChannelProcessor}s. + * + *

Pool parameters are derived from {@code processors.max_concurrent_updates}. Individual values + * can be overridden via the {@code processors.task_executor.*} properties (values ≤ 0 mean "use the + * derived value"). + */ +@Component("channelFinderTaskExecutor") +public class ChannelFinderProcessorExecutor extends ThreadPoolTaskExecutor { + + private static final Logger logger = + Logger.getLogger(ChannelFinderProcessorExecutor.class.getName()); + + public ChannelFinderProcessorExecutor( + @Value("${processors.max_concurrent_updates:10}") int maxConcurrent, + @Value("${processors.task_executor.core_pool_size:-1}") int overrideCore, + @Value("${processors.task_executor.max_pool_size:-1}") int overrideMax, + @Value("${processors.task_executor.queue_capacity:-1}") int overrideQueue) { + + int core = overrideCore > 0 ? overrideCore : maxConcurrent; + int max = overrideMax > 0 ? overrideMax : maxConcurrent; + int queue = overrideQueue > 0 ? overrideQueue : Math.max(1, maxConcurrent / 4); + + setCorePoolSize(core); + setMaxPoolSize(max); + setQueueCapacity(queue); + setRejectedExecutionHandler( + (runnable, executor) -> { + if (!executor.isShutdown()) { + executor.getQueue().poll(); // evict oldest (stale) task to make room + executor.getQueue().offer(runnable); + logger.log( + Level.WARNING, + () -> + "ChannelFinderProcessorExecutor task queue full — evicted oldest task to admit fresher update" + + " (active=" + + executor.getActiveCount() + + ", queued=" + + executor.getQueue().size() + + ")"); + } + }); + } +} diff --git a/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java b/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java index 846b9684..8fd4144d 100644 --- a/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java +++ b/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java @@ -124,10 +124,11 @@ public void sendToProcessors(List channels) { } catch (Exception e) { logger.log( Level.WARNING, - "ChannelProcessor " - + channelProcessor.getClass().getName() - + " throws exception", - e); + () -> + "ChannelProcessor " + + channelProcessor.getClass().getName() + + " threw exception: " + + e.getMessage()); } })); } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 2da37b69..134ffa99 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -4,7 +4,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -57,14 +56,12 @@ String key() { } } - @Value("${aa.post_support:}") - private List postSupportArchivers; - @Autowired public ArchiverService( @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); factory.setReadTimeout(timeoutSeconds * 1000); + factory.setConnectTimeout(timeoutSeconds * 1000); this.client = builder.requestFactory(factory).build(); } @@ -72,58 +69,36 @@ public ArchiverService( this.client = builder.build(); } - private Stream> partitionSet(Set pvSet, int pageSize) { - List list = new ArrayList<>(pvSet); - return IntStream.range(0, (list.size() + pageSize - 1) / pageSize) - .mapToObj(i -> list.subList(i * pageSize, Math.min(pageSize * (i + 1), list.size()))); + private Stream> partition(List pvs, int pageSize) { + return IntStream.range(0, (pvs.size() + pageSize - 1) / pageSize) + .mapToObj(i -> pvs.subList(i * pageSize, Math.min(pageSize * (i + 1), pvs.size()))); } - public List> getStatuses( - Map archivePVS, String archiverURL, String archiverAlias) { - Set pvs = archivePVS.keySet(); - boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); - logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias); - logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers); - - if (postSupportOverride) { - logger.log(Level.INFO, "Post Support"); - return getStatusesFromPvListBody(archiverURL, pvs.stream().toList()); - } else { - logger.log(Level.INFO, "Query Support"); - Stream> stream = partitionSet(pvs, STATUS_BATCH_SIZE); - - return stream - .map(pvList -> getStatusesFromPvListQuery(archiverURL, pvList)) - .flatMap(List::stream) - .toList(); - } + public List> getStatusesViaGet(String archiverURL, List pvs) { + return partition(pvs, STATUS_BATCH_SIZE) + .map(batch -> getStatusesViaGetBatch(archiverURL, batch)) + .flatMap(List::stream) + .toList(); } - private List> getStatusesFromPvListQuery( - String archiverURL, List pvs) { + private List> getStatusesViaGetBatch(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; URI pvStatusURI = UriComponentsBuilder.fromUri(URI.create(uriString)) .queryParam(StatusResponseKey.PV.key(), String.join(",", pvs)) .build() .toUri(); - try { List> result = client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed GET status query to %s: %s", uriString, e.getMessage()), e); } } - private List> getStatusesFromPvListBody( - String archiverURL, List pvs) { + public List> getStatusesViaPost(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; try { List> result = @@ -136,12 +111,8 @@ private List> getStatusesFromPvListBody( .body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed POST status query to %s: %s", uriString, e.getMessage()), e); } } @@ -209,7 +180,11 @@ private static List validateSubmitActionResponse( public long configureAA(Map> archivePVS, String aaURL) { logger.log( - Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL)); + Level.FINE, + () -> + String.format( + "Configuring actions for %d PVs at archiver %s.", + archivePVS.values().stream().mapToLong(List::size).sum(), aaURL)); long count = 0; // Don't request to archive an empty list. if (archivePVS.isEmpty()) { @@ -240,7 +215,11 @@ private long processAction(ArchiveAction action, List options, } return successfulPvs.size(); } catch (ArchiverServiceException e) { - logger.log(Level.WARNING, "Failed to submit " + action.name().toLowerCase() + " request", e); + logger.log( + Level.WARNING, + () -> + String.format( + "Failed to submit %s request: %s", action.name().toLowerCase(), e.getMessage())); return 0; } } @@ -263,7 +242,9 @@ public List getAAPolicies(String aaURL) { return new ArrayList<>(policyMap.keySet()); } catch (Exception e) { // problem collecting policies from AA, so warn and return empty list - logger.log(Level.WARNING, "Could not get AA policies list from: " + aaURL, e); + logger.log( + Level.WARNING, + () -> "Could not get AA policies list from " + aaURL + ": " + e.getMessage()); return List.of(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1002887a..e9564e73 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -134,6 +134,26 @@ logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO ################ Processor ################################################## processors.chunking.size=10000 +# ChannelFinderProcessorExecutor: controls the thread pool used to dispatch +# channel processor tasks (e.g. archiver sync on IOC reconnect). +# +# How many batches can be processed concurrently. Each in-flight batch holds +# one thread for the duration of its archiver HTTP calls. Increase this if you +# have more archivers or expect large-scale maintenance windows with many IOC +# restarts happening at once. +# +# Pool parameters are derived from this value: +# corePoolSize = max_concurrent_updates +# maxPoolSize = max_concurrent_updates +# queueCapacity = max(1, max_concurrent_updates / 4) +# +# Advanced: uncomment the task_executor properties below to override +# individual values (values <= 0 mean "use the derived value"). +processors.max_concurrent_updates=10 +# processors.task_executor.core_pool_size=-1 +# processors.task_executor.max_pool_size=-1 +# processors.task_executor.queue_capacity=-1 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null @@ -143,6 +163,8 @@ aa.pva=false aa.archive_property_name=archive aa.archiver_property_name=archiver aa.timeout_seconds=15 +# Interval in seconds between automatic AA policy cache refreshes (default: 1 hour) +aa.policy_refresh_interval_seconds=3600 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java new file mode 100644 index 00000000..8fead183 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java @@ -0,0 +1,83 @@ +package org.phoebus.channelfinder.processors; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.phoebus.channelfinder.configuration.ChannelFinderProcessorExecutor; + +class ChannelFinderProcessorExecutorTest { + + @Test + void testDefaultPoolSizing() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(4, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(4, ex.getCorePoolSize()); + Assertions.assertEquals(4, ex.getMaxPoolSize()); + Assertions.assertEquals(1, ex.getQueueCapacity()); // max(1, 4/4) = 1 + ex.shutdown(); + } + + @Test + void testMinQueueCapacity() throws Exception { + // max(1, 1/4) = max(1, 0) = 1 + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(1, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testOverridesTakePrecedence() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(10, 2, 6, 8); + ex.initialize(); + Assertions.assertEquals(2, ex.getCorePoolSize()); + Assertions.assertEquals(6, ex.getMaxPoolSize()); + Assertions.assertEquals(8, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testRejectionHandlerEvictsOldestAndAdmitsNew() throws Exception { + // 1 thread, queue=1 → third submit triggers the rejection handler + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, 1); + ex.initialize(); + + CountDownLatch blocker = new CountDownLatch(1); + CountDownLatch task1Ready = new CountDownLatch(1); + AtomicBoolean task2Ran = new AtomicBoolean(false); + AtomicBoolean task3Ran = new AtomicBoolean(false); + CountDownLatch task3Done = new CountDownLatch(1); + + // task1: blocks the sole thread + ex.execute( + () -> { + task1Ready.countDown(); + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + task1Ready.await(2, TimeUnit.SECONDS); + + // task2: sits in queue (the stale task to be evicted) + ex.execute(() -> task2Ran.set(true)); + + // task3: triggers rejection handler → evicts task2, admits task3 + ex.execute( + () -> { + task3Ran.set(true); + task3Done.countDown(); + }); + + blocker.countDown(); // release the blocked thread + Assertions.assertTrue(task3Done.await(2, TimeUnit.SECONDS), "task3 did not complete in time"); + + Assertions.assertTrue(task3Ran.get(), "Newer task must run after eviction"); + Assertions.assertFalse(task2Ran.get(), "Older queued task must be evicted"); + + ex.shutdown(); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index e957dfad..92c9a9fe 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -3,8 +3,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -12,6 +14,7 @@ import java.util.Map; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -23,6 +26,7 @@ import org.phoebus.channelfinder.configuration.ChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; @@ -44,6 +48,12 @@ class AAChannelProcessorIT { @MockitoBean ArchiverService archiverService; @Autowired AAChannelProcessor aaChannelProcessor; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + @NotNull private static Stream processSource() { return Stream.of( @@ -100,17 +110,13 @@ public static void paramableAAChannelProcessorTest( String archiveStatus, String archiverEndpoint) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - if (!archiveStatus.isEmpty()) { // Mock getStatuses List> archivePVStatuses = channels.stream() .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); } if (!archiverEndpoint.isEmpty()) { @@ -124,10 +130,8 @@ public static void paramableAAChannelProcessorTest( assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); // Verifications - verify(archiverService).getAAPolicies(anyString()); - if (!archiveStatus.isEmpty()) { - verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); } if (!archiverEndpoint.isEmpty()) { @@ -163,6 +167,19 @@ void testProcessNoPVs() throws JacksonException { // But since list is empty, process returns 0 early } + @Test + void testStatusFetchFailureSkipsConfigureAndReturnsZero() throws JacksonException { + Channel channel = + new Channel("PVPausedActive", "owner", List.of(archiveProperty, activeProperty), List.of()); + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenThrow(new ArchiverServiceException("Connection refused")); + + long count = aaChannelProcessor.process(List.of(channel)); + + assertEquals(0L, count); + verify(archiverService, never()).configureAA(anyMap(), anyString()); + } + @ParameterizedTest @MethodSource("processSource") void testProcessNotArchivedActive(Channel channel, String archiveStatus, String archiverEndpoint) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 48552c57..9001e3f2 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -1,8 +1,8 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; @@ -14,6 +14,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -38,6 +39,12 @@ class AAChannelProcessorMultiArchiverIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -98,28 +105,12 @@ void testProcessMultiArchivers( Map namesToStatuses, Map> actionsToNames) throws JacksonException { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); - - // Requests to archiver - actionsToNames.forEach( - (key, value) -> { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); - }); - - // Request to policies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Request to archiver status - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); // Requests to archiver actionsToNames.forEach( @@ -129,11 +120,10 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications - verify(archiverService, times(2)).getAAPolicies(anyString()); - + // Verifications: query archiver uses GET, post archiver uses POST (aa.post_support=post) if (!namesToStatuses.isEmpty()) { - verify(archiverService, times(2)).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaPost(anyString(), anyList()); } } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 3e919fac..98ae8573 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -15,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -41,6 +43,12 @@ class AAChannelProcessorMultiIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -103,16 +111,12 @@ void testProcessMulti( int expectedProcessedChannels) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); // Mock configureAA when(archiverService.configureAA(anyMap(), anyString())) @@ -121,8 +125,7 @@ void testProcessMulti( long count = aaChannelProcessor.process(channels); assertEquals(expectedProcessedChannels, count); - verify(archiverService).getAAPolicies(anyString()); - verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); ArgumentCaptor>> captor = ArgumentCaptor.forClass(Map.class); diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 38cdb1e4..7c3ff55e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -30,6 +33,12 @@ class AAChannelProcessorNoDefaultIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 9c6342bf..612b1a3e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorNoPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java new file mode 100644 index 00000000..a099b886 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -0,0 +1,89 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import tools.jackson.core.JacksonException; + +@WebMvcTest(AAChannelProcessor.class) +@ExtendWith(MockitoExtension.class) +@TestPropertySource(value = "classpath:application_aa_proc_test.properties") +class AAChannelProcessorPolicyCacheIT { + + @MockitoBean ArchiverService archiverService; + @Autowired AAChannelProcessor aaChannelProcessor; + + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + + @Test + void testProcessDoesNotCallGetAAPolicies() throws JacksonException { + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenReturn(List.of(Map.of("pvName", "PVNoneActive", "status", "Not being archived"))); + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(1L); + clearInvocations(archiverService); + + Channel channel = + new Channel( + "PVNoneActive", + "owner", + List.of( + new Property("archive", "owner", "default"), + new Property("pvStatus", "owner", "Active")), + List.of()); + aaChannelProcessor.process(List.of(channel)); + + verify(archiverService, never()).getAAPolicies(anyString()); + } + + @Test + void testProcessorInfoShowsCacheMetadata() { + ChannelProcessorInfo info = aaChannelProcessor.processorInfo(); + + assertNotEquals("never", info.properties().get("LastPolicyRefresh")); + assertTrue(info.properties().get("CachedPoliciesPerArchiver").contains("default=")); + assertEquals("3600", info.properties().get("PolicyRefreshIntervalSeconds")); + } + + @Test + void testScheduledRefreshUpdatesCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("p1", "p2", "p3")); + clearInvocations(archiverService); + + aaChannelProcessor.scheduledPolicyRefresh(); + + verify(archiverService).getAAPolicies(anyString()); + assertTrue( + aaChannelProcessor + .processorInfo() + .properties() + .get("CachedPoliciesPerArchiver") + .contains("default=3")); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index 86b4f49c..fd48434a 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorStatusPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index 1aeafff0..38f46057 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorTagPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 81d60512..4f3ff2bc 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -8,6 +8,8 @@ import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; +import java.io.IOException; +import java.net.ServerSocket; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -17,14 +19,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; -import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.web.client.MockRestServiceServer; import org.springframework.web.client.RestClient; import tools.jackson.core.JacksonException; @@ -45,8 +45,6 @@ void setUp() { archiverService = new ArchiverService(builder); objectMapper = new ObjectMapper(); - - ReflectionTestUtils.setField(archiverService, "postSupportArchivers", List.of("test-archiver")); } @AfterEach @@ -54,34 +52,43 @@ void tearDown() { mockServer.verify(); } - @ParameterizedTest - @ValueSource(strings = {"other-archiver", "test-archiver"}) - void testGetStatuses(String archiverAlias) throws JacksonException { + @Test + void testGetStatusGet() throws JacksonException { + List pvs = List.of("pv1"); + List> expectedResponse = + List.of(Map.of("pv", "pv1", "status", "Being archived")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) + .andExpect(method(HttpMethod.GET)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + + List> result = archiverService.getStatusesViaGet(ARCHIVER_URL, pvs); - Map pvs = Map.of("pv1", new ArchivePVOptions()); + assertEquals(1, result.size()); + assertEquals("pv1", result.getFirst().get("pv")); + assertEquals("Being archived", result.getFirst().get("status")); + } + + @Test + void testGetStatusPost() throws JacksonException { + + List pvs = List.of("pv1"); List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "Being archived")); - if ("test-archiver".equals(archiverAlias)) { - mockServer - .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) - .andExpect(method(HttpMethod.POST)) - .andExpect(content().json("[\"pv1\"]")) - .andRespond( - withSuccess( - objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - } else { - mockServer - .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) - .andExpect(method(HttpMethod.GET)) - .andRespond( - withSuccess( - objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - } + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"pv1\"]")) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - List> result = - archiverService.getStatuses(pvs, ARCHIVER_URL, archiverAlias); + List> result = archiverService.getStatusesViaPost(ARCHIVER_URL, pvs); assertEquals(1, result.size()); assertEquals("pv1", result.getFirst().get("pv")); @@ -89,19 +96,30 @@ void testGetStatuses(String archiverAlias) throws JacksonException { } @Test - void testGetStatusesInvalidResponse() { - - Map pvs = Map.of("pv1", new ArchivePVOptions()); + void testGetStatusesGetInvalidResponse() { + List pvs = List.of("pv1"); mockServer .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = - archiverService.getStatuses(pvs, ARCHIVER_URL, "other-archiver"); + assertThrows( + ArchiverServiceException.class, () -> archiverService.getStatusesViaGet(ARCHIVER_URL, pvs)); + } + + @Test + void testGetStatusesPostInvalidResponse() { + List pvs = List.of("pv1"); - assertTrue(result.isEmpty()); + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) + .andExpect(method(HttpMethod.POST)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + + assertThrows( + ArchiverServiceException.class, + () -> archiverService.getStatusesViaPost(ARCHIVER_URL, pvs)); } @Test @@ -351,4 +369,36 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException { assertEquals(1, successfulPvs.size()); assertTrue(successfulPvs.contains("PV1")); } + + @Test + void testRequestTimesOutAtConfiguredTimeout() throws IOException { + int timeoutSeconds = 1; + + // Accept the TCP connection but never send a response, simulating a hung archiver host. + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + Thread serverThread = + new Thread( + () -> { + try { + serverSocket.accept(); + } catch (IOException ignored) { + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = new ArchiverService(timeoutSeconds, builder); + + long start = System.currentTimeMillis(); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://localhost:" + port, List.of("pv1"))); + long elapsedMs = System.currentTimeMillis() - start; + + // Should timeout at ~1s — well short of the OS TCP default (~2 minutes) + assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms"); + } + } }