Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions libensemble/tests/functionality_tests/test_asktell_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def sim_f(In):
gen_specs = {
"persis_in": ["x", "f", "sim_id"],
"out": [("x", float, (2,))],
"initial_batch_size": 20,
"batch_size": 10,
"initial_batch_size": 2,
"batch_size": 1,
"user": {
"lb": np.array([-3, -2]),
"ub": np.array([3, 2]),
Expand All @@ -56,7 +56,7 @@ def sim_f(In):

vocs = VOCS(variables=variables, objectives=objectives)

exit_criteria = {"gen_max": 201}
exit_criteria = {"gen_max": 11}

for test in range(3):
if test == 0:
Expand Down Expand Up @@ -88,5 +88,8 @@ def sim_f(In):
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs)

if is_manager:
# Basic sanity checks that we actually saved generated inputs/outputs.
assert len(H) >= 11, f"H has length {len(H)}"
assert np.any(np.linalg.norm(H["x"], axis=1) > 0.0), "All saved x values are zero"
assert np.any(H["f"] > 0.0), "All saved f values are zero"
print(H[["sim_id", "x", "f"]][:10])
assert len(H) >= 201, f"H has length {len(H)}"
10 changes: 8 additions & 2 deletions libensemble/tests/functionality_tests/test_cancel_in_alloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
"user": {"uniform_random_pause_ub": 10},
"user": {"uniform_random_pause_ub": 5},
}

gen_specs = {
Expand All @@ -62,7 +62,13 @@
exit_criteria = {"sim_max": 10, "wallclock_max": 300}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs, alloc_specs=alloc_specs)
H, persis_info, flag = libE(
sim_specs,
gen_specs,
exit_criteria,
libE_specs=libE_specs,
alloc_specs=alloc_specs,
)

if is_manager:
test = np.any(H["cancel_requested"]) and np.any(H["kill_sent"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
}

libE_specs["scheduler_opts"] = {"match_slots": True}
exit_criteria = {"sim_max": 40, "wallclock_max": 300}
exit_criteria = {"sim_max": 10, "wallclock_max": 300}

# Perform the run

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
},
}

exit_criteria = {"gen_max": 100, "wallclock_max": 300}
exit_criteria = {"gen_max": 10, "wallclock_max": 300}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs)
Expand Down
9 changes: 6 additions & 3 deletions libensemble/tests/functionality_tests/test_stats_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
"user": {"app": "helloworld"}, # helloworld or six_hump_camel
"user": {
"app": "helloworld",
"dry_run": True,
}, # dry_run avoids real MPI launches; stats format still exercised
}

gen_specs = {
Expand Down Expand Up @@ -86,13 +89,13 @@
# This can improve scheduling when tasks may run across multiple nodes
libE_specs["scheduler_opts"] = {"match_slots": False}

exit_criteria = {"sim_max": 40, "wallclock_max": 300}
exit_criteria = {"sim_max": 12, "wallclock_max": 60}

iterations = 2

# Note that libE_stats.txt output will be appended across libE calls.
for prob_id in range(iterations):
sim_specs["user"]["app"] = "six_hump_camel"
sim_specs["user"]["app"] = "helloworld"

libE_specs["ensemble_dir_path"] = (
"./ensemble_test_stats" + str(nworkers) + "_" + libE_specs.get("comms") + "_" + str(prob_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@

# Run with random num_procs/num_gpus for each simulation
gpu_test.persis_info = {}
gpu_test.exit_criteria = ExitCriteria(sim_max=20)
gpu_test.exit_criteria = ExitCriteria(sim_max=10)

gpu_test.run()
if gpu_test.is_manager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
},
)

gpu_test.exit_criteria = ExitCriteria(sim_max=40, wallclock_max=300)
gpu_test.exit_criteria = ExitCriteria(sim_max=10, wallclock_max=300)

if gpu_test.ready():
gpu_test.run()
Expand Down
18 changes: 13 additions & 5 deletions libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def six_hump_camel_func(x):

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

workflow = Ensemble(parse_args=True)

if workflow.is_manager:
Expand All @@ -58,14 +57,23 @@ def six_hump_camel_func(x):
n = 2

vocs = VOCS(
variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [0, 1], "edge_on_cube": [0, 1]},
variables={
"core": [-3, 3],
"edge": [-2, 2],
"core_on_cube": [0, 1],
"edge_on_cube": [0, 1],
},
objectives={"energy": "MINIMIZE"},
)

