From 7d0cbe8a22a7d24bc508e044600ccc2058f55ed1 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 14 May 2026 09:33:29 -0500 Subject: [PATCH 01/14] for starters, rewrite AGENTS.md for slimness --- AGENTS.md | 179 ++++++++++++++++++++++++------------------------------ 1 file changed, 78 insertions(+), 101 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index f5673f64a..2c4481ad3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,101 +1,78 @@ -Agent Contributor Guidelines and Information -============================================ - -Read the ``README.rst`` for an overview of libEnsemble. - -- libEnsemble uses a manager-worker architecture. Points are generated by a generator and sent to a worker, which runs a simulator. -- The manager determines how and when points get passed to workers via an allocation function. -- See ``libensemble/tests/regression_tests/test_1d_sampling.py`` for a simple example of the libEnsemble interface. - -Critical Repository Layout Information --------------------------------------- - -- ``libensemble/`` - Source code. -- ``/alloc_funcs`` - Allocation functions. Policies for passing work between the manager and workers. -- ``/comms`` - Modules and abstractions for communication between the manager and workers. -- ``/executors`` - An interface for launching executables, often simulations. -- ``/gen_classes`` - Generators that adhere to the `gest-api` standard. - Recommended over entries from ``/gen_funcs`` that perform similar functionality. -- ``/gen_funcs`` - Generator functions. Modules for producing points for simulations. (Legacy) -- ``/resources`` - Classes and functions for managing compute resources for MPI tasks, libensemble workers. -- ``/sim_funcs`` - Simulator functions. Modules for running simulations or performing experiments. -- ``/tests`` - Tests. - - ``/functionality_tests`` - Primarily tests libEnsemble code only. - - ``/regression_tests`` - Tests libEnsemble code with external code. Often more closely resembles actual use-cases. - - ``/unit_tests`` - Tests for individual modules. -- ``ensemble.py`` - The primary interface for parameterizing and running libEnsemble. The ``Ensemble`` class in this module wraps the lower-level ``libE`` function and automates argument parsing and state management. -- ``generators.py`` - Base classes for generators that adhere to the `gest-api` standard. -- ``history.py`` - Module for recording points that have been generated and simulation results. NumPy structured array. -- ``libE.py`` - libE main file. Previous primary interface for parameterizing and running libEnsemble. The primary interface in ``ensemble.py`` wraps this function. -- ``manager.py`` - Module for maintaining the history array and passing points between the workers. -- ``message_numbers.py`` - Constants that represent states of the ensemble. -- ``specs.py`` - Dataclasses for parameterizing the ensemble. Most importantly, contains ``LibeSpecs, SimSpecs, GenSpecs``. -- ``worker.py`` - Module for running generators and simulators. Communicates with the manager. -- ``examples/`` - The ``*_funcs`` and ``calling_scripts`` directories contain symlinks to examples further in the source code. -- ``/libE_submission_scripts`` - Example scripts for submitting libEnsemble jobs to HPC systems. -- ``/tutorials`` - Tutorials on how to use libEnsemble. - -Information about Generators ----------------------------- - -- Generators are functions or objects that produce points for simulations. -- The History array is a NumPy structured array that stores points that have been generated and simulation results. -Its fields match ``sim_specs/gen_specs["out"]`` or ``vocs`` attributes, plus additional reserved fields for metadata. -- Prior to libEnsemble v1.6.0, generators were plain functions. They often ran in "persistent" mode, meaning they executed in a -long-running loop, sending and receiving points to and from the manager until the ensemble was complete. -- A ``gest-api`` or "standardized" generator is a class that inherits from ``gest_api.Generator``, implements ``suggest`` and ``ingest`` methods (which process lists of dictionaries, not NumPy arrays), and is parameterized by a ``vocs``. -- See ``libensemble/gen_classes/external/sampling.py`` for simple examples of the pure ``gest-api`` interface. (Note: ``libensemble.generators.LibensembleGenerator`` exists to wrap legacy NumPy-based workflows, but pure ``gest_api.Generator`` is preferred). -- Generators are often used for simple sampling, optimization, calibration, uncertainty quantification, and other simulation-based tasks. -- **Automatic Variable Mapping**: When using ``LibensembleGenerator`` subclasses, they automatically map all ``VOCS`` variables to a single multi-dimensional ``"x"`` field in the History array if no explicit ``variables_mapping`` is provided. Pure ``gest_api.Generator`` classes handle variables natively. -- **Mandatory Input Fields**: Even for simple generators that don't ingest data, ``gen_specs["in"]`` or ``gen_specs["persis_in"]`` must be defined if using an allocation function like ``only_persistent_gens`` that attempts to send rows. If these are empty, the manager will raise an ``AssertionError`` stating that no fields were requested to be sent. -- **Default Allocator**: ``only_persistent_gens`` is the default allocator for standardized ``gest-api`` generators. It treats these generators as persistent entities that communicate throughout the run. - -General Guidelines ------------------- - -- If using classic ``sim_specs`` and ``gen_specs``, then ensure that ``sim_specs["out"]`` and ``gen_specs["in"]`` field names match, and vice-versa. -- As-of libEnsemble v1.6.0, ``SimSpecs`` and ``GenSpecs`` can also be parameterized by a ``vocs`` object, imported from ``gest_api.vocs`` (NOT xopt.vocs). -- ``VOCS`` contains variables, objectives, constraints, and other settings that define the problem. -See ``libensemble/tests/regression_tests/test_xopt_EI.py`` for an example of how to use it. -- An MPI distribution is not required for libEnsemble to run, but is required to use the ``MPIExecutor``. ``mpich`` is recommended. -- New tests are heavily encouraged for new features, bug fixes, or integrations. See ``libensemble/tests/regression_tests`` for examples. -- Never use destructive git commands unless explicitly requested. -- Code is in the ``black`` style. This should be enforced by ``pre-commit``. -- When writing new code, prefer the ``LibeSpecs``, ``SimSpecs``, and ``GenSpecs`` dataclasses over the classic ``sim_specs`` and ``gen_specs`` bare dictionaries. -- Read ``CONTRIBUTING.md`` for more information. -- The external ``libE-community-examples`` repository contains past use-cases, generators, and other examples. - -Development Environment ------------------------ - -- ``pixi`` is the recommended environment manager for libEnsemble development. See ``pyproject.toml`` for the list -of dependencies and the available testing environments. (Note: If ``pixi`` is not in your system path, it can often be found in ``/opt/homebrew/bin/pixi`` or ``/usr/local/bin/pixi``). -- Enter the development environment with ``pixi shell -e dev``. This environment contains the most common dependencies for development and testing. -- For one-off commands, use ``pixi run -e dev``. This will run a single command in the development environment. -- If ``pixi`` is not available or not preferred by the user, ``pip install -e .`` can be used instead. Other dependencies may need to be installed manually. -- If committing, use ``pre-commit`` to ensure that code style and formatting are consistent. See ``.pre-commit-config.yaml`` for -the configuration and ``pyproject.toml`` for other configuration. - -Testing -------- - -- Run tests with the ``run_tests.py`` script: ``python libensemble/tests/run_tests.py``. See ``libensemble/tests/run_tests.py`` for usage information. -- Some tests require third party software to be installed. When developing a feature or fixing a bug, since the entire test suite will be run on Github Actions, -for local development running individual tests is sufficient. -- Individual unit tests can be run with ``pixi run -e dev pytest path/to/test_file``. -- A libEnsemble run typically outputs an ``ensemble.log`` and ``libE_stats.txt`` file in the working directory. Check these files for tracebacks or run statistics. -- An "ensemble" or "workflow" directory may also be created, often containing per-simulation output directories - -Modernizing Scripts for libEnsemble 2.0 ---------------------------------------- - -When modernizing existing libEnsemble scripts (functionality tests, regression tests, or user examples) for version 2.0, follow these steps: - -- **Switch to `gest-api` Generators**: Replace legacy generator functions (from `libensemble.gen_funcs`) with standardized generator classes (from `libensemble.gen_classes` or other `gest-api` compatible sources). -- **Use `VOCS` for Parameterization**: Standardized generators are parameterized by a `VOCS` object (from `gest_api.vocs`). Define variables and objectives within this object. -- **Set `gen_specs["generator"]`**: Instead of `gen_f`, use the `generator` field in `GenSpecs` to pass the initialized generator class. -- **Remove Explicit `AllocSpecs`**: In libEnsemble 2.0, `only_persistent_gens` is the default allocator. Scripts that previously used `give_sim_work_first` or other simple allocators can often remove `alloc_specs` entirely when switching to standardized generators. -- **Generator Placement**: By default, generators run on the manager thread (Worker 0). This means all allocated workers are available for simulation tasks unless `gen_on_worker` is explicitly set to `True` in `libE_specs`. -- **Mandatory Fields**: Ensure `gen_specs["in"]` or `gen_specs["persis_in"]` includes at least one field (e.g., `["sim_id"]`) if feedback is sent back to the generator, to satisfy the allocator's requirements. -- **gest-api Simulators**: The gest-api pattern also applies to simulators. Set `SimSpecs.simulator` to a callable with signature `(input_dict: dict, **kwargs) -> dict` instead of providing a `sim_f`. libEnsemble automatically wraps it with `gest_api_sim` from `libensemble.sim_funcs.gest_api_wrapper` and handles all NumPy conversions. `SimSpecs.inputs` and `SimSpecs.outputs` can be derived automatically when `SimSpecs.vocs` is provided. -- **`safe_mode` is opt-in**: `libE_specs["safe_mode"]` defaults to `False`, meaning protected History fields (`gen_worker`, `gen_started_time`, `gen_ended_time`, `sim_worker`, `sim_started`, `sim_started_time`, `sim_ended`, `sim_ended_time`, `gen_informed`, `gen_informed_time`, `kill_sent`) are freely overwritable by default. Set `safe_mode=True` to enable protection. Overwriting these fields without understanding their purpose may crash libEnsemble. +# Agent Contributor Guidelines + +## Architecture + +Manager-worker framework: manager allocates points from a generator to workers running simulators. See `libensemble/tests/regression_tests/test_1d_sampling.py` for a minimal example. + +## Repository Layout + +Core paths relative to `libensemble/`: +- `alloc_funcs/` - Allocation policies. +- `comms/` - Manager-worker communication +- `executors/` - Launching executables +- `gen_classes/` - **Preferred**: gest-api generators +- `gen_funcs/` - Legacy generators +- `resources/` - Compute resource management +- `sim_funcs/` - Simulator functions +- `tests/{functionality,regression,unit}_tests/` +- `ensemble.py` - Primary interface (wraps `libE()`) +- `generators.py` - gest-api base classes +- `history.py` - NumPy structured array for input/output data +- `libE.py` - Entrypoint for libEnsemble, and also legacy top-level interface +- `manager.py` - History & worker coordination +- `specs.py` - `SimSpecs`, `GenSpecs`, `AllocSpecs`, `ExitCriteria`, `LibeSpecs` dataclasses +- `worker.py` - Runs simulators, communicates with manager. Can be configured to run generators as well + +## Specifications (Modern Configs) + +All configs use **dataclasses** from `specs.py`, not bare dicts (legacy): +- `SimSpecs` - simulator config (`sim_f`/`simulator`, `in`/`inputs`, `out`/`outputs`, `vocs`) +- `GenSpecs` - generator config (`gen_f`/`generator`, `in`/`inputs`, `out`/`outputs`, `persis_in`, `vocs`, `user`) +- `AllocSpecs` - allocation function config (`alloc_f`, `user`) +- `ExitCriteria` - termination conditions (`sim_max`, `wallclock_max`, `stop_val`) +- `LibeSpecs` - runtime config (`comms`, `nworkers`, `gen_on_worker`, `safe_mode`, etc.) + +These accept `vocs` from `gest_api.vocs` (not xopt.vocs). The dict-based `sim_specs`/`gen_specs` API still works but is legacy. + +## Generators + +- **gest-api** (preferred): class inheriting `gest_api.Generator`, implements `suggest(input_dicts)`/`ingest(output_dicts)`, parameterized by `VOCS`. See `libensemble/gen_classes/external/sampling.py`. +- Generators are used for sampling, optimization, calibration, uncertainty quantification, and other simulation-based tasks. +- **Legacy**: plain functions with persistent loops. Use `LibensembleGenerator` to wrap into gest-api. +- History array: NumPy structured array with fields from `sim_specs/gen_specs["out"]` or `vocs` attributes plus reserved metadata fields. +- **Automatic Variable Mapping**: `LibensembleGenerator` maps all VOCS vars to `"x"` field unless `variables_mapping` is provided. +- **Mandatory Input Fields**: `gen_specs["in"]`/`["persis_in"]` must have >=1 field (e.g. `["sim_id"]`) when using `only_persistent_gens` allocator. +- **Default Allocator**: `only_persistent_gens` for gest-api generators. + +## Conventions + +- Match output fields ↔ input fields (e.g., `SimSpecs.out` ↔ `GenSpecs.in`, and vice-versa). +- Always use the dataclass configs from `specs.py` (`SimSpecs`, `GenSpecs`, etc.) over legacy bare dicts. +- `SimSpecs`/`GenSpecs` accept `vocs` from `gest_api.vocs` (not xopt.vocs). +- Code style: `black` (enforced by pre-commit via `pre-commit`). +- No destructive git commands without explicit request. + +## Development + +- **pixi** recommended. Enter: `pixi shell -e dev`. One-off: `pixi run -e dev `. (Path: `/opt/homebrew/bin/pixi` or `/usr/local/bin/pixi`.) +- Fallback: `pip install -e .` (may need manual dependency installs). +- Pre-commit: `pre-commit` (config in `.pre-commit-config.yaml`, `pyproject.toml`). + +## Testing + +- Full suite: `python libensemble/tests/run_tests.py` +- Single unit test: `pixi run -e dev pytest path/to/test_file` +- Single regression/functionality test: `pixi run -e dev python path/to/test_file -n 4` +- Check `ensemble.log` and `libE_stats.txt` for run diagnostics. + +## Modernizing for libEnsemble 2.0 + +When updating scripts from legacy patterns: + +- **Generators**: Replace `gen_f` with gest-api `Generator` class set via `gen_specs["generator"]`. +- **VOCS**: Parameterize generators with `VOCS` from `gest_api.vocs`. +- **AllocSpecs**: `AllocSpecs` dataclass replaces bare dict. Often removable — `only_persistent_gens` is the default allocator. +- **Generator placement**: Runs on manager (Worker 0) by default. Set `LibeSpecs(gen_on_worker=True)` to run on a dedicated worker. +- **Input fields**: `GenSpecs.inputs`/`persis_in` must contain >=1 field. +- **Simulators**: Use `SimSpecs.simulator` with `(input_dict, **kwargs) -> dict` instead of `sim_f`. libEnsemble auto-wraps via `gest_api_sim`. `inputs`/`outputs` auto-derived from `vocs`. +- **safe_mode**: `LibeSpecs(safe_mode=False)` by default (protected History fields overwritable). Set `True` to guard metadata fields (`gen_worker`, `sim_worker`, `sim_started`, `sim_ended`, etc.). From dda20b31050569e4d9b3305726199917ecfff5bb Mon Sep 17 00:00:00 2001 From: jlnav Date: Mon, 18 May 2026 12:32:54 -0500 Subject: [PATCH 02/14] enormously refactored manager-only approach to globus-compute. virtual worker-slots represent parallel globus-compute-submitted tasks --- libensemble/executors/__init__.py | 3 +- libensemble/libE.py | 32 +++ libensemble/manager.py | 185 +++++++++++++++++- .../tests/unit_tests/test_ufunc_runners.py | 41 ++-- libensemble/utils/runners.py | 20 +- pixi.lock | 4 +- 6 files changed, 251 insertions(+), 34 deletions(-) diff --git a/libensemble/executors/__init__.py b/libensemble/executors/__init__.py index 563fa3352..fdcae5c87 100644 --- a/libensemble/executors/__init__.py +++ b/libensemble/executors/__init__.py @@ -1,4 +1,5 @@ from libensemble.executors.executor import Executor +from libensemble.executors.globus_compute_executor import GlobusComputeExecutor, GlobusComputeTask from libensemble.executors.mpi_executor import MPIExecutor -__all__ = ["Executor", "MPIExecutor"] +__all__ = ["Executor", "GlobusComputeExecutor", "GlobusComputeTask", "MPIExecutor"] diff --git a/libensemble/libE.py b/libensemble/libE.py index 219e2cd8c..ee86d5a25 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -266,6 +266,16 @@ def libE( if Executor.executor is not None: Executor.executor.add_platform_info(platform_info) + # Detect GC-only mode: manager submits directly to Globus Compute. + # Requires globus_compute_endpoint set and gen_on_worker not set. + # nworkers is repurposed as virtual concurrency (max concurrent GC sims). + if sim_specs.get("globus_compute_endpoint") and not libE_specs.get("gen_on_worker"): + libE_specs["_gc_only"] = True + # Force local comms (no MPI workers needed) + if libE_specs.get("comms", "mpi") != "local": + libE_specs["comms"] = "local" + logger.info("GC-only mode: switching to local comms (no workers needed)") + # Reset gen counter. AllocSupport.gen_counter = 0 @@ -485,6 +495,9 @@ def kill_proc_team(wcomms, timeout): def libE_local(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Main routine for thread/process launch of libE.""" + if libE_specs.get("_gc_only"): + return _libE_local_gc_only(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0) + resources = Resources.resources if resources is not None: local_host = [socket.gethostname()] @@ -523,6 +536,25 @@ def cleanup(): ) +def _libE_local_gc_only(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): + """GC-only variant: skip worker processes entirely.""" + hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0) + + if not libE_specs["disable_log_files"]: + exit_logger = manager_logging_config(specs=libE_specs) + else: + exit_logger = None + + def cleanup(): + """Handler to clean up GC resources.""" + if exit_logger is not None: + exit_logger() + + return manager( + [], sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, hist, on_cleanup=cleanup + ) + + # ==================== TCP version ================================= diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..11661ffc3 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -3,8 +3,10 @@ ============================ """ +import concurrent.futures import cProfile import glob +import inspect import logging import os import platform @@ -30,11 +32,13 @@ MAN_SIGNAL_KILL, PERSIS_STOP, STOP_TAG, + TASK_FAILED, calc_type_strings, ) from libensemble.resources.resources import Resources from libensemble.tools.fields_keys import protected_libE_fields from libensemble.tools.tools import _USER_CALC_DIR_WARNING +from libensemble.utils.globus_compute import GCSession from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer @@ -243,6 +247,17 @@ def __init__( local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) self.wcomms = [local_worker_comm] + self.wcomms + # If GC-only mode, add virtual worker slots for concurrent sims. + # nworkers controls concurrency (default 1 when unset). + if libE_specs.get("_gc_only"): + n_virtual = libE_specs.get("nworkers", 1) or 1 + virtual_W = np.zeros(n_virtual, dtype=Manager.worker_dtype) + start_id = len(self.W) + virtual_W["worker_id"] = np.arange(start_id, start_id + n_virtual) + virtual_W["gen_worker"] = False + self.W = np.concatenate([self.W, virtual_W]) + self.wcomms = self.wcomms + [None] * n_virtual + self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) @@ -574,9 +589,19 @@ def _kill_cancelled_sims(self) -> None: logger.debug(f"Manager sending kill signals to H indices {kill_sim_rows}") kill_ids = self.hist.H["sim_id"][kill_sim_rows] kill_on_workers = self.hist.H["sim_worker"][kill_sim_rows] - for w in kill_on_workers: - self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) - self.hist.H["kill_sent"][kill_ids] = True + + if self.libE_specs.get("_gc_only"): + # Cancel in-flight GC futures for these sim_ids + sim_ids_to_kill = set(kill_ids) + for future, (sim_id, w) in list(self._gc_futures.items()): + if sim_id in sim_ids_to_kill: + future.cancel() + del self._gc_futures[future] + self.hist.H["kill_sent"][kill_ids] = True + else: + for w in kill_on_workers: + self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) + self.hist.H["kill_sent"][kill_ids] = True # --- Handle termination @@ -692,6 +717,157 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: return output + # --- Globus Compute (manager-side) helpers + + def _init_gc(self) -> None: + """Initialize the Globus Compute executor for manager-side submission.""" + endpoint = self.sim_specs.get("globus_compute_endpoint", "") + if not endpoint: + raise ValueError("_gc_only mode requires globus_compute_endpoint in sim_specs") + + self._gc_executor, self._gc_fid = GCSession.get_or_create(endpoint, self.sim_specs["sim_f"]) + self._gc_futures: dict[concurrent.futures.Future, tuple[int, int]] = {} + self._gc_nparams = len(inspect.signature(self.sim_specs["sim_f"]).parameters) + + def _gc_submit(self, Work: dict, w: int) -> None: + """Submit simulation work to Globus Compute instead of a local worker.""" + sim_ids = Work["libE_info"]["H_rows"] + H_fields = Work["H_fields"] + + for sim_id in sim_ids: + calc_in = np.empty(1, dtype=[(name, self.hist.H.dtype.fields[name][0]) for name in H_fields]) + for name in H_fields: + calc_in[name] = self.hist.H[name][sim_id] + + p_info = Work.get("persis_info", {}) + + libE_info = dict(Work["libE_info"]) + libE_info["comm"] = None + libE_info.pop("executor", None) + + args = [calc_in, p_info, self.sim_specs, libE_info][: self._gc_nparams] + + future = self._gc_executor.submit_to_registered_function(self._gc_fid, args) + self._gc_futures[future] = (sim_id, w) + + def _gather_gc_results(self, persis_info: dict) -> dict: + """Poll completed GC futures and update history.""" + done = [f for f in self._gc_futures if f.done()] + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = future.result() + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception as e: + logger.error(f"GC task failed for sim_id {sim_id}: {e}") + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": TASK_FAILED, + "calc_out": None, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": {}, + } + self._update_state_on_worker_msg(persis_info, D_recv, w) + return persis_info + + def _gc_cancel_futures(self) -> None: + """Cancel any in-flight GC futures.""" + for future in list(self._gc_futures.keys()): + future.cancel() + self._gc_futures.clear() + + def _run_gc_only(self, persis_info: dict) -> tuple[dict, int, int]: + """Manager main loop variation for GC-only mode.""" + self._init_gc() + try: + while not self.term_test(): + self._kill_cancelled_sims() + persis_info = self._gather_gc_results(persis_info) + Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) + if flag: + break + + for w in Work: + if self._sim_max_given(): + break + self._check_work_order(Work[w], w) + self._gc_submit(Work[w], w) + self._update_state_on_alloc(Work[w], w) + + assert self.term_test() or any( + self.W["active"] != 0 + ), "alloc_f did not return any work, although all workers are idle." + except WorkerException as e: + report_worker_exc(e) + raise LoggedException(e.args[0], e.args[1]) from None + except Exception as e: + logger.error(traceback.format_exc()) + raise LoggedException(e.args) from None + finally: + result = self._gc_final_receive_and_kill(persis_info) + self.wcomms = [] + sys.stdout.flush() + sys.stderr.flush() + return result + + def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: + """Drain remaining GC futures and shut down the generator thread.""" + if any(self.W["persis_state"]): + for w in self.W["worker_id"][self.W["persis_state"] > 0]: + logger.debug(f"Manager sending PERSIS_STOP to worker {w}") + if self.libE_specs.get("final_gen_send", False): + rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0] + work = { + "H_fields": self.gen_specs["persis_in"], + "persis_info": persis_info.get(w), + "tag": PERSIS_STOP, + "libE_info": {"persistent": True, "H_rows": rows_to_send}, + } + self._check_work_order(work, w, force=True) + self._send_work_order(work, w) + self.hist.update_history_to_gen(rows_to_send) + else: + self.wcomms[w].send(PERSIS_STOP, MAN_SIGNAL_KILL) + if not self.W[w]["active"]: + self.W[w]["active"] = self.W[w]["persis_state"] + self.persis_pending.append(w) + + exit_flag = 0 + while self._gc_futures: + done = [f for f in self._gc_futures if f.done()] + if not done: + time.sleep(0.1) + continue + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = future.result() + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception: + continue + self._update_state_on_worker_msg(persis_info, D_recv, w) + + self._init_every_k_save(complete=True) + self._clean_up_thread() + + if self.live_data is not None: + self.live_data.finalize(self.hist) + + persis_info["num_gens_started"] = 0 + return persis_info, exit_flag, self.elapsed() + # --- Main loop def run(self, persis_info: dict) -> tuple[dict, int, int]: @@ -699,6 +875,9 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: logger.debug(f"Manager initiated on node {socket.gethostname()}") logger.info(f"Manager exit_criteria: {self.exit_criteria}") + if self.libE_specs.get("_gc_only"): + return self._run_gc_only(persis_info) + # Continue receiving and giving until termination test is satisfied try: while not self.term_test(): diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 79fda7c28..e4e813a8e 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -1,9 +1,11 @@ -import mock +from unittest import mock + import numpy as np import pytest import libensemble.tests.unit_tests.setup as setup from libensemble.tools.fields_keys import libE_fields +from libensemble.utils.globus_compute import GCSession from libensemble.utils.runners import Runner @@ -74,7 +76,11 @@ def test_globus_compute_runner_init(): sim_specs["globus_compute_endpoint"] = "1234" - with mock.patch("globus_compute_sdk.Executor"): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-123" + mock_create.return_value = mock_exec + runner = Runner.from_specs(sim_specs) assert hasattr( @@ -88,17 +94,19 @@ def test_globus_compute_runner_pass(): sim_specs["globus_compute_endpoint"] = "1234" - with mock.patch("globus_compute_sdk.Executor"): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-123" + mock_create.return_value = mock_exec + runner = Runner.from_specs(sim_specs) - # Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future + # Creating Mock Globus Compute future object - no exception + globus_compute_future = mock.MagicMock() globus_compute_future.exception.return_value = None globus_compute_future.result.return_value = (True, True) - runner.globus_compute_executor = globus_compute_mock + runner.globus_compute_executor.submit_to_registered_function.return_value = globus_compute_future runners = {1: runner.run} libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} @@ -112,18 +120,21 @@ def test_globus_compute_runner_pass(): def test_globus_compute_runner_fail(): calc_in, sim_specs, gen_specs = get_ufunc_args() + GCSession.clear() sim_specs["globus_compute_endpoint"] = "4321" - with mock.patch("globus_compute_sdk.Executor"): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-432" + mock_create.return_value = mock_exec + runner = Runner.from_specs(sim_specs) - # Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future - globus_compute_future.exception.return_value = Exception + # Creating Mock Globus Compute future object - yes exception + globus_compute_future = mock.MagicMock() + globus_compute_future.exception.return_value = Exception("boom") - runner.globus_compute_executor = globus_compute_mock + runner.globus_compute_executor.submit_to_registered_function.return_value = globus_compute_future runners = {1: runner.run} libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index da554fad9..a10615659 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -9,6 +9,7 @@ from libensemble.generators import LibensembleGenerator, PersistentGenInterfacer from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG from libensemble.tools.persistent_support import PersistentSupport +from libensemble.utils.globus_compute import GCSession from libensemble.utils.misc import list_dicts_to_np, map_numpy_array, np_to_list_dicts, unmap_numpy_array logger = logging.getLogger(__name__) @@ -68,20 +69,12 @@ def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, int | Non class GlobusComputeRunner(Runner): + _session = GCSession() + def __init__(self, specs): super().__init__(specs) - self.globus_compute_executor = self._get_globus_compute_executor()(endpoint_id=specs["globus_compute_endpoint"]) - self.globus_compute_fid = self.globus_compute_executor.register_function(self.f) - - def _get_globus_compute_executor(self): - try: - from globus_compute_sdk import Executor - except ModuleNotFoundError: - logger.warning("Globus Compute use detected but Globus Compute not importable. Is it installed?") - logger.warning("Running function evaluations normally on local resources.") - return None - else: - return Executor + endpoint = specs["globus_compute_endpoint"] + self.globus_compute_executor, self.globus_compute_fid = self._session.get_or_create(endpoint, self.f) def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, int | None): from libensemble.worker import Worker @@ -94,7 +87,8 @@ def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> ( return task_fut.result() def shutdown(self) -> None: - self.globus_compute_executor.shutdown() + if self.globus_compute_executor is not None: + self.globus_compute_executor.shutdown() class ThreadRunner(Runner): diff --git a/pixi.lock b/pixi.lock index 8fa829e09..bcbef304b 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:95763b1326e2a21dd0aabb08e20c13f004f5fa6aef87a33614101e024318c05e -size 1085121 +oid sha256:ced3f2f78fb35503d1e2f2db0f7bc925e6e9e9a20bc83f9207cd03a4918d1639 +size 1084213 From 9cac41c3d7538bf2f62349db45d8f41104158216 Mon Sep 17 00:00:00 2001 From: jlnav Date: Mon, 18 May 2026 13:19:28 -0500 Subject: [PATCH 03/14] adding globus-compute-executor file. adding globus-compute-utils. adding unit tests for those utils --- .../executors/globus_compute_executor.py | 286 +++++++++++++++ libensemble/manager.py | 16 +- .../tests/unit_tests/test_globus_compute.py | 333 ++++++++++++++++++ libensemble/utils/globus_compute.py | 88 +++++ 4 files changed, 722 insertions(+), 1 deletion(-) create mode 100644 libensemble/executors/globus_compute_executor.py create mode 100644 libensemble/tests/unit_tests/test_globus_compute.py create mode 100644 libensemble/utils/globus_compute.py diff --git a/libensemble/executors/globus_compute_executor.py b/libensemble/executors/globus_compute_executor.py new file mode 100644 index 000000000..d6d1f3628 --- /dev/null +++ b/libensemble/executors/globus_compute_executor.py @@ -0,0 +1,286 @@ +import logging +import os +from concurrent.futures import Future, TimeoutError +from typing import Any + +from libensemble.executors.executor import ( + Application, + Executor, + ExecutorException, + Task, + TimeoutExpired, +) +from libensemble.utils.globus_compute import GCSession +from libensemble.utils.timer import TaskTimer + +logger = logging.getLogger(__name__) + + +class GlobusComputeTask(Task): + """A :class:`~libensemble.executors.executor.Task` wrapping a + ``concurrent.futures.Future`` returned by Globus Compute. + + Instead of managing a local subprocess, this task polls a remote + computation via the future's ``done()`` / ``result()`` APIs. + """ + + def __init__(self, future, app=None, app_args=None, workerid=None): + self.id = next(Task.newid) + self.reset() + self.timer = TaskTimer() + self.app = app + self.app_args = app_args + self.workerID = workerid + self._gc_future = future + + worker_name = f"_worker{self.workerID}" if self.workerID else "" + self.name = Task.prefix + f"_{app.name}{worker_name}_{self.id}" + self.stdout = "" + self.stderr = "" + self.workdir = None + self.dry_run = False + self.runline = None + self.run_attempts = 0 + self.env = {} + self.ngpus_req = 0 + + self.state = "RUNNING" + self.timer.start() + self.submit_time = self.timer.tstart + + def _check_poll(self): + if self.finished: + return False + return True + + def poll(self): + if not self._check_poll(): + return + if self._gc_future.done(): + try: + self._gc_future.result() + self.finished = True + self.success = True + self.state = "FINISHED" + except Exception: + self.finished = True + self.success = False + self.state = "FAILED" + self.calc_task_timing() + else: + self.state = "RUNNING" + self.runtime = self.timer.elapsed + + def wait(self, timeout=None): + if not self._check_poll(): + return + try: + self._gc_future.result(timeout=timeout) + self.finished = True + self.success = True + self.state = "FINISHED" + except TimeoutError: + raise TimeoutExpired(self.name, timeout) + except Exception: + self.finished = True + self.success = False + self.state = "FAILED" + self.calc_task_timing() + + def kill(self, wait_time=None): + self._gc_future.cancel() + self.state = "USER_KILLED" + self.finished = True + self.calc_task_timing() + + def result(self, timeout=None): + self.wait(timeout=timeout) + return self.state + + def running(self): + self.poll() + return self.state == "RUNNING" + + def done(self): + self.poll() + return self.finished + + def cancelled(self): + self.poll() + return self.state == "USER_KILLED" + + +class GlobusComputeExecutor(Executor): + """An :class:`~libensemble.executors.executor.Executor` that submits + Python callables to Globus Compute instead of launching local subprocesses. + + Usage in a calling script:: + + from libensemble.executors.globus_compute_executor import GlobusComputeExecutor + + exctr = GlobusComputeExecutor(endpoint_id="...") + Executor.executor = exctr + + Inside a simulator function:: + + task = info["executor"].submit(func=my_remote_func, app_args=...) + while not task.finished: + task.poll() + if info["executor"].manager_kill_received(): + task.kill() + break + time.sleep(0.1) + """ + + def __init__(self, endpoint_id: str): + self.manager_signal = None + self.default_apps: dict[str, Application | None] = {"sim": None, "gen": None} + self.apps: dict[str, Application] = {} + self.wait_time = 60 + self.list_of_tasks: list[GlobusComputeTask] = [] + self.workerID = None + self.comm = None + self.last_task = 0 + self.base_dir = os.getcwd() + + self.endpoint_id = endpoint_id + self._gc_executor = None + self._func_cache: dict[int, str] = {} + + def _ensure_gc(self): + if self._gc_executor is None: + self._gc_executor = GCSession.get_or_create_executor(self.endpoint_id) + return self._gc_executor + + def _get_func_id(self, func) -> str: + key = id(func) + if key in self._func_cache: + return self._func_cache[key] + executor = self._ensure_gc() + if executor is None: + raise RuntimeError( + "Globus Compute SDK is not installed. " "Install it with: pip install globus-compute-sdk" + ) + fid = executor.register_function(func) + self._func_cache[key] = fid + return fid + + def register_app( + self, + full_path: str, + app_name: str | None = None, + calc_type: str | None = None, + desc: str | None = None, + precedent: str = "", + pyobj: Any | None = None, + ) -> None: + """Register an application. + + If *pyobj* is provided the application is treated as a remote + Python callable. Otherwise the base-class behaviour applies + (local executable). + """ + if not app_name: + app_name = os.path.split(full_path)[1] + + app = Application(full_path, app_name, calc_type, desc, pyobj, precedent) + self.apps[app_name] = app + + if calc_type is not None: + if calc_type not in self.default_apps: + raise ExecutorException(f"Unrecognized calculation type {calc_type}") + self.default_apps[calc_type] = app + + def submit( + self, + calc_type: str | None = None, + app_name: str | None = None, + app_args: str | None = None, + func: Any = None, + stdout: str | None = None, + stderr: str | None = None, + dry_run: bool = False, + wait_on_start: bool = False, + **kwargs, + ) -> GlobusComputeTask: + """Submit a function or registered application to Globus Compute. + + Parameters + ---------- + calc_type : str, optional + Calculation type (``"sim"`` or ``"gen"``). Used with *app_name*. + app_name : str, optional + Name of a previously registered application. + app_args : str, optional + Arguments passed alongside the function. + func : callable, optional + A Python callable to execute remotely. Takes precedence over + *app_name* / *calc_type*. + stdout, stderr : str, optional + Ignored (stubs for API compatibility). + dry_run : bool, optional + If True, return a task without actually submitting. + wait_on_start : bool, optional + If True, block until the task is reported as started. + + Returns + ------- + GlobusComputeTask + """ + if dry_run: + raise NotImplementedError("dry_run is not supported for GlobusComputeExecutor") + + if func is not None: + fid = self._get_func_id(func) + app = Application(full_path="", name=func.__name__, calc_type="sim", pyobj=func) + elif app_name is not None: + app = self.get_app(app_name) + if app.pyobj is not None: + fid = self._get_func_id(app.pyobj) + else: + raise ValueError( + f"Application '{app_name}' has no pyobj callable registered. " + "Use the `func=...` argument, or register an app with `pyobj=`." + ) + elif calc_type is not None: + app = self.default_app(calc_type) + if app.pyobj is not None: + fid = self._get_func_id(app.pyobj) + else: + raise ValueError( + f"Default {calc_type} app has no pyobj callable. " + "Use the `func=...` argument, or register an app with `pyobj=`." + ) + else: + raise ValueError("One of `func`, `app_name`, or `calc_type` must be provided") + + args = app_args + future: Future = self._ensure_gc().submit_to_registered_function(fid, args) + task = GlobusComputeTask(future, app=app, app_args=args, workerid=self.workerID) + self.list_of_tasks.append(task) + + if wait_on_start: + task.wait() + + return task + + def set_workerID(self, workerid) -> None: + """Sets the worker ID for this executor.""" + self.workerID = workerid + + def set_worker_info(self, comm=None, workerid=None) -> None: + """Sets worker info for this executor.""" + self.workerID = workerid + self.comm = comm + + def serial_setup(self): + pass + + def set_resources(self, resources): + pass + + def add_platform_info(self, platform_info=None): + pass + + def set_gen_procs_gpus(self, libE_info): + pass diff --git a/libensemble/manager.py b/libensemble/manager.py index 11661ffc3..b785e6797 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -751,7 +751,19 @@ def _gc_submit(self, Work: dict, w: int) -> None: self._gc_futures[future] = (sim_id, w) def _gather_gc_results(self, persis_info: dict) -> dict: - """Poll completed GC futures and update history.""" + """Poll completed GC futures *and* the gen worker comm, then update history.""" + time.sleep(0.0001) # Avoid busy-wait (matches _receive_from_workers) + + # Poll the gen worker thread (w=0) so generator output reaches history + new_stuff = True + while new_stuff: + new_stuff = False + for w in self.W["worker_id"]: + if self.wcomms[w] is not None and self.wcomms[w].mail_flag(): + new_stuff = True + self._handle_msg_from_worker(persis_info, w) + + # Drain completed GC futures done = [f for f in self._gc_futures if f.done()] for future in done: sim_id, w = self._gc_futures.pop(future) @@ -774,6 +786,8 @@ def _gather_gc_results(self, persis_info: dict) -> dict: "persis_info": {}, } self._update_state_on_worker_msg(persis_info, D_recv, w) + + self._init_every_k_save() return persis_info def _gc_cancel_futures(self) -> None: diff --git a/libensemble/tests/unit_tests/test_globus_compute.py b/libensemble/tests/unit_tests/test_globus_compute.py new file mode 100644 index 000000000..a7506b75b --- /dev/null +++ b/libensemble/tests/unit_tests/test_globus_compute.py @@ -0,0 +1,333 @@ +from unittest import mock + +import pytest + +from libensemble.executors.globus_compute_executor import ( + GlobusComputeExecutor, + GlobusComputeTask, +) +from libensemble.utils.globus_compute import GCSession + +# ────────────────────────────────────────────── +# GCSession +# ────────────────────────────────────────────── + + +class TestGCSession: + def setup_method(self): + GCSession.clear() + + def test_get_or_create_executor(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_create.return_value = mock_exec + + ex1 = GCSession.get_or_create_executor("ep-1") + assert ex1 is mock_exec + mock_create.assert_called_once_with("ep-1") + + # Cache hit + ex2 = GCSession.get_or_create_executor("ep-1") + assert ex2 is mock_exec + mock_create.assert_called_once() # No second call + + def test_get_or_create_caches_func_id(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-42" + mock_create.return_value = mock_exec + + def my_func(): + pass + + ex1, fid1 = GCSession.get_or_create("ep-1", my_func) + assert ex1 is mock_exec + assert fid1 == "fid-42" + mock_exec.register_function.assert_called_once_with(my_func) + + # Second call — same (executor, fid) returned, no re-registration + ex2, fid2 = GCSession.get_or_create("ep-1", my_func) + assert ex2 is mock_exec + assert fid2 == "fid-42" + mock_exec.register_function.assert_called_once() + + def test_register_function(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-99" + mock_create.return_value = mock_exec + + def f(): + pass + + ex, fid = GCSession.register_function("ep-1", f) + assert ex is mock_exec + assert fid == "fid-99" + mock_exec.register_function.assert_called_once_with(f) + + def test_module_not_found_returns_none(self): + # Force _create_executor to simulate missing SDK + GCSession._create_executor = classmethod(lambda cls, eid: None) + + ex = GCSession.get_or_create_executor("ep-1") + assert ex is None + + ex, fid = GCSession.get_or_create("ep-1", lambda: None) + assert ex is None + assert fid is None + + def test_thread_safety(self): + import threading + + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid" + mock_create.return_value = mock_exec + + errors = [] + + def access(): + try: + for _ in range(100): + GCSession.get_or_create_executor("ep-t") + GCSession.get_or_create("ep-t", lambda: None) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=access) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Thread safety errors: {errors}" + + +# ────────────────────────────────────────────── +# GlobusComputeTask +# ────────────────────────────────────────────── + + +class TestGlobusComputeTask: + def make_task(self, future=None, app=None): + if app is None: + from libensemble.executors.executor import Application + + app = Application("", name="test_func", calc_type="sim", pyobj=lambda: None) + if future is None: + future = mock.MagicMock() + future.done.return_value = False + return GlobusComputeTask(future, app=app) + + def test_initial_state(self): + task = self.make_task() + assert task.state == "RUNNING" + assert not task.finished + assert task._gc_future is not None + + def test_poll_running(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + task.poll() + assert task.state == "RUNNING" + assert not task.finished + + def test_poll_finished_success(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = None # No exception + task = self.make_task(future=future) + task.poll() + assert task.state == "FINISHED" + assert task.finished + assert task.success + + def test_poll_finished_failure(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.side_effect = RuntimeError("boom") + task = self.make_task(future=future) + task.poll() + assert task.state == "FAILED" + assert task.finished + assert not task.success + + def test_wait_timeout(self): + future = mock.MagicMock() + future.result.side_effect = TimeoutError("timed out") + task = self.make_task(future=future) + with pytest.raises(Exception, match="timed out"): + task.wait(timeout=0.001) + + def test_kill(self): + future = mock.MagicMock() + task = self.make_task(future=future) + task.kill() + assert task.state == "USER_KILLED" + assert task.finished + future.cancel.assert_called_once() + + def test_running(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + assert task.running() + + def test_done(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = None + task = self.make_task(future=future) + assert task.done() + + def test_not_done(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + assert not task.done() + + +# ────────────────────────────────────────────── +# GlobusComputeExecutor +# ────────────────────────────────────────────── + + +class TestGlobusComputeExecutor: + def setup_method(self): + GCSession.clear() + + def test_init(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + assert exctr.endpoint_id == "ep-test" + assert exctr._gc_executor is None # Lazy init + assert exctr.workerID is None + + def test_submit_with_func(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-xyz" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + future_mock = mock.MagicMock() + mock_exec.submit_to_registered_function.return_value = future_mock + + def my_func(x): + return x * 2 + + task = exctr.submit(func=my_func, app_args="hello") + assert isinstance(task, GlobusComputeTask) + assert task._gc_future is future_mock + assert task.app is not None + assert task.app.name == "my_func" + + def test_submit_with_registered_app_pyobj(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-app" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + def app_func(): + return 42 + + exctr.register_app("/fake/path", app_name="myapp", calc_type="sim", pyobj=app_func) + + future_mock = mock.MagicMock() + mock_exec.submit_to_registered_function.return_value = future_mock + + task = exctr.submit(app_name="myapp") + assert isinstance(task, GlobusComputeTask) + assert task.app.name == "myapp" + + def test_submit_without_func_or_app_raises(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + with pytest.raises(ValueError): + exctr.submit() + + def test_register_function_caching(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-cached" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + def my_func(): + pass + + fid1 = exctr._get_func_id(my_func) + fid2 = exctr._get_func_id(my_func) + assert fid1 == fid2 + # Should only register once + assert mock_exec.register_function.call_count == 1 + + def test_manager_kill_received(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + # Set up a mock comm so manager_poll doesn't assert + mock_comm = mock.MagicMock() + mock_comm.mail_flag.return_value = False + exctr.set_worker_info(comm=mock_comm, workerid=1) + assert exctr.manager_kill_received() is False + + def test_register_app_no_pyobj(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr.register_app("/bin/echo", app_name="echo", calc_type="sim") + app = exctr.get_app("echo") + assert app.pyobj is None + assert app.full_path == "/bin/echo" + + +# ────────────────────────────────────────────── +# Refactored GlobusComputeRunner +# ────────────────────────────────────────────── + + +class TestGlobusComputeRunnerRefactored: + def setup_method(self): + GCSession.clear() + + def test_runner_uses_gcsession(self): + from libensemble.utils.runners import GlobusComputeRunner + + assert hasattr(GlobusComputeRunner, "_session") + assert isinstance(GlobusComputeRunner._session, GCSession) + + def test_runner_from_specs_with_endpoint(self): + import libensemble.tests.unit_tests.setup as setup + + sim_specs, _, _ = setup.make_criteria_and_specs_0() + sim_specs["globus_compute_endpoint"] = "ep-123" + + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-runner" + mock_create.return_value = mock_exec + + from libensemble.utils.runners import GlobusComputeRunner, Runner + + runner = Runner.from_specs(sim_specs) + assert isinstance(runner, GlobusComputeRunner) + assert runner.globus_compute_executor is mock_exec + assert runner.globus_compute_fid == "fid-runner" + + def test_runner_from_specs_without_endpoint(self): + import libensemble.tests.unit_tests.setup as setup + + sim_specs, _, _ = setup.make_criteria_and_specs_0() + assert not sim_specs.get("globus_compute_endpoint") + + from libensemble.utils.runners import Runner + + runner = Runner.from_specs(sim_specs) + assert not hasattr(runner, "globus_compute_executor") or runner.globus_compute_executor is None + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/libensemble/utils/globus_compute.py b/libensemble/utils/globus_compute.py new file mode 100644 index 000000000..ee7648e63 --- /dev/null +++ b/libensemble/utils/globus_compute.py @@ -0,0 +1,88 @@ +import logging +import threading + +logger = logging.getLogger(__name__) + + +class GCSession: + """Per-process singleton cache for Globus Compute executors. + + Caches executor instances keyed by endpoint_id, ensuring only one + executor per endpoint per process. Thread-safe via ``threading.Lock``. + """ + + _instances: dict[str, tuple] = {} + _lock = threading.Lock() + + @classmethod + def get_or_create_executor(cls, endpoint_id: str): + """Get or create a cached executor for the given endpoint. + + Unlike :meth:`get_or_create`, this does **not** register a function. + """ + with cls._lock: + if endpoint_id in cls._instances: + return cls._instances[endpoint_id][0] + + executor = cls._create_executor(endpoint_id) + if executor is None: + return None + + cls._instances[endpoint_id] = (executor, None) + return executor + + @classmethod + def get_or_create(cls, endpoint_id: str, func): + """Get or create a cached ``(executor, func_id)`` pair. + + The first call for an endpoint creates the executor and registers + the callable. Subsequent calls return the cached pair (the + registered function is re-used). + """ + with cls._lock: + if endpoint_id in cls._instances: + executor, existing_fid = cls._instances[endpoint_id] + if existing_fid is not None: + return executor, existing_fid + # Executor exists but no function registered yet + func_id = executor.register_function(func) + cls._instances[endpoint_id] = (executor, func_id) + return executor, func_id + + executor = cls._create_executor(endpoint_id) + if executor is None: + return None, None + + func_id = executor.register_function(func) + cls._instances[endpoint_id] = (executor, func_id) + return executor, func_id + + @classmethod + def register_function(cls, endpoint_id: str, func): + """Register an additional function with an existing executor. + + Returns ``(executor, func_id)``. Unlike :meth:`get_or_create`, + this always registers and never caches the func_id (caller should + cache it themselves). + """ + executor = cls.get_or_create_executor(endpoint_id) + if executor is None: + return None, None + func_id = executor.register_function(func) + return executor, func_id + + @classmethod + def _create_executor(cls, endpoint_id: str): + try: + from globus_compute_sdk import Executor + except ModuleNotFoundError: + logger.warning("Globus Compute use detected but Globus Compute not importable. " "Is it installed?") + logger.warning("Running function evaluations normally on local resources.") + return None + return Executor(endpoint_id=endpoint_id) + + @classmethod + def clear(cls): + """Clear the cache (primarily for testing).""" + with cls._lock: + cls._instances.clear() From 76fa11b893748dd48baae8975c549edbabd619aa Mon Sep 17 00:00:00 2001 From: jlnav Date: Mon, 18 May 2026 13:56:37 -0500 Subject: [PATCH 04/14] support submitting gest-api-styled sims to globus-compute --- libensemble/manager.py | 26 +- .../tests/unit_tests/test_globus_compute.py | 223 ++++++++++++++++++ 2 files changed, 247 insertions(+), 2 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index b785e6797..0e469e726 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -33,6 +33,7 @@ PERSIS_STOP, STOP_TAG, TASK_FAILED, + WORKER_DONE, calc_type_strings, ) from libensemble.resources.resources import Resources @@ -750,6 +751,27 @@ def _gc_submit(self, Work: dict, w: int) -> None: future = self._gc_executor.submit_to_registered_function(self._gc_fid, args) self._gc_futures[future] = (sim_id, w) + @staticmethod + def _normalize_gc_result(result): + """Normalize a sim_f return value to a 3-tuple ``(out, persis_info, calc_status)``. + + Sim functions may return: + - 3-tuple: ``(H_o, persis_info, calc_status)`` + - 2-tuple: ``(H_o, persis_info)`` — common with gest-api wrappers + + In the 2-tuple case, *calc_status* defaults to ``WORKER_DONE``. + """ + if isinstance(result, (tuple, list)): + if len(result) >= 3: + return result[0], result[1], result[2] + elif len(result) == 2: + if isinstance(result[1], (int, str)): + return result[0], {}, result[1] + return result[0], result[1], WORKER_DONE + else: + return result[0], {}, WORKER_DONE + return result, {}, WORKER_DONE + def _gather_gc_results(self, persis_info: dict) -> dict: """Poll completed GC futures *and* the gen worker comm, then update history.""" time.sleep(0.0001) # Avoid busy-wait (matches _receive_from_workers) @@ -768,7 +790,7 @@ def _gather_gc_results(self, persis_info: dict) -> dict: for future in done: sim_id, w = self._gc_futures.pop(future) try: - out, p_info, calc_status = future.result() + out, p_info, calc_status = self._normalize_gc_result(future.result()) D_recv = { "calc_type": EVAL_SIM_TAG, "calc_status": calc_status, @@ -861,7 +883,7 @@ def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int] for future in done: sim_id, w = self._gc_futures.pop(future) try: - out, p_info, calc_status = future.result() + out, p_info, calc_status = self._normalize_gc_result(future.result()) D_recv = { "calc_type": EVAL_SIM_TAG, "calc_status": calc_status, diff --git a/libensemble/tests/unit_tests/test_globus_compute.py b/libensemble/tests/unit_tests/test_globus_compute.py index a7506b75b..2e780e6ec 100644 --- a/libensemble/tests/unit_tests/test_globus_compute.py +++ b/libensemble/tests/unit_tests/test_globus_compute.py @@ -1,11 +1,14 @@ from unittest import mock +import numpy as np import pytest from libensemble.executors.globus_compute_executor import ( GlobusComputeExecutor, GlobusComputeTask, ) +from libensemble.manager import Manager +from libensemble.message_numbers import TASK_FAILED, WORKER_DONE from libensemble.utils.globus_compute import GCSession # ────────────────────────────────────────────── @@ -329,5 +332,225 @@ def test_runner_from_specs_without_endpoint(self): assert not hasattr(runner, "globus_compute_executor") or runner.globus_compute_executor is None +# ────────────────────────────────────────────── +# _normalize_gc_result (Manager static method) +# ────────────────────────────────────────────── + + +class TestNormalizeGcResult: + """Tests for Manager._normalize_gc_result which normalizes sim_f + return values (2-tuple from gest-api, 3-tuple from legacy) to a + consistent 3-tuple ``(out, persis_info, calc_status)``.""" + + def test_3_tuple_passthrough(self): + """Legacy sim_f returns (H_o, persis_info, calc_status).""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {"a": 1}, WORKER_DONE) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"a": 1} + assert status == WORKER_DONE + + def test_3_tuple_with_task_failed(self): + """Legacy sim_f returns a failure status.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {}, TASK_FAILED) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == TASK_FAILED + + def test_2_tuple_with_persis_info(self): + """gest-api wrapper returns (H_o, persis_info) — no calc_status.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {"key": "val"}) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"key": "val"} + assert status == WORKER_DONE + + def test_2_tuple_with_empty_persis_info(self): + """gest-api wrapper returns (H_o, {}).""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {}) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_2_tuple_with_int_status(self): + """Some sim_f may return (H_o, calc_status) without persis_info.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, WORKER_DONE) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_2_tuple_with_str_status(self): + """Sim_f returns (H_o, 'some_status_string').""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, "Custom status") + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == "Custom status" + + def test_1_tuple(self): + """Edge case: single-element tuple.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o,) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_non_tuple(self): + """Edge case: bare numpy array (not wrapped in tuple).""" + H_o = np.zeros(1, dtype=[("y", float)]) + out, p_info, status = Manager._normalize_gc_result(H_o) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_list_result(self): + """Result returned as a list instead of tuple.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = [H_o, {"x": 1}, WORKER_DONE] + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"x": 1} + assert status == WORKER_DONE + + +# ────────────────────────────────────────────── +# Manager-side GC with gest-api sim_f +# ────────────────────────────────────────────── + + +class TestGatherGcResultsGestApi: + """Tests that _gather_gc_results correctly handles futures returning + 2-tuples (gest-api) and 3-tuples (legacy).""" + + def setup_method(self): + GCSession.clear() + + def _make_mock_manager(self): + """Create a minimal mock Manager with just enough structure for + _gather_gc_results to run.""" + mgr = mock.MagicMock(spec=Manager) + mgr._gc_futures = {} + mgr._normalize_gc_result = Manager._normalize_gc_result + mgr._gather_gc_results = lambda pi: Manager._gather_gc_results(mgr, pi) + mgr._init_every_k_save = mock.MagicMock() + + # Minimal W array with one gen worker (w=0) + W = np.zeros(2, dtype=[("worker_id", int)]) + W["worker_id"] = [0, 1] + mgr.W = W + + # wcomms: gen worker has a comm, virtual worker is None + mock_comm = mock.MagicMock() + mock_comm.mail_flag.return_value = False + mgr.wcomms = {0: mock_comm, 1: None} + + return mgr + + def test_2_tuple_gest_api_future(self): + """Future returns (H_o, persis_info) — gest-api sim via gest_api_sim wrapper.""" + mgr = self._make_mock_manager() + + H_o = np.zeros(1, dtype=[("y", float)]) + H_o["y"] = 42.0 + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = (H_o, {"some": "info"}) + + mgr._gc_futures[future] = (0, 1) # sim_id=0, virtual_w=1 + mgr._gather_gc_results({}) + + mgr._update_state_on_worker_msg.assert_called_once() + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == WORKER_DONE + assert D_recv["calc_out"] is H_o + assert D_recv["persis_info"] == {"some": "info"} + assert D_recv["libE_info"]["H_rows"] == [0] + + def test_3_tuple_legacy_future(self): + """Future returns (H_o, persis_info, calc_status) — legacy sim_f.""" + mgr = self._make_mock_manager() + + H_o = np.zeros(1, dtype=[("y", float)]) + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = (H_o, {}, WORKER_DONE) + + mgr._gc_futures[future] = (5, 1) + mgr._gather_gc_results({}) + + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == WORKER_DONE + assert D_recv["calc_out"] is H_o + + def test_failed_future(self): + """Future raises an exception — should get TASK_FAILED status.""" + mgr = self._make_mock_manager() + + future = mock.MagicMock() + future.done.return_value = True + future.result.side_effect = RuntimeError("remote error") + + mgr._gc_futures[future] = (3, 1) + mgr._gather_gc_results({}) + + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == TASK_FAILED + assert D_recv["calc_out"] is None + + def test_multiple_futures_mixed(self): + """Mix of 2-tuple and 3-tuple futures drained in one call.""" + mgr = self._make_mock_manager() + + H_o1 = np.zeros(1, dtype=[("y", float)]) + H_o1["y"] = 1.0 + H_o2 = np.zeros(1, dtype=[("y", float)]) + H_o2["y"] = 2.0 + + f1 = mock.MagicMock() + f1.done.return_value = True + f1.result.return_value = (H_o1, {}) # 2-tuple (gest-api) + + f2 = mock.MagicMock() + f2.done.return_value = True + f2.result.return_value = (H_o2, {"p": 2}, WORKER_DONE) # 3-tuple (legacy) + + mgr._gc_futures[f1] = (0, 1) + mgr._gc_futures[f2] = (1, 1) + mgr._gather_gc_results({}) + + assert mgr._update_state_on_worker_msg.call_count == 2 + + # Verify both calls got correct calc_status + for call in mgr._update_state_on_worker_msg.call_args_list: + D_recv = call[0][1] + assert D_recv["calc_status"] == WORKER_DONE + + def test_not_done_future_skipped(self): + """Futures that are not done should be skipped and remain in dict.""" + mgr = self._make_mock_manager() + + f = mock.MagicMock() + f.done.return_value = False + + mgr._gc_futures[f] = (0, 1) + mgr._gather_gc_results({}) + + mgr._update_state_on_worker_msg.assert_not_called() + assert f in mgr._gc_futures + + if __name__ == "__main__": pytest.main([__file__]) From 9efa8a826f68ab413c0c460ebb58588b95722c84 Mon Sep 17 00:00:00 2001 From: jlnav Date: Mon, 18 May 2026 16:08:07 -0500 Subject: [PATCH 05/14] document the two new globus compute approaches, and remove the old approach --- .../advanced_installation.rst | 13 +- docs/executor/ex_globus_compute.rst | 57 ++++++++ docs/executor/ex_index.rst | 8 +- docs/executor/ex_overview.rst | 15 ++ docs/platforms/platforms_index.rst | 137 ++++++++++++++---- docs/running_libE.rst | 4 +- libensemble/libE.py | 6 +- libensemble/specs.py | 8 +- .../tests/unit_tests/test_globus_compute.py | 45 ------ .../tests/unit_tests/test_ufunc_runners.py | 81 ----------- libensemble/utils/runners.py | 26 ---- 11 files changed, 211 insertions(+), 189 deletions(-) create mode 100644 docs/executor/ex_globus_compute.rst diff --git a/docs/advanced_installation/advanced_installation.rst b/docs/advanced_installation/advanced_installation.rst index fc2e8546a..ea30fd4d0 100644 --- a/docs/advanced_installation/advanced_installation.rst +++ b/docs/advanced_installation/advanced_installation.rst @@ -31,7 +31,18 @@ Further recommendations for selected HPC systems are given in the Globus Compute -------------- -`Globus Compute`_ may be installed optionally to submit simulation function instances to remote Globus Compute endpoints. +`Globus Compute`_ may be installed optionally to submit simulation function +instances to remote Globus Compute endpoints:: + + pip install globus-compute-sdk + +This is an optional dependency; libEnsemble operates normally without it. +If Globus Compute is not installed and a ``globus_compute_endpoint`` is +configured, libEnsemble will warn and fall back to local execution. + +See :ref:`Globus Compute - Remote User Functions` for +usage, and the :doc:`GlobusComputeExecutor API reference` +for the full executor interface. .. _Globus Compute: https://www.globus.org/compute .. _Python: http://www.python.org diff --git a/docs/executor/ex_globus_compute.rst b/docs/executor/ex_globus_compute.rst new file mode 100644 index 000000000..9b8c4572a --- /dev/null +++ b/docs/executor/ex_globus_compute.rst @@ -0,0 +1,57 @@ +Globus Compute Executor +======================= + +`Overview `__ \|\| `Base Executor `__ \|\| `MPI Executor `__ \|\| **Globus Compute Executor** + +The :class:`GlobusComputeExecutor` +submits Python callables to a remote `Globus Compute`_ endpoint instead of +launching local subprocesses. It can be used inside simulator functions in the +same way as the :doc:`MPI Executor`, retrieving it from +``libE_info["executor"]``. + +See :ref:`Globus Compute - Remote User Functions` for an +overview of the three GC integration modes (manager-side GC-only, user-facing +executor, and legacy drop-in). + +.. note:: + + ``globus-compute-sdk`` must be installed to use this executor:: + + pip install globus-compute-sdk + + Users must also authenticate via Globus_ and have an active + `Globus Compute endpoint`_ running on the target system. + +GlobusComputeExecutor +--------------------- + +.. autoclass:: libensemble.executors.globus_compute_executor.GlobusComputeExecutor + :members: register_app, submit, set_workerID, set_worker_info + :show-inheritance: + + .. automethod:: __init__ + +GlobusComputeTask +----------------- + +Tasks are created and returned by +:meth:`GlobusComputeExecutor.submit()`. +Each task wraps a ``concurrent.futures.Future`` from the Globus Compute SDK +and exposes the same polling interface as other libEnsemble tasks. + +.. autoclass:: libensemble.executors.globus_compute_executor.GlobusComputeTask + :members: poll, wait, kill, result, running, done, cancelled + +**Task states**: ``RUNNING`` | ``FINISHED`` | ``FAILED`` | ``USER_KILLED`` + +**Key attributes**: + +:task.state: (string) Current task state — one of the values above. +:task.finished: (bool) True once the task has completed (successfully or not). +:task.success: (bool) True if the remote callable returned without raising. +:task.runtime: (float) Elapsed wall-clock seconds since submission. +:task.submit_time: (float) Time since epoch at submission. + +.. _Globus Compute: https://www.globus.org/compute +.. _Globus: https://www.globus.org/ +.. _Globus Compute endpoint: https://globus-compute.readthedocs.io/en/latest/endpoints.html diff --git a/docs/executor/ex_index.rst b/docs/executor/ex_index.rst index a4f33cb39..1de648ade 100644 --- a/docs/executor/ex_index.rst +++ b/docs/executor/ex_index.rst @@ -1,6 +1,6 @@ .. _executor_index: -**Overview** \|\| `Base Executor `__ \|\| `MPI Executor `__ +**Overview** \|\| `Base Executor `__ \|\| `MPI Executor `__ \|\| `Globus Compute Executor `__ Executors ========= @@ -14,8 +14,12 @@ portable interface for running and managing user applications. ex_overview ex_base ex_mpi + ex_globus_compute The **Executor** provides a portable interface for running applications on any system and -any number of compute resources. +any number of compute resources. The :doc:`MPI Executor` launches MPI +applications on local resources; the +:doc:`Globus Compute Executor` submits Python callables to +remote Globus Compute endpoints. Please select from the sections above or the sidebar navigation to read more. diff --git a/docs/executor/ex_overview.rst b/docs/executor/ex_overview.rst index f53510b73..e091e94ce 100644 --- a/docs/executor/ex_overview.rst +++ b/docs/executor/ex_overview.rst @@ -156,4 +156,19 @@ which partitions resources among workers, ensuring that runs utilize different resources (e.g., nodes). Furthermore, the ``MPIExecutor`` offers resilience via the feature of re-launching tasks that fail to start because of system factors. +Remote Execution with Globus Compute +------------------------------------- + +The :doc:`GlobusComputeExecutor` submits Python callables +to remote `Globus Compute`_ endpoints instead of launching local subprocesses. +It exposes the same ``submit()`` / ``poll()`` / ``kill()`` interface as other +libEnsemble executors and can be retrieved from ``libE_info["executor"]`` +inside simulator functions. + +See :ref:`Globus Compute - Remote User Functions` for an +overview of all three GC integration modes and the +:doc:`GlobusComputeExecutor API reference` for the full +interface. + .. _concurrent futures: https://docs.python.org/library/concurrent.futures.html +.. _Globus Compute: https://www.globus.org/compute diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index d1de30c45..d141809f9 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -164,55 +164,138 @@ to isolate this work from the workers. Globus Compute - Remote User Functions -------------------------------------- -If libEnsemble is running on some resource with -internet access (laptops, login nodes, other servers, etc.), workers can be instructed to -launch generator or simulator user function instances to separate resources from -themselves via `Globus Compute`_ (formerly funcX), a distributed, high-performance function-as-a-service platform: +`Globus Compute`_ (formerly funcX) is a distributed, high-performance +function-as-a-service platform. When libEnsemble is running on a resource with +internet access (laptops, login nodes, other servers, etc.), it can offload +simulator calls to remote Globus Compute endpoints: .. image:: ../images/funcxmodel.png :alt: running_with_globus_compute :scale: 50 :align: center -This is useful for running ensembles across machines and heterogeneous resources, but -comes with several caveats: +This is useful for running ensembles across machines and heterogeneous resources. +There are **three approaches**, described below. - 1. User functions registered with Globus Compute must be *non-persistent*, since - manager-worker communicators can't be serialized or used by a remote resource. +The following caveats apply to all Globus Compute modes: - 2. Likewise, the ``Executor.manager_poll()`` capability is disabled. The only - available control over remote functions by workers is processing return values - or exceptions when they complete. + 1. Simulator functions submitted to Globus Compute must be *non-persistent*, + since manager-worker communicators cannot be serialized or used by a + remote resource. - 3. Globus Compute imposes a `handful of task-rate and data limits`_ on submitted functions. + 2. ``Executor.manager_poll()`` is not available inside remotely executed + functions. Control over remote work is limited to inspecting return + values and exceptions when tasks complete. + + 3. Globus Compute imposes a `handful of task-rate and data limits`_ on + submitted functions. 4. Users are responsible for authenticating via Globus_ and maintaining their `Globus Compute endpoints`_ on their target systems. -Users can still define Executor instances within their user functions and submit -MPI applications normally, as long as libEnsemble and the target application are -accessible on the remote system:: +.. _gc_only_mode: + +Manager-side GC (GC-only mode) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The recommended approach for most use cases. When +``globus_compute_endpoint`` is set in :class:`SimSpecs` +and ``gen_on_worker`` is not set (the default), libEnsemble enters +**GC-only mode**: no local worker processes are launched. The manager +submits simulation work directly to Globus Compute and polls futures for +results. The generator still runs as a local thread on the manager. + +``nworkers`` controls the maximum number of simultaneously in-flight +Globus Compute tasks (virtual concurrency). The default is 1. + +This mode supports both the :ref:`gest-api simulator format` +(``SimSpecs.simulator``) and the legacy ``sim_f`` format. + +.. code-block:: python + + from libensemble import Ensemble + from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + + def my_sim(input_dict: dict, **kwargs) -> dict: + """gest-api simulator — runs remotely on the GC endpoint.""" + return {"f": input_dict["x"] ** 2} + + + sim_specs = SimSpecs( + simulator=my_sim, + vocs=vocs, + globus_compute_endpoint="3af6dc24-3f27-4c49-8d11-e301ade15353", + ) + + libE_specs = LibeSpecs(nworkers=4) # up to 4 concurrent GC tasks + + workflow = Ensemble( + sim_specs=sim_specs, + gen_specs=gen_specs, + libE_specs=libE_specs, + exit_criteria=ExitCriteria(sim_max=20), + ) + H, _, _ = workflow.run() + +Users can also define ``Executor`` instances within their remote simulator +functions and submit MPI applications normally, as long as libEnsemble and +the target application are accessible on the remote system:: - # Within remote user function + # Within the remote simulator function from libensemble.executors import MPIExecutor exctr = MPIExecutor() exctr.register_app(full_path="/home/user/forces.x", app_name="forces") task = exctr.submit(app_name="forces", num_procs=64) -Specify a Globus Compute endpoint in :class:`sim_specs` via the ``globus_compute_endpoint`` -argument. For example:: +.. note:: - from libensemble.specs import SimSpecs + Both the simulator callable and any VOCS object must be **picklable**, + as they are serialized and shipped to the remote Globus Compute endpoint. - sim_specs = SimSpecs( - sim_f = sim_f, - inputs = ["x"], - out = [("f", float)], - globus_compute_endpoint = "3af6dc24-3f27-4c49-8d11-e301ade15353", - ) +.. _gc_executor_approach: + +GlobusComputeExecutor (user-facing) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For workflows where the simulation function itself orchestrates remote +calls, for example, fanning out to multiple endpoints or mixing local +and remote work, use the +:class:`GlobusComputeExecutor` +directly inside the simulator. + +Create and register the executor in the calling script: + +.. code-block:: python + + from libensemble.executors import GlobusComputeExecutor + + exctr = GlobusComputeExecutor(endpoint_id="3af6dc24-3f27-4c49-8d11-e301ade15353") + +Then use it inside the simulator function: + +.. code-block:: python + + import time + + + def my_sim(H, persis_info, sim_specs, libE_info): + exctr = libE_info["executor"] + + task = exctr.submit(func=my_remote_func, app_args=H["x"][0]) + + while not task.finished: + task.poll() + if exctr.manager_kill_received(): + task.kill() + break + time.sleep(0.1) + + return H_o, persis_info -See the ``libensemble/tests/scaling_tests/globus_compute_forces`` directory for a complete -remote-simulation example. +See the :doc:`GlobusComputeExecutor API reference<../executor/ex_globus_compute>` for +the full interface including ``register_app``, ``submit``, and +:class:`GlobusComputeTask` methods. Instructions for Specific Platforms ----------------------------------- diff --git a/docs/running_libE.rst b/docs/running_libE.rst index 6329e13e2..4b0b333b5 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -84,8 +84,8 @@ if using an :class:`Ensemble` object with Set ``comms`` to ``ssh`` to launch workers on remote ssh-accessible systems. This co-locates workers, functions, and any applications. User -functions can also be persistent, unlike when launching remote functions via -:ref:`Globus Compute`. +functions can also be persistent, unlike simulator functions submitted to +:ref:`Globus Compute`, which must be non-persistent. The remote working directory and Python need to be specified. This may resemble:: diff --git a/libensemble/libE.py b/libensemble/libE.py index ee86d5a25..750bad539 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -267,10 +267,12 @@ def libE( Executor.executor.add_platform_info(platform_info) # Detect GC-only mode: manager submits directly to Globus Compute. - # Requires globus_compute_endpoint set and gen_on_worker not set. # nworkers is repurposed as virtual concurrency (max concurrent GC sims). - if sim_specs.get("globus_compute_endpoint") and not libE_specs.get("gen_on_worker"): + if sim_specs.get("globus_compute_endpoint"): libE_specs["_gc_only"] = True + if libE_specs.get("gen_on_worker"): + logger.info("GC-only mode: gen_on_worker is ignored (generator runs on manager)") + libE_specs["gen_on_worker"] = False # Force local comms (no MPI workers needed) if libE_specs.get("comms", "mpi") != "local": libE_specs["comms"] = "local" diff --git a/libensemble/specs.py b/libensemble/specs.py index 2f33d447f..3ebcd0397 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -116,9 +116,11 @@ class SimSpecs(BaseModel): globus_compute_endpoint: str | None = "" """ - A Globus Compute (https://www.globus.org/compute) ID corresponding to an active endpoint on a remote system. - libEnsemble's workers will submit simulator function instances to this endpoint instead of - calling them locally. + A `Globus Compute `_ endpoint ID corresponding + to an active endpoint on a remote system. When set, libEnsemble enters + **GC-only mode**: no local worker processes are launched and the manager + submits simulation work directly to the Globus Compute endpoint. + ``nworkers`` controls the maximum number of concurrent in-flight tasks. """ threaded: bool | None = False diff --git a/libensemble/tests/unit_tests/test_globus_compute.py b/libensemble/tests/unit_tests/test_globus_compute.py index 2e780e6ec..474eb601a 100644 --- a/libensemble/tests/unit_tests/test_globus_compute.py +++ b/libensemble/tests/unit_tests/test_globus_compute.py @@ -287,51 +287,6 @@ def test_register_app_no_pyobj(self): assert app.full_path == "/bin/echo" -# ────────────────────────────────────────────── -# Refactored GlobusComputeRunner -# ────────────────────────────────────────────── - - -class TestGlobusComputeRunnerRefactored: - def setup_method(self): - GCSession.clear() - - def test_runner_uses_gcsession(self): - from libensemble.utils.runners import GlobusComputeRunner - - assert hasattr(GlobusComputeRunner, "_session") - assert isinstance(GlobusComputeRunner._session, GCSession) - - def test_runner_from_specs_with_endpoint(self): - import libensemble.tests.unit_tests.setup as setup - - sim_specs, _, _ = setup.make_criteria_and_specs_0() - sim_specs["globus_compute_endpoint"] = "ep-123" - - with mock.patch.object(GCSession, "_create_executor") as mock_create: - mock_exec = mock.MagicMock() - mock_exec.register_function.return_value = "fid-runner" - mock_create.return_value = mock_exec - - from libensemble.utils.runners import GlobusComputeRunner, Runner - - runner = Runner.from_specs(sim_specs) - assert isinstance(runner, GlobusComputeRunner) - assert runner.globus_compute_executor is mock_exec - assert runner.globus_compute_fid == "fid-runner" - - def test_runner_from_specs_without_endpoint(self): - import libensemble.tests.unit_tests.setup as setup - - sim_specs, _, _ = setup.make_criteria_and_specs_0() - assert not sim_specs.get("globus_compute_endpoint") - - from libensemble.utils.runners import Runner - - runner = Runner.from_specs(sim_specs) - assert not hasattr(runner, "globus_compute_executor") or runner.globus_compute_executor is None - - # ────────────────────────────────────────────── # _normalize_gc_result (Manager static method) # ────────────────────────────────────────────── diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index e4e813a8e..8765e077a 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -1,11 +1,7 @@ -from unittest import mock - import numpy as np -import pytest import libensemble.tests.unit_tests.setup as setup from libensemble.tools.fields_keys import libE_fields -from libensemble.utils.globus_compute import GCSession from libensemble.utils.runners import Runner @@ -70,84 +66,7 @@ def tupilize(arg1, arg2): assert result == (calc_in, {}) -@pytest.mark.extra -def test_globus_compute_runner_init(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch.object(GCSession, "_create_executor") as mock_create: - mock_exec = mock.MagicMock() - mock_exec.register_function.return_value = "fid-123" - mock_create.return_value = mock_exec - - runner = Runner.from_specs(sim_specs) - - assert hasattr( - runner, "globus_compute_executor" - ), "Globus ComputeExecutor should have been instantiated when globus_compute_endpoint found in specs" - - -@pytest.mark.extra -def test_globus_compute_runner_pass(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch.object(GCSession, "_create_executor") as mock_create: - mock_exec = mock.MagicMock() - mock_exec.register_function.return_value = "fid-123" - mock_create.return_value = mock_exec - - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus Compute future object - no exception - globus_compute_future = mock.MagicMock() - globus_compute_future.exception.return_value = None - globus_compute_future.result.return_value = (True, True) - - runner.globus_compute_executor.submit_to_registered_function.return_value = globus_compute_future - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - - assert all([out, persis_info]), "Globus Compute runner correctly returned results" - - -@pytest.mark.extra -def test_globus_compute_runner_fail(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - GCSession.clear() - sim_specs["globus_compute_endpoint"] = "4321" - - with mock.patch.object(GCSession, "_create_executor") as mock_create: - mock_exec = mock.MagicMock() - mock_exec.register_function.return_value = "fid-432" - mock_create.return_value = mock_exec - - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus Compute future object - yes exception - globus_compute_future = mock.MagicMock() - globus_compute_future.exception.return_value = Exception("boom") - - runner.globus_compute_executor.submit_to_registered_function.return_value = globus_compute_future - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - with pytest.raises(Exception): - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - pytest.fail("Expected exception") - - if __name__ == "__main__": test_normal_runners() test_thread_runners() test_persis_info_from_none() - test_globus_compute_runner_init() - test_globus_compute_runner_pass() - test_globus_compute_runner_fail() diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index a10615659..1c11cfbee 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -9,7 +9,6 @@ from libensemble.generators import LibensembleGenerator, PersistentGenInterfacer from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG from libensemble.tools.persistent_support import PersistentSupport -from libensemble.utils.globus_compute import GCSession from libensemble.utils.misc import list_dicts_to_np, map_numpy_array, np_to_list_dicts, unmap_numpy_array logger = logging.getLogger(__name__) @@ -18,8 +17,6 @@ class Runner: @classmethod def from_specs(cls, specs): - if len(specs.get("globus_compute_endpoint", "")) > 0: - return GlobusComputeRunner(specs) if specs.get("threaded"): return ThreadRunner(specs) if (generator := specs.get("generator")) is not None: @@ -68,29 +65,6 @@ def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, int | Non return out -class GlobusComputeRunner(Runner): - _session = GCSession() - - def __init__(self, specs): - super().__init__(specs) - endpoint = specs["globus_compute_endpoint"] - self.globus_compute_executor, self.globus_compute_fid = self._session.get_or_create(endpoint, self.f) - - def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, int | None): - from libensemble.worker import Worker - - libE_info["comm"] = None # 'comm' object not pickle-able - Worker._set_executor(0, None) # ditto for executor - - args = self._truncate_args(calc_in, persis_info, libE_info) - task_fut = self.globus_compute_executor.submit_to_registered_function(self.globus_compute_fid, args) - return task_fut.result() - - def shutdown(self) -> None: - if self.globus_compute_executor is not None: - self.globus_compute_executor.shutdown() - - class ThreadRunner(Runner): def __init__(self, specs): super().__init__(specs) From f4bbce472512254532ffb8bf0a1ea62d3c0538ec Mon Sep 17 00:00:00 2001 From: jlnav Date: Mon, 18 May 2026 17:03:39 -0500 Subject: [PATCH 06/14] docs bugfixes. move globus compute content into own doc for listing in sidebar and toctree --- docs/conf.py | 2 +- docs/executor/ex_globus_compute.rst | 6 +- docs/platforms/globus_compute.rst | 143 +++++++++++++++++ docs/platforms/platforms_index.rst | 146 +----------------- .../executors/globus_compute_executor.py | 5 +- 5 files changed, 153 insertions(+), 149 deletions(-) create mode 100644 docs/platforms/globus_compute.rst diff --git a/docs/conf.py b/docs/conf.py index 8647bbf4f..08ea11276 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -31,7 +31,7 @@ def __getattr__(cls, name): return MagicMock() -autodoc_mock_imports = ["ax", "gpcam", "IPython", "matplotlib", "pandas", "scipy", "surmise"] +autodoc_mock_imports = ["ax", "globus_compute_sdk", "gpcam", "IPython", "matplotlib", "pandas", "scipy", "surmise"] MOCK_MODULES = [ "argparse", diff --git a/docs/executor/ex_globus_compute.rst b/docs/executor/ex_globus_compute.rst index 9b8c4572a..8b902775b 100644 --- a/docs/executor/ex_globus_compute.rst +++ b/docs/executor/ex_globus_compute.rst @@ -3,15 +3,15 @@ Globus Compute Executor `Overview `__ \|\| `Base Executor `__ \|\| `MPI Executor `__ \|\| **Globus Compute Executor** -The :class:`GlobusComputeExecutor` +The :class:`GlobusComputeExecutor` submits Python callables to a remote `Globus Compute`_ endpoint instead of launching local subprocesses. It can be used inside simulator functions in the same way as the :doc:`MPI Executor`, retrieving it from ``libE_info["executor"]``. See :ref:`Globus Compute - Remote User Functions` for an -overview of the three GC integration modes (manager-side GC-only, user-facing -executor, and legacy drop-in). +overview of the two GC integration modes (manager-side GC-only and user-facing +executor). .. note:: diff --git a/docs/platforms/globus_compute.rst b/docs/platforms/globus_compute.rst new file mode 100644 index 000000000..5ea258fb3 --- /dev/null +++ b/docs/platforms/globus_compute.rst @@ -0,0 +1,143 @@ +.. _globus_compute_ref: + +====================================== +Globus Compute - Remote User Functions +====================================== + +`Globus Compute`_ (formerly funcX) is a distributed, high-performance +function-as-a-service platform. When libEnsemble is running on a resource with +internet access (laptops, login nodes, other servers, etc.), it can offload +simulator calls to remote Globus Compute endpoints: + + .. image:: ../images/funcxmodel.png + :alt: running_with_globus_compute + :scale: 50 + :align: center + +This is useful for running ensembles across machines and heterogeneous resources. +There are **two approaches**, described below. + +The following caveats apply to all Globus Compute modes: + + 1. Simulator functions submitted to Globus Compute must be *non-persistent*, + since manager-worker communicators cannot be serialized or used by a + remote resource. + + 2. ``Executor.manager_poll()`` is not available inside remotely executed + functions. Control over remote work is limited to inspecting return + values and exceptions when tasks complete. + + 3. Globus Compute imposes a `handful of task-rate and data limits`_ on + submitted functions. + + 4. Users are responsible for authenticating via Globus_ and maintaining their + `Globus Compute endpoints`_ on their target systems. + +.. _gc_only_mode: + +Manager-side GC (GC-only mode) +------------------------------- + +The recommended approach for most use cases. When +``globus_compute_endpoint`` is set in :class:`SimSpecs` +and ``gen_on_worker`` is not set (the default), libEnsemble enters +**GC-only mode**: no local worker processes are launched. The manager +submits simulation work directly to Globus Compute and polls futures for +results. The generator still runs as a local thread on the manager. + +``nworkers`` controls the maximum number of simultaneously in-flight +Globus Compute tasks (virtual concurrency). The default is 1. + +This mode supports both the :ref:`gest-api simulator format` +(``SimSpecs.simulator``) and the legacy ``sim_f`` format. + +.. code-block:: python + + from libensemble import Ensemble + from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + + def my_sim(input_dict: dict, **kwargs) -> dict: + """gest-api simulator — runs remotely on the GC endpoint.""" + return {"f": input_dict["x"] ** 2} + + + sim_specs = SimSpecs( + simulator=my_sim, + vocs=vocs, + globus_compute_endpoint="3af6dc24-3f27-4c49-8d11-e301ade15353", + ) + + libE_specs = LibeSpecs(nworkers=4) # up to 4 concurrent GC tasks + + workflow = Ensemble( + sim_specs=sim_specs, + gen_specs=gen_specs, + libE_specs=libE_specs, + exit_criteria=ExitCriteria(sim_max=20), + ) + H, _, _ = workflow.run() + +Users can also define ``Executor`` instances within their remote simulator +functions and submit MPI applications normally, as long as libEnsemble and +the target application are accessible on the remote system:: + + # Within the remote simulator function + from libensemble.executors import MPIExecutor + exctr = MPIExecutor() + exctr.register_app(full_path="/home/user/forces.x", app_name="forces") + task = exctr.submit(app_name="forces", num_procs=64) + +.. note:: + + Both the simulator callable and any VOCS object must be **picklable**, + as they are serialized and shipped to the remote Globus Compute endpoint. + +.. _gc_executor_approach: + +GlobusComputeExecutor (user-facing) +------------------------------------ + +For workflows where the simulation function itself orchestrates remote +calls, for example, fanning out to multiple endpoints or mixing local +and remote work, use the +:class:`GlobusComputeExecutor` +directly inside the simulator. + +Create and register the executor in the calling script: + +.. code-block:: python + + from libensemble.executors import GlobusComputeExecutor + + exctr = GlobusComputeExecutor(endpoint_id="3af6dc24-3f27-4c49-8d11-e301ade15353") + +Then use it inside the simulator function: + +.. code-block:: python + + import time + + + def my_sim(H, persis_info, sim_specs, libE_info): + exctr = libE_info["executor"] + + task = exctr.submit(func=my_remote_func, app_args=H["x"][0]) + + while not task.finished: + task.poll() + if exctr.manager_kill_received(): + task.kill() + break + time.sleep(0.1) + + return H_o, persis_info + +See the :doc:`GlobusComputeExecutor API reference<../executor/ex_globus_compute>` for +the full interface including ``register_app``, ``submit``, and +:class:`GlobusComputeTask` methods. + +.. _Globus Compute: https://www.globus.org/compute +.. _Globus Compute endpoints: https://globus-compute.readthedocs.io/en/latest/endpoints.html +.. _Globus: https://www.globus.org/ +.. _handful of task-rate and data limits: https://globus-compute.readthedocs.io/en/latest/limits.html diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index d141809f9..3a16a5eed 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -159,143 +159,9 @@ will better manage simulation and generation functions that contain considerable computational work or I/O. Therefore the second option is to use Globus Compute to isolate this work from the workers. -.. _globus_compute_ref: - -Globus Compute - Remote User Functions --------------------------------------- - -`Globus Compute`_ (formerly funcX) is a distributed, high-performance -function-as-a-service platform. When libEnsemble is running on a resource with -internet access (laptops, login nodes, other servers, etc.), it can offload -simulator calls to remote Globus Compute endpoints: - - .. image:: ../images/funcxmodel.png - :alt: running_with_globus_compute - :scale: 50 - :align: center - -This is useful for running ensembles across machines and heterogeneous resources. -There are **three approaches**, described below. - -The following caveats apply to all Globus Compute modes: - - 1. Simulator functions submitted to Globus Compute must be *non-persistent*, - since manager-worker communicators cannot be serialized or used by a - remote resource. - - 2. ``Executor.manager_poll()`` is not available inside remotely executed - functions. Control over remote work is limited to inspecting return - values and exceptions when tasks complete. - - 3. Globus Compute imposes a `handful of task-rate and data limits`_ on - submitted functions. - - 4. Users are responsible for authenticating via Globus_ and maintaining their - `Globus Compute endpoints`_ on their target systems. - -.. _gc_only_mode: - -Manager-side GC (GC-only mode) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The recommended approach for most use cases. When -``globus_compute_endpoint`` is set in :class:`SimSpecs` -and ``gen_on_worker`` is not set (the default), libEnsemble enters -**GC-only mode**: no local worker processes are launched. The manager -submits simulation work directly to Globus Compute and polls futures for -results. The generator still runs as a local thread on the manager. - -``nworkers`` controls the maximum number of simultaneously in-flight -Globus Compute tasks (virtual concurrency). The default is 1. - -This mode supports both the :ref:`gest-api simulator format` -(``SimSpecs.simulator``) and the legacy ``sim_f`` format. - -.. code-block:: python - - from libensemble import Ensemble - from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs - - - def my_sim(input_dict: dict, **kwargs) -> dict: - """gest-api simulator — runs remotely on the GC endpoint.""" - return {"f": input_dict["x"] ** 2} - - - sim_specs = SimSpecs( - simulator=my_sim, - vocs=vocs, - globus_compute_endpoint="3af6dc24-3f27-4c49-8d11-e301ade15353", - ) - - libE_specs = LibeSpecs(nworkers=4) # up to 4 concurrent GC tasks - - workflow = Ensemble( - sim_specs=sim_specs, - gen_specs=gen_specs, - libE_specs=libE_specs, - exit_criteria=ExitCriteria(sim_max=20), - ) - H, _, _ = workflow.run() - -Users can also define ``Executor`` instances within their remote simulator -functions and submit MPI applications normally, as long as libEnsemble and -the target application are accessible on the remote system:: - - # Within the remote simulator function - from libensemble.executors import MPIExecutor - exctr = MPIExecutor() - exctr.register_app(full_path="/home/user/forces.x", app_name="forces") - task = exctr.submit(app_name="forces", num_procs=64) - -.. note:: - - Both the simulator callable and any VOCS object must be **picklable**, - as they are serialized and shipped to the remote Globus Compute endpoint. - -.. _gc_executor_approach: - -GlobusComputeExecutor (user-facing) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -For workflows where the simulation function itself orchestrates remote -calls, for example, fanning out to multiple endpoints or mixing local -and remote work, use the -:class:`GlobusComputeExecutor` -directly inside the simulator. - -Create and register the executor in the calling script: - -.. code-block:: python - - from libensemble.executors import GlobusComputeExecutor - - exctr = GlobusComputeExecutor(endpoint_id="3af6dc24-3f27-4c49-8d11-e301ade15353") - -Then use it inside the simulator function: - -.. code-block:: python - - import time - - - def my_sim(H, persis_info, sim_specs, libE_info): - exctr = libE_info["executor"] - - task = exctr.submit(func=my_remote_func, app_args=H["x"][0]) - - while not task.finished: - task.poll() - if exctr.manager_kill_received(): - task.kill() - break - time.sleep(0.1) - - return H_o, persis_info - -See the :doc:`GlobusComputeExecutor API reference<../executor/ex_globus_compute>` for -the full interface including ``register_app``, ``submit``, and -:class:`GlobusComputeTask` methods. +See :doc:`Globus Compute - Remote User Functions` for the two +integration approaches (manager-side GC-only mode and the user-facing +``GlobusComputeExecutor``). Instructions for Specific Platforms ----------------------------------- @@ -314,9 +180,5 @@ libEnsemble on specific HPC systems. perlmutter polaris srun + globus_compute example_scripts - -.. _Globus Compute: https://www.globus.org/compute -.. _Globus Compute endpoints: https://globus-compute.readthedocs.io/en/latest/endpoints.html -.. _Globus: https://www.globus.org/ -.. _handful of task-rate and data limits: https://globus-compute.readthedocs.io/en/latest/limits.html diff --git a/libensemble/executors/globus_compute_executor.py b/libensemble/executors/globus_compute_executor.py index d6d1f3628..154c2762e 100644 --- a/libensemble/executors/globus_compute_executor.py +++ b/libensemble/executors/globus_compute_executor.py @@ -114,12 +114,11 @@ class GlobusComputeExecutor(Executor): """An :class:`~libensemble.executors.executor.Executor` that submits Python callables to Globus Compute instead of launching local subprocesses. - Usage in a calling script:: + Usage in a top-level script:: from libensemble.executors.globus_compute_executor import GlobusComputeExecutor exctr = GlobusComputeExecutor(endpoint_id="...") - Executor.executor = exctr Inside a simulator function:: @@ -213,7 +212,7 @@ def submit( Name of a previously registered application. app_args : str, optional Arguments passed alongside the function. - func : callable, optional + func : Callable, optional A Python callable to execute remotely. Takes precedence over *app_name* / *calc_type*. stdout, stderr : str, optional From 43357398bb7911e0066fcfab43ae7b82f22a8e9b Mon Sep 17 00:00:00 2001 From: jlnav Date: Tue, 19 May 2026 10:48:17 -0500 Subject: [PATCH 07/14] bugfixes, plus new globus pixi environment, plus first functionality test --- libensemble/libE.py | 14 +- libensemble/manager.py | 14 +- .../test_gc_manager_submit.py | 127 ++++++++++++++++++ pixi.lock | 4 +- pyproject.toml | 10 +- 5 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 libensemble/tests/functionality_tests/test_gc_manager_submit.py diff --git a/libensemble/libE.py b/libensemble/libE.py index 750bad539..811b126b4 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -262,12 +262,10 @@ def libE( libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local} - Resources.init_resources(libE_specs, platform_info) - if Executor.executor is not None: - Executor.executor.add_platform_info(platform_info) - # Detect GC-only mode: manager submits directly to Globus Compute. # nworkers is repurposed as virtual concurrency (max concurrent GC sims). + # Must run before Resources.init_resources so the resource manager is + # skipped (GC-only has no real nodes to partition). if sim_specs.get("globus_compute_endpoint"): libE_specs["_gc_only"] = True if libE_specs.get("gen_on_worker"): @@ -277,6 +275,14 @@ def libE( if libE_specs.get("comms", "mpi") != "local": libE_specs["comms"] = "local" logger.info("GC-only mode: switching to local comms (no workers needed)") + # GC-only has no real nodes; disable resource manager to avoid ZeroDivisionError + if not libE_specs.get("disable_resource_manager"): + libE_specs["disable_resource_manager"] = True + logger.info("GC-only mode: disabling resource manager (no local nodes)") + + Resources.init_resources(libE_specs, platform_info) + if Executor.executor is not None: + Executor.executor.add_platform_info(platform_info) # Reset gen counter. AllocSupport.gen_counter = 0 diff --git a/libensemble/manager.py b/libensemble/manager.py index 0e469e726..c2a7e2e90 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -833,7 +833,12 @@ def _run_gc_only(self, persis_info: dict) -> tuple[dict, int, int]: if self._sim_max_given(): break self._check_work_order(Work[w], w) - self._gc_submit(Work[w], w) + if Work[w]["tag"] == EVAL_GEN_TAG: + # Gen work goes to the generator thread via wcomms + self._send_work_order(Work[w], w) + else: + # Sim work is submitted directly to Globus Compute + self._gc_submit(Work[w], w) self._update_state_on_alloc(Work[w], w) assert self.term_test() or any( @@ -895,6 +900,13 @@ def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int] continue self._update_state_on_worker_msg(persis_info, D_recv, w) + # Drain any pending gen-thread messages (e.g. FINISHED_PERSISTENT_GEN_TAG) + # before sending STOP_TAG, otherwise _clean_up_thread deadlocks. + if 0 in self.W["worker_id"] and self.wcomms[0] is not None: + while self.wcomms[0].mail_flag(): + self._handle_msg_from_worker(persis_info, 0) + self.wcomms[0].send(STOP_TAG, MAN_SIGNAL_FINISH) + self._init_every_k_save(complete=True) self._clean_up_thread() diff --git a/libensemble/tests/functionality_tests/test_gc_manager_submit.py b/libensemble/tests/functionality_tests/test_gc_manager_submit.py new file mode 100644 index 000000000..f4f945b89 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_gc_manager_submit.py @@ -0,0 +1,127 @@ +""" +Tests libEnsemble's manager-side Globus Compute submission (GC-only mode). + +The manager submits simulation work directly to a mocked Globus Compute +endpoint instead of dispatching to local workers. The generator runs on +the manager thread as normal. + +Execute via: + python test_gc_manager_submit.py + +No MPI or local workers are needed — GC-only mode uses local comms with +nworkers acting as the maximum number of concurrent in-flight GC futures. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 1 + +import concurrent.futures +from unittest import mock + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.sampling import UniformSample +from libensemble.libE import libE +from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs +from libensemble.utils.globus_compute import GCSession + +# ── Simulator ───────────────────────────────────────────────────────────────── + + +def norm_sim(H, persis_info, sim_specs, libE_info): + """Evaluate the Euclidean norm of each input point.""" + H_o = np.zeros(len(H), dtype=sim_specs["out"]) + for i in range(len(H)): + H_o["f"][i] = float(np.linalg.norm(H["x"][i])) + return H_o, persis_info + + +# ── Fake GC infrastructure ──────────────────────────────────────────────────── + + +def _make_done_future(value): + """Return a Future that is already resolved with *value*.""" + f = concurrent.futures.Future() + f.set_result(value) + return f + + +def _make_gc_executor(sim_f): + """Return a mock GC executor whose submit_to_registered_function calls + *sim_f* synchronously and wraps the result in a resolved Future.""" + + executor = mock.MagicMock() + executor.register_function.return_value = "mock-fid" + + def fake_submit(fid, args): + result = sim_f(*args) + return _make_done_future(result) + + executor.submit_to_registered_function.side_effect = fake_submit + return executor + + +# ── Main ────────────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + GCSession.clear() + + ENDPOINT = "mock-endpoint-uuid" + SIM_MAX = 20 + N_VIRTUAL_WORKERS = 4 # max concurrent GC futures (no real workers needed) + + vocs = VOCS( + variables={"x0": [-3.0, 3.0], "x1": [-2.0, 2.0]}, + objectives={"f": "MINIMIZE"}, + ) + + sim_specs = SimSpecs( + sim_f=norm_sim, + inputs=["x"], + outputs=[("f", float)], + globus_compute_endpoint=ENDPOINT, + ) + + gen_specs = GenSpecs( + generator=UniformSample(vocs), + inputs=["sim_id"], + persis_in=["f", "sim_id"], + outputs=[("x", float, (2,))], + batch_size=N_VIRTUAL_WORKERS, + ) + + libE_specs = LibeSpecs( + nworkers=N_VIRTUAL_WORKERS, + comms="local", + disable_log_files=True, + safe_mode=False, + ) + + exit_criteria = ExitCriteria(sim_max=SIM_MAX) + + mock_executor = _make_gc_executor(norm_sim) + + with mock.patch.object(GCSession, "_create_executor", return_value=mock_executor): + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs) + + assert flag == 0, f"libEnsemble exited with unexpected flag {flag}" + assert ( + np.sum(H["sim_ended"]) >= SIM_MAX + ), f"Expected at least {SIM_MAX} completed sims, got {np.sum(H['sim_ended'])}" + + completed = H[H["sim_ended"]] + assert len(completed) >= SIM_MAX + + # Every completed point must have a non-negative norm + assert np.all(completed["f"] >= 0.0), "Unexpected negative norm value" + + # Verify the executor was actually used (not bypassed) + assert mock_executor.submit_to_registered_function.call_count >= SIM_MAX, ( + f"Expected at least {SIM_MAX} GC submissions, " f"got {mock_executor.submit_to_registered_function.call_count}" + ) + + print(f"\nGC-only mode: {np.sum(H['sim_ended'])} sims completed via mocked Globus Compute.") + print(f"Best f value: {completed['f'].min():.6f}") + print("\nlibEnsemble GC-only functionality test passed.") diff --git a/pixi.lock b/pixi.lock index bcbef304b..83ccef9f5 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ced3f2f78fb35503d1e2f2db0f7bc925e6e9e9a20bc83f9207cd03a4918d1639 -size 1084213 +oid sha256:208fe492b2e23d9debe0868b9c13734516d4ccd2c6c8ca0afc96c12d58a152de +size 1091480 diff --git a/pyproject.toml b/pyproject.toml index de3bb6e8e..2d53cc373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,7 @@ libensemble = { path = ".", editable = true } [tool.pixi.environments] default = [] basic = ["basic"] -extra = ["basic", "extra"] +extra = ["basic", "extra", "globus"] docs = ["docs", "basic"] dev = ["dev", "basic", "extra", "docs"] @@ -122,6 +122,14 @@ pandas = "<3" numpy = "<2.4" proxystore = ">=0.7.1,<0.9" +# globus-compute-sdk and its conda dependency overrides +[tool.pixi.feature.globus.dependencies] +dill = "==0.3.9" +globus-sdk = ">=4.4.0,<5" + +[tool.pixi.feature.globus.pypi-dependencies] +globus-compute-sdk = ">=4.10.1,<5" + [tool.pixi.feature.docs.dependencies] sphinx = ">=8.2.3,<9" sphinxcontrib-bibtex = ">=2.6.5,<3" From 623360a957f52f76cf98c040c571ef7e2140dda0 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 21 May 2026 08:44:11 -0500 Subject: [PATCH 08/14] some docs reorg and rewording --- docs/executor/ex_overview.rst | 2 +- docs/platforms/globus_compute.rst | 30 ++++++++++++++++-------------- docs/running_libE.rst | 6 +++--- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/docs/executor/ex_overview.rst b/docs/executor/ex_overview.rst index e091e94ce..47415979c 100644 --- a/docs/executor/ex_overview.rst +++ b/docs/executor/ex_overview.rst @@ -166,7 +166,7 @@ libEnsemble executors and can be retrieved from ``libE_info["executor"]`` inside simulator functions. See :ref:`Globus Compute - Remote User Functions` for an -overview of all three GC integration modes and the +overview of all Globus Compute integration modes and the :doc:`GlobusComputeExecutor API reference` for the full interface. diff --git a/docs/platforms/globus_compute.rst b/docs/platforms/globus_compute.rst index 5ea258fb3..8934e8dc4 100644 --- a/docs/platforms/globus_compute.rst +++ b/docs/platforms/globus_compute.rst @@ -17,21 +17,23 @@ simulator calls to remote Globus Compute endpoints: This is useful for running ensembles across machines and heterogeneous resources. There are **two approaches**, described below. -The following caveats apply to all Globus Compute modes: +.. dropdown:: **Caveats** - 1. Simulator functions submitted to Globus Compute must be *non-persistent*, - since manager-worker communicators cannot be serialized or used by a - remote resource. + The following caveats apply to all Globus Compute modes: - 2. ``Executor.manager_poll()`` is not available inside remotely executed - functions. Control over remote work is limited to inspecting return - values and exceptions when tasks complete. + 1. Simulator functions submitted to Globus Compute must be *non-persistent*, + since manager-worker communicators cannot be serialized or used by a + remote resource. - 3. Globus Compute imposes a `handful of task-rate and data limits`_ on - submitted functions. + 2. ``Executor.manager_poll()`` is not available inside remotely executed + functions. Control over remote work is limited to inspecting return + values and exceptions when tasks complete. - 4. Users are responsible for authenticating via Globus_ and maintaining their - `Globus Compute endpoints`_ on their target systems. + 3. Globus Compute imposes a `handful of task-rate and data limits`_ on + submitted functions. + + 4. Users are responsible for authenticating via Globus_ and maintaining their + `Globus Compute endpoints`_ on their target systems. .. _gc_only_mode: @@ -99,12 +101,12 @@ GlobusComputeExecutor (user-facing) ------------------------------------ For workflows where the simulation function itself orchestrates remote -calls, for example, fanning out to multiple endpoints or mixing local -and remote work, use the +calls, like fanning out to multiple endpoints or mixing local +and remote work. Use the :class:`GlobusComputeExecutor` directly inside the simulator. -Create and register the executor in the calling script: +Create and register the executor in the top-level script: .. code-block:: python diff --git a/docs/running_libE.rst b/docs/running_libE.rst index 4b0b333b5..2677eb8d2 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -83,9 +83,9 @@ if using an :class:`Ensemble` object with **Reverse-ssh interface** Set ``comms`` to ``ssh`` to launch workers on remote ssh-accessible systems. This -co-locates workers, functions, and any applications. User -functions can also be persistent, unlike simulator functions submitted to -:ref:`Globus Compute`, which must be non-persistent. +co-locates workers, functions, and any applications. Simulator functions can be +persistent, unlike those submitted to :ref:`Globus Compute`, +which must be non-persistent. The remote working directory and Python need to be specified. This may resemble:: From ed1b37e6e246b2398c908a888e3e34335898630d Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 21 May 2026 15:20:32 -0500 Subject: [PATCH 09/14] tweak max_active_runs to not stress github actions --- .../regression_tests/test_persistent_aposmm_nlopt.py | 10 ++++++++-- pixi.lock | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py index b92850143..5863ba678 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py @@ -73,7 +73,7 @@ "xtol_abs": 1e-6, "ftol_abs": 1e-6, "dist_to_bound_multiple": 0.5, - "max_active_runs": 6, + "max_active_runs": nworkers - 1, "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, @@ -84,7 +84,13 @@ exit_criteria = {"sim_max": 2000} # Perform the run - H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, alloc_specs=alloc_specs, libE_specs=libE_specs) + H, persis_info, flag = libE( + sim_specs, + gen_specs, + exit_criteria, + alloc_specs=alloc_specs, + libE_specs=libE_specs, + ) if is_manager: print("[Manager]:", H[np.where(H["local_min"])]["x"]) diff --git a/pixi.lock b/pixi.lock index c2bd95892..a5ecb6ac6 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ebf0a7fd0995baf1d9df2871b599292fe9e451e35d64df541c8f06615d9c4fc4 -size 1095827 +oid sha256:51eba5da4a7f0a3df15af88fd0ced5335e095055db46a86f282130da7c4745b8 +size 1091485 From e6c132576eff6d1d9b9d01ef004630affe3ad0c3 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 21 May 2026 16:04:53 -0500 Subject: [PATCH 10/14] bump pixi version, and formatting? --- .github/workflows/basic.yml | 178 +++++++++++++++--------------- .github/workflows/extra.yml | 214 ++++++++++++++++++------------------ 2 files changed, 196 insertions(+), 196 deletions(-) diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index 41e347ce1..161da14ff 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -9,93 +9,93 @@ on: - synchronize jobs: - test-libE: - if: '! github.event.pull_request.draft' - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - mpi-version: [mpich] - python-version: ["py311", "py312", "py313", "py314"] - comms-type: [m, l] - include: - - os: macos-latest - python-version: "py311" - mpi-version: mpich - comms-type: m - - os: macos-latest - python-version: "py311" - mpi-version: mpich - comms-type: l - + test-libE: + if: "! github.event.pull_request.draft" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + mpi-version: [mpich] + python-version: ["py311", "py312", "py313", "py314"] + comms-type: [m, l] + include: + - os: macos-latest + python-version: "py311" + mpi-version: mpich + comms-type: m + - os: macos-latest + python-version: "py311" + mpi-version: mpich + comms-type: l + + env: + HYDRA_LAUNCHER: "fork" + TERM: xterm-256color + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + defaults: + run: + shell: bash -l {0} + + steps: + - uses: actions/checkout@v6 + with: + lfs: true + + - name: Checkout lockfile + run: git lfs checkout + + - uses: prefix-dev/setup-pixi@v0.9.6 + with: + pixi-version: v0.66.0 + frozen: true + environments: ${{ matrix.python-version }} + activate-environment: ${{ matrix.python-version }} + + - name: Install minq + run: | + pixi run -e ${{ matrix.python-version }} ./install/install_minq.sh + + - name: Install libEnsemble, test flake8 + run: | + pip install -e . + flake8 libensemble + + - name: Install mypy + run: pip install mypy + + - name: Run mypy (limited scope) + run: mypy + + - name: Remove various tests on newer pythons + if: matrix.python-version == 'py311' || matrix.python-version == 'py312' || matrix.python-version == 'py313' || matrix.python-version == 'py314' + run: | + rm ./libensemble/tests/functionality_tests/test_local_sine_tutorial*.py # matplotlib errors on py312 + + - name: Run simple tests, Ubuntu + if: matrix.os == 'ubuntu-latest' + run: | + ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} + + - name: Run simple tests, macOS + if: matrix.os == 'macos-latest' + run: | + pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} + + - name: Merge coverage + run: | + mv libensemble/tests/.cov* . + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v6 env: - HYDRA_LAUNCHER: "fork" - TERM: xterm-256color - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - defaults: - run: - shell: bash -l {0} - - steps: - - uses: actions/checkout@v6 - with: - lfs: true - - - name: Checkout lockfile - run: git lfs checkout - - - uses: prefix-dev/setup-pixi@v0.9.5 - with: - pixi-version: v0.55.0 - frozen: true - environments: ${{ matrix.python-version }} - activate-environment: ${{ matrix.python-version }} - - - name: Install minq - run: | - pixi run -e ${{ matrix.python-version }} ./install/install_minq.sh - - - name: Install libEnsemble, test flake8 - run: | - pip install -e . - flake8 libensemble - - - name: Install mypy - run: pip install mypy - - - name: Run mypy (limited scope) - run: mypy - - - name: Remove various tests on newer pythons - if: matrix.python-version == 'py311' || matrix.python-version == 'py312' || matrix.python-version == 'py313' || matrix.python-version == 'py314' - run: | - rm ./libensemble/tests/functionality_tests/test_local_sine_tutorial*.py # matplotlib errors on py312 - - - name: Run simple tests, Ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} - - - name: Run simple tests, macOS - if: matrix.os == 'macos-latest' - run: | - pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} - - - name: Merge coverage - run: | - mv libensemble/tests/.cov* . - - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v6 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - spellcheck: - name: Spellcheck release branch - if: contains(github.base_ref, 'develop') - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v6 - - uses: crate-ci/typos@v1.46.1 + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + spellcheck: + name: Spellcheck release branch + if: contains(github.base_ref, 'develop') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: crate-ci/typos@v1.46.1 diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index 6f4a65a73..a0e05c8f5 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -4,111 +4,111 @@ on: workflow_dispatch: jobs: - test-libE: - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - mpi-version: [mpich] - python-version: ["py311e", "py312e", "py313e", "py314e"] - comms-type: [m, l] - include: - - os: macos-latest - python-version: "py312e" - mpi-version: mpich - comms-type: m - - os: macos-latest - python-version: "py312e" - mpi-version: mpich - comms-type: l - - os: ubuntu-latest - python-version: "py312e" - mpi-version: mpich - comms-type: t - - os: ubuntu-latest - mpi-version: openmpi - python-version: "py312e" - comms-type: l - + test-libE: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + mpi-version: [mpich] + python-version: ["py311e", "py312e", "py313e", "py314e"] + comms-type: [m, l] + include: + - os: macos-latest + python-version: "py312e" + mpi-version: mpich + comms-type: m + - os: macos-latest + python-version: "py312e" + mpi-version: mpich + comms-type: l + - os: ubuntu-latest + python-version: "py312e" + mpi-version: mpich + comms-type: t + - os: ubuntu-latest + mpi-version: openmpi + python-version: "py312e" + comms-type: l + + env: + HYDRA_LAUNCHER: "fork" + TERM: xterm-256color + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + defaults: + run: + shell: bash -l {0} + + steps: + - uses: actions/checkout@v6 + with: + lfs: true + + - name: Checkout lockfile + run: git lfs checkout + + - uses: prefix-dev/setup-pixi@v0.9.6 + with: + pixi-version: v0.66.0 + cache: true + frozen: true + environments: ${{ matrix.python-version }} + activate-environment: ${{ matrix.python-version }} + + - name: Install other testing dependencies + run: | + pixi run -e ${{ matrix.python-version }} install/install_minq.sh + + - name: Install libEnsemble, flake8, lock environment + run: | + pip install -e . + flake8 libensemble + + - name: Install gpcam + if: matrix.python-version != 'py313e' && matrix.python-version != 'py314e' + run: | + pixi run -e ${{ matrix.python-version }} pip install gpcam==8.1.13 + + - name: Remove test using octave, gpcam, globus-compute on Python 3.13 + if: matrix.python-version == 'py313e' || matrix.python-version == 'py314e' + run: | + rm ./libensemble/tests/unit_tests/test_ufunc_runners.py # needs globus-compute + rm ./libensemble/tests/regression_tests/test_gpCAM.py # needs gpcam, which doesn't build on 3.13 + rm ./libensemble/tests/regression_tests/test_asktell_gpCAM.py # needs gpcam, which doesn't build on 3.13 + rm ./libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py # needs ax-platform, which doesn't yet support 3.14 + rm ./libensemble/tests/regression_tests/test_optimas_ax_mf.py # needs ax-platform, which doesn't yet support 3.14 + rm ./libensemble/tests/regression_tests/test_optimas_ax_sf.py # needs ax-platform, which doesn't yet support 3.14 + + - name: Start Redis + if: matrix.os == 'ubuntu-latest' + uses: supercharge/redis-github-action@v2 + with: + redis-version: 7 + + - name: Run extensive tests, Ubuntu + if: matrix.os == 'ubuntu-latest' + run: | + ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} + + - name: Run extensive tests, macOS + if: matrix.os == 'macos-latest' + run: | + pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} + + - name: Merge coverage + run: | + mv libensemble/tests/.cov* . + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v6 env: - HYDRA_LAUNCHER: 'fork' - TERM: xterm-256color - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - defaults: - run: - shell: bash -l {0} - - steps: - - uses: actions/checkout@v6 - with: - lfs: true - - - name: Checkout lockfile - run: git lfs checkout - - - uses: prefix-dev/setup-pixi@v0.9.5 - with: - pixi-version: v0.55.0 - cache: true - frozen: true - environments: ${{ matrix.python-version }} - activate-environment: ${{ matrix.python-version }} - - - name: Install other testing dependencies - run: | - pixi run -e ${{ matrix.python-version }} install/install_minq.sh - - - name: Install libEnsemble, flake8, lock environment - run: | - pip install -e . - flake8 libensemble - - - name: Install gpcam - if: matrix.python-version != 'py313e' && matrix.python-version != 'py314e' - run: | - pixi run -e ${{ matrix.python-version }} pip install gpcam==8.1.13 - - - name: Remove test using octave, gpcam, globus-compute on Python 3.13 - if: matrix.python-version == 'py313e' || matrix.python-version == 'py314e' - run: | - rm ./libensemble/tests/unit_tests/test_ufunc_runners.py # needs globus-compute - rm ./libensemble/tests/regression_tests/test_gpCAM.py # needs gpcam, which doesn't build on 3.13 - rm ./libensemble/tests/regression_tests/test_asktell_gpCAM.py # needs gpcam, which doesn't build on 3.13 - rm ./libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py # needs ax-platform, which doesn't yet support 3.14 - rm ./libensemble/tests/regression_tests/test_optimas_ax_mf.py # needs ax-platform, which doesn't yet support 3.14 - rm ./libensemble/tests/regression_tests/test_optimas_ax_sf.py # needs ax-platform, which doesn't yet support 3.14 - - - name: Start Redis - if: matrix.os == 'ubuntu-latest' - uses: supercharge/redis-github-action@v2 - with: - redis-version: 7 - - - name: Run extensive tests, Ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} - - - name: Run extensive tests, macOS - if: matrix.os == 'macos-latest' - run: | - pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} - - - name: Merge coverage - run: | - mv libensemble/tests/.cov* . - - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v6 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - spellcheck: - name: Spellcheck release branch - if: contains(github.base_ref, 'develop') - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v6 - - uses: crate-ci/typos@v1.46.1 + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + spellcheck: + name: Spellcheck release branch + if: contains(github.base_ref, 'develop') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: crate-ci/typos@v1.46.1 From b0a2260f76b487b54723def74e1364f1ee557ddd Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 21 May 2026 16:14:35 -0500 Subject: [PATCH 11/14] bump pixi versions again? --- .github/workflows/basic.yml | 2 +- .github/workflows/extra.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index 161da14ff..3bbff171c 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -48,7 +48,7 @@ jobs: - uses: prefix-dev/setup-pixi@v0.9.6 with: - pixi-version: v0.66.0 + pixi-version: v0.68.1 frozen: true environments: ${{ matrix.python-version }} activate-environment: ${{ matrix.python-version }} diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index a0e05c8f5..033d2daff 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -50,7 +50,7 @@ jobs: - uses: prefix-dev/setup-pixi@v0.9.6 with: - pixi-version: v0.66.0 + pixi-version: v0.68.1 cache: true frozen: true environments: ${{ matrix.python-version }} From 8b191ebc14006b80ad1a1dbfd20f4fd64c910ae9 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 22 May 2026 13:41:47 -0500 Subject: [PATCH 12/14] update deps --- pixi.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pixi.lock b/pixi.lock index 5a34a18fe..fee9aeca6 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d6381c470a649b8cf1dcd835c1ee4e007f9a4589830f5e31aca94ac4e4610a45 -size 1084439 +oid sha256:8c0540f7b1f5d52ecbcaeb82c64807c8867e7bcdabc09057d5c4ab128190dfca +size 1092087 From 3f4627522d144dd2c12a7547d8cb87d741e7f773 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 22 May 2026 15:28:50 -0500 Subject: [PATCH 13/14] fix the borked ci files --- .github/workflows/basic.yml | 72 +++-------------------------------- .github/workflows/extra.yml | 75 +------------------------------------ 2 files changed, 8 insertions(+), 139 deletions(-) diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index 55427dd57..79e8c9434 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -90,72 +90,12 @@ jobs: - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v6 env: - HYDRA_LAUNCHER: "fork" - TERM: xterm-256color - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - defaults: - run: - shell: bash -l {0} - - steps: - - uses: actions/checkout@v6 - with: - lfs: true - - - name: Checkout lockfile - run: git lfs checkout - - - uses: prefix-dev/setup-pixi@v0.9.5 - with: - pixi-version: v0.55.0 - frozen: true - environments: ${{ matrix.python-version }} - activate-environment: ${{ matrix.python-version }} - - - name: Install minq - run: | - pixi run -e ${{ matrix.python-version }} ./install/install_minq.sh - - - name: Install libEnsemble, test flake8 - run: | - pip install -e . - flake8 libensemble - - - name: Install mypy - run: pip install mypy - - - name: Run mypy (limited scope) - run: mypy - - - name: Remove various tests on newer pythons - if: matrix.python-version == 'py311' || matrix.python-version == 'py312' || matrix.python-version == 'py313' || matrix.python-version == 'py314' - run: | - rm ./libensemble/tests/functionality_tests/test_local_sine_tutorial*.py # matplotlib errors on py312 - - - name: Run simple tests, Ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} - - - name: Run simple tests, macOS - if: matrix.os == 'macos-latest' - run: | - pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -A "-W error" -${{ matrix.comms-type }} - - - name: Merge coverage - run: | - mv libensemble/tests/.cov* . - - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v6 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - spellcheck: - name: Spellcheck release branch - if: contains(github.base_ref, 'develop') - runs-on: ubuntu-latest - steps: + spellcheck: + name: Spellcheck release branch + if: contains(github.base_ref, 'develop') + runs-on: ubuntu-latest + steps: - uses: actions/checkout@v6 - uses: crate-ci/typos@v1.46.2 diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index 95bcffd41..a4a368211 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -103,80 +103,9 @@ jobs: - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v6 env: - HYDRA_LAUNCHER: 'fork' - TERM: xterm-256color - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - defaults: - run: - shell: bash -l {0} - - steps: - - uses: actions/checkout@v6 - with: - lfs: true - - - name: Checkout lockfile - run: git lfs checkout - - - uses: prefix-dev/setup-pixi@v0.9.5 - with: - pixi-version: v0.55.0 - cache: true - frozen: true - environments: ${{ matrix.python-version }} - activate-environment: ${{ matrix.python-version }} - - - name: Install other testing dependencies - run: | - pixi run -e ${{ matrix.python-version }} install/install_minq.sh - - - name: Install libEnsemble, flake8, lock environment - run: | - pip install -e . - flake8 libensemble - - - name: Install gpcam - if: matrix.python-version != 'py313e' && matrix.python-version != 'py314e' - run: | - pixi run -e ${{ matrix.python-version }} pip install gpcam==8.1.13 - - - name: Remove test using octave, gpcam, globus-compute on Python 3.13 - if: matrix.python-version == 'py313e' || matrix.python-version == 'py314e' - run: | - rm ./libensemble/tests/unit_tests/test_ufunc_runners.py # needs globus-compute - rm ./libensemble/tests/regression_tests/test_gpCAM.py # needs gpcam, which doesn't build on 3.13 - rm ./libensemble/tests/regression_tests/test_asktell_gpCAM.py # needs gpcam, which doesn't build on 3.13 - rm ./libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py # needs ax-platform, which doesn't yet support 3.14 - rm ./libensemble/tests/regression_tests/test_optimas_ax_mf.py # needs ax-platform, which doesn't yet support 3.14 - rm ./libensemble/tests/regression_tests/test_optimas_ax_sf.py # needs ax-platform, which doesn't yet support 3.14 - - - name: Start Redis - if: matrix.os == 'ubuntu-latest' - uses: supercharge/redis-github-action@v2 - with: - redis-version: 7 - - - name: Run extensive tests, Ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} - - - name: Run extensive tests, macOS - if: matrix.os == 'macos-latest' - run: | - pixi run -e ${{ matrix.python-version }} ./libensemble/tests/run_tests.py -e -${{ matrix.comms-type }} - - - name: Merge coverage - run: | - mv libensemble/tests/.cov* . - - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v6 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - spellcheck: + spellcheck: name: Spellcheck release branch if: contains(github.base_ref, 'develop') runs-on: ubuntu-latest From a37c27a9d315a0514cb7723adaff63bf9c7a098a Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 22 May 2026 15:50:50 -0500 Subject: [PATCH 14/14] decrement max_active_runs to not stress the ci --- .../test_asktell_aposmm_nlopt.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index bbc2c89d1..ac1226cf7 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -52,9 +52,7 @@ def six_hump_camel_func(x): # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": - for run in range(3): - workflow = Ensemble(parse_args=True) if workflow.is_manager: @@ -63,14 +61,23 @@ def six_hump_camel_func(x): n = 2 vocs = VOCS( - variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [-3, 3], "edge_on_cube": [-2, 2]}, + variables={ + "core": [-3, 3], + "edge": [-2, 2], + "core_on_cube": [-3, 3], + "edge_on_cube": [-2, 2], + }, objectives={"energy": "MINIMIZE"}, ) aposmm = APOSMM( vocs, - max_active_runs=workflow.nworkers, # should this match nworkers always? practically? - variables_mapping={"x": ["core", "edge"], "x_on_cube": ["core_on_cube", "edge_on_cube"], "f": ["energy"]}, + max_active_runs=workflow.nworkers - 1, # should this match nworkers always? + variables_mapping={ + "x": ["core", "edge"], + "x_on_cube": ["core_on_cube", "edge_on_cube"], + "f": ["energy"], + }, initial_sample_size=100, sample_points=minima, localopt_method="LN_BOBYQA",