diff --git a/lib/thread/thread.c b/lib/thread/thread.c index b4cd863271a..3a58d303076 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -136,8 +136,10 @@ struct spdk_thread { struct spdk_ring *messages; uint8_t num_pp_handlers; int msg_fd; - SLIST_HEAD(, spdk_msg) msg_cache; - size_t msg_cache_count; + /* Lock-free message pool using SPDK rings (MP/SC) */ + struct spdk_ring *msg_pool; /* Thread-local pool for fast allocation */ + struct spdk_ring *msg_free_ring; /* Cross-thread message returns */ + uint32_t msg_pool_size; /* Pool capacity (SPDK_MSG_MEMPOOL_CACHE_SIZE) */ spdk_msg_fn critical_msg; uint64_t id; uint64_t next_poller_id; @@ -398,11 +400,25 @@ _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz) static void thread_interrupt_destroy(struct spdk_thread *thread); static int thread_interrupt_create(struct spdk_thread *thread); +static void +thread_drain_msg_ring(struct spdk_ring **ring) +{ + struct spdk_msg *msg; + + if (*ring == NULL) { + return; + } + while (spdk_ring_dequeue(*ring, (void **)&msg, 1) == 1) { + spdk_mempool_put(g_spdk_msg_mempool, msg); + } + spdk_ring_free(*ring); + *ring = NULL; +} + static void _free_thread(struct spdk_thread *thread) { struct spdk_io_channel *ch; - struct spdk_msg *msg; struct spdk_poller *poller, *ptmp; RB_FOREACH(ch, io_channel_tree, &thread->io_channels) { @@ -440,18 +456,8 @@ _free_thread(struct spdk_thread *thread) TAILQ_REMOVE(&g_threads, thread, tailq); pthread_mutex_unlock(&g_devlist_mutex); - msg = SLIST_FIRST(&thread->msg_cache); - while (msg != NULL) { - SLIST_REMOVE_HEAD(&thread->msg_cache, link); - - assert(thread->msg_cache_count > 0); - thread->msg_cache_count--; - spdk_mempool_put(g_spdk_msg_mempool, msg); - - msg = SLIST_FIRST(&thread->msg_cache); - } - - assert(thread->msg_cache_count == 0); + thread_drain_msg_ring(&thread->msg_pool); + thread_drain_msg_ring(&thread->msg_free_ring); if (spdk_interrupt_mode_is_enabled()) { thread_interrupt_destroy(thread); @@ -551,8 +557,11 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) TAILQ_INIT(&thread->active_pollers); RB_INIT(&thread->timed_pollers); TAILQ_INIT(&thread->paused_pollers); - SLIST_INIT(&thread->msg_cache); - thread->msg_cache_count = 0; + + /* Initialize message pool fields */ + thread->msg_pool = NULL; + thread->msg_free_ring = NULL; + thread->msg_pool_size = SPDK_MSG_MEMPOOL_CACHE_SIZE; thread->tsc_last = spdk_get_ticks(); @@ -568,14 +577,41 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask) return NULL; } - /* Fill the local message pool cache. */ + /* Create thread-local message pool */ + thread->msg_pool = spdk_ring_create(SPDK_RING_TYPE_MP_SC, SPDK_MSG_MEMPOOL_CACHE_SIZE, + SPDK_ENV_NUMA_ID_ANY); + if (!thread->msg_pool) { + SPDK_ERRLOG("Unable to allocate memory for message pool ring\n"); + spdk_ring_free(thread->messages); + free(thread); + return NULL; + } + + /* Create cross-thread message return ring */ + thread->msg_free_ring = spdk_ring_create(SPDK_RING_TYPE_MP_SC, SPDK_MSG_MEMPOOL_CACHE_SIZE, + SPDK_ENV_NUMA_ID_ANY); + if (!thread->msg_free_ring) { + SPDK_ERRLOG("Unable to allocate memory for message free ring\n"); + spdk_ring_free(thread->msg_pool); + spdk_ring_free(thread->messages); + free(thread); + return NULL; + } + + /* Pre-populate the local message pool from global mempool */ rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); if (rc == 0) { - /* If we can't populate the cache it's ok. The cache will get filled - * up organically as messages are passed to the thread. */ + /* Enqueue all messages into the thread-local pool */ for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { - SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); - thread->msg_cache_count++; + rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msgs[i], 1, NULL); + if (rc != 1) { + SPDK_WARNLOG("Failed to enqueue message %d to pool\n", i); + /* Return unused messages to global pool */ + for (; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { + spdk_mempool_put(g_spdk_msg_mempool, msgs[i]); + } + break; + } } } @@ -853,6 +889,9 @@ spdk_thread_get_from_ctx(void *ctx) return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); } +static void _thread_refill_msg_pool(struct spdk_thread *thread); +static void _thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg); + static inline uint32_t msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) { @@ -899,14 +938,8 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH); - if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { - /* Insert the messages at the head. We want to re-use the hot - * ones. */ - SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); - thread->msg_cache_count++; - } else { - spdk_mempool_put(g_spdk_msg_mempool, msg); - } + /* Return message to pool */ + _thread_free_msg(thread, msg); } return count; @@ -1411,6 +1444,64 @@ thread_send_msg_notification(const struct spdk_thread *target_thread) } } } +/** + * Refill thread-local message pool from cross-thread returns. + * This drains the msg_free_ring and returns messages to the local pool. + */ +static void +_thread_refill_msg_pool(struct spdk_thread *thread) +{ + struct spdk_msg *msgs[SPDK_MSG_BATCH_SIZE]; + size_t count, i; + int rc; + + if (!thread->msg_free_ring) { + return; + } + + /* Drain cross-thread free ring in batches */ + count = spdk_ring_dequeue(thread->msg_free_ring, (void **)msgs, SPDK_MSG_BATCH_SIZE); + + if (count > 0) { + /* Return messages to local pool */ + for (i = 0; i < count; i++) { + rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msgs[i], 1, NULL); + if (rc != 1) { + /* Pool full - return to global mempool */ + spdk_mempool_put(g_spdk_msg_mempool, msgs[i]); + } + } + } +} + +/** + * Free a message back to its originating thread's pool. + * Handles both same-thread and cross-thread returns efficiently. + */ +static void +_thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg) +{ + struct spdk_thread *local_thread = _get_thread(); + int rc; + + if (local_thread == thread && thread->msg_pool) { + /* FAST PATH: Same thread - return to local pool */ + rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msg, 1, NULL); + if (rc == 1) { + return; + } + } else if (thread->msg_free_ring) { + /* CROSS-THREAD: Return to originating thread's free ring */ + rc = spdk_ring_enqueue(thread->msg_free_ring, (void **)&msg, 1, NULL); + if (rc == 1) { + return; + } + } + + /* SLOW PATH: Return to global pool (rare - only when rings are full) */ + spdk_mempool_put(g_spdk_msg_mempool, msg); +} + int spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) @@ -1429,23 +1520,30 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx local_thread = _get_thread(); msg = NULL; - if (local_thread != NULL) { - if (local_thread->msg_cache_count > 0) { - msg = SLIST_FIRST(&local_thread->msg_cache); - assert(msg != NULL); - SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); - local_thread->msg_cache_count--; + if (local_thread != NULL && local_thread->msg_pool) { + /* FAST PATH: Try thread-local pool */ + if (spdk_ring_dequeue(local_thread->msg_pool, (void **)&msg, 1) == 1) { + goto got_msg; } - } - if (msg == NULL) { - msg = spdk_mempool_get(g_spdk_msg_mempool); - if (!msg) { - SPDK_ERRLOG("msg could not be allocated\n"); - abort(); + /* Pool empty - try to refill from cross-thread returns */ + _thread_refill_msg_pool(local_thread); + + /* Try again after refill */ + if (spdk_ring_dequeue(local_thread->msg_pool, (void **)&msg, 1) == 1) { + goto got_msg; } } + /* SLOW PATH: Allocate from global pool (rare - only when local pool exhausted) */ + msg = spdk_mempool_get(g_spdk_msg_mempool); + if (!msg) { + SPDK_ERRLOG("msg could not be allocated\n"); + abort(); + } + +got_msg: + msg->fn = fn; msg->arg = ctx; diff --git a/mk/spdk.modules.mk b/mk/spdk.modules.mk index 1a9c1d1bdb7..a9c426c1a5a 100644 --- a/mk/spdk.modules.mk +++ b/mk/spdk.modules.mk @@ -74,7 +74,7 @@ endif ifeq ($(CONFIG_RBD),y) BLOCKDEV_MODULES_LIST += bdev_rbd -BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd +BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd -lstdc++ endif ifeq ($(CONFIG_DAOS),y) diff --git a/module/bdev/rbd/Makefile b/module/bdev/rbd/Makefile index 375dfaba213..782e69496b4 100644 --- a/module/bdev/rbd/Makefile +++ b/module/bdev/rbd/Makefile @@ -10,6 +10,7 @@ SO_VER := 8 SO_MINOR := 0 C_SRCS = bdev_rbd.c bdev_rbd_rpc.c +CXX_SRCS = bdev_rbd_spdk_context_wq.cpp LIBNAME = bdev_rbd SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map diff --git a/module/bdev/rbd/bdev_rbd.c b/module/bdev/rbd/bdev_rbd.c index 3a92dc74011..65d15a11b2c 100644 --- a/module/bdev/rbd/bdev_rbd.c +++ b/module/bdev/rbd/bdev_rbd.c @@ -6,6 +6,7 @@ #include "spdk/stdinc.h" #include "bdev_rbd.h" +#include "bdev_rbd_spdk_context_wq.h" #include #include @@ -28,6 +29,7 @@ static int bdev_rbd_count = 0; * global parameter to control CRC32C usage in RBD write operations. */ static bool g_rbd_with_crc32c = false; +static bool g_rbd_with_spdk_wq = false; struct bdev_rbd_pool_ctx { rados_t *cluster_p; @@ -78,6 +80,9 @@ struct bdev_rbd { int (*reservation_fn_cbk)(void *ns); char cluster_fsid[37]; + /* SPDK ContextWQ for this bdev */ + struct bdev_rbd_spdk_context_wq *spdk_context_wq; + }; struct bdev_rbd_io_channel { @@ -229,6 +234,16 @@ bdev_rbd_free(struct bdev_rbd *rbd) rbd_close(rbd->image); } + /* Clean up SPDK ContextWQ after RBD image is closed. + * This ensures no new I/O completions can occur after ContextWQ is destroyed. + * The drain() function in the destructor will wait for any pending messages + * to complete before the ContextWQ is actually destroyed. + */ + if (rbd->spdk_context_wq != NULL) { + bdev_rbd_spdk_context_wq_destroy(rbd->spdk_context_wq); + rbd->spdk_context_wq = NULL; + } + free(rbd->disk.name); free(rbd->rbd_name); free(rbd->user_id); @@ -563,11 +578,26 @@ bdev_rbd_init_context(void *arg) } assert(io_ctx != NULL); + if (g_rbd_with_spdk_wq) { + /* Find reactor thread, create SpdkContextWQ if available, then open with context_wq (NULL uses default AsioContextWQ) */ + struct spdk_thread *reactor_thread = bdev_rbd_find_reactor_thread(); + rbd->spdk_context_wq = bdev_rbd_spdk_context_wq_create_from_ioctx( + *io_ctx, reactor_thread); + if (rbd->spdk_context_wq == NULL) { + SPDK_NOTICELOG("rbd_with_spdk_wq is enabled but SpdkContextWQ was not created (no reactor thread or allocation failure); " + "falling back to default AsioContextWQ for RBD image %s/%s\n", + rbd->pool_name, rbd->rbd_name); + } + } else { + rbd->spdk_context_wq = NULL; + SPDK_NOTICELOG("rbd_with_spdk_wq is disabled, using AsioContextWQ for RBD image %s/%s\n", + rbd->pool_name, rbd->rbd_name); + } if (rbd->rbd_read_only) { SPDK_DEBUGLOG(bdev_rbd, "Will open RBD image %s/%s as read-only\n", rbd->pool_name, rbd->rbd_name); - rc = rbd_open_read_only(*io_ctx, rbd->rbd_name, &rbd->image, NULL); + rc = rbd_open_read_only_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq); } else { - rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL); + rc = rbd_open_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq); } if (rc < 0) { SPDK_ERRLOG("Failed to open specified rbd device\n"); @@ -2111,3 +2141,16 @@ bdev_rbd_set_with_crc32c(bool enable) { g_rbd_with_crc32c = enable; } + +bool +bdev_rbd_get_with_spdk_wq(void) +{ + return g_rbd_with_spdk_wq; +} + +/** enable or disable SPDK ContextWQ for RBD operations */ +void +bdev_rbd_set_with_spdk_wq(bool enable) +{ + g_rbd_with_spdk_wq = enable; +} diff --git a/module/bdev/rbd/bdev_rbd.h b/module/bdev/rbd/bdev_rbd.h index 768754aec14..2849cb2ae01 100644 --- a/module/bdev/rbd/bdev_rbd.h +++ b/module/bdev/rbd/bdev_rbd.h @@ -98,4 +98,19 @@ bool bdev_rbd_get_with_crc32c(void); */ void bdev_rbd_set_with_crc32c(bool enable); +/** + * Get the current rbd_with_spdk_wq setting. + * + * \return true if SPDK ContextWQ is enabled, false otherwise + */ +bool bdev_rbd_get_with_spdk_wq(void); + +/** + * Set the rbd_with_spdk_wq parameter to enable/disable SPDK ContextWQ + * for RBD operations. + * + * \param enable true to enable SPDK ContextWQ, false to disable (uses AsioContextWQ) + */ +void bdev_rbd_set_with_spdk_wq(bool enable); + #endif /* SPDK_BDEV_RBD_H */ diff --git a/module/bdev/rbd/bdev_rbd_rpc.c b/module/bdev/rbd/bdev_rbd_rpc.c index ee0489b0433..7d84e7f87d8 100644 --- a/module/bdev/rbd/bdev_rbd_rpc.c +++ b/module/bdev/rbd/bdev_rbd_rpc.c @@ -487,3 +487,53 @@ SPDK_RPC_REGISTER("bdev_rbd_wait_for_latest_osdmap", rpc_bdev_rbd_wait_for_lates SPDK_RPC_REGISTER("bdev_rbd_get_with_crc32c", rpc_bdev_rbd_get_with_crc32c, SPDK_RPC_RUNTIME) SPDK_RPC_REGISTER("bdev_rbd_set_with_crc32c", rpc_bdev_rbd_set_with_crc32c, SPDK_RPC_STARTUP) + +/** + * RPC function to get the current rbd_with_spdk_wq setting + */ +static void +rpc_bdev_rbd_get_with_spdk_wq(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct spdk_json_write_ctx *w; + + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq()); + spdk_jsonrpc_end_result(request, w); +} + +/** + * RPC function to set the rbd_with_spdk_wq parameter + */ +struct rpc_bdev_rbd_set_with_spdk_wq { + bool enable; +}; + +static const struct spdk_json_object_decoder rpc_bdev_rbd_set_with_spdk_wq_decoders[] = { + {"enable", offsetof(struct rpc_bdev_rbd_set_with_spdk_wq, enable), spdk_json_decode_bool}, +}; + +static void +rpc_bdev_rbd_set_with_spdk_wq(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct rpc_bdev_rbd_set_with_spdk_wq req = {}; + struct spdk_json_write_ctx *w; + + if (spdk_json_decode_object(params, rpc_bdev_rbd_set_with_spdk_wq_decoders, + SPDK_COUNTOF(rpc_bdev_rbd_set_with_spdk_wq_decoders), + &req)) { + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS, + "Missing or invalid enable parameter"); + return; + } + + bdev_rbd_set_with_spdk_wq(req.enable); + + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq()); + spdk_jsonrpc_end_result(request, w); +} + +SPDK_RPC_REGISTER("bdev_rbd_get_with_spdk_wq", rpc_bdev_rbd_get_with_spdk_wq, SPDK_RPC_RUNTIME) +SPDK_RPC_REGISTER("bdev_rbd_set_with_spdk_wq", rpc_bdev_rbd_set_with_spdk_wq, SPDK_RPC_STARTUP) diff --git a/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp b/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp new file mode 100644 index 00000000000..1c792330269 --- /dev/null +++ b/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp @@ -0,0 +1,232 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2025,2026 IBM, Inc. + * All rights reserved. + */ + +#include +#include +#include +#include +#include + +#include "bdev_rbd_spdk_context_wq.h" + +extern "C" { +#include "spdk/stdinc.h" +#include "spdk/thread.h" +#include "spdk/log.h" +#include "spdk/env.h" +#include "spdk_internal/event.h" +} + +/** + * Encapsulates the set of reactor threads for round-robin assignment. + */ +class ReactorThreadPool { +public: + /** Ensures reactor list exists (lock-free one-time init via CAS). */ + static void ensure_discovered() { + if (g_reactor_list.load(std::memory_order_acquire) != nullptr) { + return; + } + auto *p = new std::vector(); + discover_into(p); + std::vector *expected = nullptr; + if (!g_reactor_list.compare_exchange_strong(expected, p, std::memory_order_release)) { + delete p; + } + } + + static struct spdk_thread *get_next() { + std::vector *list = + g_reactor_list.load(std::memory_order_acquire); + if (list == nullptr || list->empty()) { + SPDK_ERRLOG("bdev_rbd: reactor thread pool is empty, no reactor threads available for SpdkContextWQ\n"); + return NULL; + } + size_t n = list->size(); + size_t idx = g_reactor_next.fetch_add(1, std::memory_order_relaxed) % n; + struct spdk_thread *t = (*list)[idx]; + const char *name = spdk_thread_get_name(t); + SPDK_NOTICELOG("bdev_rbd: next reactor thread=%p (id=%lu, name=%s, index=%zu/%zu)\n", + t, spdk_thread_get_id(t), name ? name : "NULL", idx, n); + return t; + } + +private: + static void discover_into(std::vector *vec) { + uint32_t lcore; + SPDK_ENV_FOREACH_CORE(lcore) { + struct spdk_reactor *reactor = spdk_reactor_get(lcore); + if (reactor == NULL || !reactor->flags.is_valid) { + continue; + } + if (reactor->thread_count == 0) { + continue; + } + struct spdk_lw_thread *lw_thread = TAILQ_FIRST(&reactor->threads); + if (lw_thread == NULL) { + continue; + } + struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); + if (thread == NULL || spdk_thread_is_app_thread(thread)) { + continue; + } + const char *name = spdk_thread_get_name(thread); + SPDK_NOTICELOG("bdev_rbd: discovered reactor thread=%p (id=%lu, name=%s, index=%zu)\n", + thread, spdk_thread_get_id(thread), name ? name : "NULL", vec->size()); + vec->push_back(thread); + } + if (!vec->empty()) { + SPDK_NOTICELOG("bdev_rbd: reactor thread pool: discovered %zu reactor(s) for round-robin\n", + vec->size()); + } + } + + static std::atomic *> g_reactor_list; + static std::atomic g_reactor_next; +}; + +std::atomic *> ReactorThreadPool::g_reactor_list{nullptr}; +std::atomic ReactorThreadPool::g_reactor_next{0}; + +namespace librbd { +namespace asio { + +SpdkContextWQ::SpdkContextWQ(void* cct, struct spdk_thread* reactor_thread) + : ContextWQ(cct), m_reactor_thread(reactor_thread) { + assert(reactor_thread != nullptr); +} + +SpdkContextWQ::~SpdkContextWQ() { + // Set shutdown flag to reject new operations + m_shutdown.store(true, std::memory_order_release); + + // Wait for all pending messages to complete + drain(); + + // Verify all messages are processed + uint64_t queued = m_queued_ops.load(std::memory_order_acquire); + if (queued > 0) { + SPDK_ERRLOG("SpdkContextWQ::~SpdkContextWQ: Warning: %lu operations still pending during destruction\n", queued); + } +} + +void SpdkContextWQ::spdk_fn_handler(void *arg) { + auto *m = static_cast(arg); + if (m == nullptr) { + SPDK_ERRLOG("SpdkContextWQ::spdk_fn_handler: msg is nullptr\n"); + return; + } + m->fn(); + delete m; +} + +void SpdkContextWQ::send_fn(Work fn) { + auto *msg = new SpdkFnMsg{std::move(fn)}; + int rc = spdk_thread_send_msg(m_reactor_thread, spdk_fn_handler, msg); + if (rc != 0) { + delete msg; + SPDK_ERRLOG("SpdkContextWQ::send_fn: spdk_thread_send_msg failed rc=%d\n", rc); + } +} + +void SpdkContextWQ::post(Work fn) { + send_fn(std::move(fn)); +} + +void SpdkContextWQ::dispatch(Work fn) { + if (spdk_get_thread() == m_reactor_thread) { + fn(); + } else { + send_fn(std::move(fn)); + } +} + +void SpdkContextWQ::post_serial(Work fn) { + send_fn(std::move(fn)); +} + +void SpdkContextWQ::dispatch_serial(Work fn) { + dispatch(std::move(fn)); +} + +void SpdkContextWQ::drain() { + // Wait for all pending messages to be processed. + // Note: This relies on the SPDK reactor thread to be actively polling. + // TODO: conf parameter, non busy wait implementation + const int max_iterations = 100000; // 10 seconds at 100us per iteration + int iterations = 0; + + // Wait for all queued operations to complete + while (m_queued_ops.load(std::memory_order_acquire) > 0 && + iterations < max_iterations) { + // Yield to allow SPDK reactor thread to process messages + spdk_delay_us(100); + ++iterations; + } + + uint64_t queued = m_queued_ops.load(std::memory_order_acquire); + if (queued > 0) { + SPDK_ERRLOG("SpdkContextWQ::drain: Incomplete drain - queued_ops=%lu after %d iterations\n", + queued, iterations); + } +} + +} // namespace asio +} // namespace librbd + +// C API implementation +extern "C" { + +struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rados_ioctx_t io_ctx, struct spdk_thread* reactor_thread) +{ + if (io_ctx == NULL || reactor_thread == NULL) { + return NULL; + } + + // Convert rados_ioctx_t to librados::IoCtx to get CephContext + librados::IoCtx ioctx; + librados::IoCtx::from_rados_ioctx_t(io_ctx, ioctx); + void* cct_ptr = ioctx.cct(); + + if (cct_ptr == NULL) { + SPDK_ERRLOG("Failed to get CephContext from rados_ioctx_t\n"); + return NULL; + } + + // Create SpdkContextWQ + uint64_t thread_id = spdk_thread_get_id(reactor_thread); + const char *thread_name = spdk_thread_get_name(reactor_thread); + try { + auto wq = new librbd::asio::SpdkContextWQ(cct_ptr, reactor_thread); + // Cast to opaque struct pointer for type safety + struct bdev_rbd_spdk_context_wq* result = reinterpret_cast(wq); + SPDK_NOTICELOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Successfully created SpdkContextWQ=%p with reactor thread=%p (id=%lu, name=%s)\n", + result, reactor_thread, thread_id, thread_name ? thread_name : "NULL"); + return result; + } catch (...) { + SPDK_ERRLOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Failed to create SpdkContextWQ with reactor thread=%p (id=%lu, name=%s)\n", + reactor_thread, thread_id, thread_name ? thread_name : "NULL"); + return NULL; + } +} + +void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq) +{ + if (context_wq == NULL) { + return; + } + + // Cast back to SpdkContextWQ and delete + auto wq = reinterpret_cast(context_wq); + delete wq; +} + +struct spdk_thread* bdev_rbd_find_reactor_thread(void) +{ + ReactorThreadPool::ensure_discovered(); + return ReactorThreadPool::get_next(); +} + +} // extern "C" diff --git a/module/bdev/rbd/bdev_rbd_spdk_context_wq.h b/module/bdev/rbd/bdev_rbd_spdk_context_wq.h new file mode 100644 index 00000000000..6b3a797e6cb --- /dev/null +++ b/module/bdev/rbd/bdev_rbd_spdk_context_wq.h @@ -0,0 +1,99 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2025,2026 IBM, Inc. + * All rights reserved. + */ + +#ifndef SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H +#define SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H + +// Forward declaration for SPDK thread +struct spdk_thread; + +// Forward declaration for rbd_image_t (defined in ) +// We use void* here to avoid including librbd.h in the header +#ifndef rbd_image_t +typedef void* rbd_image_t; +#endif + +// Forward declaration for rados_ioctx_t (defined in ) +#ifndef rados_ioctx_t +typedef void* rados_ioctx_t; +#endif + +// Opaque type for SpdkContextWQ - provides type safety in C code +// The actual implementation is C++ and is hidden behind this opaque pointer +struct bdev_rbd_spdk_context_wq; + +// C API for creating SpdkContextWQ from C code (bdev_rbd.c) +// These declarations are available to both C and C++ code +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Create a SpdkContextWQ from rados_ioctx_t and SPDK reactor thread. + * The returned pointer must be freed by calling bdev_rbd_spdk_context_wq_destroy(). + * + * @param io_ctx RADOS I/O context (rados_ioctx_t) + * @param reactor_thread SPDK reactor thread + * @return Pointer to SpdkContextWQ, or NULL on error + */ +struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rados_ioctx_t io_ctx, struct spdk_thread* reactor_thread); + +/** + * Destroy a SpdkContextWQ created by bdev_rbd_spdk_context_wq_create_from_ioctx(). + * + * @param context_wq Pointer to SpdkContextWQ + */ +void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq); + +/** + * Return the next reactor thread for SpdkContextWQ (round-robin). + * Lazy-initializes the reactor list on first call (lock-free, atomic CAS). + * Each call returns a different reactor so RBD images are balanced across reactors. + * Returns NULL if no reactor threads were discovered (error is logged). + */ +struct spdk_thread* bdev_rbd_find_reactor_thread(void); + +#ifdef __cplusplus +} + +// C++ class definition - only available when compiling C++ code +#include + +namespace librbd { +namespace asio { + +/** + * ContextWQ implementation that schedules work on SPDK reactor threads + */ +class SpdkContextWQ : public ContextWQ { +public: + explicit SpdkContextWQ(void* cct, struct spdk_thread* reactor_thread); + ~SpdkContextWQ(); + + void drain() override; + + void post(Work fn) override; + void dispatch(Work fn) override; + void post_serial(Work fn) override; + void dispatch_serial(Work fn) override; + +private: + struct spdk_thread* m_reactor_thread; + + static void spdk_fn_handler(void *arg); + + struct SpdkFnMsg { + Work fn; + }; + + void send_fn(Work fn); +}; + +} // namespace asio +} // namespace librbd + +#endif // __cplusplus + +#endif /* SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H */