diff --git a/.gitignore b/.gitignore index 479c7188..e6a25e22 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ temp_*.* .DS_Store .python-version .nox +.venv ### Visual Studio Code ### !.vscode/settings.json diff --git a/kernel_tuner/__init__.py b/kernel_tuner/__init__.py index 40b88d46..62f48362 100644 --- a/kernel_tuner/__init__.py +++ b/kernel_tuner/__init__.py @@ -1,5 +1,10 @@ from kernel_tuner.integration import store_results, create_device_targets -from kernel_tuner.interface import tune_kernel, tune_kernel_T1, run_kernel +from kernel_tuner.interface import ( + tune_kernel, + tune_kernel_T1, + tune_cache, + run_kernel, +) from importlib.metadata import version diff --git a/kernel_tuner/core.py b/kernel_tuner/core.py index 5352ced7..794c2f88 100644 --- a/kernel_tuner/core.py +++ b/kernel_tuner/core.py @@ -480,11 +480,14 @@ def benchmark(self, func, gpu_args, instance, verbose, objective, skip_nvml_sett print( f"skipping config {util.get_instance_string(instance.params)} reason: too many resources requested for launch" ) - result[objective] = util.RuntimeFailedConfig() + result['__error__'] = util.RuntimeFailedConfig() else: logging.debug("benchmark encountered runtime failure: " + str(e)) print("Error while benchmarking:", instance.name) raise e + + assert util.check_result_type(result), "The error in a result MUST be an actual error." + return result def check_kernel_output( @@ -567,7 +570,7 @@ def compile_and_benchmark(self, kernel_source, gpu_args, params, kernel_options, instance = self.create_kernel_instance(kernel_source, kernel_options, params, verbose) if isinstance(instance, util.ErrorConfig): - result[to.objective] = util.InvalidConfig() + result['__error__'] = util.InvalidConfig() else: # Preprocess the argument list. This is required to deal with `MixedPrecisionArray`s gpu_args = _preprocess_gpu_arguments(gpu_args, params) @@ -577,7 +580,7 @@ def compile_and_benchmark(self, kernel_source, gpu_args, params, kernel_options, start_compilation = time.perf_counter() func = self.compile_kernel(instance, verbose) if not func: - result[to.objective] = util.CompilationFailedConfig() + result['__error__'] = util.CompilationFailedConfig() else: # add shared memory arguments to compiled module if kernel_options.smem_args is not None: @@ -622,6 +625,8 @@ def compile_and_benchmark(self, kernel_source, gpu_args, params, kernel_options, result["verification_time"] = last_verification_time or 0 result["benchmark_time"] = last_benchmark_time or 0 + assert util.check_result_type(result), "The error in a result MUST be an actual error." + return result def compile_kernel(self, instance, verbose): diff --git a/kernel_tuner/file_utils.py b/kernel_tuner/file_utils.py index 7684eeb8..55fd8d10 100644 --- a/kernel_tuner/file_utils.py +++ b/kernel_tuner/file_utils.py @@ -20,7 +20,7 @@ def input_file_schema(): :returns: the current version of the T1 schemas and the JSON string of the schema :rtype: string, string - """ + """ current_version = "1.0.0" input_file = schema_dir.joinpath(f"T1/{current_version}/input-schema.json") with input_file.open() as fh: @@ -30,9 +30,9 @@ def input_file_schema(): def get_input_file(filepath: Path, validate=True) -> dict[str, any]: """Load the T1 input file from the given path, validates it and returns contents if valid. - :param filepath: Path to the input file to load. - :returns: the contents of the file if valid. - """ + :param filepath: Path to the input file to load. + :returns: the contents of the file if valid. + """ with filepath.open() as fp: input_file = json.load(fp) if validate: @@ -57,20 +57,38 @@ def output_file_schema(target): return current_version, json_string -def get_configuration_validity(objective) -> str: +def get_configuration_validity(error) -> str: """Convert internal Kernel Tuner error to string.""" errorstring: str - if not isinstance(objective, util.ErrorConfig): + if not isinstance(error, util.ErrorConfig): errorstring = "correct" else: - if isinstance(objective, util.CompilationFailedConfig): + if isinstance(error, util.CompilationFailedConfig): errorstring = "compile" - elif isinstance(objective, util.RuntimeFailedConfig): + elif isinstance(error, util.RuntimeFailedConfig): errorstring = "runtime" - elif isinstance(objective, util.InvalidConfig): + elif isinstance(error, util.InvalidConfig): errorstring = "constraints" else: - raise ValueError(f"Unkown objective type {type(objective)}, value {objective}") + raise ValueError(f"Unkown error type {type(error)}, value {error}") + return errorstring + + +def get_configuration_validity2(result) -> str: + """Convert internal Kernel Tuner error to string.""" + errorstring: str + if "__error__" not in result: + errorstring = "correct" + else: + error = result["__error__"] + if isinstance(error, util.CompilationFailedConfig): + errorstring = "compile" + elif isinstance(error, util.RuntimeFailedConfig): + errorstring = "runtime" + elif isinstance(error, util.InvalidConfig): + errorstring = "constraints" + else: + raise ValueError(f"Unkown error type {type(error)}, value {error}") return errorstring @@ -103,6 +121,11 @@ def get_t4_results(results, tune_params, objective="time"): :type objective: string """ + assert not isinstance(objective, (list, tuple)) + + if isinstance(objective, (list, tuple)) and len(objective) > 1: + raise ValueError("The T4 format does not support multiple objectives.") + timing_keys = ["compile_time", "benchmark_time", "framework_time", "strategy_time", "verification_time"] not_measurement_keys = list(tune_params.keys()) + timing_keys + ["timestamp"] + ["times"] @@ -129,7 +152,8 @@ def get_t4_results(results, tune_params, objective="time"): out["times"] = timings # encode the validity of the configuration - out["invalidity"] = get_configuration_validity(result[objective]) + # out["invalidity"] = get_configuration_validity(result[objective]) + out["invalidity"] = get_configuration_validity2(result) # Kernel Tuner does not support producing results of configs that fail the correctness check # therefore correctness is always 1 @@ -143,10 +167,10 @@ def get_t4_results(results, tune_params, objective="time"): out["measurements"] = measurements # objectives - # In Kernel Tuner we currently support only one objective at a time, this can be a user-defined - # metric that combines scores from multiple different quantities into a single value to support - # multi-objective tuning however. - out["objectives"] = [objective] + # out["objectives"] = objective + objectives = [objective] if isinstance(objective, str) else list(objective) + assert isinstance(objectives, list) + out["objectives"] = objectives # append to output output_data.append(out) @@ -310,7 +334,7 @@ def load_module(module_name): spec = spec_from_file_location(module_name, file_path) if spec is None: raise ImportError(f"Could not load spec from {file_path}") - + # create a module from the spec and execute it module = module_from_spec(spec) spec.loader.exec_module(module) @@ -322,6 +346,6 @@ def load_module(module_name): module = load_module(file_path.stem) except ImportError: module = load_module(f"{file_path.parent.stem}.{file_path.stem}") - + # return the class from the module return getattr(module, class_name) diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py index 0641eb7e..032fc61c 100644 --- a/kernel_tuner/interface.py +++ b/kernel_tuner/interface.py @@ -65,6 +65,7 @@ pyatf_strategies, random_sample, simulated_annealing, + pymoo_minimize, skopt ) from kernel_tuner.strategies.wrapper import OptAlgWrapper @@ -86,6 +87,8 @@ "skopt": skopt, "firefly_algorithm": firefly_algorithm, "bayes_opt": bayes_opt, + "nsga2": pymoo_minimize, + "nsga3": pymoo_minimize, "pyatf_strategies": pyatf_strategies, } @@ -438,7 +441,7 @@ def __deepcopy__(self, _): """Optimization objective to sort results on, consisting of a string that also occurs in results as a metric or observed quantity, default 'time'. Please see :ref:`objectives`.""", - "string", + "str | list[str]", ), ), ( @@ -446,7 +449,7 @@ def __deepcopy__(self, _): ( """boolean that specifies whether the objective should be maximized (True) or minimized (False), default False.""", - "bool", + "bool | list[bool]", ), ), ( @@ -477,6 +480,7 @@ def __deepcopy__(self, _): ("metrics", ("specifies user-defined metrics, please see :ref:`metrics`.", "dict")), ("simulation_mode", ("Simulate an auto-tuning search from an existing cachefile", "bool")), ("observers", ("""A list of Observers to use during tuning, please see :ref:`observers`.""", "list")), + ("seed", ("""The random seed.""", "int")), ] ) @@ -590,6 +594,8 @@ def tune_kernel( observers=None, objective=None, objective_higher_is_better=None, + objectives=None, + seed=None, ): start_overhead_time = perf_counter() if log: @@ -599,8 +605,20 @@ def tune_kernel( _check_user_input(kernel_name, kernelsource, arguments, block_size_names) - # default objective if none is specified - objective, objective_higher_is_better = get_objective_defaults(objective, objective_higher_is_better) + if objectives: + if isinstance(objectives, dict): + objective = list(objectives.keys()) + objective_higher_is_better = list(objectives.values()) + else: + raise ValueError("objectives should be a dict of (objective, higher_is_better) pairs") + else: + objective, objective_higher_is_better = get_objective_defaults(objective, objective_higher_is_better) + objective = [objective] + objective_higher_is_better = [objective_higher_is_better] + + assert isinstance(objective, list) + assert isinstance(objective_higher_is_better, list) + assert len(objective) == len(objective_higher_is_better) # check for forbidden names in tune parameters util.check_tune_params_list(tune_params, observers, simulation_mode=simulation_mode) @@ -624,9 +642,9 @@ def tune_kernel( if "max_fevals" in strategy_options: tuning_options["max_fevals"] = strategy_options["max_fevals"] if "time_limit" in strategy_options: - tuning_options["time_limit"] = strategy_options["time_limit"] + tuning_options["time_limit"] = strategy_options["time_limit"] if "searchspace_construction_options" in strategy_options: - searchspace_construction_options = strategy_options["searchspace_construction_options"] + searchspace_construction_options = strategy_options["searchspace_construction_options"] # log the user inputs logging.debug("tune_kernel called") @@ -701,13 +719,33 @@ def preprocess_cache(filepath): # finished iterating over search space if results: # checks if results is not empty - best_config = util.get_best_config(results, objective, objective_higher_is_better) - # add the best configuration to env - env["best_config"] = best_config - if not device_options.quiet: - units = getattr(runner, "units", None) - print("best performing configuration:") - util.print_config_output(tune_params, best_config, device_options.quiet, metrics, units) + if len(objective) == 1: + objective = objective[0] + objective_higher_is_better = objective_higher_is_better[0] + best_config = util.get_best_config(results, objective, objective_higher_is_better) + # add the best configuration to env + env['best_config'] = best_config + if not device_options.quiet: + units = getattr(runner, "units", None) + keys = list(tune_params.keys()) + keys += [objective] + if metrics: + keys += list(metrics.keys()) + print(f"\nBEST PERFORMING CONFIGURATION FOR OBJECTIVE {objective}:") + print(util.get_config_string(best_config, keys, units)) + else: + pareto_front = util.get_pareto_results(results, objective, objective_higher_is_better) + # add the best configuration to env + env['best_config'] = pareto_front + if not device_options.quiet: + units = getattr(runner, "units", None) + keys = list(tune_params.keys()) + keys += list(objective) + if metrics: + keys += list(metrics.keys) + print(f"\nBEST PERFORMING CONFIGURATIONS FOR OBJECTIVES: {objective}:") + for best_config in pareto_front: + print(util.get_config_string(best_config, keys, units)) elif not device_options.quiet: print("no results to report") @@ -722,6 +760,28 @@ def preprocess_cache(filepath): tune_kernel.__doc__ = _tune_kernel_docstring + +def tune_cache(*, + cache_path, + restrictions = None, + **kwargs, +): + cache = util.read_cache(cache_path, open_cache=False) + tune_args = util.infer_args_from_cache(cache) + _restrictions = [util.infer_restrictions_from_cache(cache)] + + # Add the user provided restrictions + if restrictions: + if isinstance(restrictions, list): + _restrictions.extend(restrictions) + else: + raise ValueError("The restrictions must be a list()") + + tune_args.update(kwargs) + + return tune_kernel(**tune_args, cache=cache_path, restrictions=_restrictions, simulation_mode=True) + + _run_kernel_docstring = """Compile and run a single kernel Compiles and runs a single kernel once, given a specific instance of the kernels tuning parameters. @@ -869,7 +929,7 @@ def tune_kernel_T1( strategy_options: dict={}, ) -> tuple: """Call the tune function with a T1 input file. - + The device, strategy and strategy_options can be overridden by passing a strategy name and options, otherwise the input file specification is used. """ inputs = get_input_file(input_filepath) diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py index 5e53093b..58f3a5b9 100644 --- a/kernel_tuner/runners/sequential.py +++ b/kernel_tuner/runners/sequential.py @@ -5,6 +5,7 @@ from kernel_tuner.core import DeviceInterface from kernel_tuner.runners.runner import Runner +import kernel_tuner.util as util from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache @@ -44,8 +45,15 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob #move data to the GPU self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments) + # It is the task of the cost function to increment there counters + self.config_eval_count = 0 + self.infeasable_config_eval_count = 0 + def get_environment(self, tuning_options): - return self.dev.get_environment() + env = self.dev.get_environment() + env["config_eval_count"] = self.config_eval_count + env["infeasable_config_eval_count"] = self.infeasable_config_eval_count + return env def run(self, parameter_space, tuning_options): """Iterate through the entire parameter space using a single Python process. @@ -90,17 +98,19 @@ def run(self, parameter_space, tuning_options): result = self.dev.compile_and_benchmark(self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options) + assert util.check_result_type(result) + params.update(result) - if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig): + if '__error__' in result: logging.debug('kernel configuration was skipped silently due to compile or runtime failure') # only compute metrics on configs that have not errored - if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig): + if tuning_options.metrics and '__error__' not in params: params = process_metrics(params, tuning_options.metrics) # get the framework time by estimating based on other times - total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) + total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time) params['strategy_time'] = self.last_strategy_time params['framework_time'] = max(total_time - (params['compile_time'] + params['verification_time'] + params['benchmark_time'] + params['strategy_time']), 0) params['timestamp'] = str(datetime.now(timezone.utc)) @@ -113,6 +123,8 @@ def run(self, parameter_space, tuning_options): # add configuration to cache store_cache(x_int, params, tuning_options) + assert util.check_result_type(params) + # all visited configurations are added to results to provide a trace for optimization strategies results.append(params) diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py index 9695879d..344c8197 100644 --- a/kernel_tuner/runners/simulation.py +++ b/kernel_tuner/runners/simulation.py @@ -49,7 +49,8 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob :type iterations: int """ self.quiet = device_options.quiet - self.dev = SimulationDevice(1024, dict(device_name="Simulation"), self.quiet) + # NOTE(maric): had to increase max_threas so the default restraints would pass + self.dev = SimulationDevice(1_000_000_000, dict(device_name="Simulation"), self.quiet) self.kernel_source = kernel_source self.simulation_mode = True @@ -60,10 +61,16 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob self.last_strategy_time = 0 self.units = {} + # It is the task of the cost function to increment there counters + self.config_eval_count = 0 + self.infeasable_config_eval_count = 0 + def get_environment(self, tuning_options): env = self.dev.get_environment() env["simulation"] = True env["simulated_time"] = tuning_options.simulated_time + env["config_eval_count"] = self.config_eval_count + env["infeasable_config_eval_count"] = self.infeasable_config_eval_count return env def run(self, parameter_space, tuning_options): @@ -84,7 +91,7 @@ def run(self, parameter_space, tuning_options): results = [] - # iterate over parameter space + # iterate over parameter space for element in parameter_space: # check if element is in the cache @@ -93,7 +100,7 @@ def run(self, parameter_space, tuning_options): result = tuning_options.cache[x_int].copy() # only compute metrics on configs that have not errored - if tuning_options.metrics and not isinstance(result.get(tuning_options.objective), util.ErrorConfig): + if tuning_options.metrics and "__error__" not in result: result = util.process_metrics(result, tuning_options.metrics) @@ -148,7 +155,7 @@ def run(self, parameter_space, tuning_options): self.start_time = perf_counter() result['framework_time'] = total_time - self.last_strategy_time - result[tuning_options.objective] = util.InvalidConfig() + result["__error__"] = util.InvalidConfig() results.append(result) warn(f"Configuration {element} not in cache, does not pass restrictions. Will be treated as an InvalidConfig, but make sure you are evaluating the correct cache file.") continue diff --git a/kernel_tuner/searchspace.py b/kernel_tuner/searchspace.py index 5be3aabe..2c004c33 100644 --- a/kernel_tuner/searchspace.py +++ b/kernel_tuner/searchspace.py @@ -414,7 +414,7 @@ def __build_searchspace_pyATF(self, block_size_names: list, max_threads: int, so # Define a bogus cost function costfunc = CostFunction(":") # bash no-op - + # set data self.tune_params_pyatf = self.get_tune_params_pyatf(block_size_names, max_threads) @@ -464,7 +464,7 @@ def __parameter_space_list_to_lookup_and_return_type( parameter_space_dict, size_list, ) - + def __build_searchspace(self, block_size_names: list, max_threads: int, solver: Solver): """Compute valid configurations in a search space based on restrictions and max_threads.""" # instantiate the parameter space with all the variables @@ -505,7 +505,8 @@ def __add_restrictions(self, parameter_space: Problem) -> Problem: and not isinstance(restrictions[0], (Constraint, FunctionConstraint, str)) and callable(restrictions[0]) and len(signature(restrictions[0]).parameters) == 1 - and len(self.param_names) > 1): + and len(self.param_names) > 1 + ): restrictions = restrictions[0] if isinstance(restrictions, list): for restriction in restrictions: @@ -695,9 +696,9 @@ def get_list_dict(self) -> dict: return self.__dict def get_list_numpy(self) -> np.ndarray: - """Get the parameter space list as a NumPy array of tuples with mixed types. - - Rarely faster or more convenient than `get_list_param_indices_numpy` or `get_list_numpy_numeric`. + """Get the parameter space list as a NumPy array of tuples with mixed types. + + Rarely faster or more convenient than `get_list_param_indices_numpy` or `get_list_numpy_numeric`. Initializes the NumPy array if not yet done. Returns: @@ -715,8 +716,8 @@ def get_list_numpy(self) -> np.ndarray: return self.__numpy def get_list_param_indices_numpy(self) -> np.ndarray: - """Get the parameter space list as a 2D NumPy array of parameter value indices. - + """Get the parameter space list as a 2D NumPy array of parameter value indices. + Same as mapping `get_param_indices` over the searchspace, but faster. Assumes that the parameter configs have the same order as `tune_params`. @@ -732,7 +733,7 @@ def get_list_param_indices_numpy(self) -> np.ndarray: for param_name, param_values in self.tune_params.items(): tune_params_to_index_lookup.append({ value: index for index, value in enumerate(param_values) }) tune_params_from_index_lookup.append({ index: value for index, value in enumerate(param_values) }) - if (all_values_integer_nonnegative and + if (all_values_integer_nonnegative and not all(isinstance(v, int) and 0 <= v < 2**15 for v in param_values) ): all_values_integer_nonnegative = False @@ -776,7 +777,7 @@ def get_list_param_indices_numpy(self) -> np.ndarray: self.__list_param_indices = self.__list_param_indices.astype(np.int64) # else: # self.__list_param_indices = self.__list_param_indices.astype(np.int32) - # + # # the below types do not have a sizable performance benifit currently elif largest_index >= 2**15: # if the largest index is larger than 2**15, use int32 to avoid overflow @@ -809,8 +810,8 @@ def get_list_param_indices_numpy_max(self): return np.iinfo(self.get_list_param_indices_numpy().dtype).max def get_list_numpy_numeric(self) -> np.ndarray: - """Get the parameter space list as a 2D NumPy array of numeric values. - + """Get the parameter space list as a 2D NumPy array of numeric values. + This is a view of the NumPy array returned by `get_list_numpy`, but with only numeric values. If the searchspace contains non-numeric values, their index will be used instead. @@ -1112,11 +1113,11 @@ def __get_random_neighbor_adjacent(self, param_config: tuple) -> tuple: # as the selected param config does not differ anywhere, remove it from the matches if param_config_index is not None: matching_indices.remove(param_config_index) - + # if there are matching indices, return a random one if len(matching_indices) > 0: self.__add_to_neighbor_partial_cache(param_config, matching_indices, "adjacent", full_neighbors=allowed_index_difference == max_index_difference) - + # get a random index from the matching indices random_neighbor_index = choice(matching_indices) return self.get_param_configs_at_indices([random_neighbor_index])[0] @@ -1268,7 +1269,7 @@ def get_distributed_random_sample_indices(self, num_samples: int, sampling_facto num_samples = round(self.size / 2) if num_samples == self.size: return np.shuffle([range(self.size)]) - + # adjust the number of random samples if necessary sampling_factor = max(1, sampling_factor) num_random_samples = min(sampling_factor * num_samples, self.size) @@ -1345,9 +1346,9 @@ def get_LHS_sample_indices(self, num_samples: int) -> List[int]: # get the Latin Hypercube of samples target_samples_param_indices = LatinHypercube(len(self.params_values)).integers( - l_bounds=self.get_param_indices_lower_bounds(), - u_bounds=self.get_param_indices_upper_bounds(), - n=num_samples, + l_bounds=self.get_param_indices_lower_bounds(), + u_bounds=self.get_param_indices_upper_bounds(), + n=num_samples, endpoint=True) target_samples_param_indices = np.array(target_samples_param_indices, dtype=self.params_values_indices.dtype) @@ -1482,7 +1483,7 @@ def get_random_neighbor(self, param_config: tuple, neighbor_method=None, use_par random_neighbor = self.pop_random_partial_neighbor(param_config, neighbor_method) if random_neighbor is not None: return random_neighbor - + # check if there is a neighbor method to use if neighbor_method is None: neighbor_method = self.neighbor_method diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py index b51274ce..f99525a6 100644 --- a/kernel_tuner/strategies/common.py +++ b/kernel_tuner/strategies/common.py @@ -103,7 +103,6 @@ def __init__( self.budget_spent_fraction = 0.0 self.invalid_return_value = invalid_value - def __call__(self, x, check_restrictions=True): """Cost function used by almost all strategies.""" self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time) @@ -115,6 +114,8 @@ def __call__(self, x, check_restrictions=True): # check if max_fevals is reached or time limit is exceeded self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options) + self.runner.config_eval_count += 1 + # snap values in x to nearest actual value for each parameter, unscale x if needed if self.snap: if self.scaling: @@ -133,7 +134,6 @@ def __call__(self, x, check_restrictions=True): if check_restrictions and self.searchspace.restrictions: legal = self.searchspace.is_param_config_valid(tuple(params)) - if not legal: if "constraint_aware" in self.tuning_options.strategy_options and self.tuning_options.strategy_options["constraint_aware"]: # attempt to repair @@ -146,9 +146,12 @@ def __call__(self, x, check_restrictions=True): if not legal: params_dict = dict(zip(self.searchspace.tune_params.keys(), params)) result = params_dict - result[self.tuning_options.objective] = util.InvalidConfig() + result['__error__'] = util.InvalidConfig() + self.runner.infeasable_config_eval_count += 1 if legal: + assert ('__error__' not in result), "A legal config MUST NOT have an error result." + # compile and benchmark this instance res = self.runner.run([params], self.tuning_options) result = res[0] @@ -162,24 +165,25 @@ def __call__(self, x, check_restrictions=True): # upon returning from this function control will be given back to the strategy, so reset the start time self.runner.last_strategy_start_time = perf_counter() - # get numerical return value, taking optimization direction into account - return_value = result[self.tuning_options.objective] - if not isinstance(return_value, util.ErrorConfig): - # this is a valid configuration, so invert value in case of maximization - return_value = -return_value if self.tuning_options.objective_higher_is_better else return_value + # get the cost of the result + cost_vec = util.get_result_cost( + result, + self.tuning_options.objective, + self.tuning_options.objective_higher_is_better + ) + + if len(cost_vec) == 1: + cost_0 = cost_vec[0] + # include raw data in return if requested + if self.return_raw is not None: + try: + return cost_0, result[self.return_raw] + except KeyError: + return cost_0, [np.nan] + else: + return cost_0 else: - # this is not a valid configuration, replace with float max if needed - if not self.return_invalid: - return_value = self.invalid_return_value - - # include raw data in return if requested - if self.return_raw is not None: - try: - return return_value, result[self.return_raw] - except KeyError: - return return_value, [np.nan] - - return return_value + return cost_vec def get_start_pos(self): """Get starting position for optimization.""" @@ -317,7 +321,6 @@ def scale_from_params(params, tune_params, eps): return x - def unscale_and_snap_to_nearest_valid(x, params, searchspace, eps): """Helper func to snap to the nearest valid configuration""" # params is nearest unscaled point, but is not valid diff --git a/kernel_tuner/strategies/genetic_algorithm.py b/kernel_tuner/strategies/genetic_algorithm.py index 804758ee..d602b585 100644 --- a/kernel_tuner/strategies/genetic_algorithm.py +++ b/kernel_tuner/strategies/genetic_algorithm.py @@ -61,8 +61,8 @@ def tune(searchspace: Searchspace, runner, tuning_options): # 'best_score' is used only for printing if tuning_options.verbose and cost_func.results: best_score = get_best_config( - cost_func.results, tuning_options.objective, tuning_options.objective_higher_is_better - )[tuning_options.objective] + cost_func.results, tuning_options.objective[0], tuning_options.objective_higher_is_better + )[tuning_options.objective[0]] if tuning_options.verbose: print("Generation %d, best_score %f" % (generation, best_score)) @@ -249,4 +249,3 @@ def disruptive_uniform_crossover(dna1, dna2): "uniform": uniform_crossover, "disruptive_uniform": disruptive_uniform_crossover, } - diff --git a/kernel_tuner/strategies/pymoo_minimize.py b/kernel_tuner/strategies/pymoo_minimize.py new file mode 100644 index 00000000..dcc337af --- /dev/null +++ b/kernel_tuner/strategies/pymoo_minimize.py @@ -0,0 +1,286 @@ +"""The Pymoo strategy that uses a minimizer method for searching through the parameter space.""" + +from typing import assert_never +import numpy as np + +from pymoo.algorithms.moo.nsga2 import NSGA2 +from pymoo.algorithms.moo.nsga3 import NSGA3 +from pymoo.core.algorithm import Algorithm +from pymoo.core.problem import ElementwiseProblem +from pymoo.core.duplicate import ElementwiseDuplicateElimination +from pymoo.core.termination import NoTermination, Termination +from pymoo.core.sampling import Sampling +from pymoo.core.mutation import Mutation +from pymoo.core.repair import Repair +from pymoo.operators.crossover.ux import UniformCrossover +from pymoo.operators.crossover.pntx import SinglePointCrossover, TwoPointCrossover +from pymoo.util.ref_dirs import get_reference_directions + +from kernel_tuner import util +from kernel_tuner.runners.runner import Runner +from kernel_tuner.searchspace import Searchspace +from kernel_tuner.strategies.common import ( + CostFunc, + get_strategy_docstring, +) + +from enum import StrEnum + +class SupportedAlgos(StrEnum): + NSGA2 = "nsga2" + NSGA3 = "nsga3" + +supported_algos = [ algo.value for algo in SupportedAlgos ] + +crossover_oper_dict = { + "uniform-crossover": UniformCrossover, + "single-point-crossover": SinglePointCrossover, + "two-point-crossover": TwoPointCrossover, +} +supported_crossover_oper_names = list(crossover_oper_dict.keys()) + +_options = { + "pop_size": ("Initial population size", 20), + "crossover_operator": (f"The crossover operator, can be one of {supported_crossover_oper_names}", "two-point-crossover"), + "crossover_prob": ("Crossover probability", 1.0), + "mutation_prob": ("Mutation probability", 0.1), + "ref_dirs_list": ("The list of reference directions on the unit hyperplane in the objective space to guide NSGA-III, see https://pymoo.org/misc/reference_directions.html for more information.", []), +} + +_option_defaults = { key: option_pair[1] for key, option_pair in _options.items() } + + +def tune( + searchspace: Searchspace, + runner: Runner, + tuning_options, +): + algo_name: str = tuning_options.strategy + strategy_options = tuning_options.strategy_options + + algo_name = algo_name.lower() + if algo_name in SupportedAlgos: + algo_name = SupportedAlgos(algo_name) + else: + raise ValueError(f"\"{algo_name}\" is not supported. The supported algorithms are: {supported_algos}\n") + + pop_size = strategy_options.get("pop_size", _option_defaults["pop_size"]) + crossover_oper = strategy_options.get("crossover_operator", _option_defaults["crossover_operator"]) + crossover_prob = strategy_options.get("crossover_prob", _option_defaults["crossover_prob"]) + mutation_prob = strategy_options.get("mutation_prob", _option_defaults["mutation_prob"]) + ref_dirs_list = strategy_options.get("ref_dirs_list", _option_defaults["ref_dirs_list"]) + + if algo_name == "nsga3" and len(ref_dirs_list) == 0: + ref_dirs_list = get_reference_directions("energy", len(tuning_options.objective), pop_size) + + if "x0" in strategy_options: + raise ValueError(f"\"x0\" is not a supported option.") + + if crossover_oper in crossover_oper_dict: + crossover_oper = crossover_oper_dict[crossover_oper] + else: + raise ValueError(f"Unsupported crossover method {crossover_oper}") + + cost_func = CostFunc(searchspace, tuning_options, runner, scaling=False) + + problem = TuningProblem( + cost_func = cost_func, + n_var = len(tuning_options.tune_params), + n_obj = len(tuning_options.objective), + ) + + sampling = TuningSearchspaceRandomSampling(searchspace) + crossover = crossover_oper(prob = crossover_prob) + mutation = TuningParamConfigNeighborhoodMutation(prob = mutation_prob, searchspace = searchspace) + repair = TuningParamConfigRepair() + eliminate_duplicates = TuningParamConfigDuplicateElimination() + + # algorithm_type = get_algorithm(method) + algo: Algorithm + match algo_name: + case SupportedAlgos.NSGA2: + algo = NSGA2( + pop_size = pop_size, + sampling = sampling, + crossover = crossover, + mutation = mutation, + repair = repair, + eliminate_duplicates = eliminate_duplicates, + ) + case SupportedAlgos.NSGA3: + algo = NSGA3( + pop_size = pop_size, + ref_dirs = ref_dirs_list, + sampling = sampling, + crossover = crossover, + mutation = mutation, + repair = repair, + eliminate_duplicates = eliminate_duplicates, + ) + case _ as unreachable: + assert_never(unreachable) + + # TODO: + # - CostFunc throws exception when done, so isn't really needed + termination = None + if "max_fevals" in tuning_options.strategy_options or "time_limit" in tuning_options.strategy_options: + termination = NoTermination() + + try: + algo.setup( + problem, + termination = termination, + verbose = tuning_options.verbose, + progress = tuning_options.verbose, + seed = tuning_options.seed, + ) + + while algo.has_next(): + algo.next() + + except util.StopCriterionReached as e: + if tuning_options.verbose: + print(f"Stopped because of {e}") + + results = cost_func.results + + if results and tuning_options.verbose: + print(f"{results.message=}") + + return results + + +tune.__doc__ = get_strategy_docstring("Pymoo minimize", _options) + + +class TuningProblem(ElementwiseProblem): + def __init__( + self, + cost_func: CostFunc, + n_var: int, + n_obj: int, + **kwargs, + ): + super().__init__( + n_var = n_var, + n_obj = n_obj, + **kwargs, + ) + self.cost_func = cost_func + self.searchspace = cost_func.searchspace + self.tuning_options = cost_func.tuning_options + + def _evaluate( self, x, out, *args, **kwargs, ): + # A copy of `x` is made to make sure sharing does not happen + F = self.cost_func(tuple(x)) + out["F"] = F + + def _calc_pareto_front( self, *args, **kwargs, ): + # Can only compute the pareto front if we are in simulation mode. + if not self.tuning_options.simulation_mode: + return None + + objectives = self.tuning_options.objective + higher_is_better = self.tuning_options.objective_higher_is_better + pareto_results = util.get_pareto_results( + list(self.tuning_options.cache.values()), + objectives, + higher_is_better, + ) + + pareto_front_list = list() + for res in pareto_results: + cost = util.get_result_cost(res, objectives, higher_is_better) + pareto_front_list.append(cost) + + return np.array(pareto_front_list, dtype=float) + + +class TuningTermination(Termination): + def __init__( self, tuning_options, ): + super().__init__() + self.tuning_options = tuning_options + self.reason = None + + def _update( + self, + algorithm, + ): + try: + util.check_stop_criterion(self.tuning_options) + print(f"progress: {len(self.tuning_options.unique_results) / self.tuning_options.max_fevals}") + return 0.0 + except util.StopCriterionReached as e: + self.terminate() + self.reason = e + return 1.0 + + +class TuningSearchspaceRandomSampling(Sampling): + def __init__( self, searchspace, ): + super().__init__() + self.searchspace = searchspace + + def _do( self, problem, n_samples: int, **kwargs, ): + sample = self.searchspace.get_random_sample(n_samples) + return np.array(sample, dtype=object) + + +class TuningParamConfigNeighborhoodMutation(Mutation): + def __init__( + self, + prob, + searchspace: Searchspace, + **kwargs + ): + super().__init__( + prob = prob, + # prob_var = None, + **kwargs, + ) + self.searchspace = searchspace + + def _do( + self, + problem: TuningProblem, + X: np.ndarray, + **kwargs, + ): + for X_index in range(X.shape[0]): + params_config_tuple = tuple(X[X_index]) + neighbors_indices = self.searchspace.get_neighbors_indices_no_cache(params_config_tuple, neighbor_method="Hamming") + if len(neighbors_indices) > 0: + neighbor_index = neighbors_indices[np.random.choice(len(neighbors_indices))] + neighbor = self.searchspace.get_param_configs_at_indices([neighbor_index])[0] + X[X_index] = np.array(neighbor, dtype=object) + + return X + + +class TuningParamConfigRepair(Repair): + + def _do( + self, + problem: TuningProblem, + X: np.ndarray, + **kwargs, + ): + for X_index in range(X.shape[0]): + params_config_tuple = tuple(X[X_index]) + if problem.searchspace.is_param_config_valid(params_config_tuple): + continue + for neighbor_method in ["strictly-adjacent", "adjacent", "Hamming"]: + neighbors_indices = problem.searchspace.get_neighbors_indices_no_cache(params_config_tuple, neighbor_method) + if len(neighbors_indices) > 0: + neighbor_index = neighbors_indices[np.random.choice(len(neighbors_indices))] + neighbor = problem.searchspace.get_param_configs_at_indices([neighbor_index])[0] + X[X_index] = np.array(neighbor, dtype=object) + break + + return X + + +class TuningParamConfigDuplicateElimination(ElementwiseDuplicateElimination): + + def is_equal(self, a, b): + return np.all(a.X == b.X) diff --git a/kernel_tuner/util.py b/kernel_tuner/util.py index 2c50bd6c..23ad5553 100644 --- a/kernel_tuner/util.py +++ b/kernel_tuner/util.py @@ -41,6 +41,8 @@ from kernel_tuner.observers.nvml import NVMLObserver +from pymoo.util.nds.find_non_dominated import find_non_dominated + # number of special values to insert when a configuration cannot be measured @@ -77,6 +79,32 @@ def default(self, obj): return super(NpEncoder, self).default(obj) +def get_result_cost( + result: dict, + objectives: list[str], + objective_higher_is_better: list[bool] +) -> list[float]: + """Returns the cost of a result, taking the objective directions into account.""" + # return the highest cost for invalid results + if '__error__' in result: + return [sys.float_info.max] * len(objectives) + + cost_vec = list() + for objective, is_maximizer in zip(objectives, objective_higher_is_better): + objective_value = result[objective] + cost = -objective_value if is_maximizer else objective_value + cost_vec.append(cost) + + return cost_vec + + +def check_result_type(r): + """Check if the result has the right format.""" + if '__error__' in r: + return isinstance(r['__error__'], ErrorConfig) + return True + + class TorchPlaceHolder: def __init__(self): self.Tensor = Exception # using Exception here as a type that will never be among kernel arguments @@ -187,7 +215,7 @@ def check_argument_list(kernel_name, kernel_string, args): warnings.warn(errors[0], UserWarning) -def check_stop_criterion(to: dict) -> float: +def check_stop_criterion(to): """Check if the stop criterion is reached. Args: @@ -209,7 +237,6 @@ def check_stop_criterion(to: dict) -> float: if time_spent > to.time_limit: raise StopCriterionReached("time limit exceeded") return time_spent / to.time_limit - def check_tune_params_list(tune_params, observers, simulation_mode=False): @@ -270,8 +297,11 @@ def check_block_size_params_names_list(block_size_names, tune_params): def check_restriction(restrict, params: dict) -> bool: """Check whether a configuration meets a search space restriction.""" + # if it's a function python-constraint it can be called directly + if isinstance(restrict, FunctionConstraint): + return restrict._func(*params.values()) # if it's a python-constraint, convert to function and execute - if isinstance(restrict, Constraint): + elif isinstance(restrict, Constraint): restrict = convert_constraint_restriction(restrict) return restrict(list(params.values())) # if it's a string, fill in the parameters and evaluate @@ -422,11 +452,40 @@ def get_best_config(results, objective, objective_higher_is_better=False): ignore_val = sys.float_info.max if not objective_higher_is_better else -sys.float_info.max best_config = func( results, - key=lambda x: x[objective] if isinstance(x[objective], float) else ignore_val, + key=lambda x: x[objective] if '__error__' not in x and isinstance(x[objective], float) else ignore_val, ) return best_config +def get_pareto_results( + results: list[dict], + objectives: list[str], + objective_higher_is_better: list[bool], + mark_optima=True +): + assert isinstance(results, list) + assert isinstance(objectives, list) + + n_rows = len(results) + n_cols = len(objectives) + Y = np.empty((n_rows, n_cols), dtype=float) + for row_idx, result in enumerate(results): + if "__error__" in result: + Y[row_idx, :] = sys.float_info.max + continue + for col_idx, (objective_name, higher_is_better) in enumerate(zip(objectives, objective_higher_is_better)): + y = result[objective_name] + # negate for maximizers to optimize through minimization + Y[row_idx, col_idx] = -y if higher_is_better else y + + pf_indices = find_non_dominated(Y) + pf = [results[idx] for idx in pf_indices] + if mark_optima: + for p in pf: + p["optimal"] = True + return pf + + def get_config_string(params, keys=None, units=None): """Return a compact string representation of a measurement.""" @@ -911,7 +970,7 @@ def replace_params_split(match_object): return param else: return key - + # remove functionally duplicate restrictions (preserves order and whitespace) if all(isinstance(r, str) for r in restrictions): # clean the restriction strings to functional equivalence @@ -1262,7 +1321,9 @@ def read_cache(cache, open_cache=True): for element in cache_data["cache"].values(): for k, v in element.items(): if isinstance(v, str) and v in error_configs: - element[k] = error_configs[v] + # element[k] = error_configs[v] + # This makes sure the old cache file format can still be used. + element["__error__"] = error_configs[v] return cache_data @@ -1303,3 +1364,31 @@ def dump_cache(obj: str, tuning_options): if isinstance(tuning_options.cache, dict) and tuning_options.cachefile: with open(tuning_options.cachefile, "a") as cachefile: cachefile.write(obj) + + +def infer_restrictions_from_cache(cache: dict): + param_names = cache["tune_params_keys"] + valid_param_config_set = set( + tuple(result[param_name] for param_name in param_names) + for result in cache['cache'].values() + if '__error__' not in result + ) + + def restrictions_func(*param_values) -> bool: + nonlocal valid_param_config_set + return param_values in valid_param_config_set + + return FunctionConstraint(restrictions_func) + + +def infer_args_from_cache(cache: dict) -> dict: + inferred_args = dict( + kernel_name = cache['kernel_name'], + kernel_source = "", + problem_size = tuple(cache['problem_size']), + arguments = [], + tune_params = cache['tune_params'], + # restrictions = infer_restrictions_from_cache(cache), + ) + + return inferred_args diff --git a/noxfile.py b/noxfile.py index a81d33b5..97fdd2f7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -39,7 +39,7 @@ def create_settings(session: Session) -> None: venvbackend = nox.options.default_venv_backend envdir = "" # conversion from old notenv.txt - if noxenv_file_path.exists(): + if noxenv_file_path.exists(): venvbackend = noxenv_file_path.read_text().strip() noxenv_file_path.unlink() # write the settings @@ -92,7 +92,7 @@ def check_development_environment(session: Session) -> None: # packages = re.findall(r"• Installing .* | • Updating .*", output, flags=re.MULTILINE) # assert packages is not None session.warn(f""" - Your development environment is out of date ({installs} installs, {updates} updates). + Your development environment is out of date ({installs} installs, {updates} updates). Update with 'poetry install --sync', using '--with' and '-E' for optional dependencies, extras respectively. Note: {removals} packages are not in the specification (i.e. installed manually) and may be removed. To preview changes, run 'poetry install --sync --dry-run' (with optional dependencies and extras).""") diff --git a/pyproject.toml b/pyproject.toml index e19ab82b..c06c5c7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ dependencies = [ "xmltodict", "pandas>=2.0.0", "scikit-learn>=1.0.2", + "pymoo>=0.6.1.6", ] # NOTE Torch can be used with Kernel Tuner, but is not a dependency, should be up to the user to use it diff --git a/test/strategies/test_common.py b/test/strategies/test_common.py index 90f6c63e..e73df0b3 100644 --- a/test/strategies/test_common.py +++ b/test/strategies/test_common.py @@ -22,6 +22,8 @@ def fake_runner(): runner = Mock() runner.last_strategy_start_time = perf_counter() runner.run.return_value = [fake_result] + runner.config_eval_count = 0 + runner.infeasable_config_eval_count = 0 return runner @@ -32,7 +34,7 @@ def test_cost_func(): x = [1, 4] tuning_options = Options(scaling=False, snap=False, tune_params=tune_params, restrictions=None, strategy_options={}, cache={}, unique_results={}, - objective="time", objective_higher_is_better=False, metrics=None) + objective=["time"], objective_higher_is_better=[False], metrics=None) runner = fake_runner() time = CostFunc(Searchspace(tune_params, None, 1024), tuning_options, runner)(x) @@ -45,7 +47,7 @@ def restrictions(x, y): restrictions=restrictions, strategy_options={}, verbose=True, cache={}, unique_results={}, objective="time", objective_higher_is_better=False, metrics=None) - + with raises(StopCriterionReached): time = CostFunc(Searchspace(tune_params, restrictions, 1024), tuning_options, runner)(x) assert time == sys.float_info.max diff --git a/test/strategies/test_strategies.py b/test/strategies/test_strategies.py index ea5a2994..7a2a6df4 100644 --- a/test/strategies/test_strategies.py +++ b/test/strategies/test_strategies.py @@ -127,7 +127,7 @@ def test_strategies(vector_add, strategy): # check if strategy respects user-specified starting point (x0) x0 = [256, 'alg_2', 15, True, 2.45] filter_options["x0"] = x0 - if not strategy in ["brute_force", "random_sample", "bayes_opt", "pyatf_strategies"]: + if not strategy in ["brute_force", "random_sample", "bayes_opt", "pyatf_strategies", "nsga2", "nsga3"]: results, _ = kernel_tuner.tune_kernel(*vector_add, restrictions=restrictions, strategy=strategy, strategy_options=filter_options, verbose=False, cache=cache_filename, simulation_mode=True) assert results[0]["block_size_x"] == x0[0] diff --git a/test/test_runners.py b/test/test_runners.py index 3a0a26e2..d542c1dc 100644 --- a/test/test_runners.py +++ b/test/test_runners.py @@ -257,7 +257,7 @@ def test_interface_handles_compile_failures(env): failed_config = [ record for record in results if record["block_size_x"] == 256 ][0] - assert isinstance(failed_config["time"], util.CompilationFailedConfig) + assert isinstance(failed_config["__error__"], util.CompilationFailedConfig) @skip_if_no_pycuda