Skip to content

Commit e661856

Browse files
committed
MB-20054: Regression test - bucket is deleted with DCPBackfill running
Regression test for MB-20054 - the following abort is encountered when a DCPBackfill task is still running when a bucket is deleted: Assertion failed: (engine), function verifyEngine, file ep-engine/src/objectregistry.cc, line 58. This issue occurs because the DCPBackfill object (and associated objects ActiveStream and importantly ActiveStreams' readyQ of Items) is not deleted earlier in the shutdown sequence (via EvpDestroy), as we use ref-counted pointers for it and there is a still an outstanding reference by the AuxIO Thread which is running the task. Hence the DCPBackfill object is only deleted when we finally unregister the deleted bucket from the shared ExecutorPool - see the following backtrace: #1 0x00007f513b75a085 in abort () from /lib64/libc.so.6 couchbase#2 0x00007f51337034e2 in ObjectRegistry::onDeleteItem (pItem=<value optimized out>) at ep-engine/src/objectregistry.cc:157 couchbase#3 0x00007f5133652094 in ~Item (this=<value optimized out>) at ep-engine/src/item.h:352 couchbase#4 SingleThreadedRCPtr<Item>::~SingleThreadedRCPtr (this=<value optimized out>) at ep-engine/src/atomic.h:430 couchbase#5 0x00007f51336c7f47 in ~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275 #6 MutationResponse::~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275 #7 0x00007f51336d86aa in clear_UNLOCKED (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.cc:201 #8 ~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:178 #9 ActiveStream::~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:179 #10 0x00007f51336cc808 in RCPtr<Stream>::~RCPtr (this=0xb1823780) at ep-engine/src/atomic.h:348 #11 0x00007f51336d77c7 in ~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114 #12 DCPBackfill::~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114 #13 0x00007f513368d95f in ~SingleThreadedRCPtr (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/atomic.h:430 #14 ExecutorPool::_stopTaskGroup (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/executorpool.cc:532 #15 0x00007f513368dad3 in ExecutorPool::_unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:551 #16 0x00007f513368e143 in ExecutorPool::unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:602 #17 0x00007f5133655f82 in EventuallyPersistentStore::~EventuallyPersistentStore (this=0x59e6000) at ep-engine/src/ep.cc:365 #18 0x00007f5133672a25 in EventuallyPersistentEngine::~EventuallyPersistentEngine (this=0x59c4000) at ep-engine/src/ep_engine.cc:5791 #19 0x00007f5133672c95 in EvpDestroy (handle=0x59c4000, force=<value optimized out>) at ep-engine/src/ep_engine.cc:143 To actually reproduce the issue is somewhat involved - we need to orchestrate the world such that we delete the engine while a DCPBackfill task is still running. We spin up a separate thread which will run the DCPBackfill task concurrently with destroy - specifically DCPBackfill must start running (and add items to the readyQ) before destroy(), it must then continue running (stop after) _stopTaskGroup is invoked. To achieve this we use a couple of condition variables to synchronise between the two threads - the timeline needs to look like: auxIO thread: [------- DCPBackfill ----------] main thread: [--destroy()--] [ExecutorPool::_stopTaskGroup] --------------------------------------------------------> time Change-Id: Ic64c419cb8e4e0af2378efba9711b121aacee15b
1 parent 94e4c25 commit e661856

10 files changed

Lines changed: 238 additions & 14 deletions

File tree

src/connmap.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ void ConnNotifier::stop() {
109109
void ConnNotifier::notifyMutationEvent(void) {
110110
bool inverse = false;
111111
if (pendingNotification.compare_exchange_strong(inverse, true)) {
112-
cb_assert(task > 0);
113-
ExecutorPool::get()->wake(task);
112+
if (task > 0) {
113+
ExecutorPool::get()->wake(task);
114+
}
114115
}
115116
}
116117

@@ -754,7 +755,9 @@ bool TapConnMap::mapped(connection_t &tc) {
754755
void TapConnMap::shutdownAllConnections() {
755756
LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
756757

757-
connNotifier_->stop();
758+
if (connNotifier_ != NULL) {
759+
connNotifier_->stop();
760+
}
758761

759762
// Not safe to acquire both connsLock and releaseLock at the same time
760763
// (can trigger deadlock), so first acquire releaseLock to release all
@@ -1023,7 +1026,9 @@ DcpProducer *DcpConnMap::newProducer(const void* cookie,
10231026
void DcpConnMap::shutdownAllConnections() {
10241027
LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
10251028

1026-
connNotifier_->stop();
1029+
if (connNotifier_ != NULL) {
1030+
connNotifier_->stop();
1031+
}
10271032

10281033
// Not safe to acquire both connsLock and releaseLock at the same time
10291034
// (can trigger deadlock), so first acquire releaseLock to release all

src/connmap.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,10 @@ class ConnMap {
254254
class ConnNotifier {
255255
public:
256256
ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
257-
: notifier_type(ntype), connMap(cm), pendingNotification(false) { }
257+
: notifier_type(ntype),
258+
connMap(cm),
259+
task(0),
260+
pendingNotification(false) { }
258261

259262
void start();
260263

src/ep_engine.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5661,7 +5661,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
56615661
uint32_t opaque,
56625662
uint32_t seqno,
56635663
uint32_t flags,
5664-
void *stream_name,
5664+
const void *stream_name,
56655665
uint16_t nname)
56665666
{
56675667
(void) opaque;

src/ep_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
429429
uint32_t opaque,
430430
uint32_t seqno,
431431
uint32_t flags,
432-
void *stream_name,
432+
const void *stream_name,
433433
uint16_t nname);
434434

435435
ENGINE_ERROR_CODE dcpAddStream(const void* cookie,

src/executorthread.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void ExecutorThread::run() {
7878
break;
7979
}
8080

81+
now = gethrtime();
8182
if (TaskQueue *q = manager->nextTask(*this, tick)) {
8283
EventuallyPersistentEngine *engine = currentTask->getEngine();
8384
ObjectRegistry::onSwitchThread(engine);
@@ -90,7 +91,6 @@ void ExecutorThread::run() {
9091

9192
// Measure scheduling overhead as difference between the time
9293
// that the task wanted to wake up and the current time
93-
now = gethrtime();
9494
hrtime_t woketime = currentTask->getWaketime();
9595
engine->getEpStore()->logQTime(currentTask->getTypeId(),
9696
now > woketime ? now - woketime

src/fakes/fake_executorpool.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,15 @@ class FakeExecutorThread : public ExecutorThread {
9393
void runCurrentTask() {
9494
// Only supports one-shot tasks
9595
EXPECT_FALSE(currentTask->run());
96+
completeCurrentTask();
97+
}
98+
99+
// 'completes' the current task; useful if the caller wants to seperately
100+
// run() the current task and then tidy up afterwards.
101+
void completeCurrentTask() {
96102
manager->doneWork(curTaskType);
97103
manager->cancel(currentTask->getId(), true);
104+
currentTask.reset();
98105
}
99106

100107
ExTask& getCurrentTask() {

src/warmup.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void LoadValueCallback::callback(CacheLookup &lookup)
361361

362362

363363
Warmup::Warmup(EventuallyPersistentStore *st) :
364-
state(), store(st), startTime(0), metadata(0), warmup(0),
364+
state(), store(st), taskId(0), startTime(0), metadata(0), warmup(0),
365365
threadtask_count(0),
366366
estimateTime(0), estimatedItemCount(std::numeric_limits<size_t>::max()),
367367
cleanShutdown(true), corruptAccessLog(false), warmupComplete(false),

tests/module_tests/ep_unit_tests_main.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
#include "configuration.h"
2929
#include "stored-value.h"
3030

31-
/* static storage for environment variable set by putenv(). */
32-
static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
3331

3432
int main(int argc, char **argv) {
3533
bool log_to_stderr = false;
@@ -52,8 +50,6 @@ int main(int argc, char **argv) {
5250
}
5351
}
5452

55-
putenv(allow_no_stats_env);
56-
5753
init_mock_server(log_to_stderr);
5854
get_mock_server_api()->log->set_level(EXTENSION_LOG_DEBUG);
5955

tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
#include "evp_store_test.h"
1919

2020
#include "fakes/fake_executorpool.h"
21+
#include "programs/engine_testapp/mock_server.h"
2122
#include "taskqueue.h"
2223

24+
#include <atomic>
25+
2326
/*
2427
* A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
2528
* which will not spawn ExecutorThreads and hence not run any tasks
@@ -31,3 +34,204 @@ class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
3134
EventuallyPersistentStoreTest::SetUp();
3235
}
3336
};
37+
38+
// Check that if onDeleteItem() is called during bucket deletion, we do not
39+
// abort due to not having a valid thread-local 'engine' pointer. This
40+
// has been observed when we have a DCPBackfill task which is deleted during
41+
// bucket shutdown, which has a non-zero number of Items which are destructed
42+
// (and call onDeleteItem).
43+
TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
44+
auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
45+
(ExecutorPool::get());
46+
47+
// Should start with no tasks registered on any queues.
48+
for (auto& queue : task_executor->getLpTaskQ()) {
49+
ASSERT_EQ(0, queue->getFutureQueueSize());
50+
ASSERT_EQ(0, queue->getReadyQueueSize());
51+
}
52+
53+
// [[1] Set our state to active. This should add a VBStatePersistTask to
54+
// the WRITER queue.
55+
EXPECT_EQ(ENGINE_SUCCESS,
56+
store->setVBucketState(vbid, vbucket_state_active, false));
57+
58+
auto& lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
59+
auto& lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
60+
61+
EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
62+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
63+
64+
// Use a FakeExecutorThread to fetch and run the persistTask.
65+
FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
66+
writer_thread.updateCurrentTime();
67+
EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
68+
EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
69+
writer_thread.getTaskName());
70+
EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
71+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
72+
writer_thread.runCurrentTask();
73+
74+
// Perform one SET, then close it's checkpoint. This means that we no
75+
// longer have all sequence numbers in memory checkpoints, forcing the
76+
// DCP stream request to go to disk (backfill).
77+
store_item(vbid, "key", "value");
78+
79+
// Force a new checkpoint.
80+
auto vb = store->getVbMap().getBucket(vbid);
81+
auto& ckpt_mgr = vb->checkpointManager;
82+
ckpt_mgr.createNewCheckpoint();
83+
84+
EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
85+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
86+
87+
EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
88+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
89+
90+
// Directly flush the vbucket, ensuring data is on disk.
91+
// (This would normally also wake up the checkpoint remover task, but
92+
// as that task was never registered with the ExecutorPool in this test
93+
// environment, we need to manually remove the prev checkpoint).
94+
EXPECT_EQ(1, store->flushVBucket(vbid));
95+
96+
bool new_ckpt_created;
97+
EXPECT_EQ(1,
98+
ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
99+
100+
EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
101+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
102+
103+
// Create a DCP producer, and start a stream request.
104+
std::string name{"test_producer"};
105+
EXPECT_EQ(ENGINE_SUCCESS,
106+
engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
107+
DCP_OPEN_PRODUCER, name.data(), name.size()));
108+
109+
// Expect to have an ActiveStreamCheckpointProcessorTask, which is
110+
// initially snoozed (so we can't run it).
111+
EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
112+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
113+
114+
uint64_t rollbackSeqno;
115+
auto dummy_dcp_add_failover_cb = [](vbucket_failover_t* entry,
116+
size_t nentries, const void *cookie) {
117+
return ENGINE_SUCCESS;
118+
};
119+
120+
// Actual stream request method (EvpDcpStreamReq) is static, so access via
121+
// the engine_interface.
122+
EXPECT_EQ(ENGINE_SUCCESS,
123+
engine->dcp.stream_req(
124+
&engine->interface, cookie, /*flags*/0,
125+
/*opaque*/0, /*vbucket*/vbid, /*start_seqno*/0,
126+
/*end_seqno*/-1, /*vb_uuid*/0xabcd, /*snap_start*/0,
127+
/*snap_end*/0, &rollbackSeqno,
128+
dummy_dcp_add_failover_cb));
129+
130+
// FutureQ should now have an additional DCPBackfill task.
131+
EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
132+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
133+
134+
// Create an executor 'thread' to obtain shared ownership of the next
135+
// AuxIO task (which should be DCPBackfill). As long as this
136+
// object has it's currentTask set to DCPBackfill, the DCPBackfill task
137+
// will not be deleted.
138+
// Essentially we are simulating a concurrent thread running this task.
139+
FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
140+
auxio_thread.updateCurrentTime();
141+
EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
142+
EXPECT_EQ("DCP backfill for vbucket 0", auxio_thread.getTaskName());
143+
144+
// This is the one action we really need to perform 'concurrently' - delete
145+
// the engine while a DCPBackfill task is still running. We spin up a
146+
// separate thread which will run the DCPBackfill task
147+
// concurrently with destroy - specifically DCPBackfill must start running
148+
// (and add items to the readyQ) before destroy(), it must then continue
149+
// running (stop after) _stopTaskGroup is invoked.
150+
// To achieve this we use a couple of condition variables to synchronise
151+
// between the two threads - the timeline needs to look like:
152+
//
153+
// auxIO thread: [------- DCPBackfill ----------]
154+
// main thread: [destroy()] [ExecutorPool::_stopTaskGroup]
155+
//
156+
// --------------------------------------------------------> time
157+
//
158+
bool backfill_done = false;
159+
std::condition_variable backfill_cv;
160+
std::mutex backfill_done_mutex;
161+
162+
bool destroy_done = false;
163+
std::condition_variable destroy_cv;
164+
std::mutex destroy_done_mutex;
165+
166+
auto concurrent_task_thread = std::thread{
167+
[&auxio_thread,
168+
&backfill_cv, &backfill_done, &backfill_done_mutex,
169+
&destroy_cv, &destroy_done, &destroy_done_mutex,
170+
&lpAuxioQ](EventuallyPersistentEngine* engine) {
171+
ObjectRegistry::onSwitchThread(engine);
172+
173+
// Run the DCPBackfill task to push items to readyQ. Should return
174+
// false (i.e. one-shot).
175+
EXPECT_FALSE(auxio_thread.getCurrentTask()->run());
176+
177+
// Notify the main thread that it can progress with destroying the
178+
// engine [A].
179+
{
180+
std::lock_guard<std::mutex> lk(backfill_done_mutex);
181+
backfill_done = true;
182+
backfill_cv.notify_one();
183+
}
184+
185+
// Now wait ourselves for destroy to be completed [B].
186+
std::unique_lock<std::mutex> lk(destroy_done_mutex);
187+
destroy_cv.wait(lk, [&destroy_done]{return destroy_done; });
188+
189+
// This is the only "hacky" part of the test - we need to somehow
190+
// keep the DCPBackfill task 'running' - i.e. not call
191+
// completeCurrentTask - until the main thread is in
192+
// ExecutorPool::_stopTaskGroup. However we have no way from the test
193+
// to properly signal that we are *inside* _stopTaskGroup -
194+
// called from EVPStore's destructor.
195+
// Best we can do is spin on waiting for the DCPBackfill task to be
196+
// set to 'dead' - and only then completeCurrentTask; which will
197+
// cancel the task.
198+
while (!auxio_thread.getCurrentTask()->isdead()) {
199+
// spin.
200+
}
201+
auxio_thread.completeCurrentTask();
202+
203+
// Cleanup - fetch the next (final) task -
204+
// ActiveStreamCheckpointProcessorTask - so it can be cancelled
205+
// and executorpool shut down.
206+
auxio_thread.updateCurrentTime();
207+
EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
208+
EXPECT_EQ("Process checkpoint(s) for DCP producer",
209+
auxio_thread.getTaskName());
210+
auxio_thread.runCurrentTask();
211+
212+
}, engine};
213+
214+
// [A] Wait for DCPBackfill to complete.
215+
std::unique_lock<std::mutex> lk(backfill_done_mutex);
216+
backfill_cv.wait(lk, [&backfill_done]{return backfill_done; });
217+
218+
// 'Destroy' the engine - this doesn't delete the object, just shuts down
219+
// connections, marks streams as dead etc.
220+
engine->destroy(/*force*/false);
221+
destroy_mock_event_callbacks();
222+
223+
{
224+
std::lock_guard<std::mutex> lk(destroy_done_mutex);
225+
destroy_done = true;
226+
destroy_cv.notify_one();
227+
}
228+
229+
// Need to have the current engine valid before deleting (this is what
230+
// EvpDestroy does normally; however we have a smart ptr to the engine
231+
// so must delete via that).
232+
ObjectRegistry::onSwitchThread(engine);
233+
delete engine;
234+
engine = NULL;
235+
236+
concurrent_task_thread.join();
237+
}

tests/module_tests/evp_store_test.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "connmap.h"
3333
#include "ep_engine.h"
3434
#include "flusher.h"
35+
#include "tapthrottle.h"
3536
#include "../mock/mock_dcp_producer.h"
3637

3738
#include "programs/engine_testapp/mock_server.h"
@@ -64,6 +65,12 @@ SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
6465

6566
// checkpointConfig is needed by CheckpointManager (via EPStore).
6667
checkpointConfig = new CheckpointConfig(*this);
68+
69+
// tapConfig is needed by doTapStats().
70+
tapConfig = new TapConfig(*this);
71+
72+
// tapThrottle is needed by doEngineStats().
73+
tapThrottle = new TapThrottle(configuration, stats);
6774
}
6875

6976
void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
@@ -132,7 +139,9 @@ void EventuallyPersistentStoreTest::SetUp() {
132139
void EventuallyPersistentStoreTest::TearDown() {
133140
destroy_mock_cookie(cookie);
134141
destroy_mock_event_callbacks();
135-
engine->getDcpConnMap().manageConnections();
142+
if (engine) {
143+
engine->getDcpConnMap().manageConnections();
144+
}
136145

137146
// Need to have the current engine valid before deleting (this is what
138147
// EvpDestroy does normally; however we have a smart ptr to the engine

0 commit comments

Comments
 (0)