Skip to content

Commit 1812d90

Browse files
committed
Avoid Stripe Mutex lock contention for RWW
1 parent 52b1f35 commit 1812d90

8 files changed

Lines changed: 891 additions & 72 deletions

File tree

include/iocore/cache/CacheDefs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ enum CacheEventType {
9595
CACHE_EVENT_SCAN_OPERATION_BLOCKED = CACHE_EVENT_EVENTS_START + 23,
9696
CACHE_EVENT_SCAN_OPERATION_FAILED = CACHE_EVENT_EVENTS_START + 24,
9797
CACHE_EVENT_SCAN_DONE = CACHE_EVENT_EVENTS_START + 25,
98+
CACHE_EVENT_OPEN_DIR_RETRY = CACHE_EVENT_EVENTS_START + 26,
9899
//////////////////////////
99100
// Internal error codes //
100101
//////////////////////////

include/tsutil/Bravo.h

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,195 @@ template <typename T = std::shared_mutex, size_t SLOT_SIZE = 256, int SLOWDOWN_G
372372

373373
using shared_mutex = shared_mutex_impl<>;
374374

375+
/**
376+
ts::bravo::recursive_shared_mutex_impl
377+
378+
A recursive version of shared_mutex_impl that allows the same thread
379+
to acquire exclusive and shared locks multiple times.
380+
381+
Uses DenseThreadId for efficient per-thread state tracking without map overhead.
382+
Optimized to minimize expensive std::this_thread::get_id() calls by using
383+
DenseThreadId for ownership tracking.
384+
385+
Mixed lock semantics:
386+
- Upgrade prevention: A thread holding a shared lock cannot acquire an exclusive lock
387+
(would cause deadlock). try_lock() returns false, lock() asserts.
388+
- Downgrade allowed: A thread holding an exclusive lock can acquire a shared lock.
389+
*/
390+
template <typename T = shared_mutex_impl<>, size_t SLOT_SIZE = 256> class recursive_shared_mutex_impl
391+
{
392+
// Use a sentinel value for "no owner" - DenseThreadId values are 0 to SLOT_SIZE-1
393+
static constexpr size_t NO_OWNER = SLOT_SIZE;
394+
395+
public:
396+
recursive_shared_mutex_impl() = default;
397+
~recursive_shared_mutex_impl() = default;
398+
399+
// No copying or moving
400+
recursive_shared_mutex_impl(recursive_shared_mutex_impl const &) = delete;
401+
recursive_shared_mutex_impl &operator=(recursive_shared_mutex_impl const &) = delete;
402+
recursive_shared_mutex_impl(recursive_shared_mutex_impl &&) = delete;
403+
recursive_shared_mutex_impl &operator=(recursive_shared_mutex_impl &&) = delete;
404+
405+
////
406+
// Exclusive locking (recursive)
407+
//
408+
void
409+
lock()
410+
{
411+
size_t tid = DenseThreadId::self();
412+
// Fast path: check if we already own the lock
413+
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
414+
++_exclusive_count;
415+
return;
416+
}
417+
// Upgrade prevention: cannot acquire exclusive lock while holding shared lock
418+
ThreadState &state = _thread_states[tid];
419+
debug_assert(state.shared_count == 0);
420+
_mutex.lock();
421+
_exclusive_owner.store(tid, std::memory_order_relaxed);
422+
_exclusive_count = 1;
423+
}
424+
425+
bool
426+
try_lock()
427+
{
428+
size_t tid = DenseThreadId::self();
429+
// Fast path: check if we already own the lock
430+
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
431+
++_exclusive_count;
432+
return true;
433+
}
434+
// Upgrade prevention: cannot acquire exclusive lock while holding shared lock
435+
ThreadState &state = _thread_states[tid];
436+
if (state.shared_count > 0) {
437+
return false;
438+
}
439+
if (_mutex.try_lock()) {
440+
_exclusive_owner.store(tid, std::memory_order_relaxed);
441+
_exclusive_count = 1;
442+
return true;
443+
}
444+
return false;
445+
}
446+
447+
void
448+
unlock()
449+
{
450+
if (--_exclusive_count == 0) {
451+
_exclusive_owner.store(NO_OWNER, std::memory_order_relaxed);
452+
_mutex.unlock();
453+
}
454+
}
455+
456+
////
457+
// Shared locking (recursive)
458+
//
459+
void
460+
lock_shared(Token &token)
461+
{
462+
size_t tid = DenseThreadId::self();
463+
ThreadState &state = _thread_states[tid];
464+
465+
// Fast path: already holding shared lock - just increment count (most common case)
466+
size_t count = state.shared_count;
467+
if (count > 0) {
468+
state.shared_count = count + 1;
469+
token = state.cached_token;
470+
return;
471+
}
472+
473+
// Check for downgrade: if we hold exclusive lock, allow shared lock without acquiring underlying
474+
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
475+
state.shared_count = 1;
476+
token = 0; // Special token indicating we're under exclusive lock
477+
return;
478+
}
479+
480+
// Slow path: acquire underlying lock
481+
_mutex.lock_shared(state.cached_token);
482+
state.shared_count = 1;
483+
token = state.cached_token;
484+
}
485+
486+
bool
487+
try_lock_shared(Token &token)
488+
{
489+
size_t tid = DenseThreadId::self();
490+
ThreadState &state = _thread_states[tid];
491+
492+
// Fast path: already holding shared lock - just increment count (most common case)
493+
size_t count = state.shared_count;
494+
if (count > 0) {
495+
state.shared_count = count + 1;
496+
token = state.cached_token;
497+
return true;
498+
}
499+
500+
// Check for downgrade: if we hold exclusive lock, allow shared lock without acquiring underlying
501+
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
502+
state.shared_count = 1;
503+
token = 0; // Special token indicating we're under exclusive lock
504+
return true;
505+
}
506+
507+
// Slow path: try to acquire underlying lock
508+
if (_mutex.try_lock_shared(state.cached_token)) {
509+
state.shared_count = 1;
510+
token = state.cached_token;
511+
return true;
512+
}
513+
return false;
514+
}
515+
516+
void
517+
unlock_shared(const Token /* token */)
518+
{
519+
size_t tid = DenseThreadId::self();
520+
ThreadState &state = _thread_states[tid];
521+
if (--state.shared_count == 0) {
522+
// Only unlock underlying mutex if we're not holding exclusive lock
523+
if (_exclusive_owner.load(std::memory_order_relaxed) != tid) {
524+
_mutex.unlock_shared(state.cached_token);
525+
}
526+
state.cached_token = 0;
527+
}
528+
}
529+
530+
// Extensions to check
531+
bool
532+
has_unique_lock()
533+
{
534+
return _exclusive_owner.load(std::memory_order_relaxed) == DenseThreadId::self();
535+
}
536+
537+
bool
538+
has_shared_lock()
539+
{
540+
size_t tid = DenseThreadId::self();
541+
ThreadState &state = _thread_states[tid];
542+
543+
if (state.shared_count > 0) {
544+
return true;
545+
} else if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
546+
return true;
547+
} else {
548+
return false;
549+
}
550+
}
551+
552+
private:
553+
struct ThreadState {
554+
size_t shared_count{0};
555+
Token cached_token{0};
556+
};
557+
558+
T _mutex;
559+
std::atomic<size_t> _exclusive_owner{NO_OWNER};
560+
size_t _exclusive_count{0};
561+
std::array<ThreadState, SLOT_SIZE> _thread_states{};
562+
};
563+
564+
using recursive_shared_mutex = recursive_shared_mutex_impl<>;
565+
375566
} // namespace ts::bravo

