diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7e2dd099c1..e4296b683c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -461,6 +461,8 @@ if (NOT SKIP_GRPC_BUILD) src/grpc/grpc_settings_mapper.cpp src/grpc/grpc_service_mapper.cpp src/grpc/client/grpc_client.cpp + src/grpc/client/grpc_client_env.cpp + src/grpc/client/cython_grpc_client.cpp src/grpc/client/solve_remote.cpp ) list(APPEND CUOPT_SRC_FILES ${GRPC_INFRA_FILES}) diff --git a/cpp/include/cuopt/grpc/cython_grpc_client.hpp b/cpp/include/cuopt/grpc/cython_grpc_client.hpp new file mode 100644 index 0000000000..074bd0065f --- /dev/null +++ b/cpp/include/cuopt/grpc/cython_grpc_client.hpp @@ -0,0 +1,152 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights + * reserved. SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace cuopt::linear_programming { +template +class solver_settings_t; +namespace io { +template +class data_model_view_t; +} // namespace io +} // namespace cuopt::linear_programming + +namespace cuopt::cython { + +/** Mirrors cuopt::linear_programming::job_status_t for the Python bindings. */ +enum class grpc_job_status_t : int { + QUEUED = 0, + PROCESSING = 1, + COMPLETED = 2, + FAILED = 3, + CANCELLED = 4, + NOT_FOUND = 5, +}; + +struct grpc_submit_result_t { + bool success = false; + std::string error_message; + std::string job_id; + bool is_mip = false; +}; + +struct grpc_status_result_t { + bool success = false; + std::string error_message; + grpc_job_status_t status = grpc_job_status_t::NOT_FOUND; + std::string message; + int64_t result_size_bytes = 0; +}; + +struct grpc_result_outcome_t { + bool not_ready = false; + bool success = false; + std::string error_message; + std::unique_ptr solution; +}; + +struct grpc_logs_result_t { + bool success = false; + std::string error_message; + std::vector lines; +}; + +struct grpc_incumbent_entry_t { + int64_t index = 0; + double objective = 0.0; + std::vector assignment; +}; + +struct grpc_incumbents_result_t { + bool success = false; + std::string error_message; + std::vector incumbents; + int64_t next_index = 0; + bool job_complete = false; +}; + +/** + * C callback for streaming log lines from Cython (one line at a time). + * Uses C-compatible ``int`` fields so Cython ``bint`` function pointers match. + */ +using grpc_log_line_callback_t = int (*)(const char* line, + size_t line_len, + int job_complete, + void* user_data); + +/** + * @brief Owning wrapper around grpc_client_t for Cython. + */ +class grpc_python_client_t { + public: + grpc_python_client_t(const std::string& host, int port); + ~grpc_python_client_t(); + + grpc_python_client_t(const grpc_python_client_t&) = delete; + grpc_python_client_t& operator=(const grpc_python_client_t&) = delete; + + bool connect(std::string& error_out); + + grpc_submit_result_t submit( + cuopt::linear_programming::io::data_model_view_t* data_model, + cuopt::linear_programming::solver_settings_t* settings, + bool enable_incumbents = false); + + grpc_status_result_t status(const std::string& job_id); + + /** + * @param timeout_seconds 0 = block on WaitForCompletion; >0 = poll with timeout. + */ + grpc_status_result_t wait(const std::string& job_id, int timeout_seconds); + + bool cancel(const std::string& job_id, std::string& error_out); + + bool delete_job(const std::string& job_id, std::string& error_out); + + /** + * @param is_mip When true, fetch a MIP result; otherwise LP. + */ + grpc_result_outcome_t result(const std::string& job_id, bool is_mip); + + /** + * @brief Block until the job completes, collecting all solver log lines. + */ + grpc_logs_result_t fetch_logs(const std::string& job_id, int64_t from_byte = 0); + + /** + * @brief Stream solver log lines until the job completes. + * + * Invokes @p callback for each line. Return false from the callback to stop + * early. Blocks the calling thread for the lifetime of the stream. + */ + bool stream_logs(const std::string& job_id, + int64_t from_byte, + grpc_log_line_callback_t callback, + void* user_data); + + /** + * @brief Poll GetIncumbents until the job completes. + */ + grpc_incumbents_result_t fetch_incumbents(const std::string& job_id, + int64_t from_index = 0, + int32_t max_count = 0); + + std::string last_error() const; + + private: + struct impl_t; + std::unique_ptr impl_; +}; + +} // namespace cuopt::cython diff --git a/cpp/include/cuopt/grpc/grpc_client_env.hpp b/cpp/include/cuopt/grpc/grpc_client_env.hpp new file mode 100644 index 0000000000..bdd99dd466 --- /dev/null +++ b/cpp/include/cuopt/grpc/grpc_client_env.hpp @@ -0,0 +1,22 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights + * reserved. SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "grpc_client.hpp" + +namespace cuopt::linear_programming { + +/** + * @brief Apply CUOPT_GRPC_* / CUOPT_TLS_* / CUOPT_CHUNK_SIZE env overrides. + */ +void apply_grpc_client_env_overrides(grpc_client_config_t& config); + +/** + * @brief Build grpc_client_config_t from host, port, and environment overrides. + */ +grpc_client_config_t make_grpc_client_config(const std::string& host, int port); + +} // namespace cuopt::linear_programming diff --git a/cpp/src/grpc/client/cython_grpc_client.cpp b/cpp/src/grpc/client/cython_grpc_client.cpp new file mode 100644 index 0000000000..40726b0f11 --- /dev/null +++ b/cpp/src/grpc/client/cython_grpc_client.cpp @@ -0,0 +1,265 @@ +/* clang-format off */ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +/* clang-format on */ + +#include +#include + +#include +#include +#include +#include + +#include "grpc_client.hpp" + +#include +#include +#include + +namespace cuopt::cython { + +namespace { + +grpc_job_status_t map_job_status(cuopt::linear_programming::job_status_t status) +{ + using js = cuopt::linear_programming::job_status_t; + switch (status) { + case js::QUEUED: return grpc_job_status_t::QUEUED; + case js::PROCESSING: return grpc_job_status_t::PROCESSING; + case js::COMPLETED: return grpc_job_status_t::COMPLETED; + case js::FAILED: return grpc_job_status_t::FAILED; + case js::CANCELLED: return grpc_job_status_t::CANCELLED; + case js::NOT_FOUND: return grpc_job_status_t::NOT_FOUND; + default: return grpc_job_status_t::NOT_FOUND; + } +} + +grpc_status_result_t map_status_result(const cuopt::linear_programming::job_status_result_t& in) +{ + grpc_status_result_t out; + out.success = in.success; + out.error_message = in.error_message; + out.status = map_job_status(in.status); + out.message = in.message; + out.result_size_bytes = in.result_size_bytes; + return out; +} + +bool is_in_flight(grpc_job_status_t status) +{ + return status == grpc_job_status_t::QUEUED || status == grpc_job_status_t::PROCESSING; +} + +} // namespace + +struct grpc_python_client_t::impl_t { + cuopt::linear_programming::grpc_client_t client; + explicit impl_t(cuopt::linear_programming::grpc_client_config_t config) + : client(std::move(config)) + { + } +}; + +grpc_python_client_t::grpc_python_client_t(const std::string& host, int port) + : impl_(std::make_unique(cuopt::linear_programming::make_grpc_client_config(host, port))) +{ +} + +grpc_python_client_t::~grpc_python_client_t() = default; + +bool grpc_python_client_t::connect(std::string& error_out) +{ + if (!impl_->client.connect()) { + error_out = impl_->client.get_last_error(); + return false; + } + return true; +} + +grpc_submit_result_t grpc_python_client_t::submit( + cuopt::linear_programming::io::data_model_view_t* data_model, + cuopt::linear_programming::solver_settings_t* settings, + bool enable_incumbents) +{ + grpc_submit_result_t out; + if (data_model == nullptr || settings == nullptr) { + out.error_message = "data_model and settings must not be null"; + return out; + } + + cuopt::linear_programming::cpu_optimization_problem_t cpu_problem; + cuopt::linear_programming::populate_from_data_model_view( + &cpu_problem, data_model, settings, nullptr); + + const bool is_mip = + cpu_problem.get_problem_category() == cuopt::linear_programming::problem_category_t::MIP || + cpu_problem.get_problem_category() == cuopt::linear_programming::problem_category_t::IP; + + if (is_mip) { + auto sub = + impl_->client.submit_mip(cpu_problem, settings->get_mip_settings(), enable_incumbents); + out.success = sub.success; + out.error_message = sub.error_message; + out.job_id = sub.job_id; + out.is_mip = true; + } else { + auto sub = impl_->client.submit_lp(cpu_problem, settings->get_pdlp_settings()); + out.success = sub.success; + out.error_message = sub.error_message; + out.job_id = sub.job_id; + out.is_mip = false; + } + + return out; +} + +grpc_status_result_t grpc_python_client_t::status(const std::string& job_id) +{ + return map_status_result(impl_->client.check_status(job_id)); +} + +grpc_status_result_t grpc_python_client_t::wait(const std::string& job_id, int timeout_seconds) +{ + if (timeout_seconds < 0) { + grpc_status_result_t out; + out.success = false; + out.error_message = "timeout_seconds must be non-negative"; + out.status = grpc_job_status_t::NOT_FOUND; + return out; + } + + if (timeout_seconds == 0) { return map_status_result(impl_->client.wait_for_completion(job_id)); } + + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); + const int poll_ms = 1000; + + while (std::chrono::steady_clock::now() < deadline) { + auto st = status(job_id); + if (!st.success) { return st; } + if (!is_in_flight(st.status)) { return st; } + std::this_thread::sleep_for(std::chrono::milliseconds(poll_ms)); + } + + grpc_status_result_t out; + out.success = false; + out.error_message = "Timeout waiting for job completion"; + out.status = status(job_id).status; + return out; +} + +bool grpc_python_client_t::cancel(const std::string& job_id, std::string& error_out) +{ + auto result = impl_->client.cancel_job(job_id); + if (!result.success) { + error_out = result.error_message.empty() ? result.message : result.error_message; + return false; + } + return true; +} + +bool grpc_python_client_t::delete_job(const std::string& job_id, std::string& error_out) +{ + if (!impl_->client.delete_job(job_id)) { + error_out = impl_->client.get_last_error(); + return false; + } + return true; +} + +grpc_result_outcome_t grpc_python_client_t::result(const std::string& job_id, bool is_mip) +{ + grpc_result_outcome_t out; + + auto st = status(job_id); + if (!st.success) { + out.error_message = st.error_message; + return out; + } + if (is_in_flight(st.status)) { + out.not_ready = true; + return out; + } + if (st.status != grpc_job_status_t::COMPLETED) { + out.error_message = st.message.empty() ? std::string("Job is not completed") : st.message; + return out; + } + + out.solution = std::make_unique(); + + if (is_mip) { + auto remote = impl_->client.get_mip_result(job_id); + if (!remote.success) { + out.error_message = remote.error_message; + out.solution.reset(); + return out; + } + out.solution->problem_type = cuopt::linear_programming::problem_category_t::MIP; + out.solution->mip_ret = remote.solution->to_cpu_mip_ret_t(); + } else { + auto remote = impl_->client.get_lp_result(job_id); + if (!remote.success) { + out.error_message = remote.error_message; + out.solution.reset(); + return out; + } + out.solution->problem_type = cuopt::linear_programming::problem_category_t::LP; + out.solution->lp_ret = remote.solution->to_cpu_linear_programming_ret_t(); + } + + out.success = true; + return out; +} + +grpc_logs_result_t grpc_python_client_t::fetch_logs(const std::string& job_id, int64_t from_byte) +{ + grpc_logs_result_t out; + out.success = impl_->client.stream_logs( + job_id, from_byte, [&out](const std::string& line, bool /*job_complete*/) -> bool { + if (!line.empty()) { out.lines.push_back(line); } + return true; + }); + if (!out.success) { out.error_message = impl_->client.get_last_error(); } + return out; +} + +bool grpc_python_client_t::stream_logs(const std::string& job_id, + int64_t from_byte, + grpc_log_line_callback_t callback, + void* user_data) +{ + if (callback == nullptr) { return false; } + return impl_->client.stream_logs( + job_id, from_byte, [callback, user_data](const std::string& line, bool job_complete) -> bool { + return callback(line.data(), line.size(), job_complete ? 1 : 0, user_data) != 0; + }); +} + +grpc_incumbents_result_t grpc_python_client_t::fetch_incumbents(const std::string& job_id, + int64_t from_index, + int32_t max_count) +{ + grpc_incumbents_result_t out; + auto result = impl_->client.get_incumbents(job_id, from_index, max_count); + if (!result.success) { + out.error_message = result.error_message; + return out; + } + out.success = true; + out.next_index = result.next_index; + out.job_complete = result.job_complete; + for (const auto& inc : result.incumbents) { + grpc_incumbent_entry_t entry; + entry.index = inc.index; + entry.objective = inc.objective; + entry.assignment = inc.assignment; + out.incumbents.push_back(std::move(entry)); + } + return out; +} + +std::string grpc_python_client_t::last_error() const { return impl_->client.get_last_error(); } + +} // namespace cuopt::cython diff --git a/cpp/src/grpc/client/grpc_client_env.cpp b/cpp/src/grpc/client/grpc_client_env.cpp new file mode 100644 index 0000000000..bdefe44086 --- /dev/null +++ b/cpp/src/grpc/client/grpc_client_env.cpp @@ -0,0 +1,96 @@ +/* clang-format off */ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +/* clang-format on */ + +#include + +#include "grpc_client.hpp" + +#include +#include +#include +#include + +namespace cuopt::linear_programming { + +namespace { + +int64_t parse_env_int64(const char* name, int64_t default_value) +{ + const char* val = std::getenv(name); + if (val == nullptr) return default_value; + try { + return std::stoll(val); + } catch (...) { + return default_value; + } +} + +std::string read_pem_file(const char* path) +{ + std::ifstream in(path, std::ios::binary); + if (!in.is_open()) { throw std::runtime_error(std::string("Cannot open TLS file: ") + path); } + std::ostringstream ss; + ss << in.rdbuf(); + return ss.str(); +} + +const char* get_env(const char* name) +{ + const char* v = std::getenv(name); + return (v && v[0] != '\0') ? v : nullptr; +} + +} // namespace + +void apply_grpc_client_env_overrides(grpc_client_config_t& config) +{ + constexpr int64_t kMinChunkSize = 4096; + constexpr int64_t kMaxChunkSize = 2LL * 1024 * 1024 * 1024; + constexpr int64_t kMinMessageSize = 4096; + constexpr int64_t kMaxMessageSize = 2LL * 1024 * 1024 * 1024; + + auto chunk = parse_env_int64("CUOPT_CHUNK_SIZE", config.chunk_size_bytes); + if (chunk >= kMinChunkSize && chunk <= kMaxChunkSize) { config.chunk_size_bytes = chunk; } + + auto msg = parse_env_int64("CUOPT_MAX_MESSAGE_BYTES", config.max_message_bytes); + if (msg >= kMinMessageSize && msg <= kMaxMessageSize) { config.max_message_bytes = msg; } + + config.enable_debug_log = (parse_env_int64("CUOPT_GRPC_DEBUG", 0) != 0); + + if (parse_env_int64("CUOPT_TLS_ENABLED", 0) != 0) { + config.enable_tls = true; + + const char* root_cert = get_env("CUOPT_TLS_ROOT_CERT"); + if (root_cert) { config.tls_root_certs = read_pem_file(root_cert); } + + const char* client_cert = get_env("CUOPT_TLS_CLIENT_CERT"); + const char* client_key = get_env("CUOPT_TLS_CLIENT_KEY"); + if (client_cert != nullptr || client_key != nullptr) { + if (client_cert == nullptr || client_key == nullptr) { + throw std::invalid_argument( + "CUOPT_TLS_CLIENT_CERT and CUOPT_TLS_CLIENT_KEY must both be set for mTLS"); + } + config.tls_client_cert = read_pem_file(client_cert); + config.tls_client_key = read_pem_file(client_key); + } + } +} + +grpc_client_config_t make_grpc_client_config(const std::string& host, int port) +{ + if (host.empty()) { throw std::invalid_argument("gRPC host must not be empty"); } + if (port <= 0 || port > 65535) { + throw std::invalid_argument("gRPC port must be between 1 and 65535"); + } + + grpc_client_config_t config; + config.server_address = host + ":" + std::to_string(port); + apply_grpc_client_env_overrides(config); + return config; +} + +} // namespace cuopt::linear_programming diff --git a/cpp/src/grpc/client/solve_remote.cpp b/cpp/src/grpc/client/solve_remote.cpp index fb39a6d184..68c2cc5382 100644 --- a/cpp/src/grpc/client/solve_remote.cpp +++ b/cpp/src/grpc/client/solve_remote.cpp @@ -5,6 +5,7 @@ */ /* clang-format on */ +#include #include #include #include @@ -14,7 +15,6 @@ #include #include -#include #include #include #include @@ -45,17 +45,6 @@ static std::string get_grpc_server_address() return std::string(host) + ":" + std::string(port); } -static int64_t parse_env_int64(const char* name, int64_t default_value) -{ - const char* val = std::getenv(name); - if (val == nullptr) return default_value; - try { - return std::stoll(val); - } catch (...) { - return default_value; - } -} - // Derive client-side polling timeout from the solver's time_limit. // Returns 0 (no limit) when the solver has no finite time_limit. template @@ -67,58 +56,6 @@ static int solver_timeout_seconds(f_t time_limit) return static_cast(std::ceil(secs)); } -static std::string read_pem_file(const char* path) -{ - std::ifstream in(path, std::ios::binary); - if (!in.is_open()) { throw std::runtime_error(std::string("Cannot open TLS file: ") + path); } - std::ostringstream ss; - ss << in.rdbuf(); - return ss.str(); -} - -static const char* get_env(const char* name) -{ - const char* v = std::getenv(name); - return (v && v[0] != '\0') ? v : nullptr; -} - -// Apply env-var overrides for transfer, debug, and TLS configuration. -static void apply_env_overrides(grpc_client_config_t& config) -{ - constexpr int64_t kMinChunkSize = 4096; - constexpr int64_t kMaxChunkSize = 2LL * 1024 * 1024 * 1024; // 2 GiB - constexpr int64_t kMinMessageSize = 4096; - constexpr int64_t kMaxMessageSize = 2LL * 1024 * 1024 * 1024; - - auto chunk = parse_env_int64("CUOPT_CHUNK_SIZE", config.chunk_size_bytes); - if (chunk >= kMinChunkSize && chunk <= kMaxChunkSize) { config.chunk_size_bytes = chunk; } - - auto msg = parse_env_int64("CUOPT_MAX_MESSAGE_BYTES", config.max_message_bytes); - if (msg >= kMinMessageSize && msg <= kMaxMessageSize) { config.max_message_bytes = msg; } - - config.enable_debug_log = (parse_env_int64("CUOPT_GRPC_DEBUG", 0) != 0); - - // TLS configuration from environment variables - if (parse_env_int64("CUOPT_TLS_ENABLED", 0) != 0) { - config.enable_tls = true; - - const char* root_cert = get_env("CUOPT_TLS_ROOT_CERT"); - if (root_cert) { config.tls_root_certs = read_pem_file(root_cert); } - - const char* client_cert = get_env("CUOPT_TLS_CLIENT_CERT"); - const char* client_key = get_env("CUOPT_TLS_CLIENT_KEY"); - if (client_cert && client_key) { - config.tls_client_cert = read_pem_file(client_cert); - config.tls_client_key = read_pem_file(client_key); - } - } - - CUOPT_LOG_DEBUG("gRPC client config: chunk_size=%lld max_message=%lld tls=%s", - static_cast(config.chunk_size_bytes), - static_cast(config.max_message_bytes), - config.enable_tls ? "on" : "off"); -} - // ============================================================================ // Remote execution via gRPC // ============================================================================ @@ -136,7 +73,7 @@ std::unique_ptr> solve_lp_remote( grpc_client_config_t config; config.server_address = get_grpc_server_address(); config.timeout_seconds = solver_timeout_seconds(settings.time_limit); - apply_env_overrides(config); + apply_grpc_client_env_overrides(config); // Stream the server's solver log to the client. The server already // filters by the requested log level, so we just pass lines through to @@ -191,7 +128,7 @@ std::unique_ptr> solve_mip_remote( grpc_client_config_t config; config.server_address = get_grpc_server_address(); config.timeout_seconds = solver_timeout_seconds(settings.time_limit); - apply_env_overrides(config); + apply_grpc_client_env_overrides(config); // Stream server log — same passthrough logic as the LP callback above. std::unique_ptr log_file_stream; diff --git a/python/cuopt/conftest.py b/python/cuopt/conftest.py new file mode 100644 index 0000000000..3507342945 --- /dev/null +++ b/python/cuopt/conftest.py @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pytest_plugins must live in a top-level conftest (see pytest 8+ deprecation). +pytest_plugins = ["cuopt.tests.fixtures.grpc_server_fixtures"] diff --git a/python/cuopt/cuopt/CMakeLists.txt b/python/cuopt/cuopt/CMakeLists.txt index 8d3dc46a42..0c12cfb04d 100644 --- a/python/cuopt/cuopt/CMakeLists.txt +++ b/python/cuopt/cuopt/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(linear_programming/data_model) add_subdirectory(linear_programming/io) add_subdirectory(linear_programming/solver) add_subdirectory(linear_programming/solver_settings) +add_subdirectory(grpc) add_subdirectory(routing) diff --git a/python/cuopt/cuopt/__init__.py b/python/cuopt/cuopt/__init__.py index 08f36edcb9..ecb25a9185 100644 --- a/python/cuopt/cuopt/__init__.py +++ b/python/cuopt/cuopt/__init__.py @@ -13,7 +13,7 @@ # Lazy imports for linear_programming, routing, and distance_engine modules # This allows cuopt to be imported on CPU-only hosts when remote solve is configured -_submodules = ["linear_programming", "routing", "distance_engine"] +_submodules = ["linear_programming", "routing", "distance_engine", "grpc"] def __getattr__(name): diff --git a/python/cuopt/cuopt/grpc/CMakeLists.txt b/python/cuopt/cuopt/grpc/CMakeLists.txt new file mode 100644 index 0000000000..1d8c956888 --- /dev/null +++ b/python/cuopt/cuopt/grpc/CMakeLists.txt @@ -0,0 +1,6 @@ +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on + +add_subdirectory(numerical) diff --git a/python/cuopt/cuopt/grpc/__init__.py b/python/cuopt/cuopt/grpc/__init__.py new file mode 100644 index 0000000000..555cfb15b7 --- /dev/null +++ b/python/cuopt/cuopt/grpc/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""gRPC clients for remote cuOpt execution. + +This package is the namespace for domain-specific async clients: + +- :mod:`cuopt.grpc.numerical` — LP/MILP (submit, result, incumbents) +- :mod:`cuopt.grpc.routing` — VRP/TSP/PDP (future) + +Longer term, shared job lifecycle (connect, status, wait, cancel, delete, +logs) may live here as a base client type that domain clients extend or +compose. Import domain clients explicitly, e.g. +``from cuopt.grpc.numerical import Client``. +""" diff --git a/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt b/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt new file mode 100644 index 0000000000..0f60a2afed --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt @@ -0,0 +1,20 @@ +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on + +set(cython_sources grpc_client.pyx) +set(linked_libraries cuopt::cuopt) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" + MODULE_PREFIX grpc_ + ASSOCIATED_TARGETS cuopt +) + +foreach(_cython_target IN LISTS RAPIDS_CYTHON_CREATED_TARGETS) + target_include_directories( + ${_cython_target} PRIVATE "${CMAKE_CURRENT_LIST_DIR}/../../linear_programming/solver") +endforeach() diff --git a/python/cuopt/cuopt/grpc/numerical/__init__.py b/python/cuopt/cuopt/grpc/numerical/__init__.py new file mode 100644 index 0000000000..696a41f13d --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/__init__.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from cuopt.grpc.numerical.grpc_client import ( + Client, + GrpcError, + JobNotReadyError, + JobStatus, +) + +__all__ = ["Client", "GrpcError", "JobNotReadyError", "JobStatus"] diff --git a/python/cuopt/cuopt/grpc/numerical/grpc_client.pxd b/python/cuopt/cuopt/grpc/numerical/grpc_client.pxd new file mode 100644 index 0000000000..ec4aaa037b --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/grpc_client.pxd @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from libc.stdint cimport int64_t +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string +from libcpp.vector cimport vector + +from cuopt.linear_programming.data_model.data_model cimport data_model_view_t +from cuopt.linear_programming.solver.solver cimport solver_ret_t +from cuopt.linear_programming.solver_settings.solver_settings cimport ( + solver_settings_t, +) + +cdef extern from "cuopt/grpc/cython_grpc_client.hpp" namespace "cuopt::cython": + ctypedef enum grpc_job_status_t "cuopt::cython::grpc_job_status_t": + QUEUED "cuopt::cython::grpc_job_status_t::QUEUED" + PROCESSING "cuopt::cython::grpc_job_status_t::PROCESSING" + COMPLETED "cuopt::cython::grpc_job_status_t::COMPLETED" + FAILED "cuopt::cython::grpc_job_status_t::FAILED" + CANCELLED "cuopt::cython::grpc_job_status_t::CANCELLED" + NOT_FOUND "cuopt::cython::grpc_job_status_t::NOT_FOUND" + + cdef cppclass grpc_submit_result_t: + bint success + string error_message + string job_id + bint is_mip + + cdef cppclass grpc_status_result_t: + bint success + string error_message + grpc_job_status_t status + string message + long long result_size_bytes + + cdef cppclass grpc_result_outcome_t: + bint not_ready + bint success + string error_message + unique_ptr[solver_ret_t] solution + + cdef cppclass grpc_logs_result_t: + bint success + string error_message + vector[string] lines + + cdef cppclass grpc_incumbent_entry_t: + int64_t index + double objective + vector[double] assignment + + cdef cppclass grpc_incumbents_result_t: + bint success + string error_message + vector[grpc_incumbent_entry_t] incumbents + int64_t next_index + bint job_complete + + ctypedef int (*grpc_log_line_callback_t)( + const char* line, size_t line_len, int job_complete, void* user_data + ) noexcept nogil + + cdef cppclass grpc_python_client_t: + grpc_python_client_t(const string& host, int port) except + + bint connect(string& error_out) + grpc_submit_result_t submit( + data_model_view_t[int, double]* data_model, + solver_settings_t[int, double]* settings, + bint enable_incumbents, + ) except + + grpc_status_result_t status(const string& job_id) except + + grpc_status_result_t wait(const string& job_id, int timeout_seconds) except + + bint cancel(const string& job_id, string& error_out) except + + bint delete_job(const string& job_id, string& error_out) except + + grpc_result_outcome_t result(const string& job_id, bint is_mip) except + + grpc_logs_result_t fetch_logs(const string& job_id, long long from_byte) except + + bint stream_logs( + const string& job_id, + long long from_byte, + grpc_log_line_callback_t callback, + void* user_data, + ) except + + grpc_incumbents_result_t fetch_incumbents( + const string& job_id, int64_t from_index, int max_count + ) except + + string last_error() diff --git a/python/cuopt/cuopt/grpc/numerical/grpc_client.pyx b/python/cuopt/cuopt/grpc/numerical/grpc_client.pyx new file mode 100644 index 0000000000..b4c2fd4ba4 --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/grpc_client.pyx @@ -0,0 +1,539 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True +# cython: language_level = 3 + +from cuopt.grpc.numerical.grpc_client cimport ( + grpc_incumbents_result_t, + grpc_job_status_t, + grpc_logs_result_t, + grpc_log_line_callback_t, + grpc_python_client_t, + grpc_result_outcome_t, + grpc_status_result_t, + grpc_submit_result_t, +) +from cuopt.linear_programming.data_model.data_model_wrapper cimport DataModel +from cuopt.linear_programming.solver.solver cimport solver_ret_t +from cuopt.linear_programming.solver.solver_wrapper cimport ( + build_solution_from_unique_ptr, +) +from cuopt.linear_programming.solver.solver_wrapper import ( + prepare_solver_settings, + type_cast, +) +from cuopt.linear_programming.solver_settings.solver_settings cimport ( + SolverSettings, +) + +from enum import IntEnum +import math +from libc.stdint cimport int64_t +from libc.stddef cimport size_t +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string +from libcpp.utility cimport move +import threading +import time + +import numpy as np + + +class JobStatus(IntEnum): + QUEUED = grpc_job_status_t.QUEUED + PROCESSING = grpc_job_status_t.PROCESSING + COMPLETED = grpc_job_status_t.COMPLETED + FAILED = grpc_job_status_t.FAILED + CANCELLED = grpc_job_status_t.CANCELLED + NOT_FOUND = grpc_job_status_t.NOT_FOUND + + +class GrpcError(RuntimeError): + pass + + +class JobNotReadyError(GrpcError): + pass + + +cdef int _invoke_log_callback( + const char* line, + size_t line_len, + int job_complete, + void* userdata, +) noexcept nogil: + with gil: + try: + callback = userdata + text = line[:line_len].decode("utf-8") if line_len > 0 else "" + # Only an explicit False stops the stream. print()/append() return + # None and must not be treated as a stop signal. + if _call_log_callback(callback, text, bool(job_complete)) is False: + return 0 + return 1 + except Exception as exc: + cb = userdata + state = getattr(cb, "state", None) + if state is not None: + state["error"] = exc + return 0 + + +def _call_log_callback(callback, line, job_complete): + """Invoke a log callback; accept both ``(line,)`` and ``(line, done)`` forms.""" + try: + return callback(line, job_complete) + except TypeError: + return callback(line) + + +class _LogStreamHandler: + """Bridge user callback with stream state for C log streaming.""" + + __slots__ = ("state", "callback") + + def __init__(self, state, callback): + self.state = state + self.callback = callback + + def __call__(self, line, job_complete): + self.state["lines"].append(line) + self.state["live_lines"] += 1 + try: + return _call_log_callback(self.callback, line, job_complete) + except Exception as exc: + self.state["error"] = exc + raise + + +def _call_incumbent_callback(callback, index, objective, assignment, job_complete): + try: + return callback(index, objective, assignment, job_complete) + except TypeError: + return callback(index, objective, assignment) + + +def _forward_incumbent_to_settings(settings, index, objective, assignment, job_complete): + from cuopt.linear_programming.internals import GetSolutionCallback + + if job_complete: + return True + for mip_callback in settings.get_mip_callbacks(): + if mip_callback is None: + continue + if isinstance(mip_callback, GetSolutionCallback): + solution = np.asarray(assignment, dtype=np.float64) + cost = np.array([objective], dtype=np.float64) + bound = np.array([math.nan], dtype=np.float64) + mip_callback.get_solution( + solution, cost, bound, mip_callback.user_data + ) + return True + + +cdef class Client: + cdef unique_ptr[grpc_python_client_t] _client + cdef dict _job_is_mip + cdef dict _log_threads + cdef dict _log_thread_errors + cdef dict _log_stream_state + cdef dict _incumbent_threads + cdef dict _incumbent_thread_errors + cdef str _host + cdef int _port + + def __init__(self, str host, int port): + cdef string host_cpp = host.encode("utf-8") + self._client.reset(new grpc_python_client_t(host_cpp, port)) + self._job_is_mip = {} + self._log_threads = {} + self._log_thread_errors = {} + self._log_stream_state = {} + self._incumbent_threads = {} + self._incumbent_thread_errors = {} + self._host = host + self._port = port + cdef string error_out + if not self._client.get().connect(error_out): + raise GrpcError(error_out.decode("utf-8")) + + def submit(self, problem, SolverSettings settings not None): + cdef DataModel data_model + cdef grpc_submit_result_t submit_result + cdef string job_id + cdef bint mip + + data_model = self._as_data_model(problem) + data_model.variable_types = type_cast( + data_model.variable_types, "S1", "variable_types" + ) + mip = _is_mip(data_model.get_variable_types()) + prepare_solver_settings(settings, data_model, mip) + data_model.set_data_model_view() + cdef bint enable_incumbents = False + if mip and settings.get_mip_callbacks(): + enable_incumbents = True + submit_result = self._client.get().submit( + data_model.c_data_model_view.get(), + settings.c_solver_settings.get(), + enable_incumbents, + ) + if not submit_result.success: + raise GrpcError(submit_result.error_message.decode("utf-8")) + job_id = submit_result.job_id + self._job_is_mip[job_id.decode("utf-8")] = bool(submit_result.is_mip) + return job_id.decode("utf-8") + + def status(self, str job_id): + cdef grpc_status_result_t status_result = self._client.get().status( + job_id.encode("utf-8") + ) + if not status_result.success: + raise GrpcError(status_result.error_message.decode("utf-8")) + return JobStatus(status_result.status) + + def wait(self, str job_id, timeout=None): + cdef int timeout_seconds = 0 if timeout is None else int(timeout) + cdef grpc_status_result_t wait_result = self._client.get().wait( + job_id.encode("utf-8"), timeout_seconds + ) + if not wait_result.success: + raise GrpcError(wait_result.error_message.decode("utf-8")) + return JobStatus(wait_result.status) + + def cancel(self, str job_id): + cdef string error_out + if not self._client.get().cancel(job_id.encode("utf-8"), error_out): + raise GrpcError(error_out.decode("utf-8")) + + def delete(self, str job_id): + if job_id in self._incumbent_threads: + self.join_incumbent_stream(job_id) + cdef string error_out + if not self._client.get().delete_job(job_id.encode("utf-8"), error_out): + raise GrpcError(error_out.decode("utf-8")) + self._job_is_mip.pop(job_id, None) + + def result(self, str job_id, variable_names=None, is_mip=None): + cdef grpc_result_outcome_t outcome + cdef bint fetch_mip + cdef unique_ptr[solver_ret_t] sol_ret + + if is_mip is not None: + fetch_mip = bool(is_mip) + else: + fetch_mip = self._job_is_mip.get(job_id, False) + + outcome = self._client.get().result(job_id.encode("utf-8"), fetch_mip) + if outcome.not_ready: + return None + if not outcome.success: + raise GrpcError(outcome.error_message.decode("utf-8")) + sol_ret = move(outcome.solution) + return build_solution_from_unique_ptr(move(sol_ret), variable_names) + + def logs(self, str job_id, from_byte=0): + """ + Return all solver log lines for a job that has finished. + + Raises :class:`JobNotReadyError` if the job is still queued or + running. For live output during the solve, use + :meth:`start_log_stream`. + """ + status = self.status(job_id) + if status in (JobStatus.QUEUED, JobStatus.PROCESSING): + raise JobNotReadyError( + f"job {job_id} is not complete ({status.name})" + ) + + cdef grpc_logs_result_t outcome = self._client.get().fetch_logs( + job_id.encode("utf-8"), from_byte + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "failed to fetch logs") + return [line.decode("utf-8") for line in outcome.lines] + + def start_log_stream(self, str job_id, callback=print, from_byte=0): + """ + Stream solver logs on a background thread until the job completes. + + ``callback`` is invoked as ``callback(line, job_complete)`` for each + line. Return ``False`` explicitly to stop early; other return values + (including ``None`` from ``print``) keep the stream open. + + Call :meth:`join_log_stream` before :meth:`delete` to ensure all log + lines were received. To collect lines in memory:: + + lines = [] + client.start_log_stream(job_id, lines.append) + """ + if job_id in self._log_threads: + raise GrpcError(f"log stream already running for job {job_id}") + + state = { + "lines": [], + "callback": callback, + "from_byte": from_byte, + "live_lines": 0, + "backfilled": False, + "error": None, + } + self._log_stream_state[job_id] = state + + handler = _LogStreamHandler(state, callback) + + # Use a dedicated connection so StreamLogs can run concurrently with + # status/result polling on this client. + log_client = Client(self._host, self._port) + thread = threading.Thread( + target=self._run_log_stream, + args=(log_client, job_id, handler, from_byte), + daemon=True, + ) + self._log_threads[job_id] = thread + thread.start() + return thread + + def join_log_stream(self, str job_id, timeout=None): + """Wait for a background log stream started by :meth:`start_log_stream`. + + Returns a dict with stream stats (``live_lines``, ``lines``, ``backfilled``) + when a stream was started for ``job_id``, else ``None``. + """ + thread = self._log_threads.get(job_id) + if thread is not None: + thread.join(timeout) + if thread.is_alive(): + exc = self._log_thread_errors.get(job_id) + if exc is not None: + raise exc + return self._log_stream_state.get(job_id) + self._log_threads.pop(job_id, None) + + exc = self._log_thread_errors.pop(job_id, None) + if exc is not None: + raise exc + + state = self._log_stream_state.pop(job_id, None) + if state is None: + return None + + if state.get("error") is not None: + raise state["error"] + + if state["live_lines"] == 0: + self._backfill_log_stream(job_id, state) + return state + + def _backfill_log_stream(self, str job_id, state): + """Fetch logs after live streaming missed output (status/file races).""" + cdef int attempt + for attempt in range(6): + try: + bulk = self.logs(job_id, state["from_byte"]) + except JobNotReadyError: + if attempt == 0: + self.wait(job_id, timeout=120) + continue + time.sleep(0.2) + continue + if bulk: + for line in bulk: + state["lines"].append(line) + _call_log_callback(state["callback"], line, True) + state["backfilled"] = True + return + if attempt < 5: + time.sleep(0.2) + + def _run_log_stream(self, log_client, str job_id, callback, from_byte=0): + try: + log_client._stream_logs(job_id, callback, from_byte) + except Exception as exc: + self._log_thread_errors[job_id] = exc + + def _stream_logs(self, str job_id, callback, from_byte=0): + cdef bint ok = self._client.get().stream_logs( + job_id.encode("utf-8"), + from_byte, + _invoke_log_callback, + callback, + ) + if not ok: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "log stream failed") + + def incumbents(self, str job_id, from_index=0): + """ + Return incumbent solutions collected so far (or all remaining). + + Works while the job is running or after it completes. Each entry is a + dict with ``index``, ``objective``, and ``assignment`` (list of floats). + """ + cdef grpc_incumbents_result_t outcome = self._client.get().fetch_incumbents( + job_id.encode("utf-8"), from_index, 0 + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "failed to fetch incumbents") + return [ + { + "index": entry.index, + "objective": entry.objective, + "assignment": [v for v in entry.assignment], + } + for entry in outcome.incumbents + ] + + def start_incumbent_stream( + self, + str job_id, + callback=None, + settings=None, + from_index=0, + poll_interval_ms=1000, + ): + """ + Poll for MIP incumbent solutions on a background thread until the job + completes. + + ``callback`` is invoked as ``callback(index, objective, assignment, + job_complete)``. Return ``False`` to cancel the job. ``assignment`` is + a list of variable values. + + Alternatively pass ``settings`` with :meth:`SolverSettings.set_mip_callback` + registered :class:`GetSolutionCallback` instances (same as local solve). + + Call :meth:`join_incumbent_stream` before :meth:`delete`. + """ + if job_id in self._incumbent_threads: + raise GrpcError(f"incumbent stream already running for job {job_id}") + if callback is None and settings is None: + raise GrpcError("callback or settings is required") + + def combined(index, objective, assignment, job_complete): + if settings is not None: + if _forward_incumbent_to_settings( + settings, index, objective, assignment, job_complete + ) is False: + return False + if callback is not None: + return _call_incumbent_callback( + callback, index, objective, assignment, job_complete + ) + return True + + incumbent_client = Client(self._host, self._port) + thread = threading.Thread( + target=self._run_incumbent_stream, + args=( + incumbent_client, + job_id, + combined, + from_index, + poll_interval_ms, + ), + daemon=True, + ) + self._incumbent_threads[job_id] = thread + thread.start() + return thread + + def join_incumbent_stream(self, str job_id, timeout=None): + """Wait for a background incumbent poll started by :meth:`start_incumbent_stream`.""" + thread = self._incumbent_threads.pop(job_id, None) + if thread is not None: + thread.join(timeout) + exc = self._incumbent_thread_errors.pop(job_id, None) + if exc is not None: + raise exc + + def _run_incumbent_stream( + self, + incumbent_client, + str job_id, + callback, + from_index, + poll_interval_ms, + ): + try: + incumbent_client._poll_incumbents( + job_id, callback, from_index, poll_interval_ms + ) + except Exception as exc: + self._incumbent_thread_errors[job_id] = exc + + def _poll_incumbents( + self, str job_id, callback, from_index=0, poll_interval_ms=1000 + ): + cdef grpc_incumbents_result_t outcome + cdef int64_t next_index = from_index + cdef bint job_complete = False + cdef double objective + cdef list assignment + cdef size_t i + poll_seconds = max(poll_interval_ms, 1) / 1000.0 + + while not job_complete: + outcome = self._client.get().fetch_incumbents( + job_id.encode("utf-8"), next_index, 0 + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "incumbent poll failed") + + for entry in outcome.incumbents: + assignment = [] + for i in range(entry.assignment.size()): + assignment.append(entry.assignment[i]) + if _call_incumbent_callback( + callback, entry.index, entry.objective, assignment, False + ) is False: + self.cancel(job_id) + return + + next_index = outcome.next_index + job_complete = outcome.job_complete + if job_complete: + _call_incumbent_callback(callback, 0, 0.0, [], True) + return + + time.sleep(poll_seconds) + + cdef DataModel _as_data_model(self, problem): + from cuopt.linear_programming.data_model import DataModel as PyDataModel + from cuopt.linear_programming.problem import Problem + + if isinstance(problem, PyDataModel): + return problem + if isinstance(problem, Problem): + if problem.model is None: + problem._to_data_model() + return problem.model + raise TypeError( + "submit() expects a Problem or DataModel, got " + f"{type(problem).__name__}" + ) + + +def _is_mip(var_types): + if len(var_types) == 0: + return False + if len(set(map(type, var_types))) == 1: + if isinstance(var_types[0], bytes): + return b"I" in var_types or b"S" in var_types + return "I" in var_types or "S" in var_types + return any( + vt == "I" or vt == b"I" or vt == "S" or vt == b"S" + for vt in var_types + ) diff --git a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd new file mode 100644 index 0000000000..703fe38298 --- /dev/null +++ b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from libcpp.memory cimport unique_ptr + +from cuopt.linear_programming.solver.solver cimport solver_ret_t +cdef object build_solution_from_unique_ptr( + unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names) diff --git a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx index 925dd8797b..fd67f8e89c 100644 --- a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx +++ b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx @@ -129,6 +129,21 @@ cdef object _vector_to_numpy(const vector[double]& vec): return np.asarray( data_ptr, dtype=np.float64).copy() +def _vars_dict(variable_names, primal_solution): + if len(primal_solution) == 0: + # Unbounded/infeasible solves may return no primal vector; match legacy + # dict(zip(names, primal)) behavior (empty dict, not a length error). + return {} + if variable_names is None or len(variable_names) == 0: + return {f"x{i}": v for i, v in enumerate(primal_solution)} + if len(variable_names) != len(primal_solution): + raise ValueError( + f"variable_names length ({len(variable_names)}) does not match " + f"solution size ({len(primal_solution)})" + ) + return dict(zip(variable_names, primal_solution)) + + def type_cast(cudf_obj, np_type, name): if isinstance(cudf_obj, cudf.Series): cudf_type = cudf_obj.dtype @@ -234,6 +249,14 @@ cdef set_solver_setting( cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, DataModel data_model_obj, is_batch=False): + return create_solution_with_names( + move(sol_ret_ptr), data_model_obj.get_variable_names(), is_batch + ) + + +cdef create_solution_with_names(unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names, + bint is_batch=False): from cuopt.linear_programming.solution.solution import Solution @@ -256,7 +279,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, return Solution( ProblemCategory(sol_ret.problem_type), - dict(zip(data_model_obj.get_variable_names(), solution)), + _vars_dict(variable_names, solution), mip_ptr.total_solve_time_, primal_solution=solution, termination_status=MILPTerminationStatus(mip_ptr.termination_status_), @@ -361,7 +384,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, if not is_batch: return Solution( ProblemCategory(sol_ret.problem_type), - dict(zip(data_model_obj.get_variable_names(), primal_solution)), + _vars_dict(variable_names, primal_solution), lp_ptr.solve_time_, primal_solution, dual_solution, @@ -397,7 +420,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, else: return Solution( problem_category=ProblemCategory(sol_ret.problem_type), - vars=dict(zip(data_model_obj.get_variable_names(), primal_solution)), + vars=_vars_dict(variable_names, primal_solution), solve_time=lp_ptr.solve_time_, primal_solution=primal_solution, dual_solution=dual_solution, @@ -415,6 +438,17 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, ) +cdef object build_solution_from_unique_ptr( + unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names): + return create_solution_with_names(move(sol_ret_ptr), variable_names, False) + + +def prepare_solver_settings(SolverSettings settings, data_model=None, mip=False): + """Populate C++ solver settings from Python state for the next solve/submit.""" + set_solver_setting(settings, data_model, mip) + + def Solve(py_data_model_obj, SolverSettings settings, mip=False): cdef DataModel data_model_obj = py_data_model_obj diff --git a/python/cuopt/cuopt/tests/fixtures/__init__.py b/python/cuopt/cuopt/tests/fixtures/__init__.py new file mode 100644 index 0000000000..d51c4fe1e0 --- /dev/null +++ b/python/cuopt/cuopt/tests/fixtures/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py b/python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py new file mode 100644 index 0000000000..4e325a682d --- /dev/null +++ b/python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py @@ -0,0 +1,204 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Shared cuopt_grpc_server helpers and pytest fixtures for LP tests. + +Registered via ``pytest_plugins`` in ``python/cuopt/conftest.py`` as +``cuopt.tests.fixtures.grpc_server_fixtures``. + +Class-scoped ``grpc_server`` starts one server per test class. Configure it on +the test class:: + + class TestMyGrpcFeature: + grpc_port_offset = GRPC_PORT_OFFSET_CLIENT + grpc_server_yield = "port" # or "env" (default) + + def test_foo(self, grpc_server): + ... + +Port offsets are added to ``CUOPT_TEST_PORT_BASE`` (default 18000) so parallel +test classes do not collide. +""" + +import os +import shutil +import signal +import socket +import subprocess +import time + +import pytest + +# Port offsets (added to CUOPT_TEST_PORT_BASE). Keep unique per test class. +GRPC_PORT_OFFSET_CPU_ONLY = 600 +GRPC_PORT_OFFSET_CLI = 700 +GRPC_PORT_OFFSET_CLIENT = 800 +GRPC_PORT_OFFSET_TLS = 850 +GRPC_PORT_OFFSET_MTLS = 900 + + +def find_grpc_server(): + """Locate cuopt_grpc_server binary.""" + env_path = os.environ.get("CUOPT_GRPC_SERVER_PATH") + if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK): + return env_path + + found = shutil.which("cuopt_grpc_server") + if found: + return found + + for candidate in [ + "./cuopt_grpc_server", + "../cpp/build/cuopt_grpc_server", + "../../cpp/build/cuopt_grpc_server", + ]: + if os.path.isfile(candidate) and os.access(candidate, os.X_OK): + return os.path.abspath(candidate) + + conda_prefix = os.environ.get("CONDA_PREFIX", "") + if conda_prefix: + p = os.path.join(conda_prefix, "bin", "cuopt_grpc_server") + if os.path.isfile(p) and os.access(p, os.X_OK): + return p + return None + + +def wait_for_port(port, timeout=15): + """Block until TCP port accepts connections or timeout expires.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=1): + return True + except OSError: + time.sleep(0.2) + return False + + +def wait_for_grpc_client(port, timeout=30): + """Block until cuopt.grpc.numerical.Client can connect (TCP up is not enough).""" + from cuopt.grpc.numerical import Client, GrpcError + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if not wait_for_port(port, timeout=1): + time.sleep(0.2) + continue + try: + try: + client = Client("localhost", port, tls=False) + except TypeError: + client = Client("localhost", port) + del client + return True + except GrpcError: + time.sleep(0.2) + return False + + +def client_remote_env(port): + """Env for a CPU-only client process talking to a remote gRPC server.""" + env = os.environ.copy() + for key in [k for k in env if k.startswith("CUOPT_TLS_")]: + env.pop(key) + env["CUDA_VISIBLE_DEVICES"] = "" + env["CUOPT_REMOTE_HOST"] = "localhost" + env["CUOPT_REMOTE_PORT"] = str(port) + return env + + +def server_env(): + """Env for ``cuopt_grpc_server`` — keep GPU access; drop client-only vars.""" + env = os.environ.copy() + for key in list(env): + if key.startswith("CUOPT_TLS_") or key.startswith("CUOPT_REMOTE_"): + env.pop(key) + return env + + +# Backward-compatible alias used by tests that yield client env dicts. +cpu_only_env = client_remote_env + + +def start_grpc_server(port_offset): + """Locate the server, start it on BASE + port_offset, return (proc, client_env).""" + server_bin = find_grpc_server() + if server_bin is None: + pytest.skip("cuopt_grpc_server not found") + + port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + port_offset + client_env = client_remote_env(port) + proc = subprocess.Popen( + [ + server_bin, + "--port", + str(port), + "--workers", + "1", + "--log-to-console", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=server_env(), + ) + time.sleep(0.5) + if proc.poll() is not None: + pytest.skip( + f"cuopt_grpc_server exited immediately (rc={proc.returncode}), " + "binary may be unable to load shared libraries in this environment" + ) + if not wait_for_grpc_client(port, timeout=30): + proc.kill() + proc.wait() + pytest.fail( + "cuopt_grpc_server TCP port opened but gRPC client could not connect " + "within 30s" + ) + + return proc, client_env + + +def stop_grpc_server(proc): + """Gracefully shut down a server process.""" + if proc.poll() is not None: + proc.wait() + return + + proc.send_signal(signal.SIGTERM) + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +@pytest.fixture(scope="class") +def grpc_server(request): + """Class-scoped server; see module docstring for configuration.""" + cls = request.cls + if cls is None: + pytest.fail("grpc_server requires a class-scoped test class") + + port_offset = getattr(cls, "grpc_port_offset", None) + if port_offset is None: + pytest.fail( + f"{cls.__name__} must set grpc_port_offset " + f"(e.g. GRPC_PORT_OFFSET_CLIENT)" + ) + + yield_kind = getattr(cls, "grpc_server_yield", "env") + if yield_kind not in ("env", "port"): + pytest.fail( + f"{cls.__name__}.grpc_server_yield must be 'env' or 'port', " + f"got {yield_kind!r}" + ) + + proc, client_env = start_grpc_server(port_offset) + try: + if yield_kind == "port": + yield int(client_env["CUOPT_REMOTE_PORT"]) + else: + yield client_env + finally: + stop_grpc_server(proc) diff --git a/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py b/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py index 4453d38bcc..8418563f47 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py @@ -722,7 +722,7 @@ def tls_env_with_server(self, tmp_path_factory): if server_bin is None: pytest.skip("cuopt_grpc_server not found") - port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + 800 + port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + 850 proc = subprocess.Popen( [ server_bin, diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py new file mode 100644 index 0000000000..5b0168ccbe --- /dev/null +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -0,0 +1,258 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import importlib.util +import os +import time + +import cuopt.grpc as grpc_pkg +import pytest + +from cuopt.grpc.numerical import Client, GrpcError, JobNotReadyError, JobStatus +from cuopt.linear_programming import Read, SolverSettings +from cuopt.linear_programming.internals import GetSolutionCallback +from cuopt.linear_programming.problem import INTEGER, MAXIMIZE, Problem +from cuopt.linear_programming.solver.solver_parameters import CUOPT_TIME_LIMIT + +from cuopt.tests.fixtures.grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT + +RAPIDS_DATASET_ROOT_DIR = os.getenv("RAPIDS_DATASET_ROOT_DIR") +if RAPIDS_DATASET_ROOT_DIR is None: + RAPIDS_DATASET_ROOT_DIR = os.getcwd() + RAPIDS_DATASET_ROOT_DIR = os.path.join(RAPIDS_DATASET_ROOT_DIR, "datasets") + +_SWATH1_MPS = os.path.join(RAPIDS_DATASET_ROOT_DIR, "mip", "swath1.mps") + +_DEMO_LP_NAMES = ["x", "y"] +_MIP_NAMES = ["x", "y"] + + +def _demo_lp_problem(): + problem = Problem("grpc_demo") + x = problem.addVariable(lb=0.0, ub=2.0, name="x") + y = problem.addVariable(lb=0.0, name="y") + problem.addConstraint(3 * x + 4 * y <= 5.4, name="c1") + problem.addConstraint(2.7 * x + 10.1 * y <= 4.9, name="c2") + problem.setObjective(0.2 * x + 0.1 * y, sense=MAXIMIZE) + return problem + + +def _poll_until_complete( + client, job_id, names, timeout=120, poll_interval=0.05 +): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + status = client.status(job_id) + if status not in (JobStatus.QUEUED, JobStatus.PROCESSING): + return status + if client.result(job_id, names) is not None: + return JobStatus.COMPLETED + time.sleep(poll_interval) + return client.status(job_id) + + +def _infeasible_lp_problem(): + problem = Problem("grpc_infeasible") + x = problem.addVariable(lb=0.0, name="x") + problem.addConstraint(x >= 5, name="c1") + problem.addConstraint(x <= 1, name="c2") + problem.setObjective(x, sense=MAXIMIZE) + return problem + + +def test_grpc_package_is_namespace_only(): + """cuopt.grpc is a namespace; domain clients live in subpackages.""" + assert "Client" not in grpc_pkg.__dict__ + assert importlib.util.find_spec("cuopt.grpc.numerical") is not None + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +class TestGrpcClient: + grpc_port_offset = GRPC_PORT_OFFSET_CLIENT + grpc_server_yield = "port" + + def test_submit_status_result_delete(self, grpc_server): + problem = _demo_lp_problem() + settings = SolverSettings() + client = Client("localhost", grpc_server) + + job_id = client.submit(problem, settings) + assert job_id + + assert client.result(job_id, _DEMO_LP_NAMES) is None + assert client.status(job_id) in ( + JobStatus.QUEUED, + JobStatus.PROCESSING, + JobStatus.COMPLETED, + ) + + terminal = client.wait(job_id, timeout=120) + assert terminal == JobStatus.COMPLETED + + solution = client.result(job_id, _DEMO_LP_NAMES) + assert solution is not None + assert solution.get_primal_objective() == pytest.approx(0.36, rel=1e-3) + vars_ = solution.get_vars() + assert vars_["x"] == pytest.approx(1.8, rel=1e-3) + assert vars_["y"] == pytest.approx(0.0, rel=1e-3) + + client.delete(job_id) + + def test_submit_with_log_stream(self, grpc_server): + problem = _demo_lp_problem() + settings = SolverSettings() + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, settings) + + received = [] + client.start_log_stream(job_id, callback=received.append) + + terminal = _poll_until_complete(client, job_id, _DEMO_LP_NAMES) + assert terminal == JobStatus.COMPLETED + state = client.join_log_stream(job_id) + assert state is not None + assert state["live_lines"] > 0, ( + "Log streaming failed; only backfill worked" + ) + + solution = client.result(job_id, _DEMO_LP_NAMES) + assert solution is not None + + bulk_logs = client.logs(job_id) + assert bulk_logs + assert received + assert len(received) == len(bulk_logs) + + client.delete(job_id) + + def test_logs_not_ready(self, grpc_server): + problem = _demo_lp_problem() + settings = SolverSettings() + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, settings) + + with pytest.raises(JobNotReadyError): + client.logs(job_id) + + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + assert client.logs(job_id) + + client.delete(job_id) + + def test_mip_submit_and_result(self, grpc_server): + problem = Problem("grpc_mip") + x = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="x") + y = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="y") + problem.addConstraint(x + y <= 10, name="c1") + problem.addConstraint(x - y >= 0, name="c2") + problem.setObjective(x + 2 * y, sense=MAXIMIZE) + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, SolverSettings()) + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + + solution = client.result(job_id, _MIP_NAMES) + assert solution is not None + assert solution.get_primal_objective() == pytest.approx(15.0, rel=1e-3) + client.delete(job_id) + + def test_invalid_job_id(self, grpc_server): + client = Client("localhost", grpc_server) + assert ( + client.status("00000000-0000-0000-0000-000000000000") + == JobStatus.NOT_FOUND + ) + with pytest.raises(GrpcError): + client.result("00000000-0000-0000-0000-000000000000") + with pytest.raises(GrpcError): + client.delete("00000000-0000-0000-0000-000000000000") + + def test_result_after_delete(self, grpc_server): + problem = _demo_lp_problem() + client = Client("localhost", grpc_server) + job_id = client.submit(problem, SolverSettings()) + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + client.delete(job_id) + with pytest.raises(GrpcError): + client.result(job_id, _DEMO_LP_NAMES) + + def test_infeasible_lp_result(self, grpc_server): + client = Client("localhost", grpc_server) + job_id = client.submit(_infeasible_lp_problem(), SolverSettings()) + terminal = client.wait(job_id, timeout=120) + if terminal != JobStatus.FAILED: + client.delete(job_id) + pytest.skip( + f"expected FAILED for infeasible LP, got {terminal.name}" + ) + with pytest.raises(GrpcError): + client.result(job_id, ["x"]) + client.delete(job_id) + + def test_cancel_job(self, grpc_server): + if not os.path.isfile(_SWATH1_MPS): + pytest.skip(f"dataset not found: {_SWATH1_MPS}") + + problem = Read(_SWATH1_MPS) + settings = SolverSettings() + settings.set_parameter(CUOPT_TIME_LIMIT, 10) + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, settings) + + status = client.status(job_id) + if status not in (JobStatus.QUEUED, JobStatus.PROCESSING): + client.delete(job_id) + pytest.skip("Job completed before cancellation could be observed") + + client.cancel(job_id) + assert client.wait(job_id, timeout=30) == JobStatus.CANCELLED + with pytest.raises(GrpcError): + client.result(job_id) + client.delete(job_id) + + def test_mip_incumbent_stream(self, grpc_server): + class IncumbentCollector(GetSolutionCallback): + def __init__(self): + super().__init__() + self.entries = [] + + def get_solution( + self, solution, solution_cost, solution_bound, user_data + ): + self.entries.append( + { + "solution": solution.tolist(), + "cost": float(solution_cost[0]), + } + ) + + problem = Problem("grpc_mip_incumbent") + x = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="x") + y = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="y") + problem.addConstraint(x + y <= 10, name="c1") + problem.addConstraint(x - y >= 0, name="c2") + problem.setObjective(x + 2 * y, sense=MAXIMIZE) + + collector = IncumbentCollector() + settings = SolverSettings() + settings.set_mip_callback(collector, None) + settings.set_parameter("time_limit", 30) + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, settings) + client.start_incumbent_stream(job_id, settings=settings) + + terminal = _poll_until_complete(client, job_id, _MIP_NAMES) + assert terminal == JobStatus.COMPLETED + client.join_incumbent_stream(job_id) + + assert collector.entries + bulk = client.incumbents(job_id) + assert bulk + + solution = client.result(job_id, _MIP_NAMES) + assert solution is not None + client.delete(job_id) diff --git a/python/cuopt/pyproject.toml b/python/cuopt/pyproject.toml index f7493975a7..68216301d9 100644 --- a/python/cuopt/pyproject.toml +++ b/python/cuopt/pyproject.toml @@ -80,6 +80,9 @@ max_allowed_size_compressed = '55Mi' [tool.pytest.ini_options] testpaths = ["cuopt/tests"] +# Avoid prepending the project root onto sys.path, which would shadow the +# installed cuopt wheel (Cython extensions live in site-packages, not source). +addopts = "--import-mode=importlib" [tool.scikit-build] build-dir = "build/{wheel_tag}"