Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 140 additions & 42 deletions lib/thread/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ struct spdk_thread {
struct spdk_ring *messages;
uint8_t num_pp_handlers;
int msg_fd;
SLIST_HEAD(, spdk_msg) msg_cache;
size_t msg_cache_count;
/* Lock-free message pool using SPDK rings (MP/SC) */
struct spdk_ring *msg_pool; /* Thread-local pool for fast allocation */
struct spdk_ring *msg_free_ring; /* Cross-thread message returns */
uint32_t msg_pool_size; /* Pool capacity (SPDK_MSG_MEMPOOL_CACHE_SIZE) */
spdk_msg_fn critical_msg;
uint64_t id;
uint64_t next_poller_id;
Expand Down Expand Up @@ -398,11 +400,25 @@ _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
static void thread_interrupt_destroy(struct spdk_thread *thread);
static int thread_interrupt_create(struct spdk_thread *thread);

static void
thread_drain_msg_ring(struct spdk_ring **ring)
{
struct spdk_msg *msg;

if (*ring == NULL) {
return;
}
while (spdk_ring_dequeue(*ring, (void **)&msg, 1) == 1) {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
spdk_ring_free(*ring);
*ring = NULL;
}

static void
_free_thread(struct spdk_thread *thread)
{
struct spdk_io_channel *ch;
struct spdk_msg *msg;
struct spdk_poller *poller, *ptmp;

RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
Expand Down Expand Up @@ -440,18 +456,8 @@ _free_thread(struct spdk_thread *thread)
TAILQ_REMOVE(&g_threads, thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);

msg = SLIST_FIRST(&thread->msg_cache);
while (msg != NULL) {
SLIST_REMOVE_HEAD(&thread->msg_cache, link);

assert(thread->msg_cache_count > 0);
thread->msg_cache_count--;
spdk_mempool_put(g_spdk_msg_mempool, msg);

msg = SLIST_FIRST(&thread->msg_cache);
}

assert(thread->msg_cache_count == 0);
thread_drain_msg_ring(&thread->msg_pool);
thread_drain_msg_ring(&thread->msg_free_ring);

if (spdk_interrupt_mode_is_enabled()) {
thread_interrupt_destroy(thread);
Expand Down Expand Up @@ -551,8 +557,11 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
TAILQ_INIT(&thread->active_pollers);
RB_INIT(&thread->timed_pollers);
TAILQ_INIT(&thread->paused_pollers);
SLIST_INIT(&thread->msg_cache);
thread->msg_cache_count = 0;

/* Initialize message pool fields */
thread->msg_pool = NULL;
thread->msg_free_ring = NULL;
thread->msg_pool_size = SPDK_MSG_MEMPOOL_CACHE_SIZE;

thread->tsc_last = spdk_get_ticks();

Expand All @@ -568,14 +577,41 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
return NULL;
}

/* Fill the local message pool cache. */
/* Create thread-local message pool */
thread->msg_pool = spdk_ring_create(SPDK_RING_TYPE_MP_SC, SPDK_MSG_MEMPOOL_CACHE_SIZE,
SPDK_ENV_NUMA_ID_ANY);
if (!thread->msg_pool) {
SPDK_ERRLOG("Unable to allocate memory for message pool ring\n");
spdk_ring_free(thread->messages);
free(thread);
return NULL;
}

/* Create cross-thread message return ring */
thread->msg_free_ring = spdk_ring_create(SPDK_RING_TYPE_MP_SC, SPDK_MSG_MEMPOOL_CACHE_SIZE,
SPDK_ENV_NUMA_ID_ANY);
if (!thread->msg_free_ring) {
SPDK_ERRLOG("Unable to allocate memory for message free ring\n");
spdk_ring_free(thread->msg_pool);
spdk_ring_free(thread->messages);
free(thread);
return NULL;
}

/* Pre-populate the local message pool from global mempool */
rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
if (rc == 0) {
/* If we can't populate the cache it's ok. The cache will get filled
* up organically as messages are passed to the thread. */
/* Enqueue all messages into the thread-local pool */
for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
thread->msg_cache_count++;
rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msgs[i], 1, NULL);
if (rc != 1) {
SPDK_WARNLOG("Failed to enqueue message %d to pool\n", i);
/* Return unused messages to global pool */
for (; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
spdk_mempool_put(g_spdk_msg_mempool, msgs[i]);
}
break;
}
}
}

Expand Down Expand Up @@ -853,6 +889,9 @@ spdk_thread_get_from_ctx(void *ctx)
return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
}

