From c57d4d716e97490d18e1cbe112d6606431507bd5 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 15 May 2026 13:52:23 +0200 Subject: [PATCH 1/5] fix(server): abort chain always continues after commit failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously abort() returned the error on CancelledError and raised a new one on other failures, leaving the chain errored so any queued commit (notably the disconnect transaction) was never executed — IOCs could stay Active in CF after an upload failure. Co-authored-by: Sky Brewer --- server/recceiver/recast.py | 8 +++--- server/tests/unit/test_recast.py | 42 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 6cf56897..bfe69c52 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -270,9 +270,8 @@ def __init__(self, proto, endpoint): def close(self): log.info("Close session from %s", self.ep) - # Do not cancel self._commit_chain here. Any data commit that is still queued - # behind the global lock must be allowed to complete so that channels - # are registered as active in CF before the disconnect is processed. + # Do not cancel self._commit_chain here. Any pending data commit must complete + # so that channels are registered as active in CF before the disconnect is processed. # The disconnect transaction is chained after self._commit_chain and will execute # once all preceding commits have finished. self.transaction = Transaction(self.ep, id(self)) @@ -297,11 +296,10 @@ def commit(_ignored): def abort(err): if err.check(defer.CancelledError): log.info("Commit cancelled: %s", transaction) - return err else: log.error("Commit failure: %s", err) self.proto.transport.loseConnection() - raise defer.CancelledError() + return None # always continue chain so disconnect commit runs self._commit_chain.addCallback(commit).addErrback(abort) diff --git a/server/tests/unit/test_recast.py b/server/tests/unit/test_recast.py index 026a323b..19c63452 100644 --- a/server/tests/unit/test_recast.py +++ b/server/tests/unit/test_recast.py @@ -1,5 +1,6 @@ from unittest.mock import MagicMock +from twisted.internet import defer from twisted.internet.address import IPv4Address from recceiver.recast import CollectionSession @@ -13,6 +14,47 @@ def _make_session() -> CollectionSession: return session +class TestCollectionSessionAbort: + def test_disconnect_commit_runs_after_data_commit_cancels(self): + session = _make_session() + committed = [] + + def mock_commit(t): + committed.append(t) + if t.connected: + return defer.fail(defer.CancelledError("CF unavailable")) + return None + + session.factory.commit.side_effect = mock_commit + + session.ioc_info("IOCNAME", "IOC1") + session.flush() + session.close() + + assert len(committed) == 2 + assert committed[-1].connected is False + + def test_disconnect_commit_runs_after_unexpected_commit_error(self): + session = _make_session() + committed = [] + + def mock_commit(t): + committed.append(t) + if t.connected: + return defer.fail(RuntimeError("unexpected")) + return None + + session.factory.commit.side_effect = mock_commit + + session.ioc_info("IOCNAME", "IOC1") + session.flush() + session.close() + + assert len(committed) == 2 + assert committed[-1].connected is False + session.proto.transport.loseConnection.assert_called_once() + + class TestCollectionSessionFlush: def test_client_infos_carried_forward_after_intermediate_flush(self): session = _make_session() From 7c2e6a3f29abae710c54f4949f48245043793988 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 15 May 2026 13:54:49 +0200 Subject: [PATCH 2/5] feat(server): replace global CF lock with per-IOC locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The single global DeferredLock serialised all IOC commits behind one queue. Under load, the lock depth determined how long IOCs waited to be marked active or inactive, and one stuck commit blocked every other. Per-IOC locks let different IOCs commit in parallel while keeping same-IOC transactions serialised. _state_lock (threading.Lock) guards the shared iocs/channel_ioc_ids dicts against concurrent thread writes. The CF push receives a targeted snapshot of the channels relevant to this commit (records being deleted plus the IOC's current channel set) so snapshot cost scales with the commit rather than total channel count. _ioc_channels tracks which channels belong to each IOC for O(1) disconnect cost; registration deduplicates to prevent double-counting. stopService drains all in-flight per-IOC locks before running the clean_on_stop sweep, preventing Active pushes from racing the sweep. The commit path moves to @inlineCallbacks so retry waits use task.deferLater — no sleeping worker threads between attempts. Also introduces CFUpdateAbortedError to distinguish exhausted CF retries from cancellation, and fixes chain_error to surface unexpected exceptions rather than swallowing them. Co-authored-by: Sky Brewer --- server/recceiver/cf/processor.py | 281 +++++++++++++++---------- server/tests/unit/cf/test_processor.py | 2 +- 2 files changed, 176 insertions(+), 107 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index ec49445e..ffe1e453 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -1,5 +1,6 @@ import datetime import logging +import threading import time from collections import defaultdict from typing import Callable, Dict, List, Optional, Set @@ -29,6 +30,10 @@ log = logging.getLogger(__name__) +class CFUpdateAbortedError(Exception): + """Raised when a CF push is abandoned after exhausting all retries.""" + + @implementer(interfaces.IProcessor) class CFProcessor(service.Service): """IProcessor plugin that synchronises IOC record data to Channelfinder. @@ -44,7 +49,11 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter): self.iocs: Dict[str, IOCInfo] = {} self.client: Optional[ChannelFinderAdapter] = None self.current_time: Callable[[Optional[str]], str] = get_current_time - self.lock: DeferredLock = DeferredLock() + self.lock: DeferredLock = DeferredLock() # lifecycle lock: serialises start/stop + self._ioc_locks: Dict[str, DeferredLock] = {} + self._ioc_channels: Dict[str, Set[str]] = defaultdict(set) # iocid → set of channel names + self._state_lock: threading.Lock = threading.Lock() + self._cancelled: Dict[str, bool] = {} self._statusLoop = None def startService(self): @@ -163,57 +172,67 @@ def _stop_service_with_lock(self): """ log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: - return deferToThread(self.clean_service) + drains = [lock.run(lambda: None) for lock in list(self._ioc_locks.values())] + d = defer.DeferredList(drains, consumeErrors=True) + d.addCallback(lambda _: deferToThread(self.clean_service)) + return d def _start_background_clean(self): log.info("CF Clean: background startup sweep beginning") deferToThread(self.clean_service).addErrback(lambda err: log.error("CF Clean background sweep failed: %s", err)) - # @defer.inlineCallbacks # Twisted v16 does not support cancellation! - def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: - """Commit a transaction to Channelfinder.""" - return self.lock.run(self._commit_with_lock, transaction_record) - - def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: - self.cancelled = False - - t = deferToThread(self._commit_with_thread, transaction) - - def cancel_commit(d: defer.Deferred): - self.cancelled = True - d.callback(None) + def _get_ioc_lock(self, iocid: str) -> DeferredLock: + """Return the per-IOC DeferredLock, creating it on first use. - d: defer.Deferred = defer.Deferred(cancel_commit) - - def wait_for_thread(_ignored): - if self.cancelled: - return t - - d.addCallback(wait_for_thread) + Must only be called from the reactor thread. + """ + if iocid not in self._ioc_locks: + self._ioc_locks[iocid] = DeferredLock() + return self._ioc_locks[iocid] - def chain_error(err): - """Handle errors from the commit thread. + def _prune_ioc_state(self, result, iocid: str): + """Remove per-IOC bookkeeping once an IOC has fully departed. - Note this is not foolproof as the thread may still be running. - """ - if not err.check(defer.CancelledError): - log.error("CF_COMMIT FAILURE: %s", err) - if self.cancelled: - if not err.check(defer.CancelledError): - raise defer.CancelledError() - return err - else: - d.callback(None) + Called after the per-IOC lock is released. Only prunes when the lock + is free (no new commit queued) and the IOC is no longer in self.iocs. + Always returns result so it is safe to use with addBoth. + """ + lock = self._ioc_locks.get(iocid) + if lock is not None and not lock.locked and iocid not in self.iocs: + self._ioc_locks.pop(iocid, None) + self._cancelled.pop(iocid, None) + self._ioc_channels.pop(iocid, None) + return result - def chain_result(result): - if self.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") - else: - d.callback(None) + def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder. - t.addCallbacks(chain_result, chain_error) + Uses a per-IOC DeferredLock so commits from different IOCs run in + parallel while transactions from the same IOC stay serialised. + """ + iocid = f"{transaction_record.source_address.host}:{transaction_record.source_address.port}" + lock = self._get_ioc_lock(iocid) + d = lock.run(self._commit_with_lock, transaction_record, iocid) + d.addBoth(self._prune_ioc_state, iocid) return d + @defer.inlineCallbacks + def _commit_with_lock(self, transaction: interfaces.ITransaction, iocid: str) -> defer.Deferred: + self._cancelled[iocid] = False + try: + result = yield deferToThread(self._prepare_commit, transaction, iocid) + if result is None: + return # disconnect-before-upload: nothing to push + ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap = result + yield self._push_to_cf_async(iocid, record_info_by_name, records_to_delete, ioc_info, iocs_snap, ciids_snap) + except defer.CancelledError: + raise + except CFUpdateAbortedError as err: + log.exception("CF_COMMIT ABORTED after exhausting retries: %s", err) + except Exception as err: + log.exception("CF_COMMIT FAILURE: %s", err) + raise + def transaction_to_record_infos( self, ioc_info: IOCInfo, transaction: interfaces.ITransaction ) -> Dict[str, RecordInfo]: @@ -300,10 +319,12 @@ def update_ioc_infos( if transaction.initial: self.iocs[iocid] = ioc_info if not transaction.connected: - records_to_delete.extend(self.channel_ioc_ids.keys()) + records_to_delete.extend(self._ioc_channels.get(iocid, set())) for record_name in record_info_by_name: - self.channel_ioc_ids[record_name].append(iocid) - self.iocs[iocid].channelcount += 1 + if iocid not in self.channel_ioc_ids[record_name]: + self.channel_ioc_ids[record_name].append(iocid) + self.iocs[iocid].channelcount += 1 + self._ioc_channels[iocid].add(record_name) if self.cf_config.alias_enabled: self._register_aliases(record_info_by_name[record_name].aliases, iocid) for record_name in records_to_delete: @@ -314,14 +335,24 @@ def update_ioc_infos( def _register_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: - self.channel_ioc_ids[alias].append(iocid) - self.iocs[iocid].channelcount += 1 + if iocid not in self.channel_ioc_ids[alias]: + self.channel_ioc_ids[alias].append(iocid) + self.iocs[iocid].channelcount += 1 + self._ioc_channels[iocid].add(alias) def _remove_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: self.remove_channel(alias, iocid) - def _commit_with_thread(self, transaction: interfaces.ITransaction): + def _prepare_commit(self, transaction: interfaces.ITransaction, iocid: str): + """Build IOC/record state and update shared dicts under _state_lock. + + Returns a tuple of (ioc_info, record_info_by_name, records_to_delete, + iocs_snapshot, channel_ioc_ids_snapshot) ready for the CF push, or + None if the IOC disconnected before completing its initial upload. + + Runs in a thread pool thread; must not touch the reactor. + """ host = transaction.source_address.host port = transaction.source_address.port @@ -368,22 +399,81 @@ def _commit_with_thread(self, transaction: interfaces.ITransaction): ) record_infos = self.transaction_to_record_infos(ioc_info, transaction) - records_to_delete = list(transaction.records_to_delete) log.debug("Delete records: %s", records_to_delete) - record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + if not transaction.connected and ioc_info.id not in self.iocs: log.warning( "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", host, port, ) - return - self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) - poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) - if not poll_success: - raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") + return None + + with self._state_lock: + self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) + iocs_snap = dict(self.iocs) + # Snapshot only channels this IOC owns plus any being deleted; that + # covers everything _handle_channel_is_old can look up without + # cloning the full channel map on every commit. + channels_to_snapshot = set(records_to_delete) | self._ioc_channels.get(iocid, set()) + ciids_snap = {k: list(self.channel_ioc_ids[k]) for k in channels_to_snapshot if k in self.channel_ioc_ids} + + return ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap + + @defer.inlineCallbacks + def _push_to_cf_async( + self, + iocid: str, + record_info_by_name: Dict[str, "RecordInfo"], + records_to_delete: List[str], + ioc_info: "IOCInfo", + iocs: Dict[str, "IOCInfo"], + channel_ioc_ids: Dict[str, List[str]], + ) -> defer.Deferred: + """Retry CF update until success, service stop, or retry limit. + + Each CF call runs in a thread pool thread via deferToThread. Retry + waits use task.deferLater so no thread is held between attempts. + """ + from twisted.internet import reactor + + count = 0 + sleep = 1.0 + log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) + while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + if not self.running or self._cancelled.get(iocid, False): + log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + return + count += 1 + t0 = time.monotonic() + try: + yield deferToThread( + self._update_channelfinder, record_info_by_name, records_to_delete, ioc_info, iocs, channel_ioc_ids + ) + elapsed = time.monotonic() - t0 + metrics.cf_commit_duration_seconds.observe(elapsed) + metrics.cf_commits_total.labels(result="success").inc() + log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) + return + except defer.CancelledError: + self._cancelled[iocid] = True + raise + except RequestException: + elapsed = time.monotonic() - t0 + log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) + retry_seconds = min(60.0, sleep) + log.info("CF push retry in %s seconds", retry_seconds) + try: + yield task.deferLater(reactor, retry_seconds, lambda: None) + except defer.CancelledError: + self._cancelled[iocid] = True + raise + sleep *= 1.5 + metrics.cf_commits_total.labels(result="cancelled").inc() + log.error("CF push gave up after %d attempts: %s", count, ioc_info) + raise CFUpdateAbortedError(f"Failed to commit transaction after {count} attempts: {ioc_info}") def remove_channel(self, record_name: str, iocid: str) -> None: """Unlink a channel from an IOC in channel_ioc_ids and decrement channelcount. @@ -392,6 +482,7 @@ def remove_channel(self, record_name: str, iocid: str) -> None: and deletes the IOC entry when its channelcount reaches zero. """ self.channel_ioc_ids[record_name].remove(iocid) + self._ioc_channels[iocid].discard(record_name) if iocid not in self.iocs: if len(self.channel_ioc_ids[record_name]) == 0: del self.channel_ioc_ids[record_name] @@ -440,41 +531,8 @@ def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) - def _push_to_cf( - self, - record_info_by_name: Dict[str, RecordInfo], - records_to_delete: List[str], - ioc_info: IOCInfo, - ) -> bool: - log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) - count = 0 - sleep = 1.0 - while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: - if not self.running: - log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) - return False - count += 1 - t0 = time.monotonic() - try: - self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) - elapsed = time.monotonic() - t0 - metrics.cf_commit_duration_seconds.observe(elapsed) - metrics.cf_commits_total.labels(result="success").inc() - log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) - return True - except RequestException: - elapsed = time.monotonic() - t0 - log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) - retry_seconds = min(60, sleep) - log.info("CF push retry in %s seconds", retry_seconds) - time.sleep(retry_seconds) - sleep *= 1.5 - metrics.cf_commits_total.labels(result="cancelled").inc() - log.error("CF push gave up after %d attempts: %s", count, ioc_info) - return False - - def _assert_not_cancelled(self, context: str) -> None: - if self.cancelled: + def _assert_not_cancelled(self, iocid: str, context: str) -> None: + if self._cancelled.get(iocid, False) or not self.running: raise defer.CancelledError(f"Processor cancelled: {context}") def _update_channelfinder( @@ -482,25 +540,32 @@ def _update_channelfinder( record_info_by_name: Dict[str, RecordInfo], records_to_delete: List[str], ioc_info: IOCInfo, + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: + """Push one IOC's changes to ChannelFinder. + + Uses iocs and channel_ioc_ids snapshots taken at commit time so reads + are consistent even when other IOC commits run concurrently in threads. + """ log.info("CF Update IOC: %s", ioc_info) log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) recceiverid = self.cf_config.recceiver_id new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id - if iocid not in self.iocs and record_info_by_name: - # Disconnect-before-upload is already logged in _commit_with_thread. + if iocid not in iocs and record_info_by_name: + # Disconnect-before-upload is already logged in _prepare_commit. log.warning( "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, - len(self.iocs), + len(iocs), ) if ioc_info.hostname is None or ioc_info.ioc_name is None: raise IOCMissingInfoError(ioc_info) - self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") + self._assert_not_cancelled(iocid, f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] log.debug("Find existing channels by IOCID: %s", ioc_info) @@ -516,11 +581,12 @@ def _update_channelfinder( channels, record_info_by_name, iocid, + iocs, + channel_ioc_ids, ) - # now pvNames contains a list of pv's new on this host/ioc existing_channels = self._get_existing_channels(new_channels) - self._assert_not_cancelled(f"after fetching existing channels for {ioc_info}") + self._assert_not_cancelled(iocid, f"after fetching existing channels for {ioc_info}") self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid @@ -532,7 +598,6 @@ def _update_channelfinder( else: if old_channels and len(old_channels) != 0: self._cf_set_chunked(channels) - self._assert_not_cancelled(f"after setting channels for {ioc_info}") def _process_new_channels( self, @@ -584,6 +649,8 @@ def _handle_channels( channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], iocid: str, + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: """Handle channels already present in Channelfinder for this IOC. @@ -594,8 +661,10 @@ def _handle_channels( for cf_channel in old_channels: if not new_channels or cf_channel.name in records_to_delete: log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) - if cf_channel.name in self.channel_ioc_ids: - self._handle_channel_is_old(cf_channel, ioc_info, recceiverid, channels, record_info_by_name) + if cf_channel.name in channel_ioc_ids: + self._handle_channel_is_old( + cf_channel, ioc_info, recceiverid, channels, record_info_by_name, iocs, channel_ioc_ids + ) else: self._orphan_channel(cf_channel, ioc_info, channels, record_info_by_name) else: @@ -611,12 +680,14 @@ def _handle_channel_is_old( recceiverid: str, channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: """Channel exists in CF but not in this commit — re-assign to its last known IOC.""" - last_ioc_id = self.channel_ioc_ids[cf_channel.name][-1] - cf_channel.owner = self.iocs[last_ioc_id].owner + last_ioc_id = channel_ioc_ids[cf_channel.name][-1] + cf_channel.owner = iocs[last_ioc_id].owner cf_channel.properties = _merge_property_lists( - create_default_properties(ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel), + create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), cf_channel, self.managed_properties, ) @@ -627,13 +698,11 @@ def _handle_channel_is_old( for alias_name in record_info_by_name[cf_channel.name].aliases: # Legacy alias handling retained to avoid changing runtime behavior. alias_channel = CFChannel(alias_name, "", []) - if alias_name in self.channel_ioc_ids: - last_alias_ioc_id = self.channel_ioc_ids[alias_name][-1] - alias_channel.owner = self.iocs[last_alias_ioc_id].owner + if alias_name in channel_ioc_ids: + last_alias_ioc_id = channel_ioc_ids[alias_name][-1] + alias_channel.owner = iocs[last_alias_ioc_id].owner alias_channel.properties = _merge_property_lists( - create_default_properties( - ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel - ), + create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), alias_channel, self.managed_properties, ) diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 7e3643fe..2294e602 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -78,7 +78,7 @@ def test_is_no_op_when_no_active_channels(self): class TestUpdateChannelFinder: def _make_proc(self): proc, adapter = make_processor_with_mock() - proc.cancelled = False + proc.running = True proc.managed_properties = set() proc.record_property_names_list = set() proc.env_vars = {} From 22abde4a60d72a2a956b6e5d0356faad743ea1ff Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 15 May 2026 13:57:05 +0200 Subject: [PATCH 3/5] test(server): cover per-IOC locking and commit error paths deferToThread is monkeypatched to return synchronously-resolved Deferreds so the full _commit_with_lock callback chain can be exercised without a reactor. The prepare_result tuple matches the (ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap) signature of _prepare_commit; the push phase is controlled via _push_to_cf_async. Covers lock identity, iocid routing, prune lifecycle, and all four chain_error paths. Co-authored-by: Sky Brewer --- server/tests/unit/cf/test_processor.py | 194 +++++++++++++++++++++++-- 1 file changed, 179 insertions(+), 15 deletions(-) diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 2294e602..c9815822 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,9 +1,12 @@ -import time +from unittest.mock import MagicMock from requests import RequestException +from twisted.internet import defer +from twisted.internet.address import IPv4Address +from twisted.internet.defer import DeferredLock from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo -from recceiver.cf.processor import CFProcessor +from recceiver.cf.processor import CFProcessor, CFUpdateAbortedError from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc from tests.unit.cf.mock_adapter import MockCFAdapter from tests.unit.conftest import make_adapter @@ -20,6 +23,16 @@ def make_processor_with_mock(): return proc, adapter +_HOST_A = "1.2.3.4" # NOSONAR +_HOST_B = "5.6.7.8" # NOSONAR + + +def make_transaction(host: str = _HOST_A, port: int = 5064) -> MagicMock: + t = MagicMock() + t.source_address = IPv4Address("TCP", host, port) + return t + + class TestRemoveChannel: def test_missing_iocid_does_not_raise(self): proc = make_processor() @@ -89,7 +102,7 @@ def test_registers_new_channel_as_active(self): ioc = make_ioc() proc.iocs[ioc.id] = ioc - proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc) + proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc, proc.iocs, proc.channel_ioc_ids) assert "PV:1" in adapter._channels status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) @@ -115,30 +128,181 @@ def test_orphans_channel_absent_from_local_state(self): ] ) - proc._update_channelfinder({}, [], ioc) + proc._update_channelfinder({}, [], ioc, proc.iocs, proc.channel_ioc_ids) status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) assert status.value == PVStatus.INACTIVE.value class TestPushToCF: - def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): - monkeypatch.setattr(time, "sleep", lambda _: None) + """Tests for _push_to_cf_async: retry abandonment when stopped or cancelled.""" + + def _run_async_push(self, monkeypatch, processor, ioc, side_effect_fn): + iocid = ioc.id + processor._cancelled[iocid] = False + + def sync_thread(fn, *args): + try: + fn(*args) + return defer.succeed(None) + except Exception as exc: + return defer.fail(exc) + monkeypatch.setattr("recceiver.cf.processor.deferToThread", sync_thread) + monkeypatch.setattr("recceiver.cf.processor.task.deferLater", lambda *a, **kw: defer.succeed(None)) + monkeypatch.setattr(processor, "_update_channelfinder", side_effect_fn) + + results, errors = [], [] + processor._push_to_cf_async(iocid, {}, [], ioc, {}, {}).addCallbacks(results.append, errors.append) + return results, errors + + def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): processor = make_processor() processor.running = True processor.cf_config.push_always_retry = True + ioc = make_ioc() + call_count = [0] - call_count = 0 - - def failing_update(record_info_by_name, records_to_delete, ioc_info): - nonlocal call_count - call_count += 1 + def failing_update(*args): + call_count[0] += 1 processor.running = False raise RequestException("CF unreachable") - monkeypatch.setattr(processor, "_update_channelfinder", failing_update) - result = processor._push_to_cf({}, [], make_ioc()) + results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) + assert len(results) == 1 and len(errors) == 0 + assert call_count[0] == 1 + + def test_abandons_push_when_ioc_cancelled_during_retry(self, monkeypatch): + processor = make_processor() + processor.running = True + processor.cf_config.push_always_retry = True + ioc = make_ioc() + iocid = ioc.id + call_count = [0] + + def failing_update(*args): + call_count[0] += 1 + processor._cancelled[iocid] = True + raise RequestException("CF unreachable") + + results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) + assert len(results) == 1 and len(errors) == 0 + assert call_count[0] == 1 + + +class TestPerIocLocking: + def test_different_iocs_get_different_locks(self): + proc = make_processor() + lock_a = proc._get_ioc_lock(f"{_HOST_A}:5064") + lock_b = proc._get_ioc_lock(f"{_HOST_B}:5064") + assert lock_a is not lock_b + + def test_same_ioc_gets_same_lock(self): + proc = make_processor() + lock1 = proc._get_ioc_lock(f"{_HOST_A}:5064") + lock2 = proc._get_ioc_lock(f"{_HOST_A}:5064") + assert lock1 is lock2 + + def test_commit_routes_to_correct_iocid(self, monkeypatch): + proc = make_processor() + routed_iocids = [] + + def fake_lock_run(fn, transaction, iocid): + routed_iocids.append(iocid) + return defer.succeed(None) - assert result is False - assert call_count == 1 + lock = DeferredLock() + monkeypatch.setattr(lock, "run", fake_lock_run) + monkeypatch.setattr(proc, "_get_ioc_lock", lambda _iocid: lock) + + proc.commit(make_transaction(_HOST_A, 5064)) + assert routed_iocids == [f"{_HOST_A}:5064"] + + def test_prune_removes_state_after_ioc_disconnects(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + proc._ioc_locks[iocid] = DeferredLock() + proc._cancelled[iocid] = False + proc._ioc_channels[iocid].add("CHAN:1") + # iocid is NOT in proc.iocs → IOC has disconnected + + proc._prune_ioc_state(None, iocid) + + assert iocid not in proc._ioc_locks + assert iocid not in proc._cancelled + assert iocid not in proc._ioc_channels + + def test_prune_preserves_state_while_ioc_is_active(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + proc._ioc_locks[iocid] = DeferredLock() + proc._cancelled[iocid] = False + proc.iocs[iocid] = make_ioc() + + proc._prune_ioc_state(None, iocid) + + assert iocid in proc._ioc_locks + assert iocid in proc._cancelled + + def test_prune_passes_result_through(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + assert proc._prune_ioc_state("sentinel", iocid) == "sentinel" + + +class TestCommitErrorHandling: + """Test _commit_with_lock error routing without running real threads. + + _prepare_commit is simulated via a monkeypatched deferToThread; the CF + push phase is controlled by patching _push_to_cf_async directly so each + test exercises exactly one failure mode at a time. + """ + + def _run(self, monkeypatch, *, prepare_result, push_result=None): + proc = make_processor() + proc.running = True + iocid = f"{_HOST_A}:5064" + monkeypatch.setattr("recceiver.cf.processor.deferToThread", lambda *_a, **_kw: prepare_result) + if push_result is not None: + monkeypatch.setattr(proc, "_push_to_cf_async", lambda *_a, **_kw: push_result) + + results, errors = [], [] + proc._commit_with_lock(make_transaction(), iocid).addCallbacks(results.append, errors.append) + return results, errors + + def test_aborted_commit_resolves_chain(self, monkeypatch): + """CFUpdateAbortedError from exhausted retries lets the chain continue.""" + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.fail(CFUpdateAbortedError("retries exhausted")), + ) + assert len(results) == 1 and len(errors) == 0 + + def test_unexpected_error_errbacks_chain(self, monkeypatch): + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.fail(RuntimeError("unexpected")), + ) + assert len(results) == 0 and len(errors) == 1 + assert errors[0].check(RuntimeError) + + def test_service_stopped_cancel_errbacks_chain(self, monkeypatch): + results, errors = self._run( + monkeypatch, + prepare_result=defer.fail(defer.CancelledError("service stopped")), + ) + assert len(results) == 0 and len(errors) == 1 + assert errors[0].check(defer.CancelledError) + + def test_successful_commit_resolves_chain(self, monkeypatch): + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.succeed(None), + ) + assert len(results) == 1 and len(errors) == 0 From 6e27a3783ed13a2172ede8034be35a6632ae24e2 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 15 May 2026 14:12:46 +0200 Subject: [PATCH 4/5] chore(server): fix stale comments and naming after per-IOC lock change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The global lock is gone; update demo.conf and recast.py to reflect that pushAlwaysRetry and maxActive now operate per-IOC, and fix an incorrect default value and a long-standing typo in the same pass. Also rename single-letter CastFactory identifiers (P/P2 → proto/waiting, addr → _addr) to satisfy linter, and correct the _stop_service_with_lock docstring which still claimed the lifecycle lock prevents concurrent commits (commits now use per-IOC locks and are drained explicitly before the clean_on_stop sweep). --- server/demo.conf | 2 +- server/recceiver/cf/processor.py | 7 ++++--- server/recceiver/recast.py | 28 ++++++++++++++-------------- server/recceiver_full.conf | 2 +- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/demo.conf b/server/demo.conf index 3dea7727..88632767 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -134,5 +134,5 @@ #pushMaxRetries = 10 # Whether to retry polling indefinitely until success. Default is False. -# Enabling this holds the global CF commit lock until CF recovers; use with caution. +# Enabling this holds the per-IOC lock until CF recovers for that IOC; other IOCs are unaffected. #pushAlwaysRetry = False diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index ffe1e453..85bfc161 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -166,9 +166,10 @@ def stopService(self): def _stop_service_with_lock(self): """Stop the CFProcessor service with lock held. - If clean_on_stop is enabled, mark all channels as inactive. - The sweep runs in a background thread to avoid blocking the reactor. - The lock is held throughout, preventing new commits from interleaving. + If clean_on_stop is enabled, drain all in-flight per-IOC commits + first, then mark all channels inactive in a background thread. + Commits use _ioc_locks rather than self.lock and can still be running + when this is called; the drain waits for each lock before the sweep. """ log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index bfe69c52..8fd19122 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -361,34 +361,34 @@ class CastFactory(protocol.ServerFactory): maxActive = 3 def __init__(self): - # Flow control by limiting the number of concurrent - # "active" connectons Active means dumping lots of records. - # connections become "inactive" by calling isDone() + # Throttle concurrent uploading connections to control CF commit load. + # "Active" means currently uploading records; connections become + # "inactive" via isDone() once the upload completes. self.NActive = 0 self.Wait = [] - def isDone(self, P, active): + def isDone(self, proto, active): if not active: # connection closed before activation - self.Wait.remove(P) + self.Wait.remove(proto) elif len(self.Wait) > 0: # Others are waiting - P2 = self.Wait.pop(0) - P2.active = True - P2.transport.resumeProducing() - P2.connectionMade() + waiting = self.Wait.pop(0) + waiting.active = True + waiting.transport.resumeProducing() + waiting.connectionMade() else: self.NActive -= 1 - def buildProtocol(self, addr): + def buildProtocol(self, _addr): active = self.NActive < self.maxActive - P = self.protocol(active=active) - P.factory = self + proto = self.protocol(active=active) + proto.factory = self if active: self.NActive += 1 else: - self.Wait.append(P) - return P + self.Wait.append(proto) + return proto def addClient(self, proto, address): S = self.session(proto, address) diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index 5c62f472..6a35ed1d 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -128,7 +128,7 @@ verifySSL = True # Number of times to retry polling before giving up. Default is 10. pushMaxRetries = 10 -# Whether to retry polling indefinitely until success. Default is True. +# Whether to retry polling indefinitely until success. Default is False. pushAlwaysRetry = False # Interval in seconds between periodic CF status log lines (0 to disable) From 8fc2db4d6c3ce946eb4b2433c151a5180abd4254 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 15 May 2026 14:19:33 +0200 Subject: [PATCH 5/5] fix(server): sync commitSizeLimit default with CollectionSession.trlimit application.py unconditionally overwrote session.trlimit with the config value, which defaulted to 0 (no limit), silently negating the trlimit=5000 class default added in 89be20c. Reference CollectionSession.trlimit directly so the default cannot silently drift if the class default ever changes, and add a regression test. --- server/demo.conf | 5 ++--- server/recceiver/application.py | 4 ++-- server/tests/unit/test_application.py | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 5 deletions(-) create mode 100644 server/tests/unit/test_application.py diff --git a/server/demo.conf b/server/demo.conf index 88632767..a20757e5 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -48,9 +48,8 @@ # Doesn't effect IOC clients #commitInterval = 5.0 -# Maximum size before committing updates -# The default is 0 (which is no limit) -#commitSizeLimit = 0 +# Maximum records per commit. Default is 5000; set to 0 for no limit. +#commitSizeLimit = 5000 # Maximum concurrent "active" clients # to allow. diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 1a584551..ca2d6b9d 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -15,7 +15,7 @@ from . import metrics from .announcer import Announcer, SharedUDP from .processors import ProcessorController -from .recast import CastFactory +from .recast import CastFactory, CollectionSession log = logging.getLogger(__name__) @@ -46,7 +46,7 @@ def __init__(self, config): self.annperiod = float(config.get("announceInterval", "15.0")) self.tcptimeout = float(config.get("tcptimeout", "15.0")) self.commitperiod = float(config.get("commitInterval", "5.0")) - self.commitSizeLimit = int(config.get("commitSizeLimit", "0")) + self.commitSizeLimit = int(config.get("commitSizeLimit", str(CollectionSession.trlimit))) self.maxActive = int(config.get("maxActive", "20")) self.bind, _sep, portn = config.get("bind", "").strip().partition(":") self.addrlist = [] diff --git a/server/tests/unit/test_application.py b/server/tests/unit/test_application.py new file mode 100644 index 00000000..69a8c698 --- /dev/null +++ b/server/tests/unit/test_application.py @@ -0,0 +1,16 @@ +from recceiver.application import RecService +from recceiver.recast import CollectionSession + + +class TestRecServiceConfig: + def test_commit_size_limit_defaults_to_class_default(self): + svc = RecService({}) + assert svc.commitSizeLimit == CollectionSession.trlimit + + def test_commit_size_limit_reads_from_config(self): + svc = RecService({"commitSizeLimit": "100"}) + assert svc.commitSizeLimit == 100 + + def test_commit_size_limit_zero_disables_splitting(self): + svc = RecService({"commitSizeLimit": "0"}) + assert svc.commitSizeLimit == 0