From 22d7297ec79215831e07313a26399a0405c87e53 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 10:34:34 -0500 Subject: [PATCH 01/10] add a Python interface to the C++ grpc asyc client This lets Python use an async grpc client with Problem type in additon to existing blocking Problem.solve() with remote execution env vars set (zero-code-change remote solve) --- cpp/CMakeLists.txt | 2 + cpp/include/cuopt/grpc/cython_grpc_client.hpp | 152 ++++++ cpp/include/cuopt/grpc/grpc_client_env.hpp | 22 + cpp/src/grpc/client/cython_grpc_client.cpp | 257 +++++++++ cpp/src/grpc/client/grpc_client_env.cpp | 87 +++ cpp/src/grpc/client/solve_remote.cpp | 69 +-- python/cuopt/cuopt/CMakeLists.txt | 1 + python/cuopt/cuopt/__init__.py | 2 +- python/cuopt/cuopt/grpc/CMakeLists.txt | 20 + python/cuopt/cuopt/grpc/__init__.py | 6 + python/cuopt/cuopt/grpc/grpc_client.pxd | 87 +++ python/cuopt/cuopt/grpc/grpc_client.pyx | 505 ++++++++++++++++++ .../solver/solver_wrapper.pxd | 9 + .../solver/solver_wrapper.pyx | 40 +- .../tests/linear_programming/conftest.py | 5 + .../grpc_server_fixtures.py | 146 +++++ .../linear_programming/test_grpc_client.py | 168 ++++++ 17 files changed, 1508 insertions(+), 70 deletions(-) create mode 100644 cpp/include/cuopt/grpc/cython_grpc_client.hpp create mode 100644 cpp/include/cuopt/grpc/grpc_client_env.hpp create mode 100644 cpp/src/grpc/client/cython_grpc_client.cpp create mode 100644 cpp/src/grpc/client/grpc_client_env.cpp create mode 100644 python/cuopt/cuopt/grpc/CMakeLists.txt create mode 100644 python/cuopt/cuopt/grpc/__init__.py create mode 100644 python/cuopt/cuopt/grpc/grpc_client.pxd create mode 100644 python/cuopt/cuopt/grpc/grpc_client.pyx create mode 100644 python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd create mode 100644 python/cuopt/cuopt/tests/linear_programming/conftest.py create mode 100644 python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py create mode 100644 python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7e2dd099c1..e4296b683c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -461,6 +461,8 @@ if (NOT SKIP_GRPC_BUILD) src/grpc/grpc_settings_mapper.cpp src/grpc/grpc_service_mapper.cpp src/grpc/client/grpc_client.cpp + src/grpc/client/grpc_client_env.cpp + src/grpc/client/cython_grpc_client.cpp src/grpc/client/solve_remote.cpp ) list(APPEND CUOPT_SRC_FILES ${GRPC_INFRA_FILES}) diff --git a/cpp/include/cuopt/grpc/cython_grpc_client.hpp b/cpp/include/cuopt/grpc/cython_grpc_client.hpp new file mode 100644 index 0000000000..96f49f4879 --- /dev/null +++ b/cpp/include/cuopt/grpc/cython_grpc_client.hpp @@ -0,0 +1,152 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights + * reserved. SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace cuopt::linear_programming { +template +class solver_settings_t; +namespace io { +template +class data_model_view_t; +} // namespace io +} // namespace cuopt::linear_programming + +namespace cuopt::cython { + +/** Mirrors cuopt::linear_programming::job_status_t for the Python bindings. */ +enum class grpc_job_status_t : int { + QUEUED = 0, + PROCESSING = 1, + COMPLETED = 2, + FAILED = 3, + CANCELLED = 4, + NOT_FOUND = 5, +}; + +struct grpc_submit_result_t { + bool success = false; + std::string error_message; + std::string job_id; + bool is_mip = false; +}; + +struct grpc_status_result_t { + bool success = false; + std::string error_message; + grpc_job_status_t status = grpc_job_status_t::NOT_FOUND; + std::string message; + int64_t result_size_bytes = 0; +}; + +struct grpc_result_outcome_t { + bool not_ready = false; + bool success = false; + std::string error_message; + std::unique_ptr solution; +}; + +struct grpc_logs_result_t { + bool success = false; + std::string error_message; + std::vector lines; +}; + +struct grpc_incumbent_entry_t { + int64_t index = 0; + double objective = 0.0; + std::vector assignment; +}; + +struct grpc_incumbents_result_t { + bool success = false; + std::string error_message; + std::vector incumbents; + int64_t next_index = 0; + bool job_complete = false; +}; + +/** + * C callback for streaming log lines from Cython (one line at a time). + * Uses C-compatible ``int`` fields so Cython ``bint`` function pointers match. + */ +using grpc_log_line_callback_t = int (*)(const char* line, + size_t line_len, + int job_complete, + void* user_data); + +/** + * @brief Owning wrapper around grpc_client_t for Cython. + */ +class grpc_python_client_t { + public: + grpc_python_client_t(const std::string& host, int port); + ~grpc_python_client_t(); + + grpc_python_client_t(const grpc_python_client_t&) = delete; + grpc_python_client_t& operator=(const grpc_python_client_t&) = delete; + + bool connect(std::string& error_out); + + grpc_submit_result_t submit( + cuopt::linear_programming::io::data_model_view_t* data_model, + cuopt::linear_programming::solver_settings_t* settings, + bool enable_incumbents = false); + + grpc_status_result_t status(const std::string& job_id); + + /** + * @param timeout_seconds 0 = block on WaitForCompletion; >0 = poll with timeout. + */ + grpc_status_result_t wait(const std::string& job_id, int timeout_seconds); + + bool cancel(const std::string& job_id, std::string& error_out); + + bool delete_job(const std::string& job_id, std::string& error_out); + + /** + * @param is_mip When true, fetch a MIP result; otherwise LP. + */ + grpc_result_outcome_t result(const std::string& job_id, bool is_mip); + + /** + * @brief Block until the job completes, collecting all solver log lines. + */ + grpc_logs_result_t fetch_logs(const std::string& job_id, int64_t from_byte = 0); + + /** + * @brief Stream solver log lines until the job completes. + * + * Invokes @p callback for each line. Return false from the callback to stop + * early. Blocks the calling thread for the lifetime of the stream. + */ + bool stream_logs(const std::string& job_id, + int64_t from_byte, + grpc_log_line_callback_t callback, + void* user_data); + + /** + * @brief Poll GetIncumbents until the job completes. + */ + grpc_incumbents_result_t fetch_incumbents(const std::string& job_id, + int64_t from_index = 0, + int32_t max_count = 0); + + std::string last_error() const; + + private: + struct impl_t; + std::unique_ptr impl_; +}; + +} // namespace cuopt::cython diff --git a/cpp/include/cuopt/grpc/grpc_client_env.hpp b/cpp/include/cuopt/grpc/grpc_client_env.hpp new file mode 100644 index 0000000000..16a93225b7 --- /dev/null +++ b/cpp/include/cuopt/grpc/grpc_client_env.hpp @@ -0,0 +1,22 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights + * reserved. SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "grpc_client.hpp" + +namespace cuopt::linear_programming { + +/** + * @brief Apply CUOPT_GRPC_* / CUOPT_TLS_* / CUOPT_CHUNK_SIZE env overrides. + */ +void apply_grpc_client_env_overrides(grpc_client_config_t& config); + +/** + * @brief Build grpc_client_config_t from host, port, and environment overrides. + */ +grpc_client_config_t make_grpc_client_config(const std::string& host, int port); + +} // namespace cuopt::linear_programming diff --git a/cpp/src/grpc/client/cython_grpc_client.cpp b/cpp/src/grpc/client/cython_grpc_client.cpp new file mode 100644 index 0000000000..afca1ae222 --- /dev/null +++ b/cpp/src/grpc/client/cython_grpc_client.cpp @@ -0,0 +1,257 @@ +/* clang-format off */ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +/* clang-format on */ + +#include +#include + +#include +#include +#include +#include + +#include "grpc_client.hpp" + +#include +#include +#include + +namespace cuopt::cython { + +namespace { + +grpc_job_status_t map_job_status(cuopt::linear_programming::job_status_t status) +{ + using js = cuopt::linear_programming::job_status_t; + switch (status) { + case js::QUEUED: return grpc_job_status_t::QUEUED; + case js::PROCESSING: return grpc_job_status_t::PROCESSING; + case js::COMPLETED: return grpc_job_status_t::COMPLETED; + case js::FAILED: return grpc_job_status_t::FAILED; + case js::CANCELLED: return grpc_job_status_t::CANCELLED; + case js::NOT_FOUND: return grpc_job_status_t::NOT_FOUND; + default: return grpc_job_status_t::NOT_FOUND; + } +} + +grpc_status_result_t map_status_result(const cuopt::linear_programming::job_status_result_t& in) +{ + grpc_status_result_t out; + out.success = in.success; + out.error_message = in.error_message; + out.status = map_job_status(in.status); + out.message = in.message; + out.result_size_bytes = in.result_size_bytes; + return out; +} + +bool is_in_flight(grpc_job_status_t status) +{ + return status == grpc_job_status_t::QUEUED || status == grpc_job_status_t::PROCESSING; +} + +} // namespace + +struct grpc_python_client_t::impl_t { + cuopt::linear_programming::grpc_client_t client; + explicit impl_t(cuopt::linear_programming::grpc_client_config_t config) + : client(std::move(config)) + { + } +}; + +grpc_python_client_t::grpc_python_client_t(const std::string& host, int port) + : impl_(std::make_unique(cuopt::linear_programming::make_grpc_client_config(host, port))) +{ +} + +grpc_python_client_t::~grpc_python_client_t() = default; + +bool grpc_python_client_t::connect(std::string& error_out) +{ + if (!impl_->client.connect()) { + error_out = impl_->client.get_last_error(); + return false; + } + return true; +} + +grpc_submit_result_t grpc_python_client_t::submit( + cuopt::linear_programming::io::data_model_view_t* data_model, + cuopt::linear_programming::solver_settings_t* settings, + bool enable_incumbents) +{ + grpc_submit_result_t out; + if (data_model == nullptr || settings == nullptr) { + out.error_message = "data_model and settings must not be null"; + return out; + } + + cuopt::linear_programming::cpu_optimization_problem_t cpu_problem; + cuopt::linear_programming::populate_from_data_model_view( + &cpu_problem, data_model, settings, nullptr); + + const bool is_mip = + cpu_problem.get_problem_category() == cuopt::linear_programming::problem_category_t::MIP || + cpu_problem.get_problem_category() == cuopt::linear_programming::problem_category_t::IP; + + if (is_mip) { + auto sub = + impl_->client.submit_mip(cpu_problem, settings->get_mip_settings(), enable_incumbents); + out.success = sub.success; + out.error_message = sub.error_message; + out.job_id = sub.job_id; + out.is_mip = true; + } else { + auto sub = impl_->client.submit_lp(cpu_problem, settings->get_pdlp_settings()); + out.success = sub.success; + out.error_message = sub.error_message; + out.job_id = sub.job_id; + out.is_mip = false; + } + + return out; +} + +grpc_status_result_t grpc_python_client_t::status(const std::string& job_id) +{ + return map_status_result(impl_->client.check_status(job_id)); +} + +grpc_status_result_t grpc_python_client_t::wait(const std::string& job_id, int timeout_seconds) +{ + if (timeout_seconds <= 0) { return map_status_result(impl_->client.wait_for_completion(job_id)); } + + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); + const int poll_ms = std::max(impl_->client.is_connected() ? 1000 : 1000, 1); + + while (std::chrono::steady_clock::now() < deadline) { + auto st = status(job_id); + if (!st.success) { return st; } + if (!is_in_flight(st.status)) { return st; } + std::this_thread::sleep_for(std::chrono::milliseconds(poll_ms)); + } + + grpc_status_result_t out; + out.success = false; + out.error_message = "Timeout waiting for job completion"; + out.status = status(job_id).status; + return out; +} + +bool grpc_python_client_t::cancel(const std::string& job_id, std::string& error_out) +{ + auto result = impl_->client.cancel_job(job_id); + if (!result.success) { + error_out = result.error_message.empty() ? result.message : result.error_message; + return false; + } + return true; +} + +bool grpc_python_client_t::delete_job(const std::string& job_id, std::string& error_out) +{ + if (!impl_->client.delete_job(job_id)) { + error_out = impl_->client.get_last_error(); + return false; + } + return true; +} + +grpc_result_outcome_t grpc_python_client_t::result(const std::string& job_id, bool is_mip) +{ + grpc_result_outcome_t out; + + auto st = status(job_id); + if (!st.success) { + out.error_message = st.error_message; + return out; + } + if (is_in_flight(st.status)) { + out.not_ready = true; + return out; + } + if (st.status != grpc_job_status_t::COMPLETED) { + out.error_message = st.message.empty() ? std::string("Job is not completed") : st.message; + return out; + } + + out.solution = std::make_unique(); + + if (is_mip) { + auto remote = impl_->client.get_mip_result(job_id); + if (!remote.success) { + out.error_message = remote.error_message; + out.solution.reset(); + return out; + } + out.solution->problem_type = cuopt::linear_programming::problem_category_t::MIP; + out.solution->mip_ret = remote.solution->to_cpu_mip_ret_t(); + } else { + auto remote = impl_->client.get_lp_result(job_id); + if (!remote.success) { + out.error_message = remote.error_message; + out.solution.reset(); + return out; + } + out.solution->problem_type = cuopt::linear_programming::problem_category_t::LP; + out.solution->lp_ret = remote.solution->to_cpu_linear_programming_ret_t(); + } + + out.success = true; + return out; +} + +grpc_logs_result_t grpc_python_client_t::fetch_logs(const std::string& job_id, int64_t from_byte) +{ + grpc_logs_result_t out; + out.success = impl_->client.stream_logs( + job_id, from_byte, [&out](const std::string& line, bool /*job_complete*/) -> bool { + if (!line.empty()) { out.lines.push_back(line); } + return true; + }); + if (!out.success) { out.error_message = impl_->client.get_last_error(); } + return out; +} + +bool grpc_python_client_t::stream_logs(const std::string& job_id, + int64_t from_byte, + grpc_log_line_callback_t callback, + void* user_data) +{ + if (callback == nullptr) { return false; } + return impl_->client.stream_logs( + job_id, from_byte, [callback, user_data](const std::string& line, bool job_complete) -> bool { + return callback(line.data(), line.size(), job_complete ? 1 : 0, user_data) != 0; + }); +} + +grpc_incumbents_result_t grpc_python_client_t::fetch_incumbents(const std::string& job_id, + int64_t from_index, + int32_t max_count) +{ + grpc_incumbents_result_t out; + auto result = impl_->client.get_incumbents(job_id, from_index, max_count); + if (!result.success) { + out.error_message = result.error_message; + return out; + } + out.success = true; + out.next_index = result.next_index; + out.job_complete = result.job_complete; + for (const auto& inc : result.incumbents) { + grpc_incumbent_entry_t entry; + entry.index = inc.index; + entry.objective = inc.objective; + entry.assignment = inc.assignment; + out.incumbents.push_back(std::move(entry)); + } + return out; +} + +std::string grpc_python_client_t::last_error() const { return impl_->client.get_last_error(); } + +} // namespace cuopt::cython diff --git a/cpp/src/grpc/client/grpc_client_env.cpp b/cpp/src/grpc/client/grpc_client_env.cpp new file mode 100644 index 0000000000..668837f968 --- /dev/null +++ b/cpp/src/grpc/client/grpc_client_env.cpp @@ -0,0 +1,87 @@ +/* clang-format off */ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +/* clang-format on */ + +#include + +#include "grpc_client.hpp" + +#include +#include +#include +#include + +namespace cuopt::linear_programming { + +namespace { + +int64_t parse_env_int64(const char* name, int64_t default_value) +{ + const char* val = std::getenv(name); + if (val == nullptr) return default_value; + try { + return std::stoll(val); + } catch (...) { + return default_value; + } +} + +std::string read_pem_file(const char* path) +{ + std::ifstream in(path, std::ios::binary); + if (!in.is_open()) { throw std::runtime_error(std::string("Cannot open TLS file: ") + path); } + std::ostringstream ss; + ss << in.rdbuf(); + return ss.str(); +} + +const char* get_env(const char* name) +{ + const char* v = std::getenv(name); + return (v && v[0] != '\0') ? v : nullptr; +} + +} // namespace + +void apply_grpc_client_env_overrides(grpc_client_config_t& config) +{ + constexpr int64_t kMinChunkSize = 4096; + constexpr int64_t kMaxChunkSize = 2LL * 1024 * 1024 * 1024; + constexpr int64_t kMinMessageSize = 4096; + constexpr int64_t kMaxMessageSize = 2LL * 1024 * 1024 * 1024; + + auto chunk = parse_env_int64("CUOPT_CHUNK_SIZE", config.chunk_size_bytes); + if (chunk >= kMinChunkSize && chunk <= kMaxChunkSize) { config.chunk_size_bytes = chunk; } + + auto msg = parse_env_int64("CUOPT_MAX_MESSAGE_BYTES", config.max_message_bytes); + if (msg >= kMinMessageSize && msg <= kMaxMessageSize) { config.max_message_bytes = msg; } + + config.enable_debug_log = (parse_env_int64("CUOPT_GRPC_DEBUG", 0) != 0); + + if (parse_env_int64("CUOPT_TLS_ENABLED", 0) != 0) { + config.enable_tls = true; + + const char* root_cert = get_env("CUOPT_TLS_ROOT_CERT"); + if (root_cert) { config.tls_root_certs = read_pem_file(root_cert); } + + const char* client_cert = get_env("CUOPT_TLS_CLIENT_CERT"); + const char* client_key = get_env("CUOPT_TLS_CLIENT_KEY"); + if (client_cert && client_key) { + config.tls_client_cert = read_pem_file(client_cert); + config.tls_client_key = read_pem_file(client_key); + } + } +} + +grpc_client_config_t make_grpc_client_config(const std::string& host, int port) +{ + grpc_client_config_t config; + config.server_address = host + ":" + std::to_string(port); + apply_grpc_client_env_overrides(config); + return config; +} + +} // namespace cuopt::linear_programming diff --git a/cpp/src/grpc/client/solve_remote.cpp b/cpp/src/grpc/client/solve_remote.cpp index fb39a6d184..68c2cc5382 100644 --- a/cpp/src/grpc/client/solve_remote.cpp +++ b/cpp/src/grpc/client/solve_remote.cpp @@ -5,6 +5,7 @@ */ /* clang-format on */ +#include #include #include #include @@ -14,7 +15,6 @@ #include #include -#include #include #include #include @@ -45,17 +45,6 @@ static std::string get_grpc_server_address() return std::string(host) + ":" + std::string(port); } -static int64_t parse_env_int64(const char* name, int64_t default_value) -{ - const char* val = std::getenv(name); - if (val == nullptr) return default_value; - try { - return std::stoll(val); - } catch (...) { - return default_value; - } -} - // Derive client-side polling timeout from the solver's time_limit. // Returns 0 (no limit) when the solver has no finite time_limit. template @@ -67,58 +56,6 @@ static int solver_timeout_seconds(f_t time_limit) return static_cast(std::ceil(secs)); } -static std::string read_pem_file(const char* path) -{ - std::ifstream in(path, std::ios::binary); - if (!in.is_open()) { throw std::runtime_error(std::string("Cannot open TLS file: ") + path); } - std::ostringstream ss; - ss << in.rdbuf(); - return ss.str(); -} - -static const char* get_env(const char* name) -{ - const char* v = std::getenv(name); - return (v && v[0] != '\0') ? v : nullptr; -} - -// Apply env-var overrides for transfer, debug, and TLS configuration. -static void apply_env_overrides(grpc_client_config_t& config) -{ - constexpr int64_t kMinChunkSize = 4096; - constexpr int64_t kMaxChunkSize = 2LL * 1024 * 1024 * 1024; // 2 GiB - constexpr int64_t kMinMessageSize = 4096; - constexpr int64_t kMaxMessageSize = 2LL * 1024 * 1024 * 1024; - - auto chunk = parse_env_int64("CUOPT_CHUNK_SIZE", config.chunk_size_bytes); - if (chunk >= kMinChunkSize && chunk <= kMaxChunkSize) { config.chunk_size_bytes = chunk; } - - auto msg = parse_env_int64("CUOPT_MAX_MESSAGE_BYTES", config.max_message_bytes); - if (msg >= kMinMessageSize && msg <= kMaxMessageSize) { config.max_message_bytes = msg; } - - config.enable_debug_log = (parse_env_int64("CUOPT_GRPC_DEBUG", 0) != 0); - - // TLS configuration from environment variables - if (parse_env_int64("CUOPT_TLS_ENABLED", 0) != 0) { - config.enable_tls = true; - - const char* root_cert = get_env("CUOPT_TLS_ROOT_CERT"); - if (root_cert) { config.tls_root_certs = read_pem_file(root_cert); } - - const char* client_cert = get_env("CUOPT_TLS_CLIENT_CERT"); - const char* client_key = get_env("CUOPT_TLS_CLIENT_KEY"); - if (client_cert && client_key) { - config.tls_client_cert = read_pem_file(client_cert); - config.tls_client_key = read_pem_file(client_key); - } - } - - CUOPT_LOG_DEBUG("gRPC client config: chunk_size=%lld max_message=%lld tls=%s", - static_cast(config.chunk_size_bytes), - static_cast(config.max_message_bytes), - config.enable_tls ? "on" : "off"); -} - // ============================================================================ // Remote execution via gRPC // ============================================================================ @@ -136,7 +73,7 @@ std::unique_ptr> solve_lp_remote( grpc_client_config_t config; config.server_address = get_grpc_server_address(); config.timeout_seconds = solver_timeout_seconds(settings.time_limit); - apply_env_overrides(config); + apply_grpc_client_env_overrides(config); // Stream the server's solver log to the client. The server already // filters by the requested log level, so we just pass lines through to @@ -191,7 +128,7 @@ std::unique_ptr> solve_mip_remote( grpc_client_config_t config; config.server_address = get_grpc_server_address(); config.timeout_seconds = solver_timeout_seconds(settings.time_limit); - apply_env_overrides(config); + apply_grpc_client_env_overrides(config); // Stream server log — same passthrough logic as the LP callback above. std::unique_ptr log_file_stream; diff --git a/python/cuopt/cuopt/CMakeLists.txt b/python/cuopt/cuopt/CMakeLists.txt index 8d3dc46a42..0c12cfb04d 100644 --- a/python/cuopt/cuopt/CMakeLists.txt +++ b/python/cuopt/cuopt/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(linear_programming/data_model) add_subdirectory(linear_programming/io) add_subdirectory(linear_programming/solver) add_subdirectory(linear_programming/solver_settings) +add_subdirectory(grpc) add_subdirectory(routing) diff --git a/python/cuopt/cuopt/__init__.py b/python/cuopt/cuopt/__init__.py index 08f36edcb9..ecb25a9185 100644 --- a/python/cuopt/cuopt/__init__.py +++ b/python/cuopt/cuopt/__init__.py @@ -13,7 +13,7 @@ # Lazy imports for linear_programming, routing, and distance_engine modules # This allows cuopt to be imported on CPU-only hosts when remote solve is configured -_submodules = ["linear_programming", "routing", "distance_engine"] +_submodules = ["linear_programming", "routing", "distance_engine", "grpc"] def __getattr__(name): diff --git a/python/cuopt/cuopt/grpc/CMakeLists.txt b/python/cuopt/cuopt/grpc/CMakeLists.txt new file mode 100644 index 0000000000..ac5755d7ea --- /dev/null +++ b/python/cuopt/cuopt/grpc/CMakeLists.txt @@ -0,0 +1,20 @@ +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on + +set(cython_sources grpc_client.pyx) +set(linked_libraries cuopt::cuopt) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" + MODULE_PREFIX grpc_ + ASSOCIATED_TARGETS cuopt +) + +foreach(_cython_target IN LISTS RAPIDS_CYTHON_CREATED_TARGETS) + target_include_directories( + ${_cython_target} PRIVATE "${CMAKE_CURRENT_LIST_DIR}/../linear_programming/solver") +endforeach() diff --git a/python/cuopt/cuopt/grpc/__init__.py b/python/cuopt/cuopt/grpc/__init__.py new file mode 100644 index 0000000000..7c83020067 --- /dev/null +++ b/python/cuopt/cuopt/grpc/__init__.py @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from cuopt.grpc.grpc_client import Client, GrpcError, JobNotReadyError, JobStatus + +__all__ = ["Client", "GrpcError", "JobNotReadyError", "JobStatus"] diff --git a/python/cuopt/cuopt/grpc/grpc_client.pxd b/python/cuopt/cuopt/grpc/grpc_client.pxd new file mode 100644 index 0000000000..ec4aaa037b --- /dev/null +++ b/python/cuopt/cuopt/grpc/grpc_client.pxd @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from libc.stdint cimport int64_t +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string +from libcpp.vector cimport vector + +from cuopt.linear_programming.data_model.data_model cimport data_model_view_t +from cuopt.linear_programming.solver.solver cimport solver_ret_t +from cuopt.linear_programming.solver_settings.solver_settings cimport ( + solver_settings_t, +) + +cdef extern from "cuopt/grpc/cython_grpc_client.hpp" namespace "cuopt::cython": + ctypedef enum grpc_job_status_t "cuopt::cython::grpc_job_status_t": + QUEUED "cuopt::cython::grpc_job_status_t::QUEUED" + PROCESSING "cuopt::cython::grpc_job_status_t::PROCESSING" + COMPLETED "cuopt::cython::grpc_job_status_t::COMPLETED" + FAILED "cuopt::cython::grpc_job_status_t::FAILED" + CANCELLED "cuopt::cython::grpc_job_status_t::CANCELLED" + NOT_FOUND "cuopt::cython::grpc_job_status_t::NOT_FOUND" + + cdef cppclass grpc_submit_result_t: + bint success + string error_message + string job_id + bint is_mip + + cdef cppclass grpc_status_result_t: + bint success + string error_message + grpc_job_status_t status + string message + long long result_size_bytes + + cdef cppclass grpc_result_outcome_t: + bint not_ready + bint success + string error_message + unique_ptr[solver_ret_t] solution + + cdef cppclass grpc_logs_result_t: + bint success + string error_message + vector[string] lines + + cdef cppclass grpc_incumbent_entry_t: + int64_t index + double objective + vector[double] assignment + + cdef cppclass grpc_incumbents_result_t: + bint success + string error_message + vector[grpc_incumbent_entry_t] incumbents + int64_t next_index + bint job_complete + + ctypedef int (*grpc_log_line_callback_t)( + const char* line, size_t line_len, int job_complete, void* user_data + ) noexcept nogil + + cdef cppclass grpc_python_client_t: + grpc_python_client_t(const string& host, int port) except + + bint connect(string& error_out) + grpc_submit_result_t submit( + data_model_view_t[int, double]* data_model, + solver_settings_t[int, double]* settings, + bint enable_incumbents, + ) except + + grpc_status_result_t status(const string& job_id) except + + grpc_status_result_t wait(const string& job_id, int timeout_seconds) except + + bint cancel(const string& job_id, string& error_out) except + + bint delete_job(const string& job_id, string& error_out) except + + grpc_result_outcome_t result(const string& job_id, bint is_mip) except + + grpc_logs_result_t fetch_logs(const string& job_id, long long from_byte) except + + bint stream_logs( + const string& job_id, + long long from_byte, + grpc_log_line_callback_t callback, + void* user_data, + ) except + + grpc_incumbents_result_t fetch_incumbents( + const string& job_id, int64_t from_index, int max_count + ) except + + string last_error() diff --git a/python/cuopt/cuopt/grpc/grpc_client.pyx b/python/cuopt/cuopt/grpc/grpc_client.pyx new file mode 100644 index 0000000000..d7cda5b121 --- /dev/null +++ b/python/cuopt/cuopt/grpc/grpc_client.pyx @@ -0,0 +1,505 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True +# cython: language_level = 3 + +from cuopt.grpc.grpc_client cimport ( + grpc_incumbents_result_t, + grpc_job_status_t, + grpc_logs_result_t, + grpc_log_line_callback_t, + grpc_python_client_t, + grpc_result_outcome_t, + grpc_status_result_t, + grpc_submit_result_t, +) +from cuopt.linear_programming.data_model.data_model_wrapper cimport DataModel +from cuopt.linear_programming.solver.solver cimport solver_ret_t +from cuopt.linear_programming.solver.solver_wrapper cimport ( + build_solution_from_unique_ptr, +) +from cuopt.linear_programming.solver.solver_wrapper import ( + prepare_solver_settings, + type_cast, +) +from cuopt.linear_programming.solver_settings.solver_settings cimport ( + SolverSettings, +) + +from enum import IntEnum +import math +from libc.stdint cimport int64_t +from libc.stddef cimport size_t +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string +from libcpp.utility cimport move +import threading +import time + +import numpy as np + + +class JobStatus(IntEnum): + QUEUED = grpc_job_status_t.QUEUED + PROCESSING = grpc_job_status_t.PROCESSING + COMPLETED = grpc_job_status_t.COMPLETED + FAILED = grpc_job_status_t.FAILED + CANCELLED = grpc_job_status_t.CANCELLED + NOT_FOUND = grpc_job_status_t.NOT_FOUND + + +class GrpcError(RuntimeError): + pass + + +class JobNotReadyError(GrpcError): + pass + + +cdef int _invoke_log_callback( + const char* line, + size_t line_len, + int job_complete, + void* userdata, +) noexcept nogil: + with gil: + try: + callback = userdata + text = line[:line_len].decode("utf-8") if line_len > 0 else "" + # Only an explicit False stops the stream. print()/append() return + # None and must not be treated as a stop signal. + if _call_log_callback(callback, text, bool(job_complete)) is False: + return 0 + return 1 + except Exception: + return 0 + + +def _call_log_callback(callback, line, job_complete): + """Invoke a log callback; accept both ``(line,)`` and ``(line, done)`` forms.""" + try: + return callback(line, job_complete) + except TypeError: + return callback(line) + + +def _call_incumbent_callback(callback, index, objective, assignment, job_complete): + try: + return callback(index, objective, assignment, job_complete) + except TypeError: + return callback(index, objective, assignment) + + +def _forward_incumbent_to_settings(settings, index, objective, assignment, job_complete): + from cuopt.linear_programming.internals import GetSolutionCallback + + if job_complete: + return True + for mip_callback in settings.get_mip_callbacks(): + if mip_callback is None: + continue + if isinstance(mip_callback, GetSolutionCallback): + solution = np.asarray(assignment, dtype=np.float64) + cost = np.array([objective], dtype=np.float64) + bound = np.array([math.nan], dtype=np.float64) + mip_callback.get_solution( + solution, cost, bound, mip_callback.user_data + ) + return True + + +cdef class Client: + cdef unique_ptr[grpc_python_client_t] _client + cdef dict _job_is_mip + cdef dict _log_threads + cdef dict _log_thread_errors + cdef dict _log_stream_state + cdef dict _incumbent_threads + cdef dict _incumbent_thread_errors + cdef str _host + cdef int _port + + def __init__(self, str host, int port): + cdef string host_cpp = host.encode("utf-8") + self._client.reset(new grpc_python_client_t(host_cpp, port)) + self._job_is_mip = {} + self._log_threads = {} + self._log_thread_errors = {} + self._log_stream_state = {} + self._incumbent_threads = {} + self._incumbent_thread_errors = {} + self._host = host + self._port = port + cdef string error_out + if not self._client.get().connect(error_out): + raise GrpcError(error_out.decode("utf-8")) + + def submit(self, problem, SolverSettings settings not None): + cdef DataModel data_model + cdef grpc_submit_result_t submit_result + cdef string job_id + cdef bint mip + + data_model = self._as_data_model(problem) + data_model.variable_types = type_cast( + data_model.variable_types, "S1", "variable_types" + ) + mip = _is_mip(data_model.get_variable_types()) + prepare_solver_settings(settings, data_model, mip) + data_model.set_data_model_view() + cdef bint enable_incumbents = False + if mip and settings.get_mip_callbacks(): + enable_incumbents = True + submit_result = self._client.get().submit( + data_model.c_data_model_view.get(), + settings.c_solver_settings.get(), + enable_incumbents, + ) + if not submit_result.success: + raise GrpcError(submit_result.error_message.decode("utf-8")) + job_id = submit_result.job_id + self._job_is_mip[job_id.decode("utf-8")] = bool(submit_result.is_mip) + return job_id.decode("utf-8") + + def status(self, str job_id): + cdef grpc_status_result_t status_result = self._client.get().status( + job_id.encode("utf-8") + ) + if not status_result.success: + raise GrpcError(status_result.error_message.decode("utf-8")) + return JobStatus(status_result.status) + + def wait(self, str job_id, timeout=None): + cdef int timeout_seconds = 0 if timeout is None else int(timeout) + cdef grpc_status_result_t wait_result = self._client.get().wait( + job_id.encode("utf-8"), timeout_seconds + ) + if not wait_result.success: + raise GrpcError(wait_result.error_message.decode("utf-8")) + return JobStatus(wait_result.status) + + def cancel(self, str job_id): + cdef string error_out + if not self._client.get().cancel(job_id.encode("utf-8"), error_out): + raise GrpcError(error_out.decode("utf-8")) + + def delete(self, str job_id): + if job_id in self._incumbent_threads: + self.join_incumbent_stream(job_id) + cdef string error_out + if not self._client.get().delete_job(job_id.encode("utf-8"), error_out): + raise GrpcError(error_out.decode("utf-8")) + self._job_is_mip.pop(job_id, None) + + def result(self, str job_id, variable_names=None, is_mip=None): + cdef grpc_result_outcome_t outcome + cdef bint fetch_mip + cdef unique_ptr[solver_ret_t] sol_ret + + if is_mip is not None: + fetch_mip = bool(is_mip) + else: + fetch_mip = self._job_is_mip.get(job_id, False) + + outcome = self._client.get().result(job_id.encode("utf-8"), fetch_mip) + if outcome.not_ready: + return None + if not outcome.success: + raise GrpcError(outcome.error_message.decode("utf-8")) + sol_ret = move(outcome.solution) + return build_solution_from_unique_ptr(move(sol_ret), variable_names) + + def logs(self, str job_id, from_byte=0): + """ + Return all solver log lines for a job that has finished. + + Raises :class:`JobNotReadyError` if the job is still queued or + running. For live output during the solve, use + :meth:`start_log_stream`. + """ + status = self.status(job_id) + if status in (JobStatus.QUEUED, JobStatus.PROCESSING): + raise JobNotReadyError( + f"job {job_id} is not complete ({status.name})" + ) + + cdef grpc_logs_result_t outcome = self._client.get().fetch_logs( + job_id.encode("utf-8"), from_byte + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "failed to fetch logs") + return [line.decode("utf-8") for line in outcome.lines] + + def start_log_stream(self, str job_id, callback=print, from_byte=0): + """ + Stream solver logs on a background thread until the job completes. + + ``callback`` is invoked as ``callback(line, job_complete)`` for each + line. Return ``False`` explicitly to stop early; other return values + (including ``None`` from ``print``) keep the stream open. + + Call :meth:`join_log_stream` before :meth:`delete` to ensure all log + lines were received. To collect lines in memory:: + + lines = [] + client.start_log_stream(job_id, lines.append) + """ + if job_id in self._log_threads: + raise GrpcError(f"log stream already running for job {job_id}") + + state = { + "lines": [], + "callback": callback, + "from_byte": from_byte, + "live_lines": 0, + "backfilled": False, + } + self._log_stream_state[job_id] = state + + def wrapped(line, job_complete): + state["lines"].append(line) + state["live_lines"] += 1 + _call_log_callback(callback, line, job_complete) + + # Use a dedicated connection so StreamLogs can run concurrently with + # status/result polling on this client. + log_client = Client(self._host, self._port) + thread = threading.Thread( + target=self._run_log_stream, + args=(log_client, job_id, wrapped, from_byte), + daemon=True, + ) + self._log_threads[job_id] = thread + thread.start() + return thread + + def join_log_stream(self, str job_id, timeout=None): + """Wait for a background log stream started by :meth:`start_log_stream`. + + Returns a dict with stream stats (``live_lines``, ``lines``, ``backfilled``) + when a stream was started for ``job_id``, else ``None``. + """ + thread = self._log_threads.pop(job_id, None) + if thread is not None: + thread.join(timeout) + exc = self._log_thread_errors.pop(job_id, None) + if exc is not None: + raise exc + + state = self._log_stream_state.pop(job_id, None) + if state is not None and state["live_lines"] == 0: + self._backfill_log_stream(job_id, state) + return state + + def _backfill_log_stream(self, str job_id, state): + """Fetch logs after live streaming missed output (status/file races).""" + cdef int attempt + for attempt in range(6): + try: + bulk = self.logs(job_id, state["from_byte"]) + except JobNotReadyError: + if attempt == 0: + self.wait(job_id, timeout=120) + continue + time.sleep(0.2) + continue + if bulk: + for line in bulk: + state["lines"].append(line) + _call_log_callback(state["callback"], line, True) + state["backfilled"] = True + return + if attempt < 5: + time.sleep(0.2) + + def _run_log_stream(self, log_client, str job_id, callback, from_byte=0): + try: + log_client._stream_logs(job_id, callback, from_byte) + except Exception as exc: + self._log_thread_errors[job_id] = exc + + def _stream_logs(self, str job_id, callback, from_byte=0): + cdef bint ok = self._client.get().stream_logs( + job_id.encode("utf-8"), + from_byte, + _invoke_log_callback, + callback, + ) + if not ok: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "log stream failed") + + def incumbents(self, str job_id, from_index=0): + """ + Return incumbent solutions collected so far (or all remaining). + + Works while the job is running or after it completes. Each entry is a + dict with ``index``, ``objective``, and ``assignment`` (list of floats). + """ + cdef grpc_incumbents_result_t outcome = self._client.get().fetch_incumbents( + job_id.encode("utf-8"), from_index, 0 + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "failed to fetch incumbents") + return [ + { + "index": entry.index, + "objective": entry.objective, + "assignment": [v for v in entry.assignment], + } + for entry in outcome.incumbents + ] + + def start_incumbent_stream( + self, + str job_id, + callback=None, + settings=None, + from_index=0, + poll_interval_ms=1000, + ): + """ + Poll for MIP incumbent solutions on a background thread until the job + completes. + + ``callback`` is invoked as ``callback(index, objective, assignment, + job_complete)``. Return ``False`` to cancel the job. ``assignment`` is + a list of variable values. + + Alternatively pass ``settings`` with :meth:`SolverSettings.set_mip_callback` + registered :class:`GetSolutionCallback` instances (same as local solve). + + Call :meth:`join_incumbent_stream` before :meth:`delete`. + """ + if job_id in self._incumbent_threads: + raise GrpcError(f"incumbent stream already running for job {job_id}") + if callback is None and settings is None: + raise GrpcError("callback or settings is required") + + def combined(index, objective, assignment, job_complete): + if settings is not None: + if _forward_incumbent_to_settings( + settings, index, objective, assignment, job_complete + ) is False: + return False + if callback is not None: + return _call_incumbent_callback( + callback, index, objective, assignment, job_complete + ) + return True + + incumbent_client = Client(self._host, self._port) + thread = threading.Thread( + target=self._run_incumbent_stream, + args=( + incumbent_client, + job_id, + combined, + from_index, + poll_interval_ms, + ), + daemon=True, + ) + self._incumbent_threads[job_id] = thread + thread.start() + return thread + + def join_incumbent_stream(self, str job_id, timeout=None): + """Wait for a background incumbent poll started by :meth:`start_incumbent_stream`.""" + thread = self._incumbent_threads.pop(job_id, None) + if thread is not None: + thread.join(timeout) + exc = self._incumbent_thread_errors.pop(job_id, None) + if exc is not None: + raise exc + + def _run_incumbent_stream( + self, + incumbent_client, + str job_id, + callback, + from_index, + poll_interval_ms, + ): + try: + incumbent_client._poll_incumbents( + job_id, callback, from_index, poll_interval_ms + ) + except Exception as exc: + self._incumbent_thread_errors[job_id] = exc + + def _poll_incumbents( + self, str job_id, callback, from_index=0, poll_interval_ms=1000 + ): + cdef grpc_incumbents_result_t outcome + cdef int64_t next_index = from_index + cdef bint job_complete = False + cdef double objective + cdef list assignment + cdef size_t i + poll_seconds = max(poll_interval_ms, 1) / 1000.0 + + while not job_complete: + outcome = self._client.get().fetch_incumbents( + job_id.encode("utf-8"), next_index, 0 + ) + if not outcome.success: + msg = outcome.error_message.decode("utf-8") + if not msg: + msg = self._client.get().last_error().decode("utf-8") + raise GrpcError(msg or "incumbent poll failed") + + for entry in outcome.incumbents: + assignment = [] + for i in range(entry.assignment.size()): + assignment.append(entry.assignment[i]) + if _call_incumbent_callback( + callback, entry.index, entry.objective, assignment, False + ) is False: + self.cancel(job_id) + return + + next_index = outcome.next_index + job_complete = outcome.job_complete + if job_complete: + _call_incumbent_callback(callback, 0, 0.0, [], True) + return + + time.sleep(poll_seconds) + + cdef DataModel _as_data_model(self, problem): + from cuopt.linear_programming.data_model import DataModel as PyDataModel + from cuopt.linear_programming.problem import Problem + + if isinstance(problem, PyDataModel): + return problem + if isinstance(problem, Problem): + if problem.model is None: + problem._to_data_model() + return problem.model + raise TypeError( + "submit() expects a Problem or DataModel, got " + f"{type(problem).__name__}" + ) + + +def _is_mip(var_types): + if len(var_types) == 0: + return False + if len(set(map(type, var_types))) == 1: + if isinstance(var_types[0], bytes): + return b"I" in var_types or b"S" in var_types + return "I" in var_types or "S" in var_types + return any( + vt == "I" or vt == b"I" or vt == "S" or vt == b"S" + for vt in var_types + ) diff --git a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd new file mode 100644 index 0000000000..703fe38298 --- /dev/null +++ b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pxd @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from libcpp.memory cimport unique_ptr + +from cuopt.linear_programming.solver.solver cimport solver_ret_t +cdef object build_solution_from_unique_ptr( + unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names) diff --git a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx index 925dd8797b..fd67f8e89c 100644 --- a/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx +++ b/python/cuopt/cuopt/linear_programming/solver/solver_wrapper.pyx @@ -129,6 +129,21 @@ cdef object _vector_to_numpy(const vector[double]& vec): return np.asarray( data_ptr, dtype=np.float64).copy() +def _vars_dict(variable_names, primal_solution): + if len(primal_solution) == 0: + # Unbounded/infeasible solves may return no primal vector; match legacy + # dict(zip(names, primal)) behavior (empty dict, not a length error). + return {} + if variable_names is None or len(variable_names) == 0: + return {f"x{i}": v for i, v in enumerate(primal_solution)} + if len(variable_names) != len(primal_solution): + raise ValueError( + f"variable_names length ({len(variable_names)}) does not match " + f"solution size ({len(primal_solution)})" + ) + return dict(zip(variable_names, primal_solution)) + + def type_cast(cudf_obj, np_type, name): if isinstance(cudf_obj, cudf.Series): cudf_type = cudf_obj.dtype @@ -234,6 +249,14 @@ cdef set_solver_setting( cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, DataModel data_model_obj, is_batch=False): + return create_solution_with_names( + move(sol_ret_ptr), data_model_obj.get_variable_names(), is_batch + ) + + +cdef create_solution_with_names(unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names, + bint is_batch=False): from cuopt.linear_programming.solution.solution import Solution @@ -256,7 +279,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, return Solution( ProblemCategory(sol_ret.problem_type), - dict(zip(data_model_obj.get_variable_names(), solution)), + _vars_dict(variable_names, solution), mip_ptr.total_solve_time_, primal_solution=solution, termination_status=MILPTerminationStatus(mip_ptr.termination_status_), @@ -361,7 +384,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, if not is_batch: return Solution( ProblemCategory(sol_ret.problem_type), - dict(zip(data_model_obj.get_variable_names(), primal_solution)), + _vars_dict(variable_names, primal_solution), lp_ptr.solve_time_, primal_solution, dual_solution, @@ -397,7 +420,7 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, else: return Solution( problem_category=ProblemCategory(sol_ret.problem_type), - vars=dict(zip(data_model_obj.get_variable_names(), primal_solution)), + vars=_vars_dict(variable_names, primal_solution), solve_time=lp_ptr.solve_time_, primal_solution=primal_solution, dual_solution=dual_solution, @@ -415,6 +438,17 @@ cdef create_solution(unique_ptr[solver_ret_t] sol_ret_ptr, ) +cdef object build_solution_from_unique_ptr( + unique_ptr[solver_ret_t] sol_ret_ptr, + object variable_names): + return create_solution_with_names(move(sol_ret_ptr), variable_names, False) + + +def prepare_solver_settings(SolverSettings settings, data_model=None, mip=False): + """Populate C++ solver settings from Python state for the next solve/submit.""" + set_solver_setting(settings, data_model, mip) + + def Solve(py_data_model_obj, SolverSettings settings, mip=False): cdef DataModel data_model_obj = py_data_model_obj diff --git a/python/cuopt/cuopt/tests/linear_programming/conftest.py b/python/cuopt/cuopt/tests/linear_programming/conftest.py new file mode 100644 index 0000000000..71eb89affd --- /dev/null +++ b/python/cuopt/cuopt/tests/linear_programming/conftest.py @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# gRPC server fixtures are defined in grpc_server_fixtures.py (see that module). +pytest_plugins = ["grpc_server_fixtures"] diff --git a/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py b/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py new file mode 100644 index 0000000000..891cb1ccbb --- /dev/null +++ b/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py @@ -0,0 +1,146 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Shared cuopt_grpc_server helpers and pytest fixtures for LP tests. + +Import this module in test files that need a live server:: + + pytest_plugins = ["grpc_server_fixtures"] + +Class-scoped fixtures start one server per test class and tear it down after +the class finishes. Port offsets are added to ``CUOPT_TEST_PORT_BASE`` (default +18000) so parallel test classes do not collide. +""" + +import os +import shutil +import signal +import socket +import subprocess +import time + +import pytest + +# Port offsets (added to CUOPT_TEST_PORT_BASE). Keep unique per fixture/class. +GRPC_PORT_OFFSET_CPU_ONLY = 600 +GRPC_PORT_OFFSET_CLI = 700 +GRPC_PORT_OFFSET_CLIENT = 800 +GRPC_PORT_OFFSET_TLS = 800 +GRPC_PORT_OFFSET_MTLS = 900 + + +def find_grpc_server(): + """Locate cuopt_grpc_server binary.""" + env_path = os.environ.get("CUOPT_GRPC_SERVER_PATH") + if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK): + return env_path + + found = shutil.which("cuopt_grpc_server") + if found: + return found + + for candidate in [ + "./cuopt_grpc_server", + "../cpp/build/cuopt_grpc_server", + "../../cpp/build/cuopt_grpc_server", + ]: + if os.path.isfile(candidate) and os.access(candidate, os.X_OK): + return os.path.abspath(candidate) + + conda_prefix = os.environ.get("CONDA_PREFIX", "") + if conda_prefix: + p = os.path.join(conda_prefix, "bin", "cuopt_grpc_server") + if os.path.isfile(p) and os.access(p, os.X_OK): + return p + return None + + +def wait_for_port(port, timeout=15): + """Block until TCP port accepts connections or timeout expires.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=1): + return True + except OSError: + time.sleep(0.2) + return False + + +def cpu_only_env(port): + """Return an env dict that hides all GPUs and enables remote mode.""" + env = os.environ.copy() + for key in [k for k in env if k.startswith("CUOPT_TLS_")]: + env.pop(key) + env["CUDA_VISIBLE_DEVICES"] = "" + env["CUOPT_REMOTE_HOST"] = "localhost" + env["CUOPT_REMOTE_PORT"] = str(port) + return env + + +def start_grpc_server(port_offset): + """Locate the server, start it on BASE + port_offset, return (proc, env).""" + server_bin = find_grpc_server() + if server_bin is None: + pytest.skip("cuopt_grpc_server not found") + + port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + port_offset + proc = subprocess.Popen( + [ + server_bin, + "--port", + str(port), + "--workers", + "1", + "--log-to-console", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(0.5) + if proc.poll() is not None: + pytest.skip( + f"cuopt_grpc_server exited immediately (rc={proc.returncode}), " + "binary may be unable to load shared libraries in this environment" + ) + if not wait_for_port(port, timeout=15): + proc.kill() + proc.wait() + pytest.fail("cuopt_grpc_server failed to start within 15s") + + return proc, cpu_only_env(port) + + +def stop_grpc_server(proc): + """Gracefully shut down a server process.""" + proc.send_signal(signal.SIGTERM) + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +@pytest.fixture(scope="class") +def grpc_cpu_only_env(): + """CPU-only remote env (``CUOPT_REMOTE_*``) with server on offset 600.""" + proc, env = start_grpc_server(GRPC_PORT_OFFSET_CPU_ONLY) + yield env + stop_grpc_server(proc) + + +@pytest.fixture(scope="class") +def grpc_cli_cpu_only_env(): + """CPU-only remote env with server on offset 700 (cuopt_cli tests).""" + proc, env = start_grpc_server(GRPC_PORT_OFFSET_CLI) + yield env + stop_grpc_server(proc) + + +@pytest.fixture(scope="class") +def grpc_client_port(): + """Listening port for ``cuopt.grpc.Client`` tests (offset 800).""" + proc, env = start_grpc_server(GRPC_PORT_OFFSET_CLIENT) + yield int(env["CUOPT_REMOTE_PORT"]) + stop_grpc_server(proc) diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py new file mode 100644 index 0000000000..30e497d256 --- /dev/null +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -0,0 +1,168 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import time + +import pytest + +from cuopt.grpc import Client, JobNotReadyError, JobStatus +from cuopt.linear_programming import SolverSettings +from cuopt.linear_programming.internals import GetSolutionCallback +from cuopt.linear_programming.problem import INTEGER, MAXIMIZE, Problem + +_DEMO_LP_NAMES = ["x", "y"] +_MIP_NAMES = ["x", "y"] + + +def _demo_lp_problem(): + problem = Problem("grpc_demo") + x = problem.addVariable(lb=0.0, ub=2.0, name="x") + y = problem.addVariable(lb=0.0, name="y") + problem.addConstraint(3 * x + 4 * y <= 5.4, name="c1") + problem.addConstraint(2.7 * x + 10.1 * y <= 4.9, name="c2") + problem.setObjective(0.2 * x + 0.1 * y, sense=MAXIMIZE) + return problem + + +def _poll_until_complete( + client, job_id, names, timeout=120, poll_interval=0.05 +): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + status = client.status(job_id) + if status not in (JobStatus.QUEUED, JobStatus.PROCESSING): + return status + if client.result(job_id, names) is not None: + return JobStatus.COMPLETED + time.sleep(poll_interval) + return client.status(job_id) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +class TestGrpcClient: + def test_submit_status_result_delete(self, grpc_client_port): + problem = _demo_lp_problem() + settings = SolverSettings() + client = Client("localhost", grpc_client_port) + + job_id = client.submit(problem, settings) + assert job_id + + assert client.result(job_id, _DEMO_LP_NAMES) is None + assert client.status(job_id) in ( + JobStatus.QUEUED, + JobStatus.PROCESSING, + JobStatus.COMPLETED, + ) + + terminal = client.wait(job_id, timeout=120) + assert terminal == JobStatus.COMPLETED + + solution = client.result(job_id, _DEMO_LP_NAMES) + assert solution is not None + assert solution.get_primal_objective() == pytest.approx(0.36, rel=1e-3) + vars_ = solution.get_vars() + assert vars_["x"] == pytest.approx(1.8, rel=1e-3) + assert vars_["y"] == pytest.approx(0.0, rel=1e-3) + + client.delete(job_id) + + def test_submit_with_log_stream(self, grpc_client_port): + problem = _demo_lp_problem() + settings = SolverSettings() + + client = Client("localhost", grpc_client_port) + job_id = client.submit(problem, settings) + + received = [] + client.start_log_stream(job_id, callback=received.append) + + terminal = _poll_until_complete(client, job_id, _DEMO_LP_NAMES) + assert terminal == JobStatus.COMPLETED + client.join_log_stream(job_id) + + solution = client.result(job_id, _DEMO_LP_NAMES) + assert solution is not None + + bulk_logs = client.logs(job_id) + assert bulk_logs + assert received + assert len(received) == len(bulk_logs) + + client.delete(job_id) + + def test_logs_not_ready(self, grpc_client_port): + problem = _demo_lp_problem() + settings = SolverSettings() + + client = Client("localhost", grpc_client_port) + job_id = client.submit(problem, settings) + + with pytest.raises(JobNotReadyError): + client.logs(job_id) + + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + assert client.logs(job_id) + + client.delete(job_id) + + def test_mip_submit_and_result(self, grpc_client_port): + problem = Problem("grpc_mip") + x = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="x") + y = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="y") + problem.addConstraint(x + y <= 10, name="c1") + problem.addConstraint(x - y >= 0, name="c2") + problem.setObjective(x + 2 * y, sense=MAXIMIZE) + + client = Client("localhost", grpc_client_port) + job_id = client.submit(problem, SolverSettings()) + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + + solution = client.result(job_id, _MIP_NAMES) + assert solution is not None + assert solution.get_primal_objective() == pytest.approx(15.0, rel=1e-3) + client.delete(job_id) + + def test_mip_incumbent_stream(self, grpc_client_port): + class IncumbentCollector(GetSolutionCallback): + def __init__(self): + super().__init__() + self.entries = [] + + def get_solution( + self, solution, solution_cost, solution_bound, user_data + ): + self.entries.append( + { + "solution": solution.tolist(), + "cost": float(solution_cost[0]), + } + ) + + problem = Problem("grpc_mip_incumbent") + x = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="x") + y = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="y") + problem.addConstraint(x + y <= 10, name="c1") + problem.addConstraint(x - y >= 0, name="c2") + problem.setObjective(x + 2 * y, sense=MAXIMIZE) + + collector = IncumbentCollector() + settings = SolverSettings() + settings.set_mip_callback(collector, None) + settings.set_parameter("time_limit", 30) + + client = Client("localhost", grpc_client_port) + job_id = client.submit(problem, settings) + client.start_incumbent_stream(job_id, settings=settings) + + terminal = _poll_until_complete(client, job_id, _MIP_NAMES) + assert terminal == JobStatus.COMPLETED + client.join_incumbent_stream(job_id) + + assert collector.entries + bulk = client.incumbents(job_id) + assert bulk + + solution = client.result(job_id, _MIP_NAMES) + assert solution is not None + client.delete(job_id) From 423ef7e5d229895b95dd821f22f57c5c86b2fbfd Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 11:25:24 -0500 Subject: [PATCH 02/10] grpc server tests unify server test fixture --- .../grpc_server_fixtures.py | 62 ++++++++++++------- .../linear_programming/test_grpc_client.py | 25 +++++--- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py b/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py index 891cb1ccbb..08d108d77f 100644 --- a/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py +++ b/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py @@ -8,9 +8,18 @@ pytest_plugins = ["grpc_server_fixtures"] -Class-scoped fixtures start one server per test class and tear it down after -the class finishes. Port offsets are added to ``CUOPT_TEST_PORT_BASE`` (default -18000) so parallel test classes do not collide. +Class-scoped ``grpc_server`` starts one server per test class. Configure it on +the test class:: + + class TestMyGrpcFeature: + grpc_port_offset = GRPC_PORT_OFFSET_CLIENT + grpc_server_yield = "port" # or "env" (default) + + def test_foo(self, grpc_server): + ... + +Port offsets are added to ``CUOPT_TEST_PORT_BASE`` (default 18000) so parallel +test classes do not collide. """ import os @@ -22,7 +31,7 @@ import pytest -# Port offsets (added to CUOPT_TEST_PORT_BASE). Keep unique per fixture/class. +# Port offsets (added to CUOPT_TEST_PORT_BASE). Keep unique per test class. GRPC_PORT_OFFSET_CPU_ONLY = 600 GRPC_PORT_OFFSET_CLI = 700 GRPC_PORT_OFFSET_CLIENT = 800 @@ -123,24 +132,31 @@ def stop_grpc_server(proc): @pytest.fixture(scope="class") -def grpc_cpu_only_env(): - """CPU-only remote env (``CUOPT_REMOTE_*``) with server on offset 600.""" - proc, env = start_grpc_server(GRPC_PORT_OFFSET_CPU_ONLY) - yield env - stop_grpc_server(proc) - - -@pytest.fixture(scope="class") -def grpc_cli_cpu_only_env(): - """CPU-only remote env with server on offset 700 (cuopt_cli tests).""" - proc, env = start_grpc_server(GRPC_PORT_OFFSET_CLI) - yield env - stop_grpc_server(proc) +def grpc_server(request): + """Class-scoped server; see module docstring for configuration.""" + cls = request.cls + if cls is None: + pytest.fail("grpc_server requires a class-scoped test class") + + port_offset = getattr(cls, "grpc_port_offset", None) + if port_offset is None: + pytest.fail( + f"{cls.__name__} must set grpc_port_offset " + f"(e.g. GRPC_PORT_OFFSET_CLIENT)" + ) + yield_kind = getattr(cls, "grpc_server_yield", "env") + if yield_kind not in ("env", "port"): + pytest.fail( + f"{cls.__name__}.grpc_server_yield must be 'env' or 'port', " + f"got {yield_kind!r}" + ) -@pytest.fixture(scope="class") -def grpc_client_port(): - """Listening port for ``cuopt.grpc.Client`` tests (offset 800).""" - proc, env = start_grpc_server(GRPC_PORT_OFFSET_CLIENT) - yield int(env["CUOPT_REMOTE_PORT"]) - stop_grpc_server(proc) + proc, env = start_grpc_server(port_offset) + try: + if yield_kind == "port": + yield int(env["CUOPT_REMOTE_PORT"]) + else: + yield env + finally: + stop_grpc_server(proc) diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py index 30e497d256..d7fa0473a4 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -10,6 +10,8 @@ from cuopt.linear_programming.internals import GetSolutionCallback from cuopt.linear_programming.problem import INTEGER, MAXIMIZE, Problem +from grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT + _DEMO_LP_NAMES = ["x", "y"] _MIP_NAMES = ["x", "y"] @@ -40,10 +42,13 @@ def _poll_until_complete( @pytest.mark.filterwarnings("ignore::DeprecationWarning") class TestGrpcClient: - def test_submit_status_result_delete(self, grpc_client_port): + grpc_port_offset = GRPC_PORT_OFFSET_CLIENT + grpc_server_yield = "port" + + def test_submit_status_result_delete(self, grpc_server): problem = _demo_lp_problem() settings = SolverSettings() - client = Client("localhost", grpc_client_port) + client = Client("localhost", grpc_server) job_id = client.submit(problem, settings) assert job_id @@ -67,11 +72,11 @@ def test_submit_status_result_delete(self, grpc_client_port): client.delete(job_id) - def test_submit_with_log_stream(self, grpc_client_port): + def test_submit_with_log_stream(self, grpc_server): problem = _demo_lp_problem() settings = SolverSettings() - client = Client("localhost", grpc_client_port) + client = Client("localhost", grpc_server) job_id = client.submit(problem, settings) received = [] @@ -91,11 +96,11 @@ def test_submit_with_log_stream(self, grpc_client_port): client.delete(job_id) - def test_logs_not_ready(self, grpc_client_port): + def test_logs_not_ready(self, grpc_server): problem = _demo_lp_problem() settings = SolverSettings() - client = Client("localhost", grpc_client_port) + client = Client("localhost", grpc_server) job_id = client.submit(problem, settings) with pytest.raises(JobNotReadyError): @@ -106,7 +111,7 @@ def test_logs_not_ready(self, grpc_client_port): client.delete(job_id) - def test_mip_submit_and_result(self, grpc_client_port): + def test_mip_submit_and_result(self, grpc_server): problem = Problem("grpc_mip") x = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="x") y = problem.addVariable(lb=0, ub=10, vtype=INTEGER, name="y") @@ -114,7 +119,7 @@ def test_mip_submit_and_result(self, grpc_client_port): problem.addConstraint(x - y >= 0, name="c2") problem.setObjective(x + 2 * y, sense=MAXIMIZE) - client = Client("localhost", grpc_client_port) + client = Client("localhost", grpc_server) job_id = client.submit(problem, SolverSettings()) assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED @@ -123,7 +128,7 @@ def test_mip_submit_and_result(self, grpc_client_port): assert solution.get_primal_objective() == pytest.approx(15.0, rel=1e-3) client.delete(job_id) - def test_mip_incumbent_stream(self, grpc_client_port): + def test_mip_incumbent_stream(self, grpc_server): class IncumbentCollector(GetSolutionCallback): def __init__(self): super().__init__() @@ -151,7 +156,7 @@ def get_solution( settings.set_mip_callback(collector, None) settings.set_parameter("time_limit", 30) - client = Client("localhost", grpc_client_port) + client = Client("localhost", grpc_server) job_id = client.submit(problem, settings) client.start_incumbent_stream(job_id, settings=settings) From b8c9ff0b5596eb28479e64ef8096fb55d9350fdc Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 12:12:15 -0500 Subject: [PATCH 03/10] fix CI error on conftest.py placement --- .../{cuopt/tests/linear_programming => }/conftest.py | 4 ++-- .../linear_programming => }/grpc_server_fixtures.py | 12 ++++++++---- .../tests/linear_programming/test_grpc_client.py | 8 ++++++-- 3 files changed, 16 insertions(+), 8 deletions(-) rename python/cuopt/{cuopt/tests/linear_programming => }/conftest.py (52%) rename python/cuopt/cuopt/{tests/linear_programming => }/grpc_server_fixtures.py (95%) diff --git a/python/cuopt/cuopt/tests/linear_programming/conftest.py b/python/cuopt/conftest.py similarity index 52% rename from python/cuopt/cuopt/tests/linear_programming/conftest.py rename to python/cuopt/conftest.py index 71eb89affd..f38b4269a9 100644 --- a/python/cuopt/cuopt/tests/linear_programming/conftest.py +++ b/python/cuopt/conftest.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# gRPC server fixtures are defined in grpc_server_fixtures.py (see that module). -pytest_plugins = ["grpc_server_fixtures"] +# pytest_plugins must live in a top-level conftest (see pytest 8+ deprecation). +pytest_plugins = ["cuopt.grpc_server_fixtures"] diff --git a/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py b/python/cuopt/cuopt/grpc_server_fixtures.py similarity index 95% rename from python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py rename to python/cuopt/cuopt/grpc_server_fixtures.py index 08d108d77f..812fae18ba 100644 --- a/python/cuopt/cuopt/tests/linear_programming/grpc_server_fixtures.py +++ b/python/cuopt/cuopt/grpc_server_fixtures.py @@ -4,9 +4,7 @@ """ Shared cuopt_grpc_server helpers and pytest fixtures for LP tests. -Import this module in test files that need a live server:: - - pytest_plugins = ["grpc_server_fixtures"] +Registered via ``pytest_plugins`` in ``python/cuopt/conftest.py``. Class-scoped ``grpc_server`` starts one server per test class. Configure it on the test class:: @@ -95,6 +93,7 @@ def start_grpc_server(port_offset): pytest.skip("cuopt_grpc_server not found") port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + port_offset + env = cpu_only_env(port) proc = subprocess.Popen( [ server_bin, @@ -106,6 +105,7 @@ def start_grpc_server(port_offset): ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + env=env, ) time.sleep(0.5) if proc.poll() is not None: @@ -118,11 +118,15 @@ def start_grpc_server(port_offset): proc.wait() pytest.fail("cuopt_grpc_server failed to start within 15s") - return proc, cpu_only_env(port) + return proc, env def stop_grpc_server(proc): """Gracefully shut down a server process.""" + if proc.poll() is not None: + proc.wait() + return + proc.send_signal(signal.SIGTERM) try: proc.wait(timeout=5) diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py index d7fa0473a4..9b4c004755 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -10,7 +10,7 @@ from cuopt.linear_programming.internals import GetSolutionCallback from cuopt.linear_programming.problem import INTEGER, MAXIMIZE, Problem -from grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT +from cuopt.grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT _DEMO_LP_NAMES = ["x", "y"] _MIP_NAMES = ["x", "y"] @@ -84,7 +84,11 @@ def test_submit_with_log_stream(self, grpc_server): terminal = _poll_until_complete(client, job_id, _DEMO_LP_NAMES) assert terminal == JobStatus.COMPLETED - client.join_log_stream(job_id) + state = client.join_log_stream(job_id) + assert state is not None + assert state["live_lines"] > 0, ( + "Log streaming failed; only backfill worked" + ) solution = client.result(job_id, _DEMO_LP_NAMES) assert solution is not None From a023c81d41d246747c794cc96a67575bca2b722d Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 12:13:01 -0500 Subject: [PATCH 04/10] grpc async Python API coderabbit fixes --- cpp/src/grpc/client/cython_grpc_client.cpp | 12 +++++- cpp/src/grpc/client/grpc_client_env.cpp | 11 ++++- python/cuopt/cuopt/grpc/grpc_client.pyx | 50 ++++++++++++++++++---- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/cpp/src/grpc/client/cython_grpc_client.cpp b/cpp/src/grpc/client/cython_grpc_client.cpp index afca1ae222..40726b0f11 100644 --- a/cpp/src/grpc/client/cython_grpc_client.cpp +++ b/cpp/src/grpc/client/cython_grpc_client.cpp @@ -123,10 +123,18 @@ grpc_status_result_t grpc_python_client_t::status(const std::string& job_id) grpc_status_result_t grpc_python_client_t::wait(const std::string& job_id, int timeout_seconds) { - if (timeout_seconds <= 0) { return map_status_result(impl_->client.wait_for_completion(job_id)); } + if (timeout_seconds < 0) { + grpc_status_result_t out; + out.success = false; + out.error_message = "timeout_seconds must be non-negative"; + out.status = grpc_job_status_t::NOT_FOUND; + return out; + } + + if (timeout_seconds == 0) { return map_status_result(impl_->client.wait_for_completion(job_id)); } const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); - const int poll_ms = std::max(impl_->client.is_connected() ? 1000 : 1000, 1); + const int poll_ms = 1000; while (std::chrono::steady_clock::now() < deadline) { auto st = status(job_id); diff --git a/cpp/src/grpc/client/grpc_client_env.cpp b/cpp/src/grpc/client/grpc_client_env.cpp index 668837f968..bdefe44086 100644 --- a/cpp/src/grpc/client/grpc_client_env.cpp +++ b/cpp/src/grpc/client/grpc_client_env.cpp @@ -69,7 +69,11 @@ void apply_grpc_client_env_overrides(grpc_client_config_t& config) const char* client_cert = get_env("CUOPT_TLS_CLIENT_CERT"); const char* client_key = get_env("CUOPT_TLS_CLIENT_KEY"); - if (client_cert && client_key) { + if (client_cert != nullptr || client_key != nullptr) { + if (client_cert == nullptr || client_key == nullptr) { + throw std::invalid_argument( + "CUOPT_TLS_CLIENT_CERT and CUOPT_TLS_CLIENT_KEY must both be set for mTLS"); + } config.tls_client_cert = read_pem_file(client_cert); config.tls_client_key = read_pem_file(client_key); } @@ -78,6 +82,11 @@ void apply_grpc_client_env_overrides(grpc_client_config_t& config) grpc_client_config_t make_grpc_client_config(const std::string& host, int port) { + if (host.empty()) { throw std::invalid_argument("gRPC host must not be empty"); } + if (port <= 0 || port > 65535) { + throw std::invalid_argument("gRPC port must be between 1 and 65535"); + } + grpc_client_config_t config; config.server_address = host + ":" + std::to_string(port); apply_grpc_client_env_overrides(config); diff --git a/python/cuopt/cuopt/grpc/grpc_client.pyx b/python/cuopt/cuopt/grpc/grpc_client.pyx index d7cda5b121..19ab3d9b91 100644 --- a/python/cuopt/cuopt/grpc/grpc_client.pyx +++ b/python/cuopt/cuopt/grpc/grpc_client.pyx @@ -74,7 +74,11 @@ cdef int _invoke_log_callback( if _call_log_callback(callback, text, bool(job_complete)) is False: return 0 return 1 - except Exception: + except Exception as exc: + cb = userdata + state = getattr(cb, "state", None) + if state is not None: + state["error"] = exc return 0 @@ -86,6 +90,25 @@ def _call_log_callback(callback, line, job_complete): return callback(line) +class _LogStreamHandler: + """Bridge user callback with stream state for C log streaming.""" + + __slots__ = ("state", "callback") + + def __init__(self, state, callback): + self.state = state + self.callback = callback + + def __call__(self, line, job_complete): + self.state["lines"].append(line) + self.state["live_lines"] += 1 + try: + return _call_log_callback(self.callback, line, job_complete) + except Exception as exc: + self.state["error"] = exc + raise + + def _call_incumbent_callback(callback, index, objective, assignment, job_complete): try: return callback(index, objective, assignment, job_complete) @@ -259,20 +282,18 @@ cdef class Client: "from_byte": from_byte, "live_lines": 0, "backfilled": False, + "error": None, } self._log_stream_state[job_id] = state - def wrapped(line, job_complete): - state["lines"].append(line) - state["live_lines"] += 1 - _call_log_callback(callback, line, job_complete) + handler = _LogStreamHandler(state, callback) # Use a dedicated connection so StreamLogs can run concurrently with # status/result polling on this client. log_client = Client(self._host, self._port) thread = threading.Thread( target=self._run_log_stream, - args=(log_client, job_id, wrapped, from_byte), + args=(log_client, job_id, handler, from_byte), daemon=True, ) self._log_threads[job_id] = thread @@ -285,15 +306,28 @@ cdef class Client: Returns a dict with stream stats (``live_lines``, ``lines``, ``backfilled``) when a stream was started for ``job_id``, else ``None``. """ - thread = self._log_threads.pop(job_id, None) + thread = self._log_threads.get(job_id) if thread is not None: thread.join(timeout) + if thread.is_alive(): + exc = self._log_thread_errors.get(job_id) + if exc is not None: + raise exc + return self._log_stream_state.get(job_id) + self._log_threads.pop(job_id, None) + exc = self._log_thread_errors.pop(job_id, None) if exc is not None: raise exc state = self._log_stream_state.pop(job_id, None) - if state is not None and state["live_lines"] == 0: + if state is None: + return None + + if state.get("error") is not None: + raise state["error"] + + if state["live_lines"] == 0: self._backfill_log_stream(job_id, state) return state From 081e19372838c0b4444db04942fd1ab46e90b817 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 13:30:11 -0500 Subject: [PATCH 05/10] grpc fix test fixtures and pytest config (path shadows libs) --- python/cuopt/cuopt/grpc_server_fixtures.py | 31 +++++++++++++++------- python/cuopt/pyproject.toml | 3 +++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/python/cuopt/cuopt/grpc_server_fixtures.py b/python/cuopt/cuopt/grpc_server_fixtures.py index 812fae18ba..82f44c4c7a 100644 --- a/python/cuopt/cuopt/grpc_server_fixtures.py +++ b/python/cuopt/cuopt/grpc_server_fixtures.py @@ -75,8 +75,8 @@ def wait_for_port(port, timeout=15): return False -def cpu_only_env(port): - """Return an env dict that hides all GPUs and enables remote mode.""" +def client_remote_env(port): + """Env for a CPU-only client process talking to a remote gRPC server.""" env = os.environ.copy() for key in [k for k in env if k.startswith("CUOPT_TLS_")]: env.pop(key) @@ -86,14 +86,27 @@ def cpu_only_env(port): return env +def server_env(): + """Env for ``cuopt_grpc_server`` — keep GPU access; drop client-only vars.""" + env = os.environ.copy() + for key in list(env): + if key.startswith("CUOPT_TLS_") or key.startswith("CUOPT_REMOTE_"): + env.pop(key) + return env + + +# Backward-compatible alias used by tests that yield client env dicts. +cpu_only_env = client_remote_env + + def start_grpc_server(port_offset): - """Locate the server, start it on BASE + port_offset, return (proc, env).""" + """Locate the server, start it on BASE + port_offset, return (proc, client_env).""" server_bin = find_grpc_server() if server_bin is None: pytest.skip("cuopt_grpc_server not found") port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + port_offset - env = cpu_only_env(port) + client_env = client_remote_env(port) proc = subprocess.Popen( [ server_bin, @@ -105,7 +118,7 @@ def start_grpc_server(port_offset): ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - env=env, + env=server_env(), ) time.sleep(0.5) if proc.poll() is not None: @@ -118,7 +131,7 @@ def start_grpc_server(port_offset): proc.wait() pytest.fail("cuopt_grpc_server failed to start within 15s") - return proc, env + return proc, client_env def stop_grpc_server(proc): @@ -156,11 +169,11 @@ def grpc_server(request): f"got {yield_kind!r}" ) - proc, env = start_grpc_server(port_offset) + proc, client_env = start_grpc_server(port_offset) try: if yield_kind == "port": - yield int(env["CUOPT_REMOTE_PORT"]) + yield int(client_env["CUOPT_REMOTE_PORT"]) else: - yield env + yield client_env finally: stop_grpc_server(proc) diff --git a/python/cuopt/pyproject.toml b/python/cuopt/pyproject.toml index f7493975a7..68216301d9 100644 --- a/python/cuopt/pyproject.toml +++ b/python/cuopt/pyproject.toml @@ -80,6 +80,9 @@ max_allowed_size_compressed = '55Mi' [tool.pytest.ini_options] testpaths = ["cuopt/tests"] +# Avoid prepending the project root onto sys.path, which would shadow the +# installed cuopt wheel (Cython extensions live in site-packages, not source). +addopts = "--import-mode=importlib" [tool.scikit-build] build-dir = "build/{wheel_tag}" From 351e19412651c9f70c0cab4346a8ab84454ea6e1 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 12 Jun 2026 17:38:36 -0500 Subject: [PATCH 06/10] grpc tests adjust server ports and connection check --- python/cuopt/cuopt/grpc_server_fixtures.py | 30 +++++++++++++++++-- .../test_cpu_only_execution.py | 2 +- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/python/cuopt/cuopt/grpc_server_fixtures.py b/python/cuopt/cuopt/grpc_server_fixtures.py index 82f44c4c7a..ad4e25a0b7 100644 --- a/python/cuopt/cuopt/grpc_server_fixtures.py +++ b/python/cuopt/cuopt/grpc_server_fixtures.py @@ -33,7 +33,7 @@ def test_foo(self, grpc_server): GRPC_PORT_OFFSET_CPU_ONLY = 600 GRPC_PORT_OFFSET_CLI = 700 GRPC_PORT_OFFSET_CLIENT = 800 -GRPC_PORT_OFFSET_TLS = 800 +GRPC_PORT_OFFSET_TLS = 850 GRPC_PORT_OFFSET_MTLS = 900 @@ -75,6 +75,27 @@ def wait_for_port(port, timeout=15): return False +def wait_for_grpc_client(port, timeout=30): + """Block until cuopt.grpc.Client can connect (TCP up is not enough).""" + from cuopt.grpc import Client, GrpcError + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if not wait_for_port(port, timeout=1): + time.sleep(0.2) + continue + try: + try: + client = Client("localhost", port, tls=False) + except TypeError: + client = Client("localhost", port) + del client + return True + except GrpcError: + time.sleep(0.2) + return False + + def client_remote_env(port): """Env for a CPU-only client process talking to a remote gRPC server.""" env = os.environ.copy() @@ -126,10 +147,13 @@ def start_grpc_server(port_offset): f"cuopt_grpc_server exited immediately (rc={proc.returncode}), " "binary may be unable to load shared libraries in this environment" ) - if not wait_for_port(port, timeout=15): + if not wait_for_grpc_client(port, timeout=30): proc.kill() proc.wait() - pytest.fail("cuopt_grpc_server failed to start within 15s") + pytest.fail( + "cuopt_grpc_server TCP port opened but gRPC client could not connect " + "within 30s" + ) return proc, client_env diff --git a/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py b/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py index 4453d38bcc..8418563f47 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_cpu_only_execution.py @@ -722,7 +722,7 @@ def tls_env_with_server(self, tmp_path_factory): if server_bin is None: pytest.skip("cuopt_grpc_server not found") - port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + 800 + port = int(os.environ.get("CUOPT_TEST_PORT_BASE", "18000")) + 850 proc = subprocess.Popen( [ server_bin, From 671f1da9d1cd0f6284c07963b20680437803140a Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Mon, 22 Jun 2026 21:21:25 -0400 Subject: [PATCH 07/10] split grpc Python client into submodules also move grpc server fixtures under test --- python/cuopt/conftest.py | 4 +- python/cuopt/cuopt/grpc/CMakeLists.txt | 16 +------ python/cuopt/cuopt/grpc/__init__.py | 13 +++++- .../cuopt/cuopt/grpc/numerical/CMakeLists.txt | 20 +++++++++ python/cuopt/cuopt/grpc/numerical/__init__.py | 11 +++++ .../grpc/{ => numerical}/grpc_client.pxd | 0 .../grpc/{ => numerical}/grpc_client.pyx | 2 +- python/cuopt/cuopt/tests/fixtures/__init__.py | 2 + .../fixtures}/grpc_server_fixtures.py | 7 ++-- .../linear_programming/test_grpc_client.py | 42 +++++++++++++++++-- 10 files changed, 91 insertions(+), 26 deletions(-) create mode 100644 python/cuopt/cuopt/grpc/numerical/CMakeLists.txt create mode 100644 python/cuopt/cuopt/grpc/numerical/__init__.py rename python/cuopt/cuopt/grpc/{ => numerical}/grpc_client.pxd (100%) rename python/cuopt/cuopt/grpc/{ => numerical}/grpc_client.pyx (99%) create mode 100644 python/cuopt/cuopt/tests/fixtures/__init__.py rename python/cuopt/cuopt/{ => tests/fixtures}/grpc_server_fixtures.py (96%) diff --git a/python/cuopt/conftest.py b/python/cuopt/conftest.py index f38b4269a9..3507342945 100644 --- a/python/cuopt/conftest.py +++ b/python/cuopt/conftest.py @@ -1,5 +1,5 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # pytest_plugins must live in a top-level conftest (see pytest 8+ deprecation). -pytest_plugins = ["cuopt.grpc_server_fixtures"] +pytest_plugins = ["cuopt.tests.fixtures.grpc_server_fixtures"] diff --git a/python/cuopt/cuopt/grpc/CMakeLists.txt b/python/cuopt/cuopt/grpc/CMakeLists.txt index ac5755d7ea..1d8c956888 100644 --- a/python/cuopt/cuopt/grpc/CMakeLists.txt +++ b/python/cuopt/cuopt/grpc/CMakeLists.txt @@ -3,18 +3,4 @@ # SPDX-License-Identifier: Apache-2.0 # cmake-format: on -set(cython_sources grpc_client.pyx) -set(linked_libraries cuopt::cuopt) - -rapids_cython_create_modules( - CXX - SOURCE_FILES "${cython_sources}" - LINKED_LIBRARIES "${linked_libraries}" - MODULE_PREFIX grpc_ - ASSOCIATED_TARGETS cuopt -) - -foreach(_cython_target IN LISTS RAPIDS_CYTHON_CREATED_TARGETS) - target_include_directories( - ${_cython_target} PRIVATE "${CMAKE_CURRENT_LIST_DIR}/../linear_programming/solver") -endforeach() +add_subdirectory(numerical) diff --git a/python/cuopt/cuopt/grpc/__init__.py b/python/cuopt/cuopt/grpc/__init__.py index 7c83020067..555cfb15b7 100644 --- a/python/cuopt/cuopt/grpc/__init__.py +++ b/python/cuopt/cuopt/grpc/__init__.py @@ -1,6 +1,15 @@ # SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from cuopt.grpc.grpc_client import Client, GrpcError, JobNotReadyError, JobStatus +"""gRPC clients for remote cuOpt execution. -__all__ = ["Client", "GrpcError", "JobNotReadyError", "JobStatus"] +This package is the namespace for domain-specific async clients: + +- :mod:`cuopt.grpc.numerical` — LP/MILP (submit, result, incumbents) +- :mod:`cuopt.grpc.routing` — VRP/TSP/PDP (future) + +Longer term, shared job lifecycle (connect, status, wait, cancel, delete, +logs) may live here as a base client type that domain clients extend or +compose. Import domain clients explicitly, e.g. +``from cuopt.grpc.numerical import Client``. +""" diff --git a/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt b/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt new file mode 100644 index 0000000000..0f60a2afed --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/CMakeLists.txt @@ -0,0 +1,20 @@ +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on + +set(cython_sources grpc_client.pyx) +set(linked_libraries cuopt::cuopt) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" + MODULE_PREFIX grpc_ + ASSOCIATED_TARGETS cuopt +) + +foreach(_cython_target IN LISTS RAPIDS_CYTHON_CREATED_TARGETS) + target_include_directories( + ${_cython_target} PRIVATE "${CMAKE_CURRENT_LIST_DIR}/../../linear_programming/solver") +endforeach() diff --git a/python/cuopt/cuopt/grpc/numerical/__init__.py b/python/cuopt/cuopt/grpc/numerical/__init__.py new file mode 100644 index 0000000000..696a41f13d --- /dev/null +++ b/python/cuopt/cuopt/grpc/numerical/__init__.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from cuopt.grpc.numerical.grpc_client import ( + Client, + GrpcError, + JobNotReadyError, + JobStatus, +) + +__all__ = ["Client", "GrpcError", "JobNotReadyError", "JobStatus"] diff --git a/python/cuopt/cuopt/grpc/grpc_client.pxd b/python/cuopt/cuopt/grpc/numerical/grpc_client.pxd similarity index 100% rename from python/cuopt/cuopt/grpc/grpc_client.pxd rename to python/cuopt/cuopt/grpc/numerical/grpc_client.pxd diff --git a/python/cuopt/cuopt/grpc/grpc_client.pyx b/python/cuopt/cuopt/grpc/numerical/grpc_client.pyx similarity index 99% rename from python/cuopt/cuopt/grpc/grpc_client.pyx rename to python/cuopt/cuopt/grpc/numerical/grpc_client.pyx index 19ab3d9b91..b4c2fd4ba4 100644 --- a/python/cuopt/cuopt/grpc/grpc_client.pyx +++ b/python/cuopt/cuopt/grpc/numerical/grpc_client.pyx @@ -6,7 +6,7 @@ # cython: embedsignature = True # cython: language_level = 3 -from cuopt.grpc.grpc_client cimport ( +from cuopt.grpc.numerical.grpc_client cimport ( grpc_incumbents_result_t, grpc_job_status_t, grpc_logs_result_t, diff --git a/python/cuopt/cuopt/tests/fixtures/__init__.py b/python/cuopt/cuopt/tests/fixtures/__init__.py new file mode 100644 index 0000000000..d51c4fe1e0 --- /dev/null +++ b/python/cuopt/cuopt/tests/fixtures/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/python/cuopt/cuopt/grpc_server_fixtures.py b/python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py similarity index 96% rename from python/cuopt/cuopt/grpc_server_fixtures.py rename to python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py index ad4e25a0b7..4e325a682d 100644 --- a/python/cuopt/cuopt/grpc_server_fixtures.py +++ b/python/cuopt/cuopt/tests/fixtures/grpc_server_fixtures.py @@ -4,7 +4,8 @@ """ Shared cuopt_grpc_server helpers and pytest fixtures for LP tests. -Registered via ``pytest_plugins`` in ``python/cuopt/conftest.py``. +Registered via ``pytest_plugins`` in ``python/cuopt/conftest.py`` as +``cuopt.tests.fixtures.grpc_server_fixtures``. Class-scoped ``grpc_server`` starts one server per test class. Configure it on the test class:: @@ -76,8 +77,8 @@ def wait_for_port(port, timeout=15): def wait_for_grpc_client(port, timeout=30): - """Block until cuopt.grpc.Client can connect (TCP up is not enough).""" - from cuopt.grpc import Client, GrpcError + """Block until cuopt.grpc.numerical.Client can connect (TCP up is not enough).""" + from cuopt.grpc.numerical import Client, GrpcError deadline = time.monotonic() + timeout while time.monotonic() < deadline: diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py index 9b4c004755..35ab487766 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -1,16 +1,27 @@ # SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import os + +import cuopt.grpc as grpc_pkg import time import pytest -from cuopt.grpc import Client, JobNotReadyError, JobStatus -from cuopt.linear_programming import SolverSettings +from cuopt.grpc.numerical import Client, GrpcError, JobNotReadyError, JobStatus +from cuopt.linear_programming import Read, SolverSettings from cuopt.linear_programming.internals import GetSolutionCallback from cuopt.linear_programming.problem import INTEGER, MAXIMIZE, Problem +from cuopt.linear_programming.solver.solver_parameters import CUOPT_TIME_LIMIT + +from cuopt.tests.fixtures.grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT + +RAPIDS_DATASET_ROOT_DIR = os.getenv("RAPIDS_DATASET_ROOT_DIR") +if RAPIDS_DATASET_ROOT_DIR is None: + RAPIDS_DATASET_ROOT_DIR = os.getcwd() + RAPIDS_DATASET_ROOT_DIR = os.path.join(RAPIDS_DATASET_ROOT_DIR, "datasets") -from cuopt.grpc_server_fixtures import GRPC_PORT_OFFSET_CLIENT +_SWATH1_MPS = os.path.join(RAPIDS_DATASET_ROOT_DIR, "mip", "swath1.mps") _DEMO_LP_NAMES = ["x", "y"] _MIP_NAMES = ["x", "y"] @@ -40,6 +51,12 @@ def _poll_until_complete( return client.status(job_id) +def test_grpc_package_is_namespace_only(): + """cuopt.grpc is a namespace; domain clients live in subpackages.""" + assert "Client" not in grpc_pkg.__dict__ + assert "numerical" in (grpc_pkg.__doc__ or "") + + @pytest.mark.filterwarnings("ignore::DeprecationWarning") class TestGrpcClient: grpc_port_offset = GRPC_PORT_OFFSET_CLIENT @@ -132,6 +149,25 @@ def test_mip_submit_and_result(self, grpc_server): assert solution.get_primal_objective() == pytest.approx(15.0, rel=1e-3) client.delete(job_id) + def test_cancel_job(self, grpc_server): + problem = Read(_SWATH1_MPS) + settings = SolverSettings() + settings.set_parameter(CUOPT_TIME_LIMIT, 10) + + client = Client("localhost", grpc_server) + job_id = client.submit(problem, settings) + + assert client.status(job_id) in ( + JobStatus.QUEUED, + JobStatus.PROCESSING, + ) + client.cancel(job_id) + + assert client.status(job_id) == JobStatus.CANCELLED + with pytest.raises(GrpcError): + client.result(job_id) + client.delete(job_id) + def test_mip_incumbent_stream(self, grpc_server): class IncumbentCollector(GetSolutionCallback): def __init__(self): From 6c0e41eb57f77f23edf3d10ff7e5e2024d515804 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Tue, 23 Jun 2026 13:13:58 -0400 Subject: [PATCH 08/10] grpc tests apply coderabbit advice --- .../linear_programming/test_grpc_client.py | 63 ++++++++++++++++--- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py index 35ab487766..5b0168ccbe 100644 --- a/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py +++ b/python/cuopt/cuopt/tests/linear_programming/test_grpc_client.py @@ -1,11 +1,11 @@ # SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import importlib.util import os - -import cuopt.grpc as grpc_pkg import time +import cuopt.grpc as grpc_pkg import pytest from cuopt.grpc.numerical import Client, GrpcError, JobNotReadyError, JobStatus @@ -51,10 +51,19 @@ def _poll_until_complete( return client.status(job_id) +def _infeasible_lp_problem(): + problem = Problem("grpc_infeasible") + x = problem.addVariable(lb=0.0, name="x") + problem.addConstraint(x >= 5, name="c1") + problem.addConstraint(x <= 1, name="c2") + problem.setObjective(x, sense=MAXIMIZE) + return problem + + def test_grpc_package_is_namespace_only(): """cuopt.grpc is a namespace; domain clients live in subpackages.""" assert "Client" not in grpc_pkg.__dict__ - assert "numerical" in (grpc_pkg.__doc__ or "") + assert importlib.util.find_spec("cuopt.grpc.numerical") is not None @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -149,7 +158,43 @@ def test_mip_submit_and_result(self, grpc_server): assert solution.get_primal_objective() == pytest.approx(15.0, rel=1e-3) client.delete(job_id) + def test_invalid_job_id(self, grpc_server): + client = Client("localhost", grpc_server) + assert ( + client.status("00000000-0000-0000-0000-000000000000") + == JobStatus.NOT_FOUND + ) + with pytest.raises(GrpcError): + client.result("00000000-0000-0000-0000-000000000000") + with pytest.raises(GrpcError): + client.delete("00000000-0000-0000-0000-000000000000") + + def test_result_after_delete(self, grpc_server): + problem = _demo_lp_problem() + client = Client("localhost", grpc_server) + job_id = client.submit(problem, SolverSettings()) + assert client.wait(job_id, timeout=120) == JobStatus.COMPLETED + client.delete(job_id) + with pytest.raises(GrpcError): + client.result(job_id, _DEMO_LP_NAMES) + + def test_infeasible_lp_result(self, grpc_server): + client = Client("localhost", grpc_server) + job_id = client.submit(_infeasible_lp_problem(), SolverSettings()) + terminal = client.wait(job_id, timeout=120) + if terminal != JobStatus.FAILED: + client.delete(job_id) + pytest.skip( + f"expected FAILED for infeasible LP, got {terminal.name}" + ) + with pytest.raises(GrpcError): + client.result(job_id, ["x"]) + client.delete(job_id) + def test_cancel_job(self, grpc_server): + if not os.path.isfile(_SWATH1_MPS): + pytest.skip(f"dataset not found: {_SWATH1_MPS}") + problem = Read(_SWATH1_MPS) settings = SolverSettings() settings.set_parameter(CUOPT_TIME_LIMIT, 10) @@ -157,13 +202,13 @@ def test_cancel_job(self, grpc_server): client = Client("localhost", grpc_server) job_id = client.submit(problem, settings) - assert client.status(job_id) in ( - JobStatus.QUEUED, - JobStatus.PROCESSING, - ) - client.cancel(job_id) + status = client.status(job_id) + if status not in (JobStatus.QUEUED, JobStatus.PROCESSING): + client.delete(job_id) + pytest.skip("Job completed before cancellation could be observed") - assert client.status(job_id) == JobStatus.CANCELLED + client.cancel(job_id) + assert client.wait(job_id, timeout=30) == JobStatus.CANCELLED with pytest.raises(GrpcError): client.result(job_id) client.delete(job_id) From 5cc8d7ee9baa8b1b79738496911682ae31f6f8e6 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 26 Jun 2026 12:21:23 -0400 Subject: [PATCH 09/10] Update copyright cython_grpc_client.hpp Co-authored-by: Ramakrishna Prabhu <42624703+ramakrishnap-nv@users.noreply.github.com> --- cpp/include/cuopt/grpc/cython_grpc_client.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cuopt/grpc/cython_grpc_client.hpp b/cpp/include/cuopt/grpc/cython_grpc_client.hpp index 96f49f4879..074bd0065f 100644 --- a/cpp/include/cuopt/grpc/cython_grpc_client.hpp +++ b/cpp/include/cuopt/grpc/cython_grpc_client.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights * reserved. SPDX-License-Identifier: Apache-2.0 */ From b4793e14426dfead36d875bccaea51e98ee2e6b8 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Fri, 26 Jun 2026 12:22:06 -0400 Subject: [PATCH 10/10] Update copyright grpc_client_env.hpp Co-authored-by: Ramakrishna Prabhu <42624703+ramakrishnap-nv@users.noreply.github.com> --- cpp/include/cuopt/grpc/grpc_client_env.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cuopt/grpc/grpc_client_env.hpp b/cpp/include/cuopt/grpc/grpc_client_env.hpp index 16a93225b7..bdd99dd466 100644 --- a/cpp/include/cuopt/grpc/grpc_client_env.hpp +++ b/cpp/include/cuopt/grpc/grpc_client_env.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights * reserved. SPDX-License-Identifier: Apache-2.0 */