static void _thread_refill_msg_pool(struct spdk_thread *thread);
static void _thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg);

static inline uint32_t
msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
{
Expand Down Expand Up @@ -899,14 +938,8 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)

SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);

if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
/* Insert the messages at the head. We want to re-use the hot
* ones. */
SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
thread->msg_cache_count++;
} else {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
/* Return message to pool */
_thread_free_msg(thread, msg);
}

return count;
Expand Down Expand Up @@ -1411,6 +1444,64 @@ thread_send_msg_notification(const struct spdk_thread *target_thread)
}
}
}
/**
* Refill thread-local message pool from cross-thread returns.
* This drains the msg_free_ring and returns messages to the local pool.
*/
static void
_thread_refill_msg_pool(struct spdk_thread *thread)
{
struct spdk_msg *msgs[SPDK_MSG_BATCH_SIZE];
size_t count, i;
int rc;

if (!thread->msg_free_ring) {
return;
}

/* Drain cross-thread free ring in batches */
count = spdk_ring_dequeue(thread->msg_free_ring, (void **)msgs, SPDK_MSG_BATCH_SIZE);

if (count > 0) {
/* Return messages to local pool */
for (i = 0; i < count; i++) {
rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msgs[i], 1, NULL);
if (rc != 1) {
/* Pool full - return to global mempool */
spdk_mempool_put(g_spdk_msg_mempool, msgs[i]);
}
}
}
}

/**
* Free a message back to its originating thread's pool.
* Handles both same-thread and cross-thread returns efficiently.
*/
static void
_thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg)
{
struct spdk_thread *local_thread = _get_thread();
int rc;

if (local_thread == thread && thread->msg_pool) {
/* FAST PATH: Same thread - return to local pool */
rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msg, 1, NULL);
if (rc == 1) {
return;
}
} else if (thread->msg_free_ring) {
/* CROSS-THREAD: Return to originating thread's free ring */
rc = spdk_ring_enqueue(thread->msg_free_ring, (void **)&msg, 1, NULL);
if (rc == 1) {
return;
}
}

/* SLOW PATH: Return to global pool (rare - only when rings are full) */
spdk_mempool_put(g_spdk_msg_mempool, msg);
}


int
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
Expand All @@ -1429,23 +1520,30 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx
local_thread = _get_thread();