src/iocore/cache/Cache.cc

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,28 @@ DbgCtl dbg_ctl_cache_init{"cache_init"};
109109
DbgCtl dbg_ctl_cache_hosting{"cache_hosting"};
110110
DbgCtl dbg_ctl_cache_update{"cache_update"};
111111

112+
CacheVC *
113+
new_CacheVC_for_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params,
114+
StripeSM *stripe)
115+
{
116+
CacheVC *cache_vc = new_CacheVC(cont);
117+
118+
cache_vc->first_key = *key;
119+
cache_vc->key = *key;
120+
cache_vc->earliest_key = *key;
121+
cache_vc->stripe = stripe;
122+
cache_vc->vio.op = VIO::READ;
123+
cache_vc->op_type = static_cast<int>(CacheOpType::Read);
124+
cache_vc->frag_type = CACHE_FRAG_TYPE_HTTP;
125+
cache_vc->params = params;
126+
cache_vc->request.copy_shallow(request);
127+
128+
ts::Metrics::Gauge::increment(cache_rsb.status[cache_vc->op_type].active);
129+
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[cache_vc->op_type].active);
130+
131+
return cache_vc;
132+
}
133+
112134
} // end anonymous namespace
113135

114136
// Global list of the volumes created
@@ -543,20 +565,24 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
543565
OpenDirEntry *od = nullptr;
544566
CacheVC *c = nullptr;
545567

568+
// Read-While-Writer
569+
// This OpenDirEntry lookup doesn't need stripe mutex lock because OpenDir has own reader-writer lock
570+
od = stripe->open_read(key);
571+
if (od != nullptr) {
572+
c = new_CacheVC_for_read(cont, key, request, params, stripe);
573+
c->od = od;
574+
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
575+
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
576+
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
577+
return ACTION_RESULT_DONE;
578+
}
579+
return &c->_action;
580+
}
581+
546582
{
547583
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
548-
if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) {
549-
c = new_CacheVC(cont);
550-
c->first_key = c->key = c->earliest_key = *key;
551-
c->stripe = stripe;
552-
c->vio.op = VIO::READ;
553-
c->op_type = static_cast<int>(CacheOpType::Read);
554-
ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
555-
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
556-
c->request.copy_shallow(request);
557-
c->frag_type = CACHE_FRAG_TYPE_HTTP;
558-
c->params = params;
559-
c->od = od;
584+
if (!lock.is_locked() || stripe->directory.probe(key, stripe, &result, &last_collision)) {
585+
c = new_CacheVC_for_read(cont, key, request, params, stripe);
560586
}
561587
if (!lock.is_locked()) {
562588
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
@@ -566,9 +592,7 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
566592
if (!c) {
567593
goto Lmiss;
568594
}
569-
if (c->od) {
570-
goto Lwriter;
571-
}
595+
572596
// hit
573597
c->dir = c->first_dir = result;
574598
c->last_collision = last_collision;
@@ -587,13 +611,6 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
587611
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Read)].failure);
588612
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC));
589613
return ACTION_RESULT_DONE;
590-
Lwriter:
591-
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
592-
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
593-
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
594-
return ACTION_RESULT_DONE;
595-
}
596-
return &c->_action;
597614
Lcallreturn:
598615
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
599616
return ACTION_RESULT_DONE;

0 commit comments

Comments
 (0)