diff --git a/AGENTS.md b/AGENTS.md index f5673f64a..2c4481ad3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,101 +1,78 @@ -Agent Contributor Guidelines and Information -============================================ - -Read the ``README.rst`` for an overview of libEnsemble. - -- libEnsemble uses a manager-worker architecture. Points are generated by a generator and sent to a worker, which runs a simulator. -- The manager determines how and when points get passed to workers via an allocation function. -- See ``libensemble/tests/regression_tests/test_1d_sampling.py`` for a simple example of the libEnsemble interface. - -Critical Repository Layout Information --------------------------------------- - -- ``libensemble/`` - Source code. -- ``/alloc_funcs`` - Allocation functions. Policies for passing work between the manager and workers. -- ``/comms`` - Modules and abstractions for communication between the manager and workers. -- ``/executors`` - An interface for launching executables, often simulations. -- ``/gen_classes`` - Generators that adhere to the `gest-api` standard. - Recommended over entries from ``/gen_funcs`` that perform similar functionality. -- ``/gen_funcs`` - Generator functions. Modules for producing points for simulations. (Legacy) -- ``/resources`` - Classes and functions for managing compute resources for MPI tasks, libensemble workers. -- ``/sim_funcs`` - Simulator functions. Modules for running simulations or performing experiments. -- ``/tests`` - Tests. - - ``/functionality_tests`` - Primarily tests libEnsemble code only. - - ``/regression_tests`` - Tests libEnsemble code with external code. Often more closely resembles actual use-cases. - - ``/unit_tests`` - Tests for individual modules. -- ``ensemble.py`` - The primary interface for parameterizing and running libEnsemble. The ``Ensemble`` class in this module wraps the lower-level ``libE`` function and automates argument parsing and state management. -- ``generators.py`` - Base classes for generators that adhere to the `gest-api` standard. -- ``history.py`` - Module for recording points that have been generated and simulation results. NumPy structured array. -- ``libE.py`` - libE main file. Previous primary interface for parameterizing and running libEnsemble. The primary interface in ``ensemble.py`` wraps this function. -- ``manager.py`` - Module for maintaining the history array and passing points between the workers. -- ``message_numbers.py`` - Constants that represent states of the ensemble. -- ``specs.py`` - Dataclasses for parameterizing the ensemble. Most importantly, contains ``LibeSpecs, SimSpecs, GenSpecs``. -- ``worker.py`` - Module for running generators and simulators. Communicates with the manager. -- ``examples/`` - The ``*_funcs`` and ``calling_scripts`` directories contain symlinks to examples further in the source code. -- ``/libE_submission_scripts`` - Example scripts for submitting libEnsemble jobs to HPC systems. -- ``/tutorials`` - Tutorials on how to use libEnsemble. - -Information about Generators ----------------------------- - -- Generators are functions or objects that produce points for simulations. -- The History array is a NumPy structured array that stores points that have been generated and simulation results. -Its fields match ``sim_specs/gen_specs["out"]`` or ``vocs`` attributes, plus additional reserved fields for metadata. -- Prior to libEnsemble v1.6.0, generators were plain functions. They often ran in "persistent" mode, meaning they executed in a -long-running loop, sending and receiving points to and from the manager until the ensemble was complete. -- A ``gest-api`` or "standardized" generator is a class that inherits from ``gest_api.Generator``, implements ``suggest`` and ``ingest`` methods (which process lists of dictionaries, not NumPy arrays), and is parameterized by a ``vocs``. -- See ``libensemble/gen_classes/external/sampling.py`` for simple examples of the pure ``gest-api`` interface. (Note: ``libensemble.generators.LibensembleGenerator`` exists to wrap legacy NumPy-based workflows, but pure ``gest_api.Generator`` is preferred). -- Generators are often used for simple sampling, optimization, calibration, uncertainty quantification, and other simulation-based tasks. -- **Automatic Variable Mapping**: When using ``LibensembleGenerator`` subclasses, they automatically map all ``VOCS`` variables to a single multi-dimensional ``"x"`` field in the History array if no explicit ``variables_mapping`` is provided. Pure ``gest_api.Generator`` classes handle variables natively. -- **Mandatory Input Fields**: Even for simple generators that don't ingest data, ``gen_specs["in"]`` or ``gen_specs["persis_in"]`` must be defined if using an allocation function like ``only_persistent_gens`` that attempts to send rows. If these are empty, the manager will raise an ``AssertionError`` stating that no fields were requested to be sent. -- **Default Allocator**: ``only_persistent_gens`` is the default allocator for standardized ``gest-api`` generators. It treats these generators as persistent entities that communicate throughout the run. - -General Guidelines ------------------- - -- If using classic ``sim_specs`` and ``gen_specs``, then ensure that ``sim_specs["out"]`` and ``gen_specs["in"]`` field names match, and vice-versa. -- As-of libEnsemble v1.6.0, ``SimSpecs`` and ``GenSpecs`` can also be parameterized by a ``vocs`` object, imported from ``gest_api.vocs`` (NOT xopt.vocs). -- ``VOCS`` contains variables, objectives, constraints, and other settings that define the problem. -See ``libensemble/tests/regression_tests/test_xopt_EI.py`` for an example of how to use it. -- An MPI distribution is not required for libEnsemble to run, but is required to use the ``MPIExecutor``. ``mpich`` is recommended. -- New tests are heavily encouraged for new features, bug fixes, or integrations. See ``libensemble/tests/regression_tests`` for examples. -- Never use destructive git commands unless explicitly requested. -- Code is in the ``black`` style. This should be enforced by ``pre-commit``. -- When writing new code, prefer the ``LibeSpecs``, ``SimSpecs``, and ``GenSpecs`` dataclasses over the classic ``sim_specs`` and ``gen_specs`` bare dictionaries. -- Read ``CONTRIBUTING.md`` for more information. -- The external ``libE-community-examples`` repository contains past use-cases, generators, and other examples. - -Development Environment ------------------------ - -- ``pixi`` is the recommended environment manager for libEnsemble development. See ``pyproject.toml`` for the list -of dependencies and the available testing environments. (Note: If ``pixi`` is not in your system path, it can often be found in ``/opt/homebrew/bin/pixi`` or ``/usr/local/bin/pixi``). -- Enter the development environment with ``pixi shell -e dev``. This environment contains the most common dependencies for development and testing. -- For one-off commands, use ``pixi run -e dev``. This will run a single command in the development environment. -- If ``pixi`` is not available or not preferred by the user, ``pip install -e .`` can be used instead. Other dependencies may need to be installed manually. -- If committing, use ``pre-commit`` to ensure that code style and formatting are consistent. See ``.pre-commit-config.yaml`` for -the configuration and ``pyproject.toml`` for other configuration. - -Testing -------- - -- Run tests with the ``run_tests.py`` script: ``python libensemble/tests/run_tests.py``. See ``libensemble/tests/run_tests.py`` for usage information. -- Some tests require third party software to be installed. When developing a feature or fixing a bug, since the entire test suite will be run on Github Actions, -for local development running individual tests is sufficient. -- Individual unit tests can be run with ``pixi run -e dev pytest path/to/test_file``. -- A libEnsemble run typically outputs an ``ensemble.log`` and ``libE_stats.txt`` file in the working directory. Check these files for tracebacks or run statistics. -- An "ensemble" or "workflow" directory may also be created, often containing per-simulation output directories - -Modernizing Scripts for libEnsemble 2.0 ---------------------------------------- - -When modernizing existing libEnsemble scripts (functionality tests, regression tests, or user examples) for version 2.0, follow these steps: - -- **Switch to `gest-api` Generators**: Replace legacy generator functions (from `libensemble.gen_funcs`) with standardized generator classes (from `libensemble.gen_classes` or other `gest-api` compatible sources). -- **Use `VOCS` for Parameterization**: Standardized generators are parameterized by a `VOCS` object (from `gest_api.vocs`). Define variables and objectives within this object. -- **Set `gen_specs["generator"]`**: Instead of `gen_f`, use the `generator` field in `GenSpecs` to pass the initialized generator class. -- **Remove Explicit `AllocSpecs`**: In libEnsemble 2.0, `only_persistent_gens` is the default allocator. Scripts that previously used `give_sim_work_first` or other simple allocators can often remove `alloc_specs` entirely when switching to standardized generators. -- **Generator Placement**: By default, generators run on the manager thread (Worker 0). This means all allocated workers are available for simulation tasks unless `gen_on_worker` is explicitly set to `True` in `libE_specs`. -- **Mandatory Fields**: Ensure `gen_specs["in"]` or `gen_specs["persis_in"]` includes at least one field (e.g., `["sim_id"]`) if feedback is sent back to the generator, to satisfy the allocator's requirements. -- **gest-api Simulators**: The gest-api pattern also applies to simulators. Set `SimSpecs.simulator` to a callable with signature `(input_dict: dict, **kwargs) -> dict` instead of providing a `sim_f`. libEnsemble automatically wraps it with `gest_api_sim` from `libensemble.sim_funcs.gest_api_wrapper` and handles all NumPy conversions. `SimSpecs.inputs` and `SimSpecs.outputs` can be derived automatically when `SimSpecs.vocs` is provided. -- **`safe_mode` is opt-in**: `libE_specs["safe_mode"]` defaults to `False`, meaning protected History fields (`gen_worker`, `gen_started_time`, `gen_ended_time`, `sim_worker`, `sim_started`, `sim_started_time`, `sim_ended`, `sim_ended_time`, `gen_informed`, `gen_informed_time`, `kill_sent`) are freely overwritable by default. Set `safe_mode=True` to enable protection. Overwriting these fields without understanding their purpose may crash libEnsemble. +# Agent Contributor Guidelines + +## Architecture + +Manager-worker framework: manager allocates points from a generator to workers running simulators. See `libensemble/tests/regression_tests/test_1d_sampling.py` for a minimal example. + +## Repository Layout + +Core paths relative to `libensemble/`: +- `alloc_funcs/` - Allocation policies. +- `comms/` - Manager-worker communication +- `executors/` - Launching executables +- `gen_classes/` - **Preferred**: gest-api generators +- `gen_funcs/` - Legacy generators +- `resources/` - Compute resource management +- `sim_funcs/` - Simulator functions +- `tests/{functionality,regression,unit}_tests/` +- `ensemble.py` - Primary interface (wraps `libE()`) +- `generators.py` - gest-api base classes +- `history.py` - NumPy structured array for input/output data +- `libE.py` - Entrypoint for libEnsemble, and also legacy top-level interface +- `manager.py` - History & worker coordination +- `specs.py` - `SimSpecs`, `GenSpecs`, `AllocSpecs`, `ExitCriteria`, `LibeSpecs` dataclasses +- `worker.py` - Runs simulators, communicates with manager. Can be configured to run generators as well + +## Specifications (Modern Configs) + +All configs use **dataclasses** from `specs.py`, not bare dicts (legacy): +- `SimSpecs` - simulator config (`sim_f`/`simulator`, `in`/`inputs`, `out`/`outputs`, `vocs`) +- `GenSpecs` - generator config (`gen_f`/`generator`, `in`/`inputs`, `out`/`outputs`, `persis_in`, `vocs`, `user`) +- `AllocSpecs` - allocation function config (`alloc_f`, `user`) +- `ExitCriteria` - termination conditions (`sim_max`, `wallclock_max`, `stop_val`) +- `LibeSpecs` - runtime config (`comms`, `nworkers`, `gen_on_worker`, `safe_mode`, etc.) + +These accept `vocs` from `gest_api.vocs` (not xopt.vocs). The dict-based `sim_specs`/`gen_specs` API still works but is legacy. + +## Generators + +- **gest-api** (preferred): class inheriting `gest_api.Generator`, implements `suggest(input_dicts)`/`ingest(output_dicts)`, parameterized by `VOCS`. See `libensemble/gen_classes/external/sampling.py`. +- Generators are used for sampling, optimization, calibration, uncertainty quantification, and other simulation-based tasks. +- **Legacy**: plain functions with persistent loops. Use `LibensembleGenerator` to wrap into gest-api. +- History array: NumPy structured array with fields from `sim_specs/gen_specs["out"]` or `vocs` attributes plus reserved metadata fields. +- **Automatic Variable Mapping**: `LibensembleGenerator` maps all VOCS vars to `"x"` field unless `variables_mapping` is provided. +- **Mandatory Input Fields**: `gen_specs["in"]`/`["persis_in"]` must have >=1 field (e.g. `["sim_id"]`) when using `only_persistent_gens` allocator. +- **Default Allocator**: `only_persistent_gens` for gest-api generators. + +## Conventions + +- Match output fields ↔ input fields (e.g., `SimSpecs.out` ↔ `GenSpecs.in`, and vice-versa). +- Always use the dataclass configs from `specs.py` (`SimSpecs`, `GenSpecs`, etc.) over legacy bare dicts. +- `SimSpecs`/`GenSpecs` accept `vocs` from `gest_api.vocs` (not xopt.vocs). +- Code style: `black` (enforced by pre-commit via `pre-commit`). +- No destructive git commands without explicit request. + +## Development + +- **pixi** recommended. Enter: `pixi shell -e dev`. One-off: `pixi run -e dev `. (Path: `/opt/homebrew/bin/pixi` or `/usr/local/bin/pixi`.) +- Fallback: `pip install -e .` (may need manual dependency installs). +- Pre-commit: `pre-commit` (config in `.pre-commit-config.yaml`, `pyproject.toml`). + +## Testing + +- Full suite: `python libensemble/tests/run_tests.py` +- Single unit test: `pixi run -e dev pytest path/to/test_file` +- Single regression/functionality test: `pixi run -e dev python path/to/test_file -n 4` +- Check `ensemble.log` and `libE_stats.txt` for run diagnostics. + +## Modernizing for libEnsemble 2.0 + +When updating scripts from legacy patterns: + +- **Generators**: Replace `gen_f` with gest-api `Generator` class set via `gen_specs["generator"]`. +- **VOCS**: Parameterize generators with `VOCS` from `gest_api.vocs`. +- **AllocSpecs**: `AllocSpecs` dataclass replaces bare dict. Often removable — `only_persistent_gens` is the default allocator. +- **Generator placement**: Runs on manager (Worker 0) by default. Set `LibeSpecs(gen_on_worker=True)` to run on a dedicated worker. +- **Input fields**: `GenSpecs.inputs`/`persis_in` must contain >=1 field. +- **Simulators**: Use `SimSpecs.simulator` with `(input_dict, **kwargs) -> dict` instead of `sim_f`. libEnsemble auto-wraps via `gest_api_sim`. `inputs`/`outputs` auto-derived from `vocs`. +- **safe_mode**: `LibeSpecs(safe_mode=False)` by default (protected History fields overwritable). Set `True` to guard metadata fields (`gen_worker`, `sim_worker`, `sim_started`, `sim_ended`, etc.). diff --git a/docs/advanced_installation/advanced_installation.rst b/docs/advanced_installation/advanced_installation.rst index fc2e8546a..ea30fd4d0 100644 --- a/docs/advanced_installation/advanced_installation.rst +++ b/docs/advanced_installation/advanced_installation.rst @@ -31,7 +31,18 @@ Further recommendations for selected HPC systems are given in the Globus Compute -------------- -`Globus Compute`_ may be installed optionally to submit simulation function instances to remote Globus Compute endpoints. +`Globus Compute`_ may be installed optionally to submit simulation function +instances to remote Globus Compute endpoints:: + + pip install globus-compute-sdk + +This is an optional dependency; libEnsemble operates normally without it. +If Globus Compute is not installed and a ``globus_compute_endpoint`` is +configured, libEnsemble will warn and fall back to local execution. + +See :ref:`Globus Compute - Remote User Functions` for +usage, and the :doc:`GlobusComputeExecutor API reference` +for the full executor interface. .. _Globus Compute: https://www.globus.org/compute .. _Python: http://www.python.org diff --git a/docs/conf.py b/docs/conf.py index 8647bbf4f..08ea11276 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -31,7 +31,7 @@ def __getattr__(cls, name): return MagicMock() -autodoc_mock_imports = ["ax", "gpcam", "IPython", "matplotlib", "pandas", "scipy", "surmise"] +autodoc_mock_imports = ["ax", "globus_compute_sdk", "gpcam", "IPython", "matplotlib", "pandas", "scipy", "surmise"] MOCK_MODULES = [ "argparse", diff --git a/docs/executor/ex_globus_compute.rst b/docs/executor/ex_globus_compute.rst new file mode 100644 index 000000000..8b902775b --- /dev/null +++ b/docs/executor/ex_globus_compute.rst @@ -0,0 +1,57 @@ +Globus Compute Executor +======================= + +`Overview `__ \|\| `Base Executor `__ \|\| `MPI Executor `__ \|\| **Globus Compute Executor** + +The :class:`GlobusComputeExecutor` +submits Python callables to a remote `Globus Compute`_ endpoint instead of +launching local subprocesses. It can be used inside simulator functions in the +same way as the :doc:`MPI Executor`, retrieving it from +``libE_info["executor"]``. + +See :ref:`Globus Compute - Remote User Functions` for an +overview of the two GC integration modes (manager-side GC-only and user-facing +executor). + +.. note:: + + ``globus-compute-sdk`` must be installed to use this executor:: + + pip install globus-compute-sdk + + Users must also authenticate via Globus_ and have an active + `Globus Compute endpoint`_ running on the target system. + +GlobusComputeExecutor +--------------------- + +.. autoclass:: libensemble.executors.globus_compute_executor.GlobusComputeExecutor + :members: register_app, submit, set_workerID, set_worker_info + :show-inheritance: + + .. automethod:: __init__ + +GlobusComputeTask +----------------- + +Tasks are created and returned by +:meth:`GlobusComputeExecutor.submit()`. +Each task wraps a ``concurrent.futures.Future`` from the Globus Compute SDK +and exposes the same polling interface as other libEnsemble tasks. + +.. autoclass:: libensemble.executors.globus_compute_executor.GlobusComputeTask + :members: poll, wait, kill, result, running, done, cancelled + +**Task states**: ``RUNNING`` | ``FINISHED`` | ``FAILED`` | ``USER_KILLED`` + +**Key attributes**: + +:task.state: (string) Current task state — one of the values above. +:task.finished: (bool) True once the task has completed (successfully or not). +:task.success: (bool) True if the remote callable returned without raising. +:task.runtime: (float) Elapsed wall-clock seconds since submission. +:task.submit_time: (float) Time since epoch at submission. + +.. _Globus Compute: https://www.globus.org/compute +.. _Globus: https://www.globus.org/ +.. _Globus Compute endpoint: https://globus-compute.readthedocs.io/en/latest/endpoints.html diff --git a/docs/executor/ex_index.rst b/docs/executor/ex_index.rst index a4f33cb39..1de648ade 100644 --- a/docs/executor/ex_index.rst +++ b/docs/executor/ex_index.rst @@ -1,6 +1,6 @@ .. _executor_index: -**Overview** \|\| `Base Executor `__ \|\| `MPI Executor `__ +**Overview** \|\| `Base Executor `__ \|\| `MPI Executor `__ \|\| `Globus Compute Executor `__ Executors ========= @@ -14,8 +14,12 @@ portable interface for running and managing user applications. ex_overview ex_base ex_mpi + ex_globus_compute The **Executor** provides a portable interface for running applications on any system and -any number of compute resources. +any number of compute resources. The :doc:`MPI Executor` launches MPI +applications on local resources; the +:doc:`Globus Compute Executor` submits Python callables to +remote Globus Compute endpoints. Please select from the sections above or the sidebar navigation to read more. diff --git a/docs/executor/ex_overview.rst b/docs/executor/ex_overview.rst index f53510b73..47415979c 100644 --- a/docs/executor/ex_overview.rst +++ b/docs/executor/ex_overview.rst @@ -156,4 +156,19 @@ which partitions resources among workers, ensuring that runs utilize different resources (e.g., nodes). Furthermore, the ``MPIExecutor`` offers resilience via the feature of re-launching tasks that fail to start because of system factors. +Remote Execution with Globus Compute +------------------------------------- + +The :doc:`GlobusComputeExecutor` submits Python callables +to remote `Globus Compute`_ endpoints instead of launching local subprocesses. +It exposes the same ``submit()`` / ``poll()`` / ``kill()`` interface as other +libEnsemble executors and can be retrieved from ``libE_info["executor"]`` +inside simulator functions. + +See :ref:`Globus Compute - Remote User Functions` for an +overview of all Globus Compute integration modes and the +:doc:`GlobusComputeExecutor API reference` for the full +interface. + .. _concurrent futures: https://docs.python.org/library/concurrent.futures.html +.. _Globus Compute: https://www.globus.org/compute diff --git a/docs/platforms/globus_compute.rst b/docs/platforms/globus_compute.rst new file mode 100644 index 000000000..8934e8dc4 --- /dev/null +++ b/docs/platforms/globus_compute.rst @@ -0,0 +1,145 @@ +.. _globus_compute_ref: + +====================================== +Globus Compute - Remote User Functions +====================================== + +`Globus Compute`_ (formerly funcX) is a distributed, high-performance +function-as-a-service platform. When libEnsemble is running on a resource with +internet access (laptops, login nodes, other servers, etc.), it can offload +simulator calls to remote Globus Compute endpoints: + + .. image:: ../images/funcxmodel.png + :alt: running_with_globus_compute + :scale: 50 + :align: center + +This is useful for running ensembles across machines and heterogeneous resources. +There are **two approaches**, described below. + +.. dropdown:: **Caveats** + + The following caveats apply to all Globus Compute modes: + + 1. Simulator functions submitted to Globus Compute must be *non-persistent*, + since manager-worker communicators cannot be serialized or used by a + remote resource. + + 2. ``Executor.manager_poll()`` is not available inside remotely executed + functions. Control over remote work is limited to inspecting return + values and exceptions when tasks complete. + + 3. Globus Compute imposes a `handful of task-rate and data limits`_ on + submitted functions. + + 4. Users are responsible for authenticating via Globus_ and maintaining their + `Globus Compute endpoints`_ on their target systems. + +.. _gc_only_mode: + +Manager-side GC (GC-only mode) +------------------------------- + +The recommended approach for most use cases. When +``globus_compute_endpoint`` is set in :class:`SimSpecs` +and ``gen_on_worker`` is not set (the default), libEnsemble enters +**GC-only mode**: no local worker processes are launched. The manager +submits simulation work directly to Globus Compute and polls futures for +results. The generator still runs as a local thread on the manager. + +``nworkers`` controls the maximum number of simultaneously in-flight +Globus Compute tasks (virtual concurrency). The default is 1. + +This mode supports both the :ref:`gest-api simulator format` +(``SimSpecs.simulator``) and the legacy ``sim_f`` format. + +.. code-block:: python + + from libensemble import Ensemble + from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + + def my_sim(input_dict: dict, **kwargs) -> dict: + """gest-api simulator — runs remotely on the GC endpoint.""" + return {"f": input_dict["x"] ** 2} + + + sim_specs = SimSpecs( + simulator=my_sim, + vocs=vocs, + globus_compute_endpoint="3af6dc24-3f27-4c49-8d11-e301ade15353", + ) + + libE_specs = LibeSpecs(nworkers=4) # up to 4 concurrent GC tasks + + workflow = Ensemble( + sim_specs=sim_specs, + gen_specs=gen_specs, + libE_specs=libE_specs, + exit_criteria=ExitCriteria(sim_max=20), + ) + H, _, _ = workflow.run() + +Users can also define ``Executor`` instances within their remote simulator +functions and submit MPI applications normally, as long as libEnsemble and +the target application are accessible on the remote system:: + + # Within the remote simulator function + from libensemble.executors import MPIExecutor + exctr = MPIExecutor() + exctr.register_app(full_path="/home/user/forces.x", app_name="forces") + task = exctr.submit(app_name="forces", num_procs=64) + +.. note:: + + Both the simulator callable and any VOCS object must be **picklable**, + as they are serialized and shipped to the remote Globus Compute endpoint. + +.. _gc_executor_approach: + +GlobusComputeExecutor (user-facing) +------------------------------------ + +For workflows where the simulation function itself orchestrates remote +calls, like fanning out to multiple endpoints or mixing local +and remote work. Use the +:class:`GlobusComputeExecutor` +directly inside the simulator. + +Create and register the executor in the top-level script: + +.. code-block:: python + + from libensemble.executors import GlobusComputeExecutor + + exctr = GlobusComputeExecutor(endpoint_id="3af6dc24-3f27-4c49-8d11-e301ade15353") + +Then use it inside the simulator function: + +.. code-block:: python + + import time + + + def my_sim(H, persis_info, sim_specs, libE_info): + exctr = libE_info["executor"] + + task = exctr.submit(func=my_remote_func, app_args=H["x"][0]) + + while not task.finished: + task.poll() + if exctr.manager_kill_received(): + task.kill() + break + time.sleep(0.1) + + return H_o, persis_info + +See the :doc:`GlobusComputeExecutor API reference<../executor/ex_globus_compute>` for +the full interface including ``register_app``, ``submit``, and +:class:`GlobusComputeTask` methods. + +.. _Globus Compute: https://www.globus.org/compute +.. _Globus Compute endpoints: https://globus-compute.readthedocs.io/en/latest/endpoints.html +.. _Globus: https://www.globus.org/ +.. _handful of task-rate and data limits: https://globus-compute.readthedocs.io/en/latest/limits.html diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index d1de30c45..3a16a5eed 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -159,60 +159,9 @@ will better manage simulation and generation functions that contain considerable computational work or I/O. Therefore the second option is to use Globus Compute to isolate this work from the workers. -.. _globus_compute_ref: - -Globus Compute - Remote User Functions --------------------------------------- - -If libEnsemble is running on some resource with -internet access (laptops, login nodes, other servers, etc.), workers can be instructed to -launch generator or simulator user function instances to separate resources from -themselves via `Globus Compute`_ (formerly funcX), a distributed, high-performance function-as-a-service platform: - - .. image:: ../images/funcxmodel.png - :alt: running_with_globus_compute - :scale: 50 - :align: center - -This is useful for running ensembles across machines and heterogeneous resources, but -comes with several caveats: - - 1. User functions registered with Globus Compute must be *non-persistent*, since - manager-worker communicators can't be serialized or used by a remote resource. - - 2. Likewise, the ``Executor.manager_poll()`` capability is disabled. The only - available control over remote functions by workers is processing return values - or exceptions when they complete. - - 3. Globus Compute imposes a `handful of task-rate and data limits`_ on submitted functions. - - 4. Users are responsible for authenticating via Globus_ and maintaining their - `Globus Compute endpoints`_ on their target systems. - -Users can still define Executor instances within their user functions and submit -MPI applications normally, as long as libEnsemble and the target application are -accessible on the remote system:: - - # Within remote user function - from libensemble.executors import MPIExecutor - exctr = MPIExecutor() - exctr.register_app(full_path="/home/user/forces.x", app_name="forces") - task = exctr.submit(app_name="forces", num_procs=64) - -Specify a Globus Compute endpoint in :class:`sim_specs` via the ``globus_compute_endpoint`` -argument. For example:: - - from libensemble.specs import SimSpecs - - sim_specs = SimSpecs( - sim_f = sim_f, - inputs = ["x"], - out = [("f", float)], - globus_compute_endpoint = "3af6dc24-3f27-4c49-8d11-e301ade15353", - ) - -See the ``libensemble/tests/scaling_tests/globus_compute_forces`` directory for a complete -remote-simulation example. +See :doc:`Globus Compute - Remote User Functions` for the two +integration approaches (manager-side GC-only mode and the user-facing +``GlobusComputeExecutor``). Instructions for Specific Platforms ----------------------------------- @@ -231,9 +180,5 @@ libEnsemble on specific HPC systems. perlmutter polaris srun + globus_compute example_scripts - -.. _Globus Compute: https://www.globus.org/compute -.. _Globus Compute endpoints: https://globus-compute.readthedocs.io/en/latest/endpoints.html -.. _Globus: https://www.globus.org/ -.. _handful of task-rate and data limits: https://globus-compute.readthedocs.io/en/latest/limits.html diff --git a/docs/running_libE.rst b/docs/running_libE.rst index 6329e13e2..2677eb8d2 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -83,9 +83,9 @@ if using an :class:`Ensemble` object with **Reverse-ssh interface** Set ``comms`` to ``ssh`` to launch workers on remote ssh-accessible systems. This -co-locates workers, functions, and any applications. User -functions can also be persistent, unlike when launching remote functions via -:ref:`Globus Compute`. +co-locates workers, functions, and any applications. Simulator functions can be +persistent, unlike those submitted to :ref:`Globus Compute`, +which must be non-persistent. The remote working directory and Python need to be specified. This may resemble:: diff --git a/libensemble/executors/__init__.py b/libensemble/executors/__init__.py index 563fa3352..fdcae5c87 100644 --- a/libensemble/executors/__init__.py +++ b/libensemble/executors/__init__.py @@ -1,4 +1,5 @@ from libensemble.executors.executor import Executor +from libensemble.executors.globus_compute_executor import GlobusComputeExecutor, GlobusComputeTask from libensemble.executors.mpi_executor import MPIExecutor -__all__ = ["Executor", "MPIExecutor"] +__all__ = ["Executor", "GlobusComputeExecutor", "GlobusComputeTask", "MPIExecutor"] diff --git a/libensemble/executors/globus_compute_executor.py b/libensemble/executors/globus_compute_executor.py new file mode 100644 index 000000000..154c2762e --- /dev/null +++ b/libensemble/executors/globus_compute_executor.py @@ -0,0 +1,285 @@ +import logging +import os +from concurrent.futures import Future, TimeoutError +from typing import Any + +from libensemble.executors.executor import ( + Application, + Executor, + ExecutorException, + Task, + TimeoutExpired, +) +from libensemble.utils.globus_compute import GCSession +from libensemble.utils.timer import TaskTimer + +logger = logging.getLogger(__name__) + + +class GlobusComputeTask(Task): + """A :class:`~libensemble.executors.executor.Task` wrapping a + ``concurrent.futures.Future`` returned by Globus Compute. + + Instead of managing a local subprocess, this task polls a remote + computation via the future's ``done()`` / ``result()`` APIs. + """ + + def __init__(self, future, app=None, app_args=None, workerid=None): + self.id = next(Task.newid) + self.reset() + self.timer = TaskTimer() + self.app = app + self.app_args = app_args + self.workerID = workerid + self._gc_future = future + + worker_name = f"_worker{self.workerID}" if self.workerID else "" + self.name = Task.prefix + f"_{app.name}{worker_name}_{self.id}" + self.stdout = "" + self.stderr = "" + self.workdir = None + self.dry_run = False + self.runline = None + self.run_attempts = 0 + self.env = {} + self.ngpus_req = 0 + + self.state = "RUNNING" + self.timer.start() + self.submit_time = self.timer.tstart + + def _check_poll(self): + if self.finished: + return False + return True + + def poll(self): + if not self._check_poll(): + return + if self._gc_future.done(): + try: + self._gc_future.result() + self.finished = True + self.success = True + self.state = "FINISHED" + except Exception: + self.finished = True + self.success = False + self.state = "FAILED" + self.calc_task_timing() + else: + self.state = "RUNNING" + self.runtime = self.timer.elapsed + + def wait(self, timeout=None): + if not self._check_poll(): + return + try: + self._gc_future.result(timeout=timeout) + self.finished = True + self.success = True + self.state = "FINISHED" + except TimeoutError: + raise TimeoutExpired(self.name, timeout) + except Exception: + self.finished = True + self.success = False + self.state = "FAILED" + self.calc_task_timing() + + def kill(self, wait_time=None): + self._gc_future.cancel() + self.state = "USER_KILLED" + self.finished = True + self.calc_task_timing() + + def result(self, timeout=None): + self.wait(timeout=timeout) + return self.state + + def running(self): + self.poll() + return self.state == "RUNNING" + + def done(self): + self.poll() + return self.finished + + def cancelled(self): + self.poll() + return self.state == "USER_KILLED" + + +class GlobusComputeExecutor(Executor): + """An :class:`~libensemble.executors.executor.Executor` that submits + Python callables to Globus Compute instead of launching local subprocesses. + + Usage in a top-level script:: + + from libensemble.executors.globus_compute_executor import GlobusComputeExecutor + + exctr = GlobusComputeExecutor(endpoint_id="...") + + Inside a simulator function:: + + task = info["executor"].submit(func=my_remote_func, app_args=...) + while not task.finished: + task.poll() + if info["executor"].manager_kill_received(): + task.kill() + break + time.sleep(0.1) + """ + + def __init__(self, endpoint_id: str): + self.manager_signal = None + self.default_apps: dict[str, Application | None] = {"sim": None, "gen": None} + self.apps: dict[str, Application] = {} + self.wait_time = 60 + self.list_of_tasks: list[GlobusComputeTask] = [] + self.workerID = None + self.comm = None + self.last_task = 0 + self.base_dir = os.getcwd() + + self.endpoint_id = endpoint_id + self._gc_executor = None + self._func_cache: dict[int, str] = {} + + def _ensure_gc(self): + if self._gc_executor is None: + self._gc_executor = GCSession.get_or_create_executor(self.endpoint_id) + return self._gc_executor + + def _get_func_id(self, func) -> str: + key = id(func) + if key in self._func_cache: + return self._func_cache[key] + executor = self._ensure_gc() + if executor is None: + raise RuntimeError( + "Globus Compute SDK is not installed. " "Install it with: pip install globus-compute-sdk" + ) + fid = executor.register_function(func) + self._func_cache[key] = fid + return fid + + def register_app( + self, + full_path: str, + app_name: str | None = None, + calc_type: str | None = None, + desc: str | None = None, + precedent: str = "", + pyobj: Any | None = None, + ) -> None: + """Register an application. + + If *pyobj* is provided the application is treated as a remote + Python callable. Otherwise the base-class behaviour applies + (local executable). + """ + if not app_name: + app_name = os.path.split(full_path)[1] + + app = Application(full_path, app_name, calc_type, desc, pyobj, precedent) + self.apps[app_name] = app + + if calc_type is not None: + if calc_type not in self.default_apps: + raise ExecutorException(f"Unrecognized calculation type {calc_type}") + self.default_apps[calc_type] = app + + def submit( + self, + calc_type: str | None = None, + app_name: str | None = None, + app_args: str | None = None, + func: Any = None, + stdout: str | None = None, + stderr: str | None = None, + dry_run: bool = False, + wait_on_start: bool = False, + **kwargs, + ) -> GlobusComputeTask: + """Submit a function or registered application to Globus Compute. + + Parameters + ---------- + calc_type : str, optional + Calculation type (``"sim"`` or ``"gen"``). Used with *app_name*. + app_name : str, optional + Name of a previously registered application. + app_args : str, optional + Arguments passed alongside the function. + func : Callable, optional + A Python callable to execute remotely. Takes precedence over + *app_name* / *calc_type*. + stdout, stderr : str, optional + Ignored (stubs for API compatibility). + dry_run : bool, optional + If True, return a task without actually submitting. + wait_on_start : bool, optional + If True, block until the task is reported as started. + + Returns + ------- + GlobusComputeTask + """ + if dry_run: + raise NotImplementedError("dry_run is not supported for GlobusComputeExecutor") + + if func is not None: + fid = self._get_func_id(func) + app = Application(full_path="", name=func.__name__, calc_type="sim", pyobj=func) + elif app_name is not None: + app = self.get_app(app_name) + if app.pyobj is not None: + fid = self._get_func_id(app.pyobj) + else: + raise ValueError( + f"Application '{app_name}' has no pyobj callable registered. " + "Use the `func=...` argument, or register an app with `pyobj=`." + ) + elif calc_type is not None: + app = self.default_app(calc_type) + if app.pyobj is not None: + fid = self._get_func_id(app.pyobj) + else: + raise ValueError( + f"Default {calc_type} app has no pyobj callable. " + "Use the `func=...` argument, or register an app with `pyobj=`." + ) + else: + raise ValueError("One of `func`, `app_name`, or `calc_type` must be provided") + + args = app_args + future: Future = self._ensure_gc().submit_to_registered_function(fid, args) + task = GlobusComputeTask(future, app=app, app_args=args, workerid=self.workerID) + self.list_of_tasks.append(task) + + if wait_on_start: + task.wait() + + return task + + def set_workerID(self, workerid) -> None: + """Sets the worker ID for this executor.""" + self.workerID = workerid + + def set_worker_info(self, comm=None, workerid=None) -> None: + """Sets worker info for this executor.""" + self.workerID = workerid + self.comm = comm + + def serial_setup(self): + pass + + def set_resources(self, resources): + pass + + def add_platform_info(self, platform_info=None): + pass + + def set_gen_procs_gpus(self, libE_info): + pass diff --git a/libensemble/libE.py b/libensemble/libE.py index 219e2cd8c..811b126b4 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -262,6 +262,24 @@ def libE( libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local} + # Detect GC-only mode: manager submits directly to Globus Compute. + # nworkers is repurposed as virtual concurrency (max concurrent GC sims). + # Must run before Resources.init_resources so the resource manager is + # skipped (GC-only has no real nodes to partition). + if sim_specs.get("globus_compute_endpoint"): + libE_specs["_gc_only"] = True + if libE_specs.get("gen_on_worker"): + logger.info("GC-only mode: gen_on_worker is ignored (generator runs on manager)") + libE_specs["gen_on_worker"] = False + # Force local comms (no MPI workers needed) + if libE_specs.get("comms", "mpi") != "local": + libE_specs["comms"] = "local" + logger.info("GC-only mode: switching to local comms (no workers needed)") + # GC-only has no real nodes; disable resource manager to avoid ZeroDivisionError + if not libE_specs.get("disable_resource_manager"): + libE_specs["disable_resource_manager"] = True + logger.info("GC-only mode: disabling resource manager (no local nodes)") + Resources.init_resources(libE_specs, platform_info) if Executor.executor is not None: Executor.executor.add_platform_info(platform_info) @@ -485,6 +503,9 @@ def kill_proc_team(wcomms, timeout): def libE_local(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Main routine for thread/process launch of libE.""" + if libE_specs.get("_gc_only"): + return _libE_local_gc_only(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0) + resources = Resources.resources if resources is not None: local_host = [socket.gethostname()] @@ -523,6 +544,25 @@ def cleanup(): ) +def _libE_local_gc_only(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): + """GC-only variant: skip worker processes entirely.""" + hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0) + + if not libE_specs["disable_log_files"]: + exit_logger = manager_logging_config(specs=libE_specs) + else: + exit_logger = None + + def cleanup(): + """Handler to clean up GC resources.""" + if exit_logger is not None: + exit_logger() + + return manager( + [], sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, hist, on_cleanup=cleanup + ) + + # ==================== TCP version ================================= diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..c2a7e2e90 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -3,8 +3,10 @@ ============================ """ +import concurrent.futures import cProfile import glob +import inspect import logging import os import platform @@ -30,11 +32,14 @@ MAN_SIGNAL_KILL, PERSIS_STOP, STOP_TAG, + TASK_FAILED, + WORKER_DONE, calc_type_strings, ) from libensemble.resources.resources import Resources from libensemble.tools.fields_keys import protected_libE_fields from libensemble.tools.tools import _USER_CALC_DIR_WARNING +from libensemble.utils.globus_compute import GCSession from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer @@ -243,6 +248,17 @@ def __init__( local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) self.wcomms = [local_worker_comm] + self.wcomms + # If GC-only mode, add virtual worker slots for concurrent sims. + # nworkers controls concurrency (default 1 when unset). + if libE_specs.get("_gc_only"): + n_virtual = libE_specs.get("nworkers", 1) or 1 + virtual_W = np.zeros(n_virtual, dtype=Manager.worker_dtype) + start_id = len(self.W) + virtual_W["worker_id"] = np.arange(start_id, start_id + n_virtual) + virtual_W["gen_worker"] = False + self.W = np.concatenate([self.W, virtual_W]) + self.wcomms = self.wcomms + [None] * n_virtual + self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) @@ -574,9 +590,19 @@ def _kill_cancelled_sims(self) -> None: logger.debug(f"Manager sending kill signals to H indices {kill_sim_rows}") kill_ids = self.hist.H["sim_id"][kill_sim_rows] kill_on_workers = self.hist.H["sim_worker"][kill_sim_rows] - for w in kill_on_workers: - self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) - self.hist.H["kill_sent"][kill_ids] = True + + if self.libE_specs.get("_gc_only"): + # Cancel in-flight GC futures for these sim_ids + sim_ids_to_kill = set(kill_ids) + for future, (sim_id, w) in list(self._gc_futures.items()): + if sim_id in sim_ids_to_kill: + future.cancel() + del self._gc_futures[future] + self.hist.H["kill_sent"][kill_ids] = True + else: + for w in kill_on_workers: + self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) + self.hist.H["kill_sent"][kill_ids] = True # --- Handle termination @@ -692,6 +718,204 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: return output + # --- Globus Compute (manager-side) helpers + + def _init_gc(self) -> None: + """Initialize the Globus Compute executor for manager-side submission.""" + endpoint = self.sim_specs.get("globus_compute_endpoint", "") + if not endpoint: + raise ValueError("_gc_only mode requires globus_compute_endpoint in sim_specs") + + self._gc_executor, self._gc_fid = GCSession.get_or_create(endpoint, self.sim_specs["sim_f"]) + self._gc_futures: dict[concurrent.futures.Future, tuple[int, int]] = {} + self._gc_nparams = len(inspect.signature(self.sim_specs["sim_f"]).parameters) + + def _gc_submit(self, Work: dict, w: int) -> None: + """Submit simulation work to Globus Compute instead of a local worker.""" + sim_ids = Work["libE_info"]["H_rows"] + H_fields = Work["H_fields"] + + for sim_id in sim_ids: + calc_in = np.empty(1, dtype=[(name, self.hist.H.dtype.fields[name][0]) for name in H_fields]) + for name in H_fields: + calc_in[name] = self.hist.H[name][sim_id] + + p_info = Work.get("persis_info", {}) + + libE_info = dict(Work["libE_info"]) + libE_info["comm"] = None + libE_info.pop("executor", None) + + args = [calc_in, p_info, self.sim_specs, libE_info][: self._gc_nparams] + + future = self._gc_executor.submit_to_registered_function(self._gc_fid, args) + self._gc_futures[future] = (sim_id, w) + + @staticmethod + def _normalize_gc_result(result): + """Normalize a sim_f return value to a 3-tuple ``(out, persis_info, calc_status)``. + + Sim functions may return: + - 3-tuple: ``(H_o, persis_info, calc_status)`` + - 2-tuple: ``(H_o, persis_info)`` — common with gest-api wrappers + + In the 2-tuple case, *calc_status* defaults to ``WORKER_DONE``. + """ + if isinstance(result, (tuple, list)): + if len(result) >= 3: + return result[0], result[1], result[2] + elif len(result) == 2: + if isinstance(result[1], (int, str)): + return result[0], {}, result[1] + return result[0], result[1], WORKER_DONE + else: + return result[0], {}, WORKER_DONE + return result, {}, WORKER_DONE + + def _gather_gc_results(self, persis_info: dict) -> dict: + """Poll completed GC futures *and* the gen worker comm, then update history.""" + time.sleep(0.0001) # Avoid busy-wait (matches _receive_from_workers) + + # Poll the gen worker thread (w=0) so generator output reaches history + new_stuff = True + while new_stuff: + new_stuff = False + for w in self.W["worker_id"]: + if self.wcomms[w] is not None and self.wcomms[w].mail_flag(): + new_stuff = True + self._handle_msg_from_worker(persis_info, w) + + # Drain completed GC futures + done = [f for f in self._gc_futures if f.done()] + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = self._normalize_gc_result(future.result()) + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception as e: + logger.error(f"GC task failed for sim_id {sim_id}: {e}") + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": TASK_FAILED, + "calc_out": None, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": {}, + } + self._update_state_on_worker_msg(persis_info, D_recv, w) + + self._init_every_k_save() + return persis_info + + def _gc_cancel_futures(self) -> None: + """Cancel any in-flight GC futures.""" + for future in list(self._gc_futures.keys()): + future.cancel() + self._gc_futures.clear() + + def _run_gc_only(self, persis_info: dict) -> tuple[dict, int, int]: + """Manager main loop variation for GC-only mode.""" + self._init_gc() + try: + while not self.term_test(): + self._kill_cancelled_sims() + persis_info = self._gather_gc_results(persis_info) + Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) + if flag: + break + + for w in Work: + if self._sim_max_given(): + break + self._check_work_order(Work[w], w) + if Work[w]["tag"] == EVAL_GEN_TAG: + # Gen work goes to the generator thread via wcomms + self._send_work_order(Work[w], w) + else: + # Sim work is submitted directly to Globus Compute + self._gc_submit(Work[w], w) + self._update_state_on_alloc(Work[w], w) + + assert self.term_test() or any( + self.W["active"] != 0 + ), "alloc_f did not return any work, although all workers are idle." + except WorkerException as e: + report_worker_exc(e) + raise LoggedException(e.args[0], e.args[1]) from None + except Exception as e: + logger.error(traceback.format_exc()) + raise LoggedException(e.args) from None + finally: + result = self._gc_final_receive_and_kill(persis_info) + self.wcomms = [] + sys.stdout.flush() + sys.stderr.flush() + return result + + def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: + """Drain remaining GC futures and shut down the generator thread.""" + if any(self.W["persis_state"]): + for w in self.W["worker_id"][self.W["persis_state"] > 0]: + logger.debug(f"Manager sending PERSIS_STOP to worker {w}") + if self.libE_specs.get("final_gen_send", False): + rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0] + work = { + "H_fields": self.gen_specs["persis_in"], + "persis_info": persis_info.get(w), + "tag": PERSIS_STOP, + "libE_info": {"persistent": True, "H_rows": rows_to_send}, + } + self._check_work_order(work, w, force=True) + self._send_work_order(work, w) + self.hist.update_history_to_gen(rows_to_send) + else: + self.wcomms[w].send(PERSIS_STOP, MAN_SIGNAL_KILL) + if not self.W[w]["active"]: + self.W[w]["active"] = self.W[w]["persis_state"] + self.persis_pending.append(w) + + exit_flag = 0 + while self._gc_futures: + done = [f for f in self._gc_futures if f.done()] + if not done: + time.sleep(0.1) + continue + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = self._normalize_gc_result(future.result()) + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception: + continue + self._update_state_on_worker_msg(persis_info, D_recv, w) + + # Drain any pending gen-thread messages (e.g. FINISHED_PERSISTENT_GEN_TAG) + # before sending STOP_TAG, otherwise _clean_up_thread deadlocks. + if 0 in self.W["worker_id"] and self.wcomms[0] is not None: + while self.wcomms[0].mail_flag(): + self._handle_msg_from_worker(persis_info, 0) + self.wcomms[0].send(STOP_TAG, MAN_SIGNAL_FINISH) + + self._init_every_k_save(complete=True) + self._clean_up_thread() + + if self.live_data is not None: + self.live_data.finalize(self.hist) + + persis_info["num_gens_started"] = 0 + return persis_info, exit_flag, self.elapsed() + # --- Main loop def run(self, persis_info: dict) -> tuple[dict, int, int]: @@ -699,6 +923,9 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: logger.debug(f"Manager initiated on node {socket.gethostname()}") logger.info(f"Manager exit_criteria: {self.exit_criteria}") + if self.libE_specs.get("_gc_only"): + return self._run_gc_only(persis_info) + # Continue receiving and giving until termination test is satisfied try: while not self.term_test(): diff --git a/libensemble/specs.py b/libensemble/specs.py index 9ee04baa3..3ca5fcc20 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -116,9 +116,11 @@ class SimSpecs(BaseModel): globus_compute_endpoint: str | None = "" """ - A Globus Compute (https://www.globus.org/compute) ID corresponding to an active endpoint on a remote system. - libEnsemble's workers will submit simulator function instances to this endpoint instead of - calling them locally. + A `Globus Compute `_ endpoint ID corresponding + to an active endpoint on a remote system. When set, libEnsemble enters + **GC-only mode**: no local worker processes are launched and the manager + submits simulation work directly to the Globus Compute endpoint. + ``nworkers`` controls the maximum number of concurrent in-flight tasks. """ threaded: bool | None = False diff --git a/libensemble/tests/functionality_tests/test_gc_manager_submit.py b/libensemble/tests/functionality_tests/test_gc_manager_submit.py new file mode 100644 index 000000000..f4f945b89 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_gc_manager_submit.py @@ -0,0 +1,127 @@ +""" +Tests libEnsemble's manager-side Globus Compute submission (GC-only mode). + +The manager submits simulation work directly to a mocked Globus Compute +endpoint instead of dispatching to local workers. The generator runs on +the manager thread as normal. + +Execute via: + python test_gc_manager_submit.py + +No MPI or local workers are needed — GC-only mode uses local comms with +nworkers acting as the maximum number of concurrent in-flight GC futures. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 1 + +import concurrent.futures +from unittest import mock + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.sampling import UniformSample +from libensemble.libE import libE +from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs +from libensemble.utils.globus_compute import GCSession + +# ── Simulator ───────────────────────────────────────────────────────────────── + + +def norm_sim(H, persis_info, sim_specs, libE_info): + """Evaluate the Euclidean norm of each input point.""" + H_o = np.zeros(len(H), dtype=sim_specs["out"]) + for i in range(len(H)): + H_o["f"][i] = float(np.linalg.norm(H["x"][i])) + return H_o, persis_info + + +# ── Fake GC infrastructure ──────────────────────────────────────────────────── + + +def _make_done_future(value): + """Return a Future that is already resolved with *value*.""" + f = concurrent.futures.Future() + f.set_result(value) + return f + + +def _make_gc_executor(sim_f): + """Return a mock GC executor whose submit_to_registered_function calls + *sim_f* synchronously and wraps the result in a resolved Future.""" + + executor = mock.MagicMock() + executor.register_function.return_value = "mock-fid" + + def fake_submit(fid, args): + result = sim_f(*args) + return _make_done_future(result) + + executor.submit_to_registered_function.side_effect = fake_submit + return executor + + +# ── Main ────────────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + GCSession.clear() + + ENDPOINT = "mock-endpoint-uuid" + SIM_MAX = 20 + N_VIRTUAL_WORKERS = 4 # max concurrent GC futures (no real workers needed) + + vocs = VOCS( + variables={"x0": [-3.0, 3.0], "x1": [-2.0, 2.0]}, + objectives={"f": "MINIMIZE"}, + ) + + sim_specs = SimSpecs( + sim_f=norm_sim, + inputs=["x"], + outputs=[("f", float)], + globus_compute_endpoint=ENDPOINT, + ) + + gen_specs = GenSpecs( + generator=UniformSample(vocs), + inputs=["sim_id"], + persis_in=["f", "sim_id"], + outputs=[("x", float, (2,))], + batch_size=N_VIRTUAL_WORKERS, + ) + + libE_specs = LibeSpecs( + nworkers=N_VIRTUAL_WORKERS, + comms="local", + disable_log_files=True, + safe_mode=False, + ) + + exit_criteria = ExitCriteria(sim_max=SIM_MAX) + + mock_executor = _make_gc_executor(norm_sim) + + with mock.patch.object(GCSession, "_create_executor", return_value=mock_executor): + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs) + + assert flag == 0, f"libEnsemble exited with unexpected flag {flag}" + assert ( + np.sum(H["sim_ended"]) >= SIM_MAX + ), f"Expected at least {SIM_MAX} completed sims, got {np.sum(H['sim_ended'])}" + + completed = H[H["sim_ended"]] + assert len(completed) >= SIM_MAX + + # Every completed point must have a non-negative norm + assert np.all(completed["f"] >= 0.0), "Unexpected negative norm value" + + # Verify the executor was actually used (not bypassed) + assert mock_executor.submit_to_registered_function.call_count >= SIM_MAX, ( + f"Expected at least {SIM_MAX} GC submissions, " f"got {mock_executor.submit_to_registered_function.call_count}" + ) + + print(f"\nGC-only mode: {np.sum(H['sim_ended'])} sims completed via mocked Globus Compute.") + print(f"Best f value: {completed['f'].min():.6f}") + print("\nlibEnsemble GC-only functionality test passed.") diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py index b92850143..5863ba678 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py @@ -73,7 +73,7 @@ "xtol_abs": 1e-6, "ftol_abs": 1e-6, "dist_to_bound_multiple": 0.5, - "max_active_runs": 6, + "max_active_runs": nworkers - 1, "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, @@ -84,7 +84,13 @@ exit_criteria = {"sim_max": 2000} # Perform the run - H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, alloc_specs=alloc_specs, libE_specs=libE_specs) + H, persis_info, flag = libE( + sim_specs, + gen_specs, + exit_criteria, + alloc_specs=alloc_specs, + libE_specs=libE_specs, + ) if is_manager: print("[Manager]:", H[np.where(H["local_min"])]["x"]) diff --git a/libensemble/tests/unit_tests/test_globus_compute.py b/libensemble/tests/unit_tests/test_globus_compute.py new file mode 100644 index 000000000..474eb601a --- /dev/null +++ b/libensemble/tests/unit_tests/test_globus_compute.py @@ -0,0 +1,511 @@ +from unittest import mock + +import numpy as np +import pytest + +from libensemble.executors.globus_compute_executor import ( + GlobusComputeExecutor, + GlobusComputeTask, +) +from libensemble.manager import Manager +from libensemble.message_numbers import TASK_FAILED, WORKER_DONE +from libensemble.utils.globus_compute import GCSession + +# ────────────────────────────────────────────── +# GCSession +# ────────────────────────────────────────────── + + +class TestGCSession: + def setup_method(self): + GCSession.clear() + + def test_get_or_create_executor(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_create.return_value = mock_exec + + ex1 = GCSession.get_or_create_executor("ep-1") + assert ex1 is mock_exec + mock_create.assert_called_once_with("ep-1") + + # Cache hit + ex2 = GCSession.get_or_create_executor("ep-1") + assert ex2 is mock_exec + mock_create.assert_called_once() # No second call + + def test_get_or_create_caches_func_id(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-42" + mock_create.return_value = mock_exec + + def my_func(): + pass + + ex1, fid1 = GCSession.get_or_create("ep-1", my_func) + assert ex1 is mock_exec + assert fid1 == "fid-42" + mock_exec.register_function.assert_called_once_with(my_func) + + # Second call — same (executor, fid) returned, no re-registration + ex2, fid2 = GCSession.get_or_create("ep-1", my_func) + assert ex2 is mock_exec + assert fid2 == "fid-42" + mock_exec.register_function.assert_called_once() + + def test_register_function(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-99" + mock_create.return_value = mock_exec + + def f(): + pass + + ex, fid = GCSession.register_function("ep-1", f) + assert ex is mock_exec + assert fid == "fid-99" + mock_exec.register_function.assert_called_once_with(f) + + def test_module_not_found_returns_none(self): + # Force _create_executor to simulate missing SDK + GCSession._create_executor = classmethod(lambda cls, eid: None) + + ex = GCSession.get_or_create_executor("ep-1") + assert ex is None + + ex, fid = GCSession.get_or_create("ep-1", lambda: None) + assert ex is None + assert fid is None + + def test_thread_safety(self): + import threading + + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid" + mock_create.return_value = mock_exec + + errors = [] + + def access(): + try: + for _ in range(100): + GCSession.get_or_create_executor("ep-t") + GCSession.get_or_create("ep-t", lambda: None) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=access) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Thread safety errors: {errors}" + + +# ────────────────────────────────────────────── +# GlobusComputeTask +# ────────────────────────────────────────────── + + +class TestGlobusComputeTask: + def make_task(self, future=None, app=None): + if app is None: + from libensemble.executors.executor import Application + + app = Application("", name="test_func", calc_type="sim", pyobj=lambda: None) + if future is None: + future = mock.MagicMock() + future.done.return_value = False + return GlobusComputeTask(future, app=app) + + def test_initial_state(self): + task = self.make_task() + assert task.state == "RUNNING" + assert not task.finished + assert task._gc_future is not None + + def test_poll_running(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + task.poll() + assert task.state == "RUNNING" + assert not task.finished + + def test_poll_finished_success(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = None # No exception + task = self.make_task(future=future) + task.poll() + assert task.state == "FINISHED" + assert task.finished + assert task.success + + def test_poll_finished_failure(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.side_effect = RuntimeError("boom") + task = self.make_task(future=future) + task.poll() + assert task.state == "FAILED" + assert task.finished + assert not task.success + + def test_wait_timeout(self): + future = mock.MagicMock() + future.result.side_effect = TimeoutError("timed out") + task = self.make_task(future=future) + with pytest.raises(Exception, match="timed out"): + task.wait(timeout=0.001) + + def test_kill(self): + future = mock.MagicMock() + task = self.make_task(future=future) + task.kill() + assert task.state == "USER_KILLED" + assert task.finished + future.cancel.assert_called_once() + + def test_running(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + assert task.running() + + def test_done(self): + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = None + task = self.make_task(future=future) + assert task.done() + + def test_not_done(self): + future = mock.MagicMock() + future.done.return_value = False + task = self.make_task(future=future) + assert not task.done() + + +# ────────────────────────────────────────────── +# GlobusComputeExecutor +# ────────────────────────────────────────────── + + +class TestGlobusComputeExecutor: + def setup_method(self): + GCSession.clear() + + def test_init(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + assert exctr.endpoint_id == "ep-test" + assert exctr._gc_executor is None # Lazy init + assert exctr.workerID is None + + def test_submit_with_func(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-xyz" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + future_mock = mock.MagicMock() + mock_exec.submit_to_registered_function.return_value = future_mock + + def my_func(x): + return x * 2 + + task = exctr.submit(func=my_func, app_args="hello") + assert isinstance(task, GlobusComputeTask) + assert task._gc_future is future_mock + assert task.app is not None + assert task.app.name == "my_func" + + def test_submit_with_registered_app_pyobj(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-app" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + def app_func(): + return 42 + + exctr.register_app("/fake/path", app_name="myapp", calc_type="sim", pyobj=app_func) + + future_mock = mock.MagicMock() + mock_exec.submit_to_registered_function.return_value = future_mock + + task = exctr.submit(app_name="myapp") + assert isinstance(task, GlobusComputeTask) + assert task.app.name == "myapp" + + def test_submit_without_func_or_app_raises(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + with pytest.raises(ValueError): + exctr.submit() + + def test_register_function_caching(self): + with mock.patch.object(GCSession, "_create_executor") as mock_create: + mock_exec = mock.MagicMock() + mock_exec.register_function.return_value = "fid-cached" + mock_create.return_value = mock_exec + + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr._ensure_gc() + + def my_func(): + pass + + fid1 = exctr._get_func_id(my_func) + fid2 = exctr._get_func_id(my_func) + assert fid1 == fid2 + # Should only register once + assert mock_exec.register_function.call_count == 1 + + def test_manager_kill_received(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + # Set up a mock comm so manager_poll doesn't assert + mock_comm = mock.MagicMock() + mock_comm.mail_flag.return_value = False + exctr.set_worker_info(comm=mock_comm, workerid=1) + assert exctr.manager_kill_received() is False + + def test_register_app_no_pyobj(self): + exctr = GlobusComputeExecutor(endpoint_id="ep-test") + exctr.register_app("/bin/echo", app_name="echo", calc_type="sim") + app = exctr.get_app("echo") + assert app.pyobj is None + assert app.full_path == "/bin/echo" + + +# ────────────────────────────────────────────── +# _normalize_gc_result (Manager static method) +# ────────────────────────────────────────────── + + +class TestNormalizeGcResult: + """Tests for Manager._normalize_gc_result which normalizes sim_f + return values (2-tuple from gest-api, 3-tuple from legacy) to a + consistent 3-tuple ``(out, persis_info, calc_status)``.""" + + def test_3_tuple_passthrough(self): + """Legacy sim_f returns (H_o, persis_info, calc_status).""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {"a": 1}, WORKER_DONE) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"a": 1} + assert status == WORKER_DONE + + def test_3_tuple_with_task_failed(self): + """Legacy sim_f returns a failure status.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {}, TASK_FAILED) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == TASK_FAILED + + def test_2_tuple_with_persis_info(self): + """gest-api wrapper returns (H_o, persis_info) — no calc_status.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {"key": "val"}) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"key": "val"} + assert status == WORKER_DONE + + def test_2_tuple_with_empty_persis_info(self): + """gest-api wrapper returns (H_o, {}).""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, {}) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_2_tuple_with_int_status(self): + """Some sim_f may return (H_o, calc_status) without persis_info.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, WORKER_DONE) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_2_tuple_with_str_status(self): + """Sim_f returns (H_o, 'some_status_string').""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o, "Custom status") + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == "Custom status" + + def test_1_tuple(self): + """Edge case: single-element tuple.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = (H_o,) + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_non_tuple(self): + """Edge case: bare numpy array (not wrapped in tuple).""" + H_o = np.zeros(1, dtype=[("y", float)]) + out, p_info, status = Manager._normalize_gc_result(H_o) + assert out is H_o + assert p_info == {} + assert status == WORKER_DONE + + def test_list_result(self): + """Result returned as a list instead of tuple.""" + H_o = np.zeros(1, dtype=[("y", float)]) + result = [H_o, {"x": 1}, WORKER_DONE] + out, p_info, status = Manager._normalize_gc_result(result) + assert out is H_o + assert p_info == {"x": 1} + assert status == WORKER_DONE + + +# ────────────────────────────────────────────── +# Manager-side GC with gest-api sim_f +# ────────────────────────────────────────────── + + +class TestGatherGcResultsGestApi: + """Tests that _gather_gc_results correctly handles futures returning + 2-tuples (gest-api) and 3-tuples (legacy).""" + + def setup_method(self): + GCSession.clear() + + def _make_mock_manager(self): + """Create a minimal mock Manager with just enough structure for + _gather_gc_results to run.""" + mgr = mock.MagicMock(spec=Manager) + mgr._gc_futures = {} + mgr._normalize_gc_result = Manager._normalize_gc_result + mgr._gather_gc_results = lambda pi: Manager._gather_gc_results(mgr, pi) + mgr._init_every_k_save = mock.MagicMock() + + # Minimal W array with one gen worker (w=0) + W = np.zeros(2, dtype=[("worker_id", int)]) + W["worker_id"] = [0, 1] + mgr.W = W + + # wcomms: gen worker has a comm, virtual worker is None + mock_comm = mock.MagicMock() + mock_comm.mail_flag.return_value = False + mgr.wcomms = {0: mock_comm, 1: None} + + return mgr + + def test_2_tuple_gest_api_future(self): + """Future returns (H_o, persis_info) — gest-api sim via gest_api_sim wrapper.""" + mgr = self._make_mock_manager() + + H_o = np.zeros(1, dtype=[("y", float)]) + H_o["y"] = 42.0 + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = (H_o, {"some": "info"}) + + mgr._gc_futures[future] = (0, 1) # sim_id=0, virtual_w=1 + mgr._gather_gc_results({}) + + mgr._update_state_on_worker_msg.assert_called_once() + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == WORKER_DONE + assert D_recv["calc_out"] is H_o + assert D_recv["persis_info"] == {"some": "info"} + assert D_recv["libE_info"]["H_rows"] == [0] + + def test_3_tuple_legacy_future(self): + """Future returns (H_o, persis_info, calc_status) — legacy sim_f.""" + mgr = self._make_mock_manager() + + H_o = np.zeros(1, dtype=[("y", float)]) + future = mock.MagicMock() + future.done.return_value = True + future.result.return_value = (H_o, {}, WORKER_DONE) + + mgr._gc_futures[future] = (5, 1) + mgr._gather_gc_results({}) + + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == WORKER_DONE + assert D_recv["calc_out"] is H_o + + def test_failed_future(self): + """Future raises an exception — should get TASK_FAILED status.""" + mgr = self._make_mock_manager() + + future = mock.MagicMock() + future.done.return_value = True + future.result.side_effect = RuntimeError("remote error") + + mgr._gc_futures[future] = (3, 1) + mgr._gather_gc_results({}) + + call_args = mgr._update_state_on_worker_msg.call_args + D_recv = call_args[0][1] + assert D_recv["calc_status"] == TASK_FAILED + assert D_recv["calc_out"] is None + + def test_multiple_futures_mixed(self): + """Mix of 2-tuple and 3-tuple futures drained in one call.""" + mgr = self._make_mock_manager() + + H_o1 = np.zeros(1, dtype=[("y", float)]) + H_o1["y"] = 1.0 + H_o2 = np.zeros(1, dtype=[("y", float)]) + H_o2["y"] = 2.0 + + f1 = mock.MagicMock() + f1.done.return_value = True + f1.result.return_value = (H_o1, {}) # 2-tuple (gest-api) + + f2 = mock.MagicMock() + f2.done.return_value = True + f2.result.return_value = (H_o2, {"p": 2}, WORKER_DONE) # 3-tuple (legacy) + + mgr._gc_futures[f1] = (0, 1) + mgr._gc_futures[f2] = (1, 1) + mgr._gather_gc_results({}) + + assert mgr._update_state_on_worker_msg.call_count == 2 + + # Verify both calls got correct calc_status + for call in mgr._update_state_on_worker_msg.call_args_list: + D_recv = call[0][1] + assert D_recv["calc_status"] == WORKER_DONE + + def test_not_done_future_skipped(self): + """Futures that are not done should be skipped and remain in dict.""" + mgr = self._make_mock_manager() + + f = mock.MagicMock() + f.done.return_value = False + + mgr._gc_futures[f] = (0, 1) + mgr._gather_gc_results({}) + + mgr._update_state_on_worker_msg.assert_not_called() + assert f in mgr._gc_futures + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 79fda7c28..8765e077a 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -1,6 +1,4 @@ -import mock import numpy as np -import pytest import libensemble.tests.unit_tests.setup as setup from libensemble.tools.fields_keys import libE_fields @@ -68,75 +66,7 @@ def tupilize(arg1, arg2): assert result == (calc_in, {}) -@pytest.mark.extra -def test_globus_compute_runner_init(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - assert hasattr( - runner, "globus_compute_executor" - ), "Globus ComputeExecutor should have been instantiated when globus_compute_endpoint found in specs" - - -@pytest.mark.extra -def test_globus_compute_runner_pass(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future - globus_compute_future.exception.return_value = None - globus_compute_future.result.return_value = (True, True) - - runner.globus_compute_executor = globus_compute_mock - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - - assert all([out, persis_info]), "Globus Compute runner correctly returned results" - - -@pytest.mark.extra -def test_globus_compute_runner_fail(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "4321" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future - globus_compute_future.exception.return_value = Exception - - runner.globus_compute_executor = globus_compute_mock - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - with pytest.raises(Exception): - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - pytest.fail("Expected exception") - - if __name__ == "__main__": test_normal_runners() test_thread_runners() test_persis_info_from_none() - test_globus_compute_runner_init() - test_globus_compute_runner_pass() - test_globus_compute_runner_fail() diff --git a/libensemble/utils/globus_compute.py b/libensemble/utils/globus_compute.py new file mode 100644 index 000000000..ee7648e63 --- /dev/null +++ b/libensemble/utils/globus_compute.py @@ -0,0 +1,88 @@ +import logging +import threading + +logger = logging.getLogger(__name__) + + +class GCSession: + """Per-process singleton cache for Globus Compute executors. + + Caches executor instances keyed by endpoint_id, ensuring only one + executor per endpoint per process. Thread-safe via ``threading.Lock``. + """ + + _instances: dict[str, tuple] = {} + _lock = threading.Lock() + + @classmethod + def get_or_create_executor(cls, endpoint_id: str): + """Get or create a cached executor for the given endpoint. + + Unlike :meth:`get_or_create`, this does **not** register a function. + """ + with cls._lock: + if endpoint_id in cls._instances: + return cls._instances[endpoint_id][0] + + executor = cls._create_executor(endpoint_id) + if executor is None: + return None + + cls._instances[endpoint_id] = (executor, None) + return executor + + @classmethod + def get_or_create(cls, endpoint_id: str, func): + """Get or create a cached ``(executor, func_id)`` pair. + + The first call for an endpoint creates the executor and registers + the callable. Subsequent calls return the cached pair (the + registered function is re-used). + """ + with cls._lock: + if endpoint_id in cls._instances: + executor, existing_fid = cls._instances[endpoint_id] + if existing_fid is not None: + return executor, existing_fid + # Executor exists but no function registered yet + func_id = executor.register_function(func) + cls._instances[endpoint_id] = (executor, func_id) + return executor, func_id + + executor = cls._create_executor(endpoint_id) + if executor is None: + return None, None + + func_id = executor.register_function(func) + cls._instances[endpoint_id] = (executor, func_id) + return executor, func_id + + @classmethod + def register_function(cls, endpoint_id: str, func): + """Register an additional function with an existing executor. + + Returns ``(executor, func_id)``. Unlike :meth:`get_or_create`, + this always registers and never caches the func_id (caller should + cache it themselves). + """ + executor = cls.get_or_create_executor(endpoint_id) + if executor is None: + return None, None + func_id = executor.register_function(func) + return executor, func_id + + @classmethod + def _create_executor(cls, endpoint_id: str): + try: + from globus_compute_sdk import Executor + except ModuleNotFoundError: + logger.warning("Globus Compute use detected but Globus Compute not importable. " "Is it installed?") + logger.warning("Running function evaluations normally on local resources.") + return None + return Executor(endpoint_id=endpoint_id) + + @classmethod + def clear(cls): + """Clear the cache (primarily for testing).""" + with cls._lock: + cls._instances.clear() diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index b6ac823e1..d55c56a99 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -17,8 +17,6 @@ class Runner: @classmethod def from_specs(cls, specs): - if len(specs.get("globus_compute_endpoint", "")) > 0: - return GlobusComputeRunner(specs) if specs.get("threaded"): return ThreadRunner(specs) if (generator := specs.get("generator")) is not None: @@ -67,36 +65,6 @@ def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, int | Non return out -class GlobusComputeRunner(Runner): - def __init__(self, specs): - super().__init__(specs) - self.globus_compute_executor = self._get_globus_compute_executor()(endpoint_id=specs["globus_compute_endpoint"]) - self.globus_compute_fid = self.globus_compute_executor.register_function(self.f) - - def _get_globus_compute_executor(self): - try: - from globus_compute_sdk import Executor - except ModuleNotFoundError: - logger.warning("Globus Compute use detected but Globus Compute not importable. Is it installed?") - logger.warning("Running function evaluations normally on local resources.") - return None - else: - return Executor - - def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, int | None): - from libensemble.worker import Worker - - libE_info["comm"] = None # 'comm' object not pickle-able - Worker._set_executor(0, None) # ditto for executor - - args = self._truncate_args(calc_in, persis_info, libE_info) - task_fut = self.globus_compute_executor.submit_to_registered_function(self.globus_compute_fid, args) - return task_fut.result() - - def shutdown(self) -> None: - self.globus_compute_executor.shutdown() - - class ThreadRunner(Runner): def __init__(self, specs): super().__init__(specs) @@ -174,8 +142,7 @@ def _create_initial_sample(self, sample_method, num_points): } if sample_method not in samplers: raise ValueError( - f"Unknown initial_sample_method: {sample_method!r}. " - f"Supported: {list(samplers.keys())}" + f"Unknown initial_sample_method: {sample_method!r}. " f"Supported: {list(samplers.keys())}" ) sampler = samplers[sample_method](vocs=self.specs.get("vocs")) else: diff --git a/pixi.lock b/pixi.lock index c4714fc82..fee9aeca6 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:dd8596dc0210788ddfda1f23f01aa4a83aaf85d1242d5b68a530e350e14c174b -size 1084218 +oid sha256:8c0540f7b1f5d52ecbcaeb82c64807c8867e7bcdabc09057d5c4ab128190dfca +size 1092087 diff --git a/pyproject.toml b/pyproject.toml index 45bb84e22..1b1c0ddec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,7 @@ libensemble = { path = ".", editable = true } [tool.pixi.environments] default = [] basic = ["basic"] -extra = ["basic", "extra"] +extra = ["basic", "extra", "globus"] docs = ["docs", "basic"] dev = ["dev", "basic", "extra", "docs"] @@ -122,6 +122,14 @@ pandas = "<3" numpy = "<2.4" proxystore = ">=0.7.1,<0.9" +# globus-compute-sdk and its conda dependency overrides +[tool.pixi.feature.globus.dependencies] +dill = "==0.3.9" +globus-sdk = ">=4.4.0,<5" + +[tool.pixi.feature.globus.pypi-dependencies] +globus-compute-sdk = ">=4.10.1,<5" + [tool.pixi.feature.docs.dependencies] sphinx = ">=8.2.3,<9" sphinxcontrib-bibtex = ">=2.6.5,<3"