aposmm = APOSMM(
vocs,
max_active_runs=max(1, workflow.nworkers - 1),
variables_mapping={"x": ["core", "edge"], "x_on_cube": ["core_on_cube", "edge_on_cube"], "f": ["energy"]},
max_active_runs=6,
variables_mapping={
"x": ["core", "edge"],
"x_on_cube": ["core_on_cube", "edge_on_cube"],
"f": ["energy"],
},
initial_sample_size=100,
sample_points=np.round(minima, 1),
localopt_method="LN_BOBYQA",
Expand All @@ -82,7 +90,7 @@ def six_hump_camel_func(x):
)

workflow.sim_specs = SimSpecs(simulator=six_hump_camel_func, vocs=vocs)
workflow.exit_criteria = ExitCriteria(sim_max=2000, wallclock_max=600)
workflow.exit_criteria = ExitCriteria(sim_max=3000, wallclock_max=600)

# Perform the run
H, _, _ = workflow.run()
Expand Down
9 changes: 9 additions & 0 deletions libensemble/tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ def cleanup(root_dir):
"opt_*.txt_flag",
"test_executor_forces_tutorial",
"test_executor_forces_tutorial_2",
# Coverage output generated by merge_coverage_reports
"coverage.xml",
# Cache files created by Ensemble/calling scripts
".libe_cache_*.meta.json",
# Artifacts from forces build step
"forces_app",
"scaling_tests/forces/forces_app/forces.x",
# Task output scripts in unit tests
"libe_task_*.sh",
]
dirs_to_clean = UNIT_TEST_DIRS + [REG_TEST_SUBDIR, FUNC_TEST_SUBDIR]
for dir_path in dirs_to_clean:
Expand Down
22 changes: 21 additions & 1 deletion libensemble/tests/unit_tests/test_asktell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np

from libensemble.utils.misc import unmap_numpy_array
from libensemble.utils.misc import map_numpy_array, unmap_numpy_array


def _check_conversion(H, npp, mapping={}):
Expand Down Expand Up @@ -94,6 +94,22 @@ def test_awkward_H():
_check_conversion(H, npp)


def test_map_numpy_array_skips_missing_mapping_sources():
"""Test mapping only uses entries represented in the source array."""

dtype = [("x0", float), ("x1", float), ("priority", float)]
H = np.zeros(2, dtype=dtype)
H[0] = (1.1, 2.2, 0.5)
H[1] = (3.3, 4.4, 0.25)

mapping = {"x": ["x0", "x1"], "f": ["energy"]}
H_mapped = map_numpy_array(H, mapping)

assert H_mapped.dtype.names == ("x", "priority")
assert np.array_equal(H_mapped["x"], [[1.1, 2.2], [3.3, 4.4]])
assert np.array_equal(H_mapped["priority"], H["priority"])


def test_unmap_numpy_array_basic():
"""Test basic unmapping of x and x_on_cube arrays"""

Expand Down Expand Up @@ -148,6 +164,10 @@ def test_unmap_numpy_array_edge_cases():
H_none = unmap_numpy_array(None, {"x": ["x0", "x1"]})
assert H_none is None

# Mapping entries for absent fields are ignored
H_unmapped = unmap_numpy_array(H, {"missing": ["y"], "x": ["x0", "x1"]})
assert H_unmapped.dtype.names == ("sim_id", "x0", "x1", "f")


