diff --git a/server/demo.conf b/server/demo.conf index 3dea7727..a20757e5 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -48,9 +48,8 @@ # Doesn't effect IOC clients #commitInterval = 5.0 -# Maximum size before committing updates -# The default is 0 (which is no limit) -#commitSizeLimit = 0 +# Maximum records per commit. Default is 5000; set to 0 for no limit. +#commitSizeLimit = 5000 # Maximum concurrent "active" clients # to allow. @@ -134,5 +133,5 @@ #pushMaxRetries = 10 # Whether to retry polling indefinitely until success. Default is False. -# Enabling this holds the global CF commit lock until CF recovers; use with caution. +# Enabling this holds the per-IOC lock until CF recovers for that IOC; other IOCs are unaffected. #pushAlwaysRetry = False diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 1a584551..3f68c6a2 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -15,7 +15,7 @@ from . import metrics from .announcer import Announcer, SharedUDP from .processors import ProcessorController -from .recast import CastFactory +from .recast import CastFactory, CollectionSession log = logging.getLogger(__name__) @@ -46,7 +46,7 @@ def __init__(self, config): self.annperiod = float(config.get("announceInterval", "15.0")) self.tcptimeout = float(config.get("tcptimeout", "15.0")) self.commitperiod = float(config.get("commitInterval", "5.0")) - self.commitSizeLimit = int(config.get("commitSizeLimit", "0")) + self.commitSizeLimit = int(config.get("commitSizeLimit", str(CollectionSession.trlimit))) self.maxActive = int(config.get("maxActive", "20")) self.bind, _sep, portn = config.get("bind", "").strip().partition(":") self.addrlist = [] @@ -75,6 +75,12 @@ def __init__(self, config): def privilegedStartService(self): log.info("Starting RecService") + # Each active IOC commit holds a thread for the CF HTTP call; size the + # pool so all maxActive commits can run concurrently (+ headroom for + # clean_service and other thread users). Twisted's default of 10 would + # otherwise throttle commits even when maxActive slots are available. + self.reactor.suggestThreadPoolSize(self.maxActive + 10) + # Start TCP server on random port self.tcpFactory = CastFactory() self.tcpFactory.protocol.timeout = self.tcptimeout diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index ec49445e..06688057 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -1,5 +1,6 @@ import datetime import logging +import threading import time from collections import defaultdict from typing import Callable, Dict, List, Optional, Set @@ -29,6 +30,10 @@ log = logging.getLogger(__name__) +class CFUpdateAbortedError(Exception): + """Raised when a CF push is abandoned after exhausting all retries.""" + + @implementer(interfaces.IProcessor) class CFProcessor(service.Service): """IProcessor plugin that synchronises IOC record data to Channelfinder. @@ -44,7 +49,11 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter): self.iocs: Dict[str, IOCInfo] = {} self.client: Optional[ChannelFinderAdapter] = None self.current_time: Callable[[Optional[str]], str] = get_current_time - self.lock: DeferredLock = DeferredLock() + self.lock: DeferredLock = DeferredLock() # lifecycle lock: serialises start/stop + self._ioc_locks: Dict[str, DeferredLock] = {} + self._ioc_channels: Dict[str, Set[str]] = defaultdict(set) # iocid → set of channel names + self._state_lock: threading.Lock = threading.Lock() + self._cancelled: Dict[str, bool] = {} self._statusLoop = None def startService(self): @@ -157,63 +166,74 @@ def stopService(self): def _stop_service_with_lock(self): """Stop the CFProcessor service with lock held. - If clean_on_stop is enabled, mark all channels as inactive. - The sweep runs in a background thread to avoid blocking the reactor. - The lock is held throughout, preventing new commits from interleaving. + If clean_on_stop is enabled, drain all in-flight per-IOC commits + first, then mark all channels inactive in a background thread. + Commits use _ioc_locks rather than self.lock and can still be running + when this is called; the drain waits for each lock before the sweep. """ log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: - return deferToThread(self.clean_service) + drains = [lock.run(lambda: None) for lock in self._ioc_locks.values()] + d = defer.DeferredList(drains, consumeErrors=True) + d.addCallback(lambda _: deferToThread(self.clean_service)) + return d def _start_background_clean(self): log.info("CF Clean: background startup sweep beginning") deferToThread(self.clean_service).addErrback(lambda err: log.error("CF Clean background sweep failed: %s", err)) - # @defer.inlineCallbacks # Twisted v16 does not support cancellation! - def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: - """Commit a transaction to Channelfinder.""" - return self.lock.run(self._commit_with_lock, transaction_record) - - def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: - self.cancelled = False - - t = deferToThread(self._commit_with_thread, transaction) + def _get_ioc_lock(self, iocid: str) -> DeferredLock: + """Return the per-IOC DeferredLock, creating it on first use. - def cancel_commit(d: defer.Deferred): - self.cancelled = True - d.callback(None) - - d: defer.Deferred = defer.Deferred(cancel_commit) - - def wait_for_thread(_ignored): - if self.cancelled: - return t - - d.addCallback(wait_for_thread) + Must only be called from the reactor thread. + """ + if iocid not in self._ioc_locks: + self._ioc_locks[iocid] = DeferredLock() + return self._ioc_locks[iocid] - def chain_error(err): - """Handle errors from the commit thread. + def _prune_ioc_state(self, result, iocid: str): + """Remove per-IOC bookkeeping once an IOC has fully departed. - Note this is not foolproof as the thread may still be running. - """ - if not err.check(defer.CancelledError): - log.error("CF_COMMIT FAILURE: %s", err) - if self.cancelled: - if not err.check(defer.CancelledError): - raise defer.CancelledError() - return err - else: - d.callback(None) + Called after the per-IOC lock is released. Only prunes when the lock + is free (no new commit queued) and the IOC is no longer in self.iocs. + Always returns result so it is safe to use with addBoth. + """ + lock = self._ioc_locks.get(iocid) + if lock is not None and not lock.locked and iocid not in self.iocs: + self._ioc_locks.pop(iocid, None) + self._cancelled.pop(iocid, None) + self._ioc_channels.pop(iocid, None) + return result - def chain_result(result): - if self.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") - else: - d.callback(None) + def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder. - t.addCallbacks(chain_result, chain_error) + Uses a per-IOC DeferredLock so commits from different IOCs run in + parallel while transactions from the same IOC stay serialised. + """ + iocid = f"{transaction_record.source_address.host}:{transaction_record.source_address.port}" + lock = self._get_ioc_lock(iocid) + d = lock.run(self._commit_with_lock, transaction_record, iocid) + d.addBoth(self._prune_ioc_state, iocid) return d + @defer.inlineCallbacks + def _commit_with_lock(self, transaction: interfaces.ITransaction, iocid: str) -> defer.Deferred: + self._cancelled[iocid] = False + try: + result = yield deferToThread(self._prepare_commit, transaction, iocid) + if result is None: + return # disconnect-before-upload: nothing to push + ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap = result + yield self._push_to_cf_async(iocid, record_info_by_name, records_to_delete, ioc_info, iocs_snap, ciids_snap) + except defer.CancelledError: + raise + except CFUpdateAbortedError as err: + log.exception("CF_COMMIT ABORTED after exhausting retries: %s", err) + except Exception as err: + log.exception("CF_COMMIT FAILURE: %s", err) + raise + def transaction_to_record_infos( self, ioc_info: IOCInfo, transaction: interfaces.ITransaction ) -> Dict[str, RecordInfo]: @@ -300,10 +320,12 @@ def update_ioc_infos( if transaction.initial: self.iocs[iocid] = ioc_info if not transaction.connected: - records_to_delete.extend(self.channel_ioc_ids.keys()) + records_to_delete.extend(self._ioc_channels.get(iocid, set())) for record_name in record_info_by_name: - self.channel_ioc_ids[record_name].append(iocid) - self.iocs[iocid].channelcount += 1 + if iocid not in self.channel_ioc_ids[record_name]: + self.channel_ioc_ids[record_name].append(iocid) + self.iocs[iocid].channelcount += 1 + self._ioc_channels[iocid].add(record_name) if self.cf_config.alias_enabled: self._register_aliases(record_info_by_name[record_name].aliases, iocid) for record_name in records_to_delete: @@ -314,14 +336,24 @@ def update_ioc_infos( def _register_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: - self.channel_ioc_ids[alias].append(iocid) - self.iocs[iocid].channelcount += 1 + if iocid not in self.channel_ioc_ids[alias]: + self.channel_ioc_ids[alias].append(iocid) + self.iocs[iocid].channelcount += 1 + self._ioc_channels[iocid].add(alias) def _remove_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: self.remove_channel(alias, iocid) - def _commit_with_thread(self, transaction: interfaces.ITransaction): + def _prepare_commit(self, transaction: interfaces.ITransaction, iocid: str): + """Build IOC/record state and update shared dicts under _state_lock. + + Returns a tuple of (ioc_info, record_info_by_name, records_to_delete, + iocs_snapshot, channel_ioc_ids_snapshot) ready for the CF push, or + None if the IOC disconnected before completing its initial upload. + + Runs in a thread pool thread; must not touch the reactor. + """ host = transaction.source_address.host port = transaction.source_address.port @@ -368,22 +400,83 @@ def _commit_with_thread(self, transaction: interfaces.ITransaction): ) record_infos = self.transaction_to_record_infos(ioc_info, transaction) - records_to_delete = list(transaction.records_to_delete) log.debug("Delete records: %s", records_to_delete) - record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + if not transaction.connected and ioc_info.id not in self.iocs: log.warning( "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", host, port, ) - return - self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) - poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) - if not poll_success: - raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") + return None + + with self._state_lock: + self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) + # Snapshot only the channels and IOCs this commit touches; that covers + # everything _handle_channel_is_old / create_default_properties can look + # up without cloning the full maps on every commit. + channels_to_snapshot = set(records_to_delete) | self._ioc_channels.get(iocid, set()) + ciids_snap = {k: list(self.channel_ioc_ids[k]) for k in channels_to_snapshot if k in self.channel_ioc_ids} + needed_ioc_ids = {iid for ch in channels_to_snapshot for iid in self.channel_ioc_ids.get(ch, [])} + needed_ioc_ids.add(iocid) + iocs_snap = {iid: self.iocs[iid] for iid in needed_ioc_ids if iid in self.iocs} + + return ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap + + @defer.inlineCallbacks + def _push_to_cf_async( + self, + iocid: str, + record_info_by_name: Dict[str, "RecordInfo"], + records_to_delete: List[str], + ioc_info: "IOCInfo", + iocs: Dict[str, "IOCInfo"], + channel_ioc_ids: Dict[str, List[str]], + ) -> defer.Deferred: + """Retry CF update until success, service stop, or retry limit. + + Each CF call runs in a thread pool thread via deferToThread. Retry + waits use task.deferLater so no thread is held between attempts. + """ + from twisted.internet import reactor + + count = 0 + sleep = 1.0 + log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) + while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + if not self.running or self._cancelled.get(iocid, False): + log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + return + count += 1 + t0 = time.monotonic() + try: + yield deferToThread( + self._update_channelfinder, record_info_by_name, records_to_delete, ioc_info, iocs, channel_ioc_ids + ) + elapsed = time.monotonic() - t0 + metrics.cf_commit_duration_seconds.observe(elapsed) + metrics.cf_commits_total.labels(result="success").inc() + log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) + return + except defer.CancelledError: + self._cancelled[iocid] = True + raise + except RequestException: + elapsed = time.monotonic() - t0 + log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) + retry_seconds = min(60.0, sleep) + log.info("CF push retry in %s seconds", retry_seconds) + try: + yield task.deferLater(reactor, retry_seconds, lambda: None) + except defer.CancelledError: + self._cancelled[iocid] = True + raise + sleep *= 1.5 + metrics.cf_commits_total.labels(result="cancelled").inc() + log.error("CF push gave up after %d attempts: %s", count, ioc_info) + raise CFUpdateAbortedError(f"Failed to commit transaction after {count} attempts: {ioc_info}") def remove_channel(self, record_name: str, iocid: str) -> None: """Unlink a channel from an IOC in channel_ioc_ids and decrement channelcount. @@ -392,6 +485,7 @@ def remove_channel(self, record_name: str, iocid: str) -> None: and deletes the IOC entry when its channelcount reaches zero. """ self.channel_ioc_ids[record_name].remove(iocid) + self._ioc_channels[iocid].discard(record_name) if iocid not in self.iocs: if len(self.channel_ioc_ids[record_name]) == 0: del self.channel_ioc_ids[record_name] @@ -440,41 +534,8 @@ def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) - def _push_to_cf( - self, - record_info_by_name: Dict[str, RecordInfo], - records_to_delete: List[str], - ioc_info: IOCInfo, - ) -> bool: - log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) - count = 0 - sleep = 1.0 - while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: - if not self.running: - log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) - return False - count += 1 - t0 = time.monotonic() - try: - self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) - elapsed = time.monotonic() - t0 - metrics.cf_commit_duration_seconds.observe(elapsed) - metrics.cf_commits_total.labels(result="success").inc() - log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) - return True - except RequestException: - elapsed = time.monotonic() - t0 - log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) - retry_seconds = min(60, sleep) - log.info("CF push retry in %s seconds", retry_seconds) - time.sleep(retry_seconds) - sleep *= 1.5 - metrics.cf_commits_total.labels(result="cancelled").inc() - log.error("CF push gave up after %d attempts: %s", count, ioc_info) - return False - - def _assert_not_cancelled(self, context: str) -> None: - if self.cancelled: + def _assert_not_cancelled(self, iocid: str, context: str) -> None: + if self._cancelled.get(iocid, False) or not self.running: raise defer.CancelledError(f"Processor cancelled: {context}") def _update_channelfinder( @@ -482,25 +543,32 @@ def _update_channelfinder( record_info_by_name: Dict[str, RecordInfo], records_to_delete: List[str], ioc_info: IOCInfo, + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: + """Push one IOC's changes to ChannelFinder. + + Uses iocs and channel_ioc_ids snapshots taken at commit time so reads + are consistent even when other IOC commits run concurrently in threads. + """ log.info("CF Update IOC: %s", ioc_info) log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) recceiverid = self.cf_config.recceiver_id new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id - if iocid not in self.iocs and record_info_by_name: - # Disconnect-before-upload is already logged in _commit_with_thread. + if iocid not in iocs and record_info_by_name: + # Disconnect-before-upload is already logged in _prepare_commit. log.warning( "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, - len(self.iocs), + len(iocs), ) if ioc_info.hostname is None or ioc_info.ioc_name is None: raise IOCMissingInfoError(ioc_info) - self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") + self._assert_not_cancelled(iocid, f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] log.debug("Find existing channels by IOCID: %s", ioc_info) @@ -516,11 +584,12 @@ def _update_channelfinder( channels, record_info_by_name, iocid, + iocs, + channel_ioc_ids, ) - # now pvNames contains a list of pv's new on this host/ioc existing_channels = self._get_existing_channels(new_channels) - self._assert_not_cancelled(f"after fetching existing channels for {ioc_info}") + self._assert_not_cancelled(iocid, f"after fetching existing channels for {ioc_info}") self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid @@ -532,7 +601,6 @@ def _update_channelfinder( else: if old_channels and len(old_channels) != 0: self._cf_set_chunked(channels) - self._assert_not_cancelled(f"after setting channels for {ioc_info}") def _process_new_channels( self, @@ -584,6 +652,8 @@ def _handle_channels( channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], iocid: str, + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: """Handle channels already present in Channelfinder for this IOC. @@ -594,8 +664,10 @@ def _handle_channels( for cf_channel in old_channels: if not new_channels or cf_channel.name in records_to_delete: log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) - if cf_channel.name in self.channel_ioc_ids: - self._handle_channel_is_old(cf_channel, ioc_info, recceiverid, channels, record_info_by_name) + if cf_channel.name in channel_ioc_ids: + self._handle_channel_is_old( + cf_channel, ioc_info, recceiverid, channels, record_info_by_name, iocs, channel_ioc_ids + ) else: self._orphan_channel(cf_channel, ioc_info, channels, record_info_by_name) else: @@ -611,12 +683,14 @@ def _handle_channel_is_old( recceiverid: str, channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], + iocs: Dict[str, IOCInfo], + channel_ioc_ids: Dict[str, List[str]], ) -> None: """Channel exists in CF but not in this commit — re-assign to its last known IOC.""" - last_ioc_id = self.channel_ioc_ids[cf_channel.name][-1] - cf_channel.owner = self.iocs[last_ioc_id].owner + last_ioc_id = channel_ioc_ids[cf_channel.name][-1] + cf_channel.owner = iocs[last_ioc_id].owner cf_channel.properties = _merge_property_lists( - create_default_properties(ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel), + create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), cf_channel, self.managed_properties, ) @@ -627,13 +701,11 @@ def _handle_channel_is_old( for alias_name in record_info_by_name[cf_channel.name].aliases: # Legacy alias handling retained to avoid changing runtime behavior. alias_channel = CFChannel(alias_name, "", []) - if alias_name in self.channel_ioc_ids: - last_alias_ioc_id = self.channel_ioc_ids[alias_name][-1] - alias_channel.owner = self.iocs[last_alias_ioc_id].owner + if alias_name in channel_ioc_ids: + last_alias_ioc_id = channel_ioc_ids[alias_name][-1] + alias_channel.owner = iocs[last_alias_ioc_id].owner alias_channel.properties = _merge_property_lists( - create_default_properties( - ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel - ), + create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), alias_channel, self.managed_properties, ) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 6cf56897..8fd19122 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -270,9 +270,8 @@ def __init__(self, proto, endpoint): def close(self): log.info("Close session from %s", self.ep) - # Do not cancel self._commit_chain here. Any data commit that is still queued - # behind the global lock must be allowed to complete so that channels - # are registered as active in CF before the disconnect is processed. + # Do not cancel self._commit_chain here. Any pending data commit must complete + # so that channels are registered as active in CF before the disconnect is processed. # The disconnect transaction is chained after self._commit_chain and will execute # once all preceding commits have finished. self.transaction = Transaction(self.ep, id(self)) @@ -297,11 +296,10 @@ def commit(_ignored): def abort(err): if err.check(defer.CancelledError): log.info("Commit cancelled: %s", transaction) - return err else: log.error("Commit failure: %s", err) self.proto.transport.loseConnection() - raise defer.CancelledError() + return None # always continue chain so disconnect commit runs self._commit_chain.addCallback(commit).addErrback(abort) @@ -363,34 +361,34 @@ class CastFactory(protocol.ServerFactory): maxActive = 3 def __init__(self): - # Flow control by limiting the number of concurrent - # "active" connectons Active means dumping lots of records. - # connections become "inactive" by calling isDone() + # Throttle concurrent uploading connections to control CF commit load. + # "Active" means currently uploading records; connections become + # "inactive" via isDone() once the upload completes. self.NActive = 0 self.Wait = [] - def isDone(self, P, active): + def isDone(self, proto, active): if not active: # connection closed before activation - self.Wait.remove(P) + self.Wait.remove(proto) elif len(self.Wait) > 0: # Others are waiting - P2 = self.Wait.pop(0) - P2.active = True - P2.transport.resumeProducing() - P2.connectionMade() + waiting = self.Wait.pop(0) + waiting.active = True + waiting.transport.resumeProducing() + waiting.connectionMade() else: self.NActive -= 1 - def buildProtocol(self, addr): + def buildProtocol(self, _addr): active = self.NActive < self.maxActive - P = self.protocol(active=active) - P.factory = self + proto = self.protocol(active=active) + proto.factory = self if active: self.NActive += 1 else: - self.Wait.append(P) - return P + self.Wait.append(proto) + return proto def addClient(self, proto, address): S = self.session(proto, address) diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index 5c62f472..6a35ed1d 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -128,7 +128,7 @@ verifySSL = True # Number of times to retry polling before giving up. Default is 10. pushMaxRetries = 10 -# Whether to retry polling indefinitely until success. Default is True. +# Whether to retry polling indefinitely until success. Default is False. pushAlwaysRetry = False # Interval in seconds between periodic CF status log lines (0 to disable) diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 7e3643fe..8cbc649b 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,9 +1,12 @@ -import time +from unittest.mock import MagicMock from requests import RequestException +from twisted.internet import defer +from twisted.internet.address import IPv4Address +from twisted.internet.defer import DeferredLock from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo -from recceiver.cf.processor import CFProcessor +from recceiver.cf.processor import CFProcessor, CFUpdateAbortedError from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc from tests.unit.cf.mock_adapter import MockCFAdapter from tests.unit.conftest import make_adapter @@ -20,6 +23,16 @@ def make_processor_with_mock(): return proc, adapter +_HOST_A = "1.2.3.4" # NOSONAR +_HOST_B = "5.6.7.8" # NOSONAR + + +def make_transaction(host: str = _HOST_A, port: int = 5064) -> MagicMock: + t = MagicMock() + t.source_address = IPv4Address("TCP", host, port) + return t + + class TestRemoveChannel: def test_missing_iocid_does_not_raise(self): proc = make_processor() @@ -78,7 +91,7 @@ def test_is_no_op_when_no_active_channels(self): class TestUpdateChannelFinder: def _make_proc(self): proc, adapter = make_processor_with_mock() - proc.cancelled = False + proc.running = True proc.managed_properties = set() proc.record_property_names_list = set() proc.env_vars = {} @@ -89,7 +102,7 @@ def test_registers_new_channel_as_active(self): ioc = make_ioc() proc.iocs[ioc.id] = ioc - proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc) + proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc, proc.iocs, proc.channel_ioc_ids) assert "PV:1" in adapter._channels status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) @@ -115,30 +128,223 @@ def test_orphans_channel_absent_from_local_state(self): ] ) - proc._update_channelfinder({}, [], ioc) + proc._update_channelfinder({}, [], ioc, proc.iocs, proc.channel_ioc_ids) status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) assert status.value == PVStatus.INACTIVE.value class TestPushToCF: - def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): - monkeypatch.setattr(time, "sleep", lambda _: None) + """Tests for _push_to_cf_async: retry abandonment when stopped or cancelled.""" + + def _run_async_push(self, monkeypatch, processor, ioc, side_effect_fn): + iocid = ioc.id + processor._cancelled[iocid] = False + + def sync_thread(fn, *args): + try: + fn(*args) + return defer.succeed(None) + except Exception as exc: + return defer.fail(exc) + + monkeypatch.setattr("recceiver.cf.processor.deferToThread", sync_thread) + monkeypatch.setattr("recceiver.cf.processor.task.deferLater", lambda *a, **kw: defer.succeed(None)) + monkeypatch.setattr(processor, "_update_channelfinder", side_effect_fn) + results, errors = [], [] + processor._push_to_cf_async(iocid, {}, [], ioc, {}, {}).addCallbacks(results.append, errors.append) + return results, errors + + def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): processor = make_processor() processor.running = True processor.cf_config.push_always_retry = True + ioc = make_ioc() + call_count = [0] - call_count = 0 - - def failing_update(record_info_by_name, records_to_delete, ioc_info): - nonlocal call_count - call_count += 1 + def failing_update(*args): + call_count[0] += 1 processor.running = False raise RequestException("CF unreachable") - monkeypatch.setattr(processor, "_update_channelfinder", failing_update) - result = processor._push_to_cf({}, [], make_ioc()) + results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) + assert len(results) == 1 and len(errors) == 0 + assert call_count[0] == 1 + + def test_abandons_push_when_ioc_cancelled_during_retry(self, monkeypatch): + processor = make_processor() + processor.running = True + processor.cf_config.push_always_retry = True + ioc = make_ioc() + iocid = ioc.id + call_count = [0] + + def failing_update(*args): + call_count[0] += 1 + processor._cancelled[iocid] = True + raise RequestException("CF unreachable") + + results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) + assert len(results) == 1 and len(errors) == 0 + assert call_count[0] == 1 + + +class TestPerIocLocking: + def test_different_iocs_get_different_locks(self): + proc = make_processor() + lock_a = proc._get_ioc_lock(f"{_HOST_A}:5064") + lock_b = proc._get_ioc_lock(f"{_HOST_B}:5064") + assert lock_a is not lock_b + + def test_same_ioc_gets_same_lock(self): + proc = make_processor() + lock1 = proc._get_ioc_lock(f"{_HOST_A}:5064") + lock2 = proc._get_ioc_lock(f"{_HOST_A}:5064") + assert lock1 is lock2 + + def test_commit_routes_to_correct_iocid(self, monkeypatch): + proc = make_processor() + routed_iocids = [] + + def fake_lock_run(fn, transaction, iocid): + routed_iocids.append(iocid) + return defer.succeed(None) + + lock = DeferredLock() + monkeypatch.setattr(lock, "run", fake_lock_run) + monkeypatch.setattr(proc, "_get_ioc_lock", lambda _iocid: lock) + + proc.commit(make_transaction(_HOST_A, 5064)) + assert routed_iocids == [f"{_HOST_A}:5064"] + + def test_prune_removes_state_after_ioc_disconnects(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + proc._ioc_locks[iocid] = DeferredLock() + proc._cancelled[iocid] = False + proc._ioc_channels[iocid].add("CHAN:1") + # iocid is NOT in proc.iocs → IOC has disconnected + + proc._prune_ioc_state(None, iocid) + + assert iocid not in proc._ioc_locks + assert iocid not in proc._cancelled + assert iocid not in proc._ioc_channels + + def test_prune_preserves_state_while_ioc_is_active(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + proc._ioc_locks[iocid] = DeferredLock() + proc._cancelled[iocid] = False + proc.iocs[iocid] = make_ioc() + + proc._prune_ioc_state(None, iocid) + + assert iocid in proc._ioc_locks + assert iocid in proc._cancelled + + def test_prune_passes_result_through(self): + proc = make_processor() + iocid = f"{_HOST_A}:5064" + assert proc._prune_ioc_state("sentinel", iocid) == "sentinel" + + +class TestCommitErrorHandling: + """Test _commit_with_lock error routing without running real threads. + + _prepare_commit is simulated via a monkeypatched deferToThread; the CF + push phase is controlled by patching _push_to_cf_async directly so each + test exercises exactly one failure mode at a time. + """ + + def _run(self, monkeypatch, *, prepare_result, push_result=None): + proc = make_processor() + proc.running = True + iocid = f"{_HOST_A}:5064" + monkeypatch.setattr("recceiver.cf.processor.deferToThread", lambda *_a, **_kw: prepare_result) + if push_result is not None: + monkeypatch.setattr(proc, "_push_to_cf_async", lambda *_a, **_kw: push_result) + + results, errors = [], [] + proc._commit_with_lock(make_transaction(), iocid).addCallbacks(results.append, errors.append) + return results, errors + + def test_aborted_commit_resolves_chain(self, monkeypatch): + """CFUpdateAbortedError from exhausted retries lets the chain continue.""" + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.fail(CFUpdateAbortedError("retries exhausted")), + ) + assert len(results) == 1 and len(errors) == 0 + + def test_unexpected_error_errbacks_chain(self, monkeypatch): + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.fail(RuntimeError("unexpected")), + ) + assert len(results) == 0 and len(errors) == 1 + assert errors[0].check(RuntimeError) + + def test_service_stopped_cancel_errbacks_chain(self, monkeypatch): + results, errors = self._run( + monkeypatch, + prepare_result=defer.fail(defer.CancelledError("service stopped")), + ) + assert len(results) == 0 and len(errors) == 1 + assert errors[0].check(defer.CancelledError) + + def test_successful_commit_resolves_chain(self, monkeypatch): + ioc = make_ioc() + results, errors = self._run( + monkeypatch, + prepare_result=defer.succeed((ioc, {}, [], {}, {})), + push_result=defer.succeed(None), + ) + assert len(results) == 1 and len(errors) == 0 + + +class TestPrepareCommitSnapshot: + """iocs_snap must contain only IOCs referenced by the commit's channels.""" + + def test_iocs_snap_excludes_unrelated_iocs(self, monkeypatch): + proc = make_processor() + proc.running = True + + iocid = f"{_HOST_A}:5064" + other_iocid = f"{_HOST_B}:5064" + unrelated_iocid = "9.9.9.9:5064" # NOSONAR + + proc.iocs[iocid] = make_ioc() + proc.iocs[other_iocid] = make_ioc() + proc.iocs[unrelated_iocid] = make_ioc() + + # CHAN:1 is co-owned by iocid and other_iocid + proc.channel_ioc_ids["CHAN:1"].extend([iocid, other_iocid]) + proc._ioc_channels[iocid].add("CHAN:1") + + # CHAN:99 belongs to unrelated_iocid only — not touched by this commit + proc.channel_ioc_ids["CHAN:99"].append(unrelated_iocid) + proc._ioc_channels[unrelated_iocid].add("CHAN:99") + + monkeypatch.setattr(proc, "update_ioc_infos", lambda *_: None) + + t = make_transaction(_HOST_A, 5064) + t.records_to_add = {} + t.record_infos_to_add = {} + t.aliases = {} + t.records_to_delete = [] + t.connected = True + t.client_infos = {"IOCNAME": "IOC1", "HOSTNAME": "ioc1.example.com"} + + result = proc._prepare_commit(t, iocid) - assert result is False - assert call_count == 1 + assert result is not None + _, _, _, iocs_snap, _ = result + assert iocid in iocs_snap + assert other_iocid in iocs_snap + assert unrelated_iocid not in iocs_snap diff --git a/server/tests/unit/test_application.py b/server/tests/unit/test_application.py new file mode 100644 index 00000000..69a8c698 --- /dev/null +++ b/server/tests/unit/test_application.py @@ -0,0 +1,16 @@ +from recceiver.application import RecService +from recceiver.recast import CollectionSession + + +class TestRecServiceConfig: + def test_commit_size_limit_defaults_to_class_default(self): + svc = RecService({}) + assert svc.commitSizeLimit == CollectionSession.trlimit + + def test_commit_size_limit_reads_from_config(self): + svc = RecService({"commitSizeLimit": "100"}) + assert svc.commitSizeLimit == 100 + + def test_commit_size_limit_zero_disables_splitting(self): + svc = RecService({"commitSizeLimit": "0"}) + assert svc.commitSizeLimit == 0 diff --git a/server/tests/unit/test_recast.py b/server/tests/unit/test_recast.py index 026a323b..19c63452 100644 --- a/server/tests/unit/test_recast.py +++ b/server/tests/unit/test_recast.py @@ -1,5 +1,6 @@ from unittest.mock import MagicMock +from twisted.internet import defer from twisted.internet.address import IPv4Address from recceiver.recast import CollectionSession @@ -13,6 +14,47 @@ def _make_session() -> CollectionSession: return session +class TestCollectionSessionAbort: + def test_disconnect_commit_runs_after_data_commit_cancels(self): + session = _make_session() + committed = [] + + def mock_commit(t): + committed.append(t) + if t.connected: + return defer.fail(defer.CancelledError("CF unavailable")) + return None + + session.factory.commit.side_effect = mock_commit + + session.ioc_info("IOCNAME", "IOC1") + session.flush() + session.close() + + assert len(committed) == 2 + assert committed[-1].connected is False + + def test_disconnect_commit_runs_after_unexpected_commit_error(self): + session = _make_session() + committed = [] + + def mock_commit(t): + committed.append(t) + if t.connected: + return defer.fail(RuntimeError("unexpected")) + return None + + session.factory.commit.side_effect = mock_commit + + session.ioc_info("IOCNAME", "IOC1") + session.flush() + session.close() + + assert len(committed) == 2 + assert committed[-1].connected is False + session.proto.transport.loseConnection.assert_called_once() + + class TestCollectionSessionFlush: def test_client_infos_carried_forward_after_intermediate_flush(self): session = _make_session()