msg = NULL;
if (local_thread != NULL) {
if (local_thread->msg_cache_count > 0) {
msg = SLIST_FIRST(&local_thread->msg_cache);
assert(msg != NULL);
SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
local_thread->msg_cache_count--;
if (local_thread != NULL && local_thread->msg_pool) {
/* FAST PATH: Try thread-local pool */
if (spdk_ring_dequeue(local_thread->msg_pool, (void **)&msg, 1) == 1) {
goto got_msg;
}
}

if (msg == NULL) {
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
SPDK_ERRLOG("msg could not be allocated\n");
abort();
/* Pool empty - try to refill from cross-thread returns */
_thread_refill_msg_pool(local_thread);

/* Try again after refill */
if (spdk_ring_dequeue(local_thread->msg_pool, (void **)&msg, 1) == 1) {
goto got_msg;
}
}

/* SLOW PATH: Allocate from global pool (rare - only when local pool exhausted) */
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
SPDK_ERRLOG("msg could not be allocated\n");
abort();
}

got_msg:

msg->fn = fn;
msg->arg = ctx;

Expand Down
2 changes: 1 addition & 1 deletion mk/spdk.modules.mk
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ endif

ifeq ($(CONFIG_RBD),y)
BLOCKDEV_MODULES_LIST += bdev_rbd
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd -lstdc++
endif

ifeq ($(CONFIG_DAOS),y)
Expand Down
1 change: 1 addition & 0 deletions module/bdev/rbd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SO_VER := 8
SO_MINOR := 0

C_SRCS = bdev_rbd.c bdev_rbd_rpc.c
CXX_SRCS = bdev_rbd_spdk_context_wq.cpp
LIBNAME = bdev_rbd

SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
Expand Down
47 changes: 45 additions & 2 deletions module/bdev/rbd/bdev_rbd.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "spdk/stdinc.h"

#include "bdev_rbd.h"
#include "bdev_rbd_spdk_context_wq.h"

#include <rbd/librbd.h>
#include <rados/librados.h>
Expand All @@ -28,6 +29,7 @@ static int bdev_rbd_count = 0;
* global parameter to control CRC32C usage in RBD write operations.
*/
static bool g_rbd_with_crc32c = false;
static bool g_rbd_with_spdk_wq = false;

struct bdev_rbd_pool_ctx {
rados_t *cluster_p;
Expand Down Expand Up @@ -78,6 +80,9 @@ struct bdev_rbd {
int (*reservation_fn_cbk)(void *ns);
char cluster_fsid[37];

/* SPDK ContextWQ for this bdev */
struct bdev_rbd_spdk_context_wq *spdk_context_wq;

};

struct bdev_rbd_io_channel {
Expand Down Expand Up @@ -229,6 +234,16 @@ bdev_rbd_free(struct bdev_rbd *rbd)
rbd_close(rbd->image);
}

/* Clean up SPDK ContextWQ after RBD image is closed.
* This ensures no new I/O completions can occur after ContextWQ is destroyed.
* The drain() function in the destructor will wait for any pending messages
* to complete before the ContextWQ is actually destroyed.
*/
if (rbd->spdk_context_wq != NULL) {
bdev_rbd_spdk_context_wq_destroy(rbd->spdk_context_wq);
rbd->spdk_context_wq = NULL;
}

free(rbd->disk.name);
free(rbd->rbd_name);
free(rbd->user_id);
Expand Down Expand Up @@ -563,11 +578,26 @@ bdev_rbd_init_context(void *arg)
}

assert(io_ctx != NULL);
if (g_rbd_with_spdk_wq) {
/* Find reactor thread, create SpdkContextWQ if available, then open with context_wq (NULL uses default AsioContextWQ) */
struct spdk_thread *reactor_thread = bdev_rbd_find_reactor_thread();
rbd->spdk_context_wq = bdev_rbd_spdk_context_wq_create_from_ioctx(
*io_ctx, reactor_thread);
if (rbd->spdk_context_wq == NULL) {
SPDK_NOTICELOG("rbd_with_spdk_wq is enabled but SpdkContextWQ was not created (no reactor thread or allocation failure); "
"falling back to default AsioContextWQ for RBD image %s/%s\n",
rbd->pool_name, rbd->rbd_name);
}
} else {
rbd->spdk_context_wq = NULL;
SPDK_NOTICELOG("rbd_with_spdk_wq is disabled, using AsioContextWQ for RBD image %s/%s\n",
rbd->pool_name, rbd->rbd_name);
}
if (rbd->rbd_read_only) {
SPDK_DEBUGLOG(bdev_rbd, "Will open RBD image %s/%s as read-only\n", rbd->pool_name, rbd->rbd_name);
rc = rbd_open_read_only(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
rc = rbd_open_read_only_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq);
} else {
rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
rc = rbd_open_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq);
}
if (rc < 0) {
SPDK_ERRLOG("Failed to open specified rbd device\n");
Expand Down Expand Up @@ -2111,3 +2141,16 @@ bdev_rbd_set_with_crc32c(bool enable)
{
g_rbd_with_crc32c = enable;
}

bool
bdev_rbd_get_with_spdk_wq(void)
{
return g_rbd_with_spdk_wq;
}

/** enable or disable SPDK ContextWQ for RBD operations */
void
bdev_rbd_set_with_spdk_wq(bool enable)
{
g_rbd_with_spdk_wq = enable;
}
15 changes: 15 additions & 0 deletions module/bdev/rbd/bdev_rbd.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,19 @@ bool bdev_rbd_get_with_crc32c(void);
*/
void bdev_rbd_set_with_crc32c(bool enable);

/**
* Get the current rbd_with_spdk_wq setting.
*
* \return true if SPDK ContextWQ is enabled, false otherwise
*/
bool bdev_rbd_get_with_spdk_wq(void);

/**
* Set the rbd_with_spdk_wq parameter to enable/disable SPDK ContextWQ
* for RBD operations.
*
* \param enable true to enable SPDK ContextWQ, false to disable (uses AsioContextWQ)
*/
void bdev_rbd_set_with_spdk_wq(bool enable);

#endif /* SPDK_BDEV_RBD_H */
Loading