if __name__ == "__main__":
# test_awkward_list_dict()
Expand Down
48 changes: 29 additions & 19 deletions libensemble/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,30 @@ def unmap_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray:
"""
if not mapping or array is None:
return array
# Create new dtype with unmapped fields

active_mapping = {field: mapping[field] for field in array.dtype.names if field in mapping}
if not active_mapping:
return array

new_fields = []
for field in array.dtype.names:
if field in mapping:
for var_name in mapping[field]:
if field in active_mapping:
for var_name in active_mapping[field]:
new_fields.append((var_name, array[field].dtype.type))
else:
# Preserve the original field structure including per-row shape
field_dtype = array.dtype[field]
new_fields.append((field, field_dtype))
unmapped_array = np.zeros(len(array), dtype=new_fields)
for field in array.dtype.names:
if field in mapping:
if field in active_mapping:
# Unmap array fields
if len(array[field].shape) == 1:
# Scalar field mapped to single variable
unmapped_array[mapping[field][0]] = array[field]
unmapped_array[active_mapping[field][0]] = array[field]
else:
# Multi-dimensional field
for i, var_name in enumerate(mapping[field]):
for i, var_name in enumerate(active_mapping[field]):
unmapped_array[var_name] = array[field][:, i]
else:
# Copy non-mapped fields
Expand All @@ -230,16 +234,25 @@ def map_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray:
if not mapping or array is None:
return array

# Create new dtype with mapped fields
# Some mappings may apply only on ingest. For example, generator suggestions
# usually contain variables but not objective values.
active_mapping = {
mapped_name: val_list
for mapped_name, val_list in mapping.items()
if all(val in array.dtype.names for val in val_list)
}
if not active_mapping:
return array

new_fields: list[tuple] = []

# Track fields processed by mapping to avoid duplication
mapped_source_fields = set()
for key, val_list in mapping.items():
for val_list in active_mapping.values():
mapped_source_fields.update(val_list)

# First add mapped fields from the mapping definition
for mapped_name, val_list in mapping.items():
for mapped_name, val_list in active_mapping.items():
first_var = val_list[0]
# We assume all components have the same type, take from first
base_type = array.dtype[first_var]
Expand All @@ -257,20 +270,17 @@ def map_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray:
# remove duplicates from new_fields
new_fields = list(dict.fromkeys(new_fields))

# Create the new array
mapped_array = np.zeros(len(array), dtype=new_fields)

# Fill the new array
for field in mapped_array.dtype.names:
# Mapped field: stack the source columns
val_list = mapping[field]
if len(val_list) == 1:
mapped_array[field] = array[val_list[0]]
if field in active_mapping:
val_list = active_mapping[field]
if len(val_list) == 1:
mapped_array[field] = array[val_list[0]]
else:
mapped_array[field] = np.stack([array[val] for val in val_list], axis=1)
else:
# Stack columns horizontally for each row
# We need to extract each column, then stack them along axis 1
cols = [array[val] for val in val_list]
mapped_array[field] = np.stack(cols, axis=1)
mapped_array[field] = array[field]

return mapped_array

Expand Down
17 changes: 11 additions & 6 deletions libensemble/utils/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ def _create_initial_sample(self, sample_method, num_points):
}
if sample_method not in samplers:
raise ValueError(
f"Unknown initial_sample_method: {sample_method!r}. "
f"Supported: {list(samplers.keys())}"
f"Unknown initial_sample_method: {sample_method!r}. " f"Supported: {list(samplers.keys())}"
)
sampler = samplers[sample_method](vocs=self.specs.get("vocs"))
else:
Expand Down Expand Up @@ -234,24 +233,30 @@ def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (

class LibensembleGenRunner(StandardGenRunner):
def _get_initial_suggest(self, libE_info) -> npt.NDArray:
"""Get initial batch from generator based on generator type"""
"""Get initial batch from a LibensembleGenerator.

LibensembleGenerator.suggest_numpy emits VOCS-field-named structured arrays
(e.g. x0/x1, energy). The manager-side history expects mapped fields (x, f)
unless the user explicitly requested otherwise.
"""
initial_batch = self.specs.get("initial_batch_size") or self.specs.get("batch_size") or libE_info["batch_size"]
H_out = self.gen.suggest_numpy(initial_batch)
return H_out
return map_numpy_array(H_out, mapping=getattr(self.gen, "variables_mapping", {}))

def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list):
numpy_out = self.gen.suggest_numpy(batch_size)
numpy_out = map_numpy_array(numpy_out, mapping=getattr(self.gen, "variables_mapping", {}))
if callable(getattr(self.gen, "suggest_updates", None)):
updates = self.gen.suggest_updates()
else:
updates = None
return numpy_out, updates

def _convert_ingest(self, x: npt.NDArray) -> list:
self.gen.ingest_numpy(x)
self.gen.ingest_numpy(unmap_numpy_array(x, mapping=getattr(self.gen, "variables_mapping", {})))

def _convert_initial_ingest(self, x: npt.NDArray) -> list:
self.gen.ingest_numpy(x)
self.gen.ingest_numpy(unmap_numpy_array(x, mapping=getattr(self.gen, "variables_mapping", {})))


class LibensembleGenThreadRunner(StandardGenRunner):
Expand Down
Loading