From cde138ca1cea31807b03dbc57d352a24ecfecaaf Mon Sep 17 00:00:00 2001 From: Peco Myint <48025004+pecomyint@users.noreply.github.com> Date: Thu, 21 May 2026 15:05:18 -0500 Subject: [PATCH] Revert "fix: HPC consumer interpreter resolution + RSM/metadata robustness" --- .../hpc/analysis/hpc_rsm_consumer.py | 48 ++++++++----------- .../hpc/meta/hpc_metadata_consumer.py | 34 +++---------- src/dashpva/workflow/workflow.py | 6 +-- tests/unit/test_pvapy_resolves.py | 44 ----------------- 4 files changed, 29 insertions(+), 103 deletions(-) delete mode 100644 tests/unit/test_pvapy_resolves.py diff --git a/src/dashpva/consumers/hpc/analysis/hpc_rsm_consumer.py b/src/dashpva/consumers/hpc/analysis/hpc_rsm_consumer.py index 3d3cc13..c3a1339 100644 --- a/src/dashpva/consumers/hpc/analysis/hpc_rsm_consumer.py +++ b/src/dashpva/consumers/hpc/analysis/hpc_rsm_consumer.py @@ -8,7 +8,7 @@ import numpy as np import pvaccess as pva import xrayutilities as xu -from pvaccess import PvObject, NtAttribute +from pvaccess import PvObject from pvapy.hpc.adImageProcessor import AdImageProcessor from pvapy.utility.floatWithUnits import FloatWithUnits from pvapy.utility.timeUtility import TimeUtility @@ -150,7 +150,7 @@ def configure(self, configDict): ) self.config = config - self.hkl_config = self.config.get('HKL') or {} + self.hkl_config = self.config.get('HKL', {}) self.hkl_pv_channels = set() for section in self.hkl_config.values(): if isinstance(section, dict): @@ -206,25 +206,16 @@ def get_sample_and_detector_circles(self, hkl_attr: dict): return sample_circle_directions, sample_circle_positions, det_circle_directions, det_circle_positions def get_axis_directions(self, hkl_attr: dict): - """Get beam / reference / surface-normal direction triplets from hkl_attr. + # Get beam and reference directions + if len(hkl_attr) == len(self.hkl_pv_channels): + primary_beam_directions = [hkl_attr.get(f'PrimaryBeamDirection:AxisNumber{i}', None) for i in range(1,4)] + inplane_beam_direction = [hkl_attr.get(f'InplaneReferenceDirection:AxisNumber{i}', None) for i in range(1,4)] + sample_surface_normal_direction = [hkl_attr.get(f'SampleSurfaceNormalDirection:AxisNumber{i}', None) for i in range(1,4)] - PV names come from the active HKL config (which includes any prefix the - IOC publishes under), not hardcoded — so this works for `xidb:`, - `6idb:`, or unprefixed schemas without code changes. - """ - if len(hkl_attr) != len(self.hkl_pv_channels): + return primary_beam_directions, inplane_beam_direction, sample_surface_normal_direction + else: return None, None, None - def _triplet(section_name): - section = self.hkl_config.get(section_name, {}) or {} - return [hkl_attr.get(section.get(f'AXIS_NUMBER_{i}', ''), None) for i in range(1, 4)] - - primary_beam_directions = _triplet('PRIMARY_BEAM_DIRECTION') - # Section names match the (typo'd) keys used elsewhere in the config schema. - inplane_beam_direction = _triplet('INPLANE_REFERENCE_DIRECITON') - sample_surface_normal_direction = _triplet('SAMPLE_SURFACE_NORMAL_DIRECITON') - return primary_beam_directions, inplane_beam_direction, sample_surface_normal_direction - def get_ub_matrix(self, hkl_attr: dict): ub_matrix_key = self.hkl_config['SPEC'].get('UB_MATRIX_VALUE', '') @@ -259,18 +250,17 @@ def create_rsm(self, hkl_attr: dict, shape: tuple): en=energy, qconv=q_conv) - # Set up detector parameters — look up by the PV name in the active - # HKL config so any prefix (xidb:, 6idb:, none) works without edits. - ds_cfg = self.hkl_config.get('DETECTOR_SETUP', {}) or {} + # Set up detector parameters roi = [0, shape[0], 0, shape[1]] - pixel_dir1 = hkl_attr[ds_cfg['PIXEL_DIRECTION_1']] - pixel_dir2 = hkl_attr[ds_cfg['PIXEL_DIRECTION_2']] - cch1, cch2 = hkl_attr[ds_cfg['CENTER_CHANNEL_PIXEL']][:2] - nch1, nch2 = shape[0], shape[1] - size_xy = hkl_attr[ds_cfg['SIZE']] - pixel_width1 = size_xy[0] / nch1 - pixel_width2 = size_xy[1] / nch2 - distance = hkl_attr[ds_cfg['DISTANCE']] + pixel_dir1 = hkl_attr['DetectorSetup:PixelDirection1'] + pixel_dir2 = hkl_attr['DetectorSetup:PixelDirection2'] + cch1 = hkl_attr['DetectorSetup:CenterChannelPixel'][0] + cch2 = hkl_attr['DetectorSetup:CenterChannelPixel'][1] + nch1 = shape[0] + nch2 = shape[1] + pixel_width1 = hkl_attr['DetectorSetup:Size'][0] / nch1 + pixel_width2 = hkl_attr['DetectorSetup:Size'][1] / nch2 + distance = hkl_attr['DetectorSetup:Distance'] hxrd.Ang2Q.init_area( pixel_dir1, pixel_dir2, diff --git a/src/dashpva/consumers/hpc/meta/hpc_metadata_consumer.py b/src/dashpva/consumers/hpc/meta/hpc_metadata_consumer.py index aded51e..fc59408 100755 --- a/src/dashpva/consumers/hpc/meta/hpc_metadata_consumer.py +++ b/src/dashpva/consumers/hpc/meta/hpc_metadata_consumer.py @@ -54,16 +54,6 @@ def __init__(self, configDict={}): # The last object time self.lastFrameTimestamp = 0 - # Throttling for noisy timestamp-tolerance warnings - self._lastToleranceWarnTime = 0.0 - self._toleranceWarnSuppressed = 0 - self._toleranceWarnIntervalSec = 60.0 - # Silent tally of "Metadata channel X not found" occurrences. Was - # previously logged at ERROR level, which the associator subprocess - # piped into the workflow GUI text box on every frame. Counting only — - # inspect self._mdMissingChannels if you need the per-channel tallies. - self._mdMissingChannels = {} # channel -> total count - # COPIED FROM hpc_rsm_consumer.py - Type mapping for compression self.CODEC_PARAMETERS_MAP = { np.dtype('uint8'): pva.UBYTE, @@ -140,7 +130,7 @@ def configure(self, configDict): with open(self.path, "r") as config_file: self.config = toml.load(config_file) - self.hkl_config = self.config.get('HKL') or {} + self.hkl_config = self.config.get('HKL', {}) self.hkl_pv_channels = set() for section in self.hkl_config.values(): if isinstance(section, dict): @@ -161,11 +151,9 @@ def configure(self, configDict): def associateMetadata(self, mdChannel, frameId, frameTimestamp, frameAttributes): # self.logger.debug(f" current metadata map: {self.currentMetadataMap}") #modified since 3.8 env isn't working for me, works w/ 3.8 if mdChannel not in self.currentMetadataMap: - # Count silently — no logger.error, no print. The associator - # subprocess captures any ERROR-level output and pushes it into - # the workflow GUI text box on every frame, so logging here - # floods the UI. - self._mdMissingChannels[mdChannel] = self._mdMissingChannels.get(mdChannel, 0) + 1 + self.logger.error(f'Metadata channel {mdChannel} not found in current metadata map') + print(f'Metadata channel {mdChannel} not found in current metadata map') + return False mdObject = self.currentMetadataMap[mdChannel] @@ -207,17 +195,9 @@ def associateMetadata(self, mdChannel, frameId, frameTimestamp, frameAttributes) diff = abs(frameTimestamp - mdTimestamp2) self.logger.debug(f'Metadata {mdChannel} has value of {mdValue}, timestamp: {mdTimestamp} (with offset: {mdTimestamp2}), timestamp diff: {diff}') if diff > self.timestampTolerance: - now = time.time() - if now - self._lastToleranceWarnTime >= self._toleranceWarnIntervalSec: - suppressed = self._toleranceWarnSuppressed - suffix = f' ({suppressed} similar warnings suppressed)' if suppressed else '' - self.logger.warning( - f'[Metadata Associator] Rejecting {mdChannel}: timestamp diff {diff:.6f}s exceeds tolerance {self.timestampTolerance}s{suffix}' - ) - self._lastToleranceWarnTime = now - self._toleranceWarnSuppressed = 0 - else: - self._toleranceWarnSuppressed += 1 + self.logger.warning( + f'[Metadata Associator] Rejecting {mdChannel}: timestamp diff {diff:.6f}s exceeds tolerance {self.timestampTolerance}s' + ) self.nMetadataDiscarded += 1 return False self.nMetadataProcessed += 1 diff --git a/src/dashpva/workflow/workflow.py b/src/dashpva/workflow/workflow.py index 1428db5..8d56fdb 100644 --- a/src/dashpva/workflow/workflow.py +++ b/src/dashpva/workflow/workflow.py @@ -2078,7 +2078,7 @@ def run_associator_consumers(self): self._save_meta_assoc_last() cmd = [ - sys.executable, '-m', 'pvapy.cli.hpcConsumer', + 'pvapy-hpc-consumer', '--input-channel', self.lineEditInputChannelAssociator.text(), '--control-channel', self.lineEditControlChannelAssociator.text(), '--status-channel', self.lineEditStatusChannelAssociator.text(), @@ -2225,7 +2225,7 @@ def run_collector(self): producer_id_list = ','.join(producer_id_list) cmd = [ - sys.executable, '-m', 'pvapy.cli.hpcConsumer', + 'pvapy-hpc-collector', '--collector-id', str(self.spinBoxCollectorId.value()), '--producer-id-list', producer_id_list, '--input-channel', self.lineEditInputChannelCollector.text(), @@ -2286,7 +2286,7 @@ def run_analysis_consumer(self): self._save_analysis_last() cmd = [ - sys.executable, '-m', 'pvapy.cli.hpcConsumer', + 'pvapy-hpc-consumer', '--input-channel', self.lineEditInputChannelAnalysis.text(), '--control-channel', self.lineEditControlChannelAnalysis.text(), '--status-channel', self.lineEditStatusChannelAnalysis.text(), diff --git a/tests/unit/test_pvapy_resolves.py b/tests/unit/test_pvapy_resolves.py deleted file mode 100644 index 14dac3e..0000000 --- a/tests/unit/test_pvapy_resolves.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Verify pvapy.cli.hpcConsumer resolves from the same interpreter DashPVA launches.""" - -import subprocess -import sys -from pathlib import Path - -import pytest - -pytest.importorskip("pvapy", reason="pvapy not installed (standalone edition)") - - -def _run_probe(code: str) -> str: - result = subprocess.run( - [sys.executable, "-c", code], - capture_output=True, - text=True, - timeout=30, - ) - assert result.returncode == 0, ( - f"probe failed (rc={result.returncode}):\n" - f"stdout: {result.stdout}\nstderr: {result.stderr}" - ) - return result.stdout.strip() - - -def test_pvapy_cli_hpcconsumer_importable_from_sys_executable(): - """workflow.py invokes `sys.executable -m pvapy.cli.hpcConsumer`; this verifies it resolves.""" - out = _run_probe("import pvapy.cli.hpcConsumer as m; print(m.__file__)") - assert out, "expected pvapy.cli.hpcConsumer.__file__ to be non-empty" - assert Path(out).exists(), f"resolved path does not exist: {out}" - - -def test_pvapy_lives_under_sys_executable_prefix(): - """Catches the case where pvapy is on a different env than the launching interpreter.""" - exe_prefix = _run_probe("import sys; print(sys.prefix)") - pvapy_file = _run_probe("import pvapy; print(pvapy.__file__)") - - exe_prefix_resolved = Path(exe_prefix).resolve() - pvapy_resolved = Path(pvapy_file).resolve() - - assert str(pvapy_resolved).startswith(str(exe_prefix_resolved)), ( - f"pvapy at {pvapy_resolved} is not under interpreter prefix {exe_prefix_resolved} — " - f"DashPVA's `sys.executable -m pvapy.cli.hpcConsumer` will fail at runtime." - )