From b2a99c17175ee037f8d26aae7356e9d6a6664bea Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:32:45 +0200 Subject: [PATCH 01/13] =?UTF-8?q?fix(aa):=20rename=20parameter=20pvStaus?= =?UTF-8?q?=20=E2=86=92=20pvStatus=20in=20createArchivePV?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- .../channelfinder/configuration/AAChannelProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 3da84200..92813463 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -273,7 +273,7 @@ private Map> getArchiveActions( } private ArchivePVOptions createArchivePV( - List policyList, Channel channel, String archiveProperty, String pvStaus) { + List policyList, Channel channel, String archiveProperty, String pvStatus) { ArchivePVOptions newArchiverPV = new ArchivePVOptions(); if (aaPVA && !channel.getName().contains("://")) { newArchiverPV.setPv("pva://" + channel.getName()); @@ -281,7 +281,7 @@ private ArchivePVOptions createArchivePV( newArchiverPV.setPv(channel.getName()); } newArchiverPV.setSamplingParameters(archiveProperty, policyList); - newArchiverPV.setPvStatus(pvStaus); + newArchiverPV.setPvStatus(pvStatus); return newArchiverPV; } From ad9625291817264327d4296c79086b0508bd680f Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:33:37 +0200 Subject: [PATCH 02/13] refactor(aa): extract findProperty/findPropertyValue helpers Eliminate three duplicate inline stream-filter-findFirst patterns for channel property lookups. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 92813463..a675d1aa 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -121,11 +121,7 @@ public long process(List channels) throws JacksonException { logger.log(Level.INFO, "Get channelfinder properties for aa processor."); channels.forEach( channel -> { - Optional archiveProperty = - channel.getProperties().stream() - .filter( - xmlProperty -> archivePropertyName.equalsIgnoreCase(xmlProperty.getName())) - .findFirst(); + Optional archiveProperty = findProperty(channel, archivePropertyName); if (archiveProperty.isPresent()) { channel.getProperties().stream() .filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName())) @@ -189,18 +185,25 @@ public long process(List channels) throws JacksonException { return finalCount; } + private Optional findProperty(Channel channel, String propertyName) { + return channel.getProperties().stream() + .filter(p -> propertyName.equalsIgnoreCase(p.getName())) + .findFirst(); + } + + private Optional findPropertyValue(Channel channel, String propertyName) { + return findProperty(channel, propertyName) + .map(Property::getValue) + .filter(v -> v != null && !v.isEmpty()); + } + private void addChannelChange( Channel channel, Map> aaArchivePVS, Map archiversInfo, Optional archiveProperty, String archiverAlias) { - String pvStatus = - channel.getProperties().stream() - .filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName())) - .findFirst() - .map(Property::getValue) - .orElse(PV_STATUS_INACTIVE); + String pvStatus = findPropertyValue(channel, PV_STATUS_PROPERTY_NAME).orElse(PV_STATUS_INACTIVE); if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) { ArchivePVOptions newArchiverPV = createArchivePV( From e73880211bf73c69a8a9558521ce59a5eb600465 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:36:17 +0200 Subject: [PATCH 03/13] refactor(aa): extract resolveArchiverAliases to flatten nested lambda MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace a 15-line Optional.map(→Stream).orElse(stream) chain with a named helper method, making the intent immediately clear at the call site. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index a675d1aa..aeea0ace 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -10,6 +10,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; @@ -123,24 +124,7 @@ public long process(List channels) throws JacksonException { channel -> { Optional archiveProperty = findProperty(channel, archivePropertyName); if (archiveProperty.isPresent()) { - channel.getProperties().stream() - .filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName())) - .findFirst() - .map( - xmlProperty -> { - String archiverValue = xmlProperty.getValue(); - // archiver property can be comma separated list of archivers - if (archiverValue != null && !archiverValue.isEmpty()) { - return Arrays.stream(archiverValue.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()); - } else { - return defaultArchivers.stream(); - } - }) - .orElse( - defaultArchivers - .stream()) // Use defaultArchivers list if no matching property found + resolveArchiverAliases(channel) .forEach( archiverAlias -> { try { @@ -185,6 +169,12 @@ public long process(List channels) throws JacksonException { return finalCount; } + private Stream resolveArchiverAliases(Channel channel) { + return findPropertyValue(channel, archiverPropertyName) + .map(v -> Arrays.stream(v.split(",")).map(String::trim).filter(s -> !s.isEmpty())) + .orElseGet(defaultArchivers::stream); + } + private Optional findProperty(Channel channel, String propertyName) { return channel.getProperties().stream() .filter(p -> propertyName.equalsIgnoreCase(p.getName())) @@ -203,7 +193,8 @@ private void addChannelChange( Map archiversInfo, Optional archiveProperty, String archiverAlias) { - String pvStatus = findPropertyValue(channel, PV_STATUS_PROPERTY_NAME).orElse(PV_STATUS_INACTIVE); + String pvStatus = + findPropertyValue(channel, PV_STATUS_PROPERTY_NAME).orElse(PV_STATUS_INACTIVE); if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) { ArchivePVOptions newArchiverPV = createArchivePV( From 45d60a8129e96d3c63e628aded70c37925fac270 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:38:17 +0200 Subject: [PATCH 04/13] refactor(aa): split process() into buildArchivePVMap + submitToArchivers process() was doing three things: guard clauses, mapping channels to per-archiver PV options, and submitting actions. Extract the latter two into named methods so process() reads as a three-step narrative. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 52 ++++++++++--------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index aeea0ace..38158f48 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -107,19 +107,24 @@ public long process(List channels) throws JacksonException { return 0; } - // Get Info (policy and version) of each archiver Map archiversInfo = getArchiversInfo(aaURLs); if (archiversInfo.isEmpty()) { return 0; } - Map> archiverAliasToArchivePVOptions = - new HashMap<>(); // AA identifier, ArchivePVOptions - for (String alias : archiversInfo.keySet()) { - archiverAliasToArchivePVOptions.put(alias, new ArrayList<>()); - } - logger.log(Level.INFO, "Get channelfinder properties for aa processor."); + Map> pvsByArchiver = buildArchivePVMap(channels, archiversInfo); + + long count = submitToArchivers(pvsByArchiver, archiversInfo); + logger.log(Level.INFO, () -> String.format("Configured %s channels.", count)); + return count; + } + + private Map> buildArchivePVMap( + List channels, Map archiversInfo) { + Map> result = new HashMap<>(); + archiversInfo.keySet().forEach(alias -> result.put(alias, new ArrayList<>())); + channels.forEach( channel -> { Optional archiveProperty = findProperty(channel, archivePropertyName); @@ -129,44 +134,41 @@ public long process(List channels) throws JacksonException { archiverAlias -> { try { addChannelChange( - channel, - archiverAliasToArchivePVOptions, - archiversInfo, - archiveProperty, - archiverAlias); + channel, result, archiversInfo, archiveProperty, archiverAlias); } catch (Exception e) { logger.log( Level.WARNING, String.format("Failed to process %s", channel), e); } }); } else if (autoPauseOptions.contains(archivePropertyName)) { - aaURLs + archiversInfo .keySet() .forEach( archiverAlias -> - archiverAliasToArchivePVOptions + result .get(archiverAlias) .add(createArchivePV(List.of(), channel, "", PV_STATUS_INACTIVE))); } }); + return result; + } + + private long submitToArchivers( + Map> pvsByArchiver, Map archiversInfo) + throws JacksonException { long count = 0; - for (Map.Entry> e : archiverAliasToArchivePVOptions.entrySet()) { + for (Map.Entry> e : pvsByArchiver.entrySet()) { ArchiverInfo archiverInfo = archiversInfo.get(e.getKey()); if (archiverInfo == null) { logger.log(Level.WARNING, String.format("Failed to process %s", e.getKey())); continue; } - Map archivePVSList = - e.getValue().stream() - .collect( - Collectors.toMap(ArchivePVOptions::getPv, archivePVOptions -> archivePVOptions)); - Map> archiveActionArchivePVMap = - getArchiveActions(archivePVSList, archiverInfo); - count += archiverService.configureAA(archiveActionArchivePVMap, archiverInfo.url()); + Map pvMap = + e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); + count += + archiverService.configureAA(getArchiveActions(pvMap, archiverInfo), archiverInfo.url()); } - long finalCount = count; - logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); - return finalCount; + return count; } private Stream resolveArchiverAliases(Channel channel) { From 29e7a187a2338b0f330e68a11000b3870590cbe6 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:40:21 +0200 Subject: [PATCH 05/13] refactor(aa): streamify getArchiversInfo with filter+collect Replace the imperative for-loop (with continue guard) with a stream pipeline. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 38158f48..13f0ef97 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -282,15 +282,13 @@ private ArchivePVOptions createArchivePV( } private Map getArchiversInfo(Map aaURLs) { - Map result = new HashMap<>(); - for (Map.Entry aa : aaURLs.entrySet()) { - if (StringUtils.isEmpty(aa.getValue())) { - // Empty archiver tagged - continue; - } - List policies = archiverService.getAAPolicies(aa.getValue()); - result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), policies)); - } - return result; + return aaURLs.entrySet().stream() + .filter(aa -> !StringUtils.isEmpty(aa.getValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + aa -> + new ArchiverInfo( + aa.getKey(), aa.getValue(), archiverService.getAAPolicies(aa.getValue())))); } } From fdbcca9fc42ba787e8e4aaa7115822b18397c25f Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 13:49:55 +0200 Subject: [PATCH 06/13] =?UTF-8?q?refactor(aa):=20improve=20log=20messages?= =?UTF-8?q?=20=E2=80=94=20levels,=20content,=20and=20lazy=20evaluation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - INFO: "Processing N channels." / "Configured N channels." frame each run - WARNING (new): "No reachable archivers configured" when archiversInfo is empty - WARNING: per-channel failure now names the channel and archiver alias - WARNING: archiver response anomalies now identify which archiver and what was wrong - FINE (was INFO): per-archiver status query — too chatty for production INFO - FINER: raw status response now includes archiver alias for correlation - All String.format calls wrapped in lambdas for lazy evaluation Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 48 +++++++++++++++---- .../service/ChannelProcessorService.java | 9 ++-- .../service/external/ArchiverService.java | 10 +++- 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 13f0ef97..9c032b80 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -109,14 +109,19 @@ public long process(List channels) throws JacksonException { Map archiversInfo = getArchiversInfo(aaURLs); if (archiversInfo.isEmpty()) { + logger.log( + Level.WARNING, + () -> + String.format( + "No reachable archivers configured; skipping %d channels.", channels.size())); return 0; } - logger.log(Level.INFO, "Get channelfinder properties for aa processor."); + logger.log(Level.INFO, () -> String.format("Processing %d channels.", channels.size())); Map> pvsByArchiver = buildArchivePVMap(channels, archiversInfo); long count = submitToArchivers(pvsByArchiver, archiversInfo); - logger.log(Level.INFO, () -> String.format("Configured %s channels.", count)); + logger.log(Level.INFO, () -> String.format("Configured %d channels.", count)); return count; } @@ -137,7 +142,11 @@ private Map> buildArchivePVMap( channel, result, archiversInfo, archiveProperty, archiverAlias); } catch (Exception e) { logger.log( - Level.WARNING, String.format("Failed to process %s", channel), e); + Level.WARNING, + () -> + String.format( + "Failed to add channel '%s' to archiver '%s': %s", + channel.getName(), archiverAlias, e.getMessage())); } }); } else if (autoPauseOptions.contains(archivePropertyName)) { @@ -160,7 +169,12 @@ private long submitToArchivers( for (Map.Entry> e : pvsByArchiver.entrySet()) { ArchiverInfo archiverInfo = archiversInfo.get(e.getKey()); if (archiverInfo == null) { - logger.log(Level.WARNING, String.format("Failed to process %s", e.getKey())); + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver alias '%s' present in PV map but missing from archiver info; skipping.", + e.getKey())); continue; } Map pvMap = @@ -228,7 +242,12 @@ private Map> getArchiveActions( return Map.of(); } - logger.log(Level.INFO, () -> String.format("Get archiver status in archiver %s", archiverInfo)); + logger.log( + Level.FINE, + () -> + String.format( + "Querying status of %d PVs from archiver '%s'.", + archivePVS.size(), archiverInfo.alias())); Map> result = new EnumMap<>(ArchiveAction.class); Arrays.stream(ArchiveAction.values()) @@ -239,7 +258,11 @@ private Map> getArchiveActions( } List> statuses = archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); - logger.log(Level.FINER, "Statuses {0}", statuses); + logger.log( + Level.FINER, + () -> + String.format( + "Status response from archiver '%s': %s", archiverInfo.alias(), statuses)); statuses.forEach( archivePVStatusJsonMap -> { String archiveStatus = archivePVStatusJsonMap.get("status"); @@ -248,14 +271,21 @@ private Map> getArchiveActions( if (archiveStatus == null || pvName == null) { logger.log( Level.WARNING, - "Missing status or pvName in archivePVStatusJsonMap: {0}", - archivePVStatusJsonMap); + () -> + String.format( + "Archiver '%s' returned entry with missing 'status' or 'pvName': %s", + archiverInfo.alias(), archivePVStatusJsonMap)); return; } ArchivePVOptions archivePVOptions = archivePVS.get(pvName); if (archivePVOptions == null) { - logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName); + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' returned status for unknown PV '%s'; ignoring.", + archiverInfo.alias(), pvName)); return; } diff --git a/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java b/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java index 846b9684..8fd4144d 100644 --- a/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java +++ b/src/main/java/org/phoebus/channelfinder/service/ChannelProcessorService.java @@ -124,10 +124,11 @@ public void sendToProcessors(List channels) { } catch (Exception e) { logger.log( Level.WARNING, - "ChannelProcessor " - + channelProcessor.getClass().getName() - + " throws exception", - e); + () -> + "ChannelProcessor " + + channelProcessor.getClass().getName() + + " threw exception: " + + e.getMessage()); } })); } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 2da37b69..45462aed 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -240,7 +240,11 @@ private long processAction(ArchiveAction action, List options, } return successfulPvs.size(); } catch (ArchiverServiceException e) { - logger.log(Level.WARNING, "Failed to submit " + action.name().toLowerCase() + " request", e); + logger.log( + Level.WARNING, + () -> + String.format( + "Failed to submit %s request: %s", action.name().toLowerCase(), e.getMessage())); return 0; } } @@ -263,7 +267,9 @@ public List getAAPolicies(String aaURL) { return new ArrayList<>(policyMap.keySet()); } catch (Exception e) { // problem collecting policies from AA, so warn and return empty list - logger.log(Level.WARNING, "Could not get AA policies list from: " + aaURL, e); + logger.log( + Level.WARNING, + () -> "Could not get AA policies list from " + aaURL + ": " + e.getMessage()); return List.of(); } } From db2c390392a699559e9eb730051d64214b146dc6 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 14:54:07 +0200 Subject: [PATCH 07/13] fix(aa): propagate status fetch failures; skip archiver to prevent spurious ARCHIVE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, a failed HTTP status batch silently returned an empty list. Those PVs were then treated as "not archived" and submitted for archiving on every subsequent run, spamming the archiver with duplicate requests. ArchiverService now throws ArchiverServiceException on status fetch failure. AAChannelProcessor catches it in getArchiveActions() and returns null, causing submitToArchivers() to skip that archiver entirely for the run. Also renames getStatusesFromPvListQuery/Body → getStatusesViaGet/Post to reflect their transport semantics Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 29 ++++++++++---- .../service/external/ArchiverService.java | 39 +++++-------------- .../service/external/ArchiverServiceTest.java | 7 ++-- 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 9c032b80..586aea4b 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; @@ -179,8 +180,10 @@ private long submitToArchivers( } Map pvMap = e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); - count += - archiverService.configureAA(getArchiveActions(pvMap, archiverInfo), archiverInfo.url()); + Optional>> actions = + getArchiveActions(pvMap, archiverInfo); + if (actions.isEmpty()) continue; + count += archiverService.configureAA(actions.get(), archiverInfo.url()); } return count; } @@ -236,10 +239,10 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { return ArchiveAction.NONE; } - private Map> getArchiveActions( + private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { if (archiverInfo == null) { - return Map.of(); + return Optional.of(Map.of()); } logger.log( @@ -254,10 +257,20 @@ private Map> getArchiveActions( .forEach(archiveAction -> result.put(archiveAction, new ArrayList<>())); // Don't request to archive an empty list. if (archivePVS.isEmpty()) { - return result; + return Optional.of(result); + } + List> statuses; + try { + statuses = archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); + } catch (ArchiverServiceException e) { + logger.log( + Level.WARNING, + () -> + String.format( + "Status fetch failed for archiver '%s'; skipping %d PVs to avoid spurious ARCHIVE submissions: %s", + archiverInfo.alias(), archivePVS.size(), e.getMessage())); + return Optional.empty(); } - List> statuses = - archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); logger.log( Level.FINER, () -> @@ -295,7 +308,7 @@ private Map> getArchiveActions( List archivePVOptionsList = result.get(action); archivePVOptionsList.add(archivePVOptions); }); - return result; + return Optional.of(result); } private ArchivePVOptions createArchivePV( diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 45462aed..b23c33fd 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -81,49 +81,34 @@ private Stream> partitionSet(Set pvSet, int pageSize) { public List> getStatuses( Map archivePVS, String archiverURL, String archiverAlias) { Set pvs = archivePVS.keySet(); - boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); - logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias); - logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers); - - if (postSupportOverride) { - logger.log(Level.INFO, "Post Support"); - return getStatusesFromPvListBody(archiverURL, pvs.stream().toList()); + if (postSupportArchivers.contains(archiverAlias)) { + return getStatusesViaPost(archiverURL, pvs.stream().toList()); } else { - logger.log(Level.INFO, "Query Support"); - Stream> stream = partitionSet(pvs, STATUS_BATCH_SIZE); - - return stream - .map(pvList -> getStatusesFromPvListQuery(archiverURL, pvList)) + return partitionSet(pvs, STATUS_BATCH_SIZE) + .map(pvList -> getStatusesViaGet(archiverURL, pvList)) .flatMap(List::stream) .toList(); } } - private List> getStatusesFromPvListQuery( - String archiverURL, List pvs) { + List> getStatusesViaGet(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; URI pvStatusURI = UriComponentsBuilder.fromUri(URI.create(uriString)) .queryParam(StatusResponseKey.PV.key(), String.join(",", pvs)) .build() .toUri(); - try { List> result = client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed GET status query to %s: %s", uriString, e.getMessage()), e); } } - private List> getStatusesFromPvListBody( - String archiverURL, List pvs) { + List> getStatusesViaPost(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; try { List> result = @@ -136,12 +121,8 @@ private List> getStatusesFromPvListBody( .body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed POST status query to %s: %s", uriString, e.getMessage()), e); } } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 81d60512..18afd590 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -98,10 +98,9 @@ void testGetStatusesInvalidResponse() { .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = - archiverService.getStatuses(pvs, ARCHIVER_URL, "other-archiver"); - - assertTrue(result.isEmpty()); + assertThrows( + ArchiverServiceException.class, + () -> archiverService.getStatuses(pvs, ARCHIVER_URL, "other-archiver")); } @Test From 1a191ffeba727ac0c5994bd31c201670dc731c21 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 14:57:28 +0200 Subject: [PATCH 08/13] refactor(aa): move postSupportArchivers to AAChannelProcessor; expose transport methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ArchiverService was holding archiver-alias config (postSupportArchivers) and making the POST-vs-GET routing decision — a concern that belongs in AAChannelProcessor alongside all other per-archiver configuration. ArchiverService now exposes two public transport methods: getStatusesViaGet(url, pvs) — batches internally (URL length limit) getStatusesViaPost(url, pvs) — sends in a single POST body AAChannelProcessor holds @Value("${aa.post_support:}") and selects the correct transport in getArchiveActions(). The field is volatile so the runtime setter added later is thread-safe. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 11 ++- .../service/external/ArchiverService.java | 31 +++----- .../processors/aa/AAChannelProcessorIT.java | 6 +- .../aa/AAChannelProcessorMultiArchiverIT.java | 9 +-- .../aa/AAChannelProcessorMultiIT.java | 6 +- .../service/external/ArchiverServiceTest.java | 77 +++++++++++-------- 6 files changed, 76 insertions(+), 64 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 586aea4b..13bc6610 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -62,7 +62,10 @@ public class AAChannelProcessor implements ChannelProcessor { private String archiverPropertyName; @Value("${aa.auto_pause:}") - private List autoPauseOptions; + private volatile List autoPauseOptions; + + @Value("${aa.post_support:}") + private volatile List postSupportArchivers; @Autowired private ArchiverService archiverService; @@ -260,8 +263,12 @@ private Optional>> getArchiveActions( return Optional.of(result); } List> statuses; + List pvList = new ArrayList<>(archivePVS.keySet()); try { - statuses = archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); + statuses = + postSupportArchivers.contains(archiverInfo.alias()) + ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) + : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); } catch (ArchiverServiceException e) { logger.log( Level.WARNING, diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index b23c33fd..4fad9945 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -4,7 +4,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -57,9 +56,6 @@ String key() { } } - @Value("${aa.post_support:}") - private List postSupportArchivers; - @Autowired public ArchiverService( @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { @@ -72,26 +68,19 @@ public ArchiverService( this.client = builder.build(); } - private Stream> partitionSet(Set pvSet, int pageSize) { - List list = new ArrayList<>(pvSet); - return IntStream.range(0, (list.size() + pageSize - 1) / pageSize) - .mapToObj(i -> list.subList(i * pageSize, Math.min(pageSize * (i + 1), list.size()))); + private Stream> partition(List pvs, int pageSize) { + return IntStream.range(0, (pvs.size() + pageSize - 1) / pageSize) + .mapToObj(i -> pvs.subList(i * pageSize, Math.min(pageSize * (i + 1), pvs.size()))); } - public List> getStatuses( - Map archivePVS, String archiverURL, String archiverAlias) { - Set pvs = archivePVS.keySet(); - if (postSupportArchivers.contains(archiverAlias)) { - return getStatusesViaPost(archiverURL, pvs.stream().toList()); - } else { - return partitionSet(pvs, STATUS_BATCH_SIZE) - .map(pvList -> getStatusesViaGet(archiverURL, pvList)) - .flatMap(List::stream) - .toList(); - } + public List> getStatusesViaGet(String archiverURL, List pvs) { + return partition(pvs, STATUS_BATCH_SIZE) + .map(batch -> getStatusesViaGetBatch(archiverURL, batch)) + .flatMap(List::stream) + .toList(); } - List> getStatusesViaGet(String archiverURL, List pvs) { + private List> getStatusesViaGetBatch(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; URI pvStatusURI = UriComponentsBuilder.fromUri(URI.create(uriString)) @@ -108,7 +97,7 @@ List> getStatusesViaGet(String archiverURL, List pvs } } - List> getStatusesViaPost(String archiverURL, List pvs) { + public List> getStatusesViaPost(String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; try { List> result = diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index e957dfad..d490d9eb 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -109,8 +110,7 @@ public static void paramableAAChannelProcessorTest( channels.stream() .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); } if (!archiverEndpoint.isEmpty()) { @@ -127,7 +127,7 @@ public static void paramableAAChannelProcessorTest( verify(archiverService).getAAPolicies(anyString()); if (!archiveStatus.isEmpty()) { - verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); } if (!archiverEndpoint.isEmpty()) { diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 48552c57..ea694aa0 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -1,5 +1,6 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; @@ -105,8 +106,7 @@ void testProcessMultiArchivers( namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); // Requests to archiver actionsToNames.forEach( @@ -118,8 +118,7 @@ void testProcessMultiArchivers( when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); // Request to archiver status - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); // Requests to archiver actionsToNames.forEach( @@ -133,7 +132,7 @@ void testProcessMultiArchivers( verify(archiverService, times(2)).getAAPolicies(anyString()); if (!namesToStatuses.isEmpty()) { - verify(archiverService, times(2)).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService, times(2)).getStatusesViaGet(anyString(), anyList()); } } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 3e919fac..68ac2ec9 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -111,8 +112,7 @@ void testProcessMulti( namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - when(archiverService.getStatuses(anyMap(), anyString(), anyString())) - .thenReturn(archivePVStatuses); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); // Mock configureAA when(archiverService.configureAA(anyMap(), anyString())) @@ -122,7 +122,7 @@ void testProcessMulti( assertEquals(expectedProcessedChannels, count); verify(archiverService).getAAPolicies(anyString()); - verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); ArgumentCaptor>> captor = ArgumentCaptor.forClass(Map.class); diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 18afd590..82bba5f6 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -17,14 +17,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; -import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.web.client.MockRestServiceServer; import org.springframework.web.client.RestClient; import tools.jackson.core.JacksonException; @@ -45,8 +43,6 @@ void setUp() { archiverService = new ArchiverService(builder); objectMapper = new ObjectMapper(); - - ReflectionTestUtils.setField(archiverService, "postSupportArchivers", List.of("test-archiver")); } @AfterEach @@ -54,34 +50,43 @@ void tearDown() { mockServer.verify(); } - @ParameterizedTest - @ValueSource(strings = {"other-archiver", "test-archiver"}) - void testGetStatuses(String archiverAlias) throws JacksonException { + @Test + void testGetStatusGet() throws JacksonException { + List pvs = List.of("pv1"); + List> expectedResponse = + List.of(Map.of("pv", "pv1", "status", "Being archived")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) + .andExpect(method(HttpMethod.GET)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - Map pvs = Map.of("pv1", new ArchivePVOptions()); + List> result = archiverService.getStatusesViaGet(ARCHIVER_URL, pvs); + + assertEquals(1, result.size()); + assertEquals("pv1", result.getFirst().get("pv")); + assertEquals("Being archived", result.getFirst().get("status")); + } + + @Test + void testGetStatusPost() throws JacksonException { + + List pvs = List.of("pv1"); List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "Being archived")); - if ("test-archiver".equals(archiverAlias)) { - mockServer - .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) - .andExpect(method(HttpMethod.POST)) - .andExpect(content().json("[\"pv1\"]")) - .andRespond( - withSuccess( - objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - } else { - mockServer - .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) - .andExpect(method(HttpMethod.GET)) - .andRespond( - withSuccess( - objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - } + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"pv1\"]")) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); - List> result = - archiverService.getStatuses(pvs, ARCHIVER_URL, archiverAlias); + List> result = archiverService.getStatusesViaPost(ARCHIVER_URL, pvs); assertEquals(1, result.size()); assertEquals("pv1", result.getFirst().get("pv")); @@ -89,18 +94,30 @@ void testGetStatuses(String archiverAlias) throws JacksonException { } @Test - void testGetStatusesInvalidResponse() { - - Map pvs = Map.of("pv1", new ArchivePVOptions()); + void testGetStatusesGetInvalidResponse() { + List pvs = List.of("pv1"); mockServer .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + assertThrows( + ArchiverServiceException.class, () -> archiverService.getStatusesViaGet(ARCHIVER_URL, pvs)); + } + + @Test + void testGetStatusesPostInvalidResponse() { + List pvs = List.of("pv1"); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) + .andExpect(method(HttpMethod.POST)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + assertThrows( ArchiverServiceException.class, - () -> archiverService.getStatuses(pvs, ARCHIVER_URL, "other-archiver")); + () -> archiverService.getStatusesViaPost(ARCHIVER_URL, pvs)); } @Test From 84d18af6ac2e1099cfbbbcc8d3f3bc7acf19fdbf Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 14:58:56 +0200 Subject: [PATCH 09/13] refactor(aa): demote configureAA log from INFO to FINE; show counts not PV map Logging the full PV map at INFO generates enormous output in production. Replace with a count summary at FINE. Co-Authored-By: Claude Sonnet 4.6 --- .../channelfinder/service/external/ArchiverService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 4fad9945..0960d664 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -178,8 +178,8 @@ private static List validateSubmitActionResponse( } public long configureAA(Map> archivePVS, String aaURL) { - logger.log( - Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL)); + logger.log(Level.FINE, () -> String.format("Configuring actions for %d PVs at archiver %s.", + archivePVS.values().stream().mapToLong(List::size).sum(), aaURL)); long count = 0; // Don't request to archive an empty list. if (archivePVS.isEmpty()) { From 01aba2dc0d15a4bd33abdf6af26c5ba7d8b9ecfa Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Fri, 22 May 2026 14:41:47 +0200 Subject: [PATCH 10/13] feat(aa): cache AA policies with scheduled background refresh Policies are fetched once at startup (@PostConstruct) and refreshed on a configurable fixed delay (aa.policy_refresh_interval_seconds, default 1 h) rather than on every process() call. process() now reads a volatile snapshot, so policy fetches can no longer block or fail mid-run. processorInfo() exposes LastPolicyRefresh and per-archiver policy counts for observability. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 87 +++++++++++++++--- .../service/external/ArchiverService.java | 8 +- src/main/resources/application.properties | 2 + .../processors/aa/AAChannelProcessorIT.java | 12 +-- .../aa/AAChannelProcessorMultiArchiverIT.java | 29 +++--- .../aa/AAChannelProcessorMultiIT.java | 11 ++- .../aa/AAChannelProcessorNoDefaultIT.java | 9 ++ .../aa/AAChannelProcessorNoPauseIT.java | 9 ++ .../aa/AAChannelProcessorPolicyCacheIT.java | 89 +++++++++++++++++++ .../aa/AAChannelProcessorStatusPauseIT.java | 9 ++ .../aa/AAChannelProcessorTagPauseIT.java | 9 ++ 11 files changed, 233 insertions(+), 41 deletions(-) create mode 100644 src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 13bc6610..c5d71730 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -1,12 +1,16 @@ package org.phoebus.channelfinder.configuration; +import jakarta.annotation.PostConstruct; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -24,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; import tools.jackson.core.JacksonException; /** @@ -69,6 +74,12 @@ public class AAChannelProcessor implements ChannelProcessor { @Autowired private ArchiverService archiverService; + @Value("${aa.policy_refresh_interval_seconds:3600}") + private long policyRefreshIntervalSeconds; + + private volatile Map> cachedPolicies = Map.of(); + private volatile Instant lastPolicyRefresh; + @Override public boolean enabled() { return aaEnabled; @@ -79,8 +90,24 @@ public void setEnabled(boolean enabled) { this.aaEnabled = enabled; } + @PostConstruct + public void initPolicyCache() { + refreshPolicies(); + } + + @Scheduled( + fixedDelayString = "${aa.policy_refresh_interval_seconds:3600}", + timeUnit = TimeUnit.SECONDS) + public void scheduledPolicyRefresh() { + refreshPolicies(); + } + @Override public ChannelProcessorInfo processorInfo() { + String policyCounts = + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().size()) + .collect(Collectors.joining(", ")); return new ChannelProcessorInfo( "AAChannelProcessor", aaEnabled, @@ -92,7 +119,15 @@ public ChannelProcessorInfo processorInfo() { "Archivers", aaURLs.keySet().toString(), "AutoPauseOn", - autoPauseOptions.toString())); + autoPauseOptions.toString(), + "PostSupportArchivers", + postSupportArchivers.toString(), + "PolicyRefreshIntervalSeconds", + String.valueOf(policyRefreshIntervalSeconds), + "LastPolicyRefresh", + lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), + "CachedPoliciesPerArchiver", + policyCounts.isEmpty() ? "none" : policyCounts)); } /** @@ -111,7 +146,7 @@ public long process(List channels) throws JacksonException { return 0; } - Map archiversInfo = getArchiversInfo(aaURLs); + Map archiversInfo = getArchiversInfoFromCache(); if (archiversInfo.isEmpty()) { logger.log( Level.WARNING, @@ -244,10 +279,6 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { - if (archiverInfo == null) { - return Optional.of(Map.of()); - } - logger.log( Level.FINE, () -> @@ -331,14 +362,48 @@ private ArchivePVOptions createArchivePV( return newArchiverPV; } - private Map getArchiversInfo(Map aaURLs) { + private Map getArchiversInfoFromCache() { + Map> snapshot = cachedPolicies; return aaURLs.entrySet().stream() - .filter(aa -> !StringUtils.isEmpty(aa.getValue())) + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .filter( + e -> { + if (!snapshot.containsKey(e.getKey())) { + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' has no cached policies (unreachable at last refresh); skipping.", + e.getKey())); + return false; + } + return true; + }) .collect( Collectors.toMap( Map.Entry::getKey, - aa -> - new ArchiverInfo( - aa.getKey(), aa.getValue(), archiverService.getAAPolicies(aa.getValue())))); + e -> new ArchiverInfo(e.getKey(), e.getValue(), snapshot.get(e.getKey())))); + } + + private void refreshPolicies() { + if (aaURLs.isEmpty()) { + logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh."); + return; + } + logger.log(Level.INFO, "Refreshing AA policy cache for {0} archivers.", aaURLs.size()); + Map> updated = + aaURLs.entrySet().stream() + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, e -> archiverService.getAAPolicies(e.getValue()))); + cachedPolicies = Collections.unmodifiableMap(updated); + lastPolicyRefresh = Instant.now(); + logger.log( + Level.INFO, + "AA policy cache refreshed: {0}", + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().size() + " policies") + .collect(Collectors.joining(", "))); } } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 0960d664..5da69fd9 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -178,8 +178,12 @@ private static List validateSubmitActionResponse( } public long configureAA(Map> archivePVS, String aaURL) { - logger.log(Level.FINE, () -> String.format("Configuring actions for %d PVs at archiver %s.", - archivePVS.values().stream().mapToLong(List::size).sum(), aaURL)); + logger.log( + Level.FINE, + () -> + String.format( + "Configuring actions for %d PVs at archiver %s.", + archivePVS.values().stream().mapToLong(List::size).sum(), aaURL)); long count = 0; // Don't request to archive an empty list. if (archivePVS.isEmpty()) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1002887a..9e2c0d25 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -143,6 +143,8 @@ aa.pva=false aa.archive_property_name=archive aa.archiver_property_name=archiver aa.timeout_seconds=15 +# Interval in seconds between automatic AA policy cache refreshes (default: 1 hour) +aa.policy_refresh_interval_seconds=3600 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index d490d9eb..d7c521f1 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,12 @@ class AAChannelProcessorIT { @MockitoBean ArchiverService archiverService; @Autowired AAChannelProcessor aaChannelProcessor; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + @NotNull private static Stream processSource() { return Stream.of( @@ -101,9 +108,6 @@ public static void paramableAAChannelProcessorTest( String archiveStatus, String archiverEndpoint) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - if (!archiveStatus.isEmpty()) { // Mock getStatuses List> archivePVStatuses = @@ -124,8 +128,6 @@ public static void paramableAAChannelProcessorTest( assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); // Verifications - verify(archiverService).getAAPolicies(anyString()); - if (!archiveStatus.isEmpty()) { verify(archiverService).getStatusesViaGet(anyString(), anyList()); } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index ea694aa0..9001e3f2 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -3,7 +3,6 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; @@ -15,6 +14,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -39,6 +39,12 @@ class AAChannelProcessorMultiArchiverIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -99,8 +105,6 @@ void testProcessMultiArchivers( Map namesToStatuses, Map> actionsToNames) throws JacksonException { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() @@ -108,18 +112,6 @@ void testProcessMultiArchivers( .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver - actionsToNames.forEach( - (key, value) -> { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); - }); - - // Request to policies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Request to archiver status - when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver actionsToNames.forEach( (key, value) -> { @@ -128,11 +120,10 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications - verify(archiverService, times(2)).getAAPolicies(anyString()); - + // Verifications: query archiver uses GET, post archiver uses POST (aa.post_support=post) if (!namesToStatuses.isEmpty()) { - verify(archiverService, times(2)).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaPost(anyString(), anyList()); } } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 68ac2ec9..98ae8573 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -42,6 +43,12 @@ class AAChannelProcessorMultiIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -104,9 +111,6 @@ void testProcessMulti( int expectedProcessedChannels) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() @@ -121,7 +125,6 @@ void testProcessMulti( long count = aaChannelProcessor.process(channels); assertEquals(expectedProcessedChannels, count); - verify(archiverService).getAAPolicies(anyString()); verify(archiverService).getStatusesViaGet(anyString(), anyList()); ArgumentCaptor>> captor = diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 38cdb1e4..7c3ff55e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -30,6 +33,12 @@ class AAChannelProcessorNoDefaultIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 9c6342bf..612b1a3e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorNoPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java new file mode 100644 index 00000000..a099b886 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -0,0 +1,89 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import tools.jackson.core.JacksonException; + +@WebMvcTest(AAChannelProcessor.class) +@ExtendWith(MockitoExtension.class) +@TestPropertySource(value = "classpath:application_aa_proc_test.properties") +class AAChannelProcessorPolicyCacheIT { + + @MockitoBean ArchiverService archiverService; + @Autowired AAChannelProcessor aaChannelProcessor; + + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + + @Test + void testProcessDoesNotCallGetAAPolicies() throws JacksonException { + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenReturn(List.of(Map.of("pvName", "PVNoneActive", "status", "Not being archived"))); + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(1L); + clearInvocations(archiverService); + + Channel channel = + new Channel( + "PVNoneActive", + "owner", + List.of( + new Property("archive", "owner", "default"), + new Property("pvStatus", "owner", "Active")), + List.of()); + aaChannelProcessor.process(List.of(channel)); + + verify(archiverService, never()).getAAPolicies(anyString()); + } + + @Test + void testProcessorInfoShowsCacheMetadata() { + ChannelProcessorInfo info = aaChannelProcessor.processorInfo(); + + assertNotEquals("never", info.properties().get("LastPolicyRefresh")); + assertTrue(info.properties().get("CachedPoliciesPerArchiver").contains("default=")); + assertEquals("3600", info.properties().get("PolicyRefreshIntervalSeconds")); + } + + @Test + void testScheduledRefreshUpdatesCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("p1", "p2", "p3")); + clearInvocations(archiverService); + + aaChannelProcessor.scheduledPolicyRefresh(); + + verify(archiverService).getAAPolicies(anyString()); + assertTrue( + aaChannelProcessor + .processorInfo() + .properties() + .get("CachedPoliciesPerArchiver") + .contains("default=3")); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index 86b4f49c..fd48434a 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorStatusPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index 1aeafff0..38f46057 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorTagPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( From 8b5e689f16591f23ed8c5f19af28334e2f3ec252 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 25 May 2026 09:19:55 +0200 Subject: [PATCH 11/13] fix(aa): add connect timeout to ArchiverService so unreachable hosts fail fast Without a connect timeout, TCP SYN drops (firewall, no route) stall for the OS default (~2 min) before returning. Setting connect timeout equal to the configured read timeout ensures both slow and unreachable archivers fail at the same bounded deadline. Co-Authored-By: Claude Sonnet 4.6 --- .../service/external/ArchiverService.java | 1 + .../service/external/ArchiverServiceTest.java | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 5da69fd9..134ffa99 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -61,6 +61,7 @@ public ArchiverService( @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); factory.setReadTimeout(timeoutSeconds * 1000); + factory.setConnectTimeout(timeoutSeconds * 1000); this.client = builder.requestFactory(factory).build(); } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 82bba5f6..4f3ff2bc 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -8,6 +8,8 @@ import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; +import java.io.IOException; +import java.net.ServerSocket; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -367,4 +369,36 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException { assertEquals(1, successfulPvs.size()); assertTrue(successfulPvs.contains("PV1")); } + + @Test + void testRequestTimesOutAtConfiguredTimeout() throws IOException { + int timeoutSeconds = 1; + + // Accept the TCP connection but never send a response, simulating a hung archiver host. + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + Thread serverThread = + new Thread( + () -> { + try { + serverSocket.accept(); + } catch (IOException ignored) { + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = new ArchiverService(timeoutSeconds, builder); + + long start = System.currentTimeMillis(); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://localhost:" + port, List.of("pv1"))); + long elapsedMs = System.currentTimeMillis() - start; + + // Should timeout at ~1s — well short of the OS TCP default (~2 minutes) + assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms"); + } + } } From 29bc5c8edc929be8ab01f6b5a52aa9935bf5d072 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 25 May 2026 14:37:49 +0200 Subject: [PATCH 12/13] feat(processors): give channel processors a tunable, named thread pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel processors (e.g. archiver sync on IOC reconnect) are dispatched asynchronously on every channel write. Previously the pool was anonymous, sized by a single hard-coded property, and used AbortPolicy — meaning a saturated queue would propagate an exception to the HTTP caller and cause receivers to retry, increasing load further. ChannelFinderProcessorExecutor replaces it with a named @Component whose size is driven by processors.max_concurrent_updates (default 10). Core and max are equal to avoid ramp-up lag during bursts of channel updates. The queue is deliberately shallow (N/4) with a discard-oldest rejection policy: when the pool is saturated, the stalest pending batch is evicted in favour of the fresher update, since a newer channel snapshot always supersedes an older one. Individual pool parameters can be overridden via processors.task_executor.* properties without touching the shared default. Tests cover pool-size derivation from defaults and overrides, and confirm the eviction behaviour under a saturated queue. Co-Authored-By: Claude Sonnet 4.6 --- .../phoebus/channelfinder/Application.java | 17 ---- .../ChannelFinderProcessorExecutor.java | 52 ++++++++++++ src/main/resources/application.properties | 20 +++++ .../ChannelFinderProcessorExecutorTest.java | 83 +++++++++++++++++++ 4 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java create mode 100644 src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java diff --git a/src/main/java/org/phoebus/channelfinder/Application.java b/src/main/java/org/phoebus/channelfinder/Application.java index 28d4bee9..7f7e9799 100644 --- a/src/main/java/org/phoebus/channelfinder/Application.java +++ b/src/main/java/org/phoebus/channelfinder/Application.java @@ -31,9 +31,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.FileCopyUtils; @EnableAutoConfiguration @@ -113,19 +111,4 @@ public List channelProcessors() { }); return processors; } - - /** - * {@link TaskExecutor} used when calling {@link ChannelProcessor}s. - * - * @return A {@link TaskExecutor} - */ - @Bean("channelFinderTaskExecutor") - public TaskExecutor taskExecutor() { - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setCorePoolSize(3); - taskExecutor.setMaxPoolSize(10); - taskExecutor.setQueueCapacity(25); - - return taskExecutor; - } } diff --git a/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java new file mode 100644 index 00000000..dff1a099 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java @@ -0,0 +1,52 @@ +package org.phoebus.channelfinder.configuration; + +import java.util.logging.Level; +import java.util.logging.Logger; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +/** + * {@link ThreadPoolTaskExecutor} used when calling {@link ChannelProcessor}s. + * + *

Pool parameters are derived from {@code processors.max_concurrent_updates}. Individual values + * can be overridden via the {@code processors.task_executor.*} properties (values ≤ 0 mean "use the + * derived value"). + */ +@Component("channelFinderTaskExecutor") +public class ChannelFinderProcessorExecutor extends ThreadPoolTaskExecutor { + + private static final Logger logger = + Logger.getLogger(ChannelFinderProcessorExecutor.class.getName()); + + public ChannelFinderProcessorExecutor( + @Value("${processors.max_concurrent_updates:10}") int maxConcurrent, + @Value("${processors.task_executor.core_pool_size:-1}") int overrideCore, + @Value("${processors.task_executor.max_pool_size:-1}") int overrideMax, + @Value("${processors.task_executor.queue_capacity:-1}") int overrideQueue) { + + int core = overrideCore > 0 ? overrideCore : maxConcurrent; + int max = overrideMax > 0 ? overrideMax : maxConcurrent; + int queue = overrideQueue > 0 ? overrideQueue : Math.max(1, maxConcurrent / 4); + + setCorePoolSize(core); + setMaxPoolSize(max); + setQueueCapacity(queue); + setRejectedExecutionHandler( + (runnable, executor) -> { + if (!executor.isShutdown()) { + executor.getQueue().poll(); // evict oldest (stale) task to make room + executor.getQueue().offer(runnable); + logger.log( + Level.WARNING, + () -> + "ChannelFinderProcessorExecutor task queue full — evicted oldest task to admit fresher update" + + " (active=" + + executor.getActiveCount() + + ", queued=" + + executor.getQueue().size() + + ")"); + } + }); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 9e2c0d25..e9564e73 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -134,6 +134,26 @@ logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO ################ Processor ################################################## processors.chunking.size=10000 +# ChannelFinderProcessorExecutor: controls the thread pool used to dispatch +# channel processor tasks (e.g. archiver sync on IOC reconnect). +# +# How many batches can be processed concurrently. Each in-flight batch holds +# one thread for the duration of its archiver HTTP calls. Increase this if you +# have more archivers or expect large-scale maintenance windows with many IOC +# restarts happening at once. +# +# Pool parameters are derived from this value: +# corePoolSize = max_concurrent_updates +# maxPoolSize = max_concurrent_updates +# queueCapacity = max(1, max_concurrent_updates / 4) +# +# Advanced: uncomment the task_executor properties below to override +# individual values (values <= 0 mean "use the derived value"). +processors.max_concurrent_updates=10 +# processors.task_executor.core_pool_size=-1 +# processors.task_executor.max_pool_size=-1 +# processors.task_executor.queue_capacity=-1 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null diff --git a/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java new file mode 100644 index 00000000..8fead183 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java @@ -0,0 +1,83 @@ +package org.phoebus.channelfinder.processors; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.phoebus.channelfinder.configuration.ChannelFinderProcessorExecutor; + +class ChannelFinderProcessorExecutorTest { + + @Test + void testDefaultPoolSizing() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(4, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(4, ex.getCorePoolSize()); + Assertions.assertEquals(4, ex.getMaxPoolSize()); + Assertions.assertEquals(1, ex.getQueueCapacity()); // max(1, 4/4) = 1 + ex.shutdown(); + } + + @Test + void testMinQueueCapacity() throws Exception { + // max(1, 1/4) = max(1, 0) = 1 + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(1, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testOverridesTakePrecedence() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(10, 2, 6, 8); + ex.initialize(); + Assertions.assertEquals(2, ex.getCorePoolSize()); + Assertions.assertEquals(6, ex.getMaxPoolSize()); + Assertions.assertEquals(8, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testRejectionHandlerEvictsOldestAndAdmitsNew() throws Exception { + // 1 thread, queue=1 → third submit triggers the rejection handler + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, 1); + ex.initialize(); + + CountDownLatch blocker = new CountDownLatch(1); + CountDownLatch task1Ready = new CountDownLatch(1); + AtomicBoolean task2Ran = new AtomicBoolean(false); + AtomicBoolean task3Ran = new AtomicBoolean(false); + CountDownLatch task3Done = new CountDownLatch(1); + + // task1: blocks the sole thread + ex.execute( + () -> { + task1Ready.countDown(); + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + task1Ready.await(2, TimeUnit.SECONDS); + + // task2: sits in queue (the stale task to be evicted) + ex.execute(() -> task2Ran.set(true)); + + // task3: triggers rejection handler → evicts task2, admits task3 + ex.execute( + () -> { + task3Ran.set(true); + task3Done.countDown(); + }); + + blocker.countDown(); // release the blocked thread + Assertions.assertTrue(task3Done.await(2, TimeUnit.SECONDS), "task3 did not complete in time"); + + Assertions.assertTrue(task3Ran.get(), "Newer task must run after eviction"); + Assertions.assertFalse(task2Ran.get(), "Older queued task must be evicted"); + + ex.shutdown(); + } +} From f5a6c0b6514caa2bc161de70be7e77d9d44126d9 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 25 May 2026 15:05:00 +0200 Subject: [PATCH 13/13] fix(aa): warn when Active PVs are skipped because their archiver is down MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the status fetch for an archiver fails (host unreachable), the processor already skips that archiver to avoid spurious ARCHIVE submissions. However, channels with pvStatus=Active that are currently paused in the archiver will remain paused indefinitely — until the archiver recovers and another channel update happens to arrive. Add warnSkippedActivePvs() to make that risk visible: after a failed status fetch, filter the skipped batch to Active channels and log them at WARNING with a count and up to 10 names, so operators can see which PVs may need a manual RESUME if the archiver was down for an extended period. Test: AAChannelProcessorIT.testStatusFetchFailureSkipsConfigureAndReturnsZero verifies that when getStatusesViaGet throws, process() returns 0 and configureAA is never called. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 29 ++++++++++++++++++- .../processors/aa/AAChannelProcessorIT.java | 15 ++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index c5d71730..2781b57b 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -47,6 +47,7 @@ public class AAChannelProcessor implements ChannelProcessor { private static final String PV_STATUS_PROPERTY_NAME = "pvStatus"; // Matches in recsync private static final String PV_STATUS_INACTIVE = "Inactive"; public static final String PV_STATUS_ACTIVE = "Active"; + private static final int SKIPPED_PV_LOG_LIMIT = 10; @Value("${aa.enabled:true}") private boolean aaEnabled; @@ -220,12 +221,38 @@ private long submitToArchivers( e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); Optional>> actions = getArchiveActions(pvMap, archiverInfo); - if (actions.isEmpty()) continue; + if (actions.isEmpty()) { + warnSkippedActivePvs(e.getKey(), e.getValue()); + continue; + } count += archiverService.configureAA(actions.get(), archiverInfo.url()); } return count; } + private void warnSkippedActivePvs(String archiverAlias, List pvs) { + List activePvNames = + pvs.stream() + .filter(pv -> PV_STATUS_ACTIVE.equals(pv.getPvStatus())) + .map(ArchivePVOptions::getPv) + .toList(); + if (activePvNames.isEmpty()) { + return; + } + int total = activePvNames.size(); + String sample = + String.join(", ", activePvNames.subList(0, Math.min(SKIPPED_PV_LOG_LIMIT, total))); + String suffix = + total > SKIPPED_PV_LOG_LIMIT ? " ... (" + (total - SKIPPED_PV_LOG_LIMIT) + " more)" : ""; + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' unreachable: %d Active PV(s) may remain paused until the archiver recovers" + + " and another channel update arrives — RESUME was not sent: %s%s", + archiverAlias, total, sample, suffix)); + } + private Stream resolveArchiverAliases(Channel channel) { return findPropertyValue(channel, archiverPropertyName) .map(v -> Arrays.stream(v.split(",")).map(String::trim).filter(s -> !s.isEmpty())) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index d7c521f1..92c9a9fe 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -6,6 +6,7 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,6 +26,7 @@ import org.phoebus.channelfinder.configuration.ChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; @@ -165,6 +167,19 @@ void testProcessNoPVs() throws JacksonException { // But since list is empty, process returns 0 early } + @Test + void testStatusFetchFailureSkipsConfigureAndReturnsZero() throws JacksonException { + Channel channel = + new Channel("PVPausedActive", "owner", List.of(archiveProperty, activeProperty), List.of()); + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenThrow(new ArchiverServiceException("Connection refused")); + + long count = aaChannelProcessor.process(List.of(channel)); + + assertEquals(0L, count); + verify(archiverService, never()).configureAA(anyMap(), anyString()); + } + @ParameterizedTest @MethodSource("processSource") void testProcessNotArchivedActive(Channel channel, String archiveStatus, String archiverEndpoint)