Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions server/demo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
# Doesn't effect IOC clients
#commitInterval = 5.0

# Maximum size before committing updates
# The default is 0 (which is no limit)
#commitSizeLimit = 0
# Maximum records per commit. Default is 5000; set to 0 for no limit.
#commitSizeLimit = 5000

# Maximum concurrent "active" clients
# to allow.
Expand Down Expand Up @@ -134,5 +133,5 @@
#pushMaxRetries = 10

# Whether to retry polling indefinitely until success. Default is False.
# Enabling this holds the global CF commit lock until CF recovers; use with caution.
# Enabling this holds the per-IOC lock until CF recovers for that IOC; other IOCs are unaffected.
#pushAlwaysRetry = False
4 changes: 2 additions & 2 deletions server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from . import metrics
from .announcer import Announcer, SharedUDP
from .processors import ProcessorController
from .recast import CastFactory
from .recast import CastFactory, CollectionSession

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(self, config):
self.annperiod = float(config.get("announceInterval", "15.0"))
self.tcptimeout = float(config.get("tcptimeout", "15.0"))
self.commitperiod = float(config.get("commitInterval", "5.0"))
self.commitSizeLimit = int(config.get("commitSizeLimit", "0"))
self.commitSizeLimit = int(config.get("commitSizeLimit", str(CollectionSession.trlimit)))
self.maxActive = int(config.get("maxActive", "20"))
self.bind, _sep, portn = config.get("bind", "").strip().partition(":")
self.addrlist = []
Expand Down
288 changes: 179 additions & 109 deletions server/recceiver/cf/processor.py

Large diffs are not rendered by default.

36 changes: 17 additions & 19 deletions server/recceiver/recast.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,8 @@ def __init__(self, proto, endpoint):
def close(self):
log.info("Close session from %s", self.ep)

# Do not cancel self._commit_chain here. Any data commit that is still queued
# behind the global lock must be allowed to complete so that channels
# are registered as active in CF before the disconnect is processed.
# Do not cancel self._commit_chain here. Any pending data commit must complete
# so that channels are registered as active in CF before the disconnect is processed.
# The disconnect transaction is chained after self._commit_chain and will execute
# once all preceding commits have finished.
self.transaction = Transaction(self.ep, id(self))
Expand All @@ -297,11 +296,10 @@ def commit(_ignored):
def abort(err):
if err.check(defer.CancelledError):
log.info("Commit cancelled: %s", transaction)
return err
else:
log.error("Commit failure: %s", err)
self.proto.transport.loseConnection()
raise defer.CancelledError()
return None # always continue chain so disconnect commit runs

self._commit_chain.addCallback(commit).addErrback(abort)

Expand Down Expand Up @@ -363,34 +361,34 @@ class CastFactory(protocol.ServerFactory):
maxActive = 3

def __init__(self):
# Flow control by limiting the number of concurrent
# "active" connectons Active means dumping lots of records.
# connections become "inactive" by calling isDone()
# Throttle concurrent uploading connections to control CF commit load.
# "Active" means currently uploading records; connections become
# "inactive" via isDone() once the upload completes.
self.NActive = 0
self.Wait = []

def isDone(self, P, active):
def isDone(self, proto, active):
if not active:
# connection closed before activation
self.Wait.remove(P)
self.Wait.remove(proto)
elif len(self.Wait) > 0:
# Others are waiting
P2 = self.Wait.pop(0)
P2.active = True
P2.transport.resumeProducing()
P2.connectionMade()
waiting = self.Wait.pop(0)
waiting.active = True
waiting.transport.resumeProducing()
waiting.connectionMade()
else:
self.NActive -= 1

def buildProtocol(self, addr):
def buildProtocol(self, _addr):
active = self.NActive < self.maxActive
P = self.protocol(active=active)
P.factory = self
proto = self.protocol(active=active)
proto.factory = self
if active:
self.NActive += 1
else:
self.Wait.append(P)
return P
self.Wait.append(proto)
return proto

def addClient(self, proto, address):
S = self.session(proto, address)
Expand Down
2 changes: 1 addition & 1 deletion server/recceiver_full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ verifySSL = True
# Number of times to retry polling before giving up. Default is 10.
pushMaxRetries = 10

# Whether to retry polling indefinitely until success. Default is True.
# Whether to retry polling indefinitely until success. Default is False.
pushAlwaysRetry = False

# Interval in seconds between periodic CF status log lines (0 to disable)
Expand Down
196 changes: 180 additions & 16 deletions server/tests/unit/cf/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import time
from unittest.mock import MagicMock

from requests import RequestException
from twisted.internet import defer
from twisted.internet.address import IPv4Address
from twisted.internet.defer import DeferredLock

from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo
from recceiver.cf.processor import CFProcessor
from recceiver.cf.processor import CFProcessor, CFUpdateAbortedError
from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc
from tests.unit.cf.mock_adapter import MockCFAdapter
from tests.unit.conftest import make_adapter
Expand All @@ -20,6 +23,16 @@ def make_processor_with_mock():
return proc, adapter


_HOST_A = "1.2.3.4" # NOSONAR
_HOST_B = "5.6.7.8" # NOSONAR


def make_transaction(host: str = _HOST_A, port: int = 5064) -> MagicMock:
t = MagicMock()
t.source_address = IPv4Address("TCP", host, port)
return t


class TestRemoveChannel:
def test_missing_iocid_does_not_raise(self):
proc = make_processor()
Expand Down Expand Up @@ -78,7 +91,7 @@ def test_is_no_op_when_no_active_channels(self):
class TestUpdateChannelFinder:
def _make_proc(self):
proc, adapter = make_processor_with_mock()
proc.cancelled = False
proc.running = True
proc.managed_properties = set()
proc.record_property_names_list = set()
proc.env_vars = {}
Expand All @@ -89,7 +102,7 @@ def test_registers_new_channel_as_active(self):
ioc = make_ioc()
proc.iocs[ioc.id] = ioc

proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc)
proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc, proc.iocs, proc.channel_ioc_ids)

assert "PV:1" in adapter._channels
status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value)
Expand All @@ -115,30 +128,181 @@ def test_orphans_channel_absent_from_local_state(self):
]
)

proc._update_channelfinder({}, [], ioc)
proc._update_channelfinder({}, [], ioc, proc.iocs, proc.channel_ioc_ids)

status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value)
assert status.value == PVStatus.INACTIVE.value


class TestPushToCF:
def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch):
monkeypatch.setattr(time, "sleep", lambda _: None)
"""Tests for _push_to_cf_async: retry abandonment when stopped or cancelled."""

def _run_async_push(self, monkeypatch, processor, ioc, side_effect_fn):
iocid = ioc.id
processor._cancelled[iocid] = False

def sync_thread(fn, *args):
try:
fn(*args)
return defer.succeed(None)
except Exception as exc:
return defer.fail(exc)

monkeypatch.setattr("recceiver.cf.processor.deferToThread", sync_thread)
monkeypatch.setattr("recceiver.cf.processor.task.deferLater", lambda *a, **kw: defer.succeed(None))
monkeypatch.setattr(processor, "_update_channelfinder", side_effect_fn)

results, errors = [], []
processor._push_to_cf_async(iocid, {}, [], ioc, {}, {}).addCallbacks(results.append, errors.append)
return results, errors

def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch):
processor = make_processor()
processor.running = True
processor.cf_config.push_always_retry = True
ioc = make_ioc()
call_count = [0]

call_count = 0

def failing_update(record_info_by_name, records_to_delete, ioc_info):
nonlocal call_count
call_count += 1
def failing_update(*args):
call_count[0] += 1
processor.running = False
raise RequestException("CF unreachable")

monkeypatch.setattr(processor, "_update_channelfinder", failing_update)
result = processor._push_to_cf({}, [], make_ioc())
results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update)
assert len(results) == 1 and len(errors) == 0
assert call_count[0] == 1

def test_abandons_push_when_ioc_cancelled_during_retry(self, monkeypatch):
processor = make_processor()
processor.running = True
processor.cf_config.push_always_retry = True
ioc = make_ioc()
iocid = ioc.id
call_count = [0]

def failing_update(*args):
call_count[0] += 1
processor._cancelled[iocid] = True
raise RequestException("CF unreachable")

results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update)
assert len(results) == 1 and len(errors) == 0
assert call_count[0] == 1


class TestPerIocLocking:
def test_different_iocs_get_different_locks(self):
proc = make_processor()
lock_a = proc._get_ioc_lock(f"{_HOST_A}:5064")
lock_b = proc._get_ioc_lock(f"{_HOST_B}:5064")
assert lock_a is not lock_b

def test_same_ioc_gets_same_lock(self):
proc = make_processor()
lock1 = proc._get_ioc_lock(f"{_HOST_A}:5064")
lock2 = proc._get_ioc_lock(f"{_HOST_A}:5064")
assert lock1 is lock2

def test_commit_routes_to_correct_iocid(self, monkeypatch):
proc = make_processor()
routed_iocids = []

def fake_lock_run(fn, transaction, iocid):
routed_iocids.append(iocid)
return defer.succeed(None)

lock = DeferredLock()
monkeypatch.setattr(lock, "run", fake_lock_run)
monkeypatch.setattr(proc, "_get_ioc_lock", lambda _iocid: lock)

proc.commit(make_transaction(_HOST_A, 5064))
assert routed_iocids == [f"{_HOST_A}:5064"]

def test_prune_removes_state_after_ioc_disconnects(self):
proc = make_processor()
iocid = f"{_HOST_A}:5064"
proc._ioc_locks[iocid] = DeferredLock()
proc._cancelled[iocid] = False
proc._ioc_channels[iocid].add("CHAN:1")
# iocid is NOT in proc.iocs → IOC has disconnected

proc._prune_ioc_state(None, iocid)

assert iocid not in proc._ioc_locks
assert iocid not in proc._cancelled
assert iocid not in proc._ioc_channels

def test_prune_preserves_state_while_ioc_is_active(self):
proc = make_processor()
iocid = f"{_HOST_A}:5064"
proc._ioc_locks[iocid] = DeferredLock()
proc._cancelled[iocid] = False
proc.iocs[iocid] = make_ioc()

proc._prune_ioc_state(None, iocid)

assert iocid in proc._ioc_locks
assert iocid in proc._cancelled

def test_prune_passes_result_through(self):
proc = make_processor()
iocid = f"{_HOST_A}:5064"
assert proc._prune_ioc_state("sentinel", iocid) == "sentinel"


class TestCommitErrorHandling:
"""Test _commit_with_lock error routing without running real threads.

assert result is False
assert call_count == 1
_prepare_commit is simulated via a monkeypatched deferToThread; the CF
push phase is controlled by patching _push_to_cf_async directly so each
test exercises exactly one failure mode at a time.
"""

def _run(self, monkeypatch, *, prepare_result, push_result=None):
proc = make_processor()
proc.running = True
iocid = f"{_HOST_A}:5064"
monkeypatch.setattr("recceiver.cf.processor.deferToThread", lambda *_a, **_kw: prepare_result)
if push_result is not None:
monkeypatch.setattr(proc, "_push_to_cf_async", lambda *_a, **_kw: push_result)

results, errors = [], []
proc._commit_with_lock(make_transaction(), iocid).addCallbacks(results.append, errors.append)
return results, errors

def test_aborted_commit_resolves_chain(self, monkeypatch):
"""CFUpdateAbortedError from exhausted retries lets the chain continue."""
ioc = make_ioc()
results, errors = self._run(
monkeypatch,
prepare_result=defer.succeed((ioc, {}, [], {}, {})),
push_result=defer.fail(CFUpdateAbortedError("retries exhausted")),
)
assert len(results) == 1 and len(errors) == 0

def test_unexpected_error_errbacks_chain(self, monkeypatch):
ioc = make_ioc()
results, errors = self._run(
monkeypatch,
prepare_result=defer.succeed((ioc, {}, [], {}, {})),
push_result=defer.fail(RuntimeError("unexpected")),
)
assert len(results) == 0 and len(errors) == 1
assert errors[0].check(RuntimeError)

def test_service_stopped_cancel_errbacks_chain(self, monkeypatch):
results, errors = self._run(
monkeypatch,
prepare_result=defer.fail(defer.CancelledError("service stopped")),
)
assert len(results) == 0 and len(errors) == 1
assert errors[0].check(defer.CancelledError)

def test_successful_commit_resolves_chain(self, monkeypatch):
ioc = make_ioc()
results, errors = self._run(
monkeypatch,
prepare_result=defer.succeed((ioc, {}, [], {}, {})),
push_result=defer.succeed(None),
)
assert len(results) == 1 and len(errors) == 0
16 changes: 16 additions & 0 deletions server/tests/unit/test_application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from recceiver.application import RecService
from recceiver.recast import CollectionSession


class TestRecServiceConfig:
def test_commit_size_limit_defaults_to_class_default(self):
svc = RecService({})
assert svc.commitSizeLimit == CollectionSession.trlimit

def test_commit_size_limit_reads_from_config(self):
svc = RecService({"commitSizeLimit": "100"})
assert svc.commitSizeLimit == 100

def test_commit_size_limit_zero_disables_splitting(self):
svc = RecService({"commitSizeLimit": "0"})
assert svc.commitSizeLimit == 0
Loading
Loading