Skip to content

Commit 00467c6

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent 76fbfd6 commit 00467c6

4 files changed

Lines changed: 287 additions & 7 deletions

File tree

src/sentry_sync.c

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct sentry_bgworker_task_s;
125125
typedef struct sentry_bgworker_task_s {
126126
struct sentry_bgworker_task_s *next_task;
127127
long refcount;
128+
uint64_t execute_after;
128129
sentry_task_exec_func_t exec_func;
129130
void (*cleanup_func)(void *task_data);
130131
void *task_data;
@@ -225,7 +226,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225226
static bool
226227
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227228
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
229+
return (!bgw->first_task
230+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
231+
&& !sentry__atomic_fetch(&bgw->running);
229232
}
230233

231234
SENTRY_THREAD_FN
@@ -260,6 +263,16 @@ worker_thread(void *data)
260263
continue;
261264
}
262265

266+
// wait for a delayed task, wake up to new submissions
267+
{
268+
uint64_t now = sentry__monotonic_time();
269+
if (now < task->execute_after) {
270+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
271+
(uint32_t)(task->execute_after - now));
272+
continue;
273+
}
274+
}
275+
263276
sentry__task_incref(task);
264277
sentry__mutex_unlock(&bgw->task_lock);
265278

@@ -350,11 +363,23 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350363
sentry__cond_init(&flush_task->signal);
351364
sentry__mutex_init(&flush_task->lock);
352365

366+
// flush potential delayed tasks up until the timeout
367+
uint64_t delay_ms = 0;
368+
uint64_t before = sentry__monotonic_time();
369+
sentry__mutex_lock(&bgw->task_lock);
370+
if (bgw->last_task && bgw->last_task->execute_after > before) {
371+
delay_ms = bgw->last_task->execute_after - before;
372+
if (delay_ms > timeout) {
373+
delay_ms = timeout;
374+
}
375+
}
376+
sentry__mutex_unlock(&bgw->task_lock);
377+
353378
sentry__mutex_lock(&flush_task->lock);
354379

355380
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
381+
sentry__bgworker_submit_delayed(bgw, sentry__flush_task,
382+
(void (*)(void *))sentry__flush_task_decref, flush_task, delay_ms);
358383

359384
uint64_t started = sentry__monotonic_time();
360385
bool was_flushed = false;
@@ -396,6 +421,7 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396421

397422
uint64_t started = sentry__monotonic_time();
398423
sentry__mutex_lock(&bgw->task_lock);
424+
399425
while (true) {
400426
if (sentry__bgworker_is_done(bgw)) {
401427
sentry__mutex_unlock(&bgw->task_lock);
@@ -422,6 +448,28 @@ int
422448
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423449
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424450
void *task_data)
451+
{
452+
SENTRY_DEBUG("submitting task to background worker thread");
453+
return sentry__bgworker_submit_at(
454+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
455+
}
456+
457+
int
458+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
459+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
460+
void *task_data, uint64_t delay_ms)
461+
{
462+
SENTRY_DEBUGF("submitting %" PRIu64
463+
" ms delayed task to background worker thread",
464+
delay_ms);
465+
return sentry__bgworker_submit_at(bgw, exec_func, cleanup_func, task_data,
466+
sentry__monotonic_time() + delay_ms);
467+
}
468+
469+
int
470+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
471+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
472+
void *task_data, uint64_t execute_after)
425473
{
426474
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427475
if (!task) {
@@ -432,19 +480,41 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432480
}
433481
task->next_task = NULL;
434482
task->refcount = 1;
483+
task->execute_after = execute_after;
435484
task->exec_func = exec_func;
436485
task->cleanup_func = cleanup_func;
437486
task->task_data = task_data;
438487

439-
SENTRY_DEBUG("submitting task to background worker thread");
440488
sentry__mutex_lock(&bgw->task_lock);
489+
441490
if (!bgw->first_task) {
491+
// empty queue
442492
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
493+
bgw->last_task = task;
494+
} else if (bgw->last_task->execute_after <= task->execute_after) {
495+
// append last (common fast path for FIFO immediates)
445496
bgw->last_task->next_task = task;
497+
bgw->last_task = task;
498+
} else {
499+
// insert sorted by execute_after
500+
sentry_bgworker_task_t *prev = NULL;
501+
sentry_bgworker_task_t *cur = bgw->first_task;
502+
while (cur && cur->execute_after <= task->execute_after) {
503+
prev = cur;
504+
cur = cur->next_task;
505+
}
506+
507+
task->next_task = cur;
508+
if (prev) {
509+
prev->next_task = task;
510+
} else {
511+
bgw->first_task = task;
512+
}
513+
if (!task->next_task) {
514+
bgw->last_task = task;
515+
}
446516
}
447-
bgw->last_task = task;
517+
448518
sentry__cond_wake(&bgw->submit_signal);
449519
sentry__mutex_unlock(&bgw->task_lock);
450520

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

tests/unit/test_sync.c

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
#include "sentry_core.h"
22
#include "sentry_sync.h"
33
#include "sentry_testsupport.h"
4+
#include "sentry_utils.h"
45

56
#ifdef SENTRY_PLATFORM_WINDOWS
67
# include <windows.h>
78
# define sleep_s(SECONDS) Sleep((SECONDS) * 1000)
9+
# define sleep_ms(MS) Sleep(MS)
810
#else
911
# include <unistd.h>
1012
# define sleep_s(SECONDS) sleep(SECONDS)
13+
# define sleep_ms(MS) usleep((MS) * 1000)
1114
#endif
1215

1316
struct task_state {
@@ -167,3 +170,195 @@ SENTRY_TEST(bgworker_flush)
167170
TEST_CHECK_INT_EQUAL(shutdown, 0);
168171
sentry__bgworker_decref(bgw);
169172
}
173+
174+
static sentry_cond_t blocker_signal;
175+
#ifdef SENTRY__MUTEX_INIT_DYN
176+
SENTRY__MUTEX_INIT_DYN(blocker_lock)
177+
#else
178+
static sentry_mutex_t blocker_lock = SENTRY__MUTEX_INIT;
179+
#endif
180+
static bool blocker_released;
181+
182+
static void
183+
blocker_task(void *UNUSED(data), void *UNUSED(state))
184+
{
185+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
186+
sentry__mutex_lock(&blocker_lock);
187+
while (!blocker_released) {
188+
sentry__cond_wait_timeout(&blocker_signal, &blocker_lock, 100);
189+
}
190+
sentry__mutex_unlock(&blocker_lock);
191+
}
192+
193+
struct order_state {
194+
int order[10];
195+
int count;
196+
};
197+
198+
static void
199+
record_order_task(void *data, void *_state)
200+
{
201+
struct order_state *state = (struct order_state *)_state;
202+
state->order[state->count++] = (int)(size_t)data;
203+
}
204+
205+
SENTRY_TEST(bgworker_task_delay)
206+
{
207+
struct order_state os;
208+
os.count = 0;
209+
210+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
211+
TEST_ASSERT(!!bgw);
212+
213+
uint64_t before = sentry__monotonic_time();
214+
sentry__bgworker_submit_delayed(
215+
bgw, record_order_task, NULL, (void *)1, 50);
216+
217+
sentry__bgworker_start(bgw);
218+
TEST_CHECK_INT_EQUAL(sentry__bgworker_flush(bgw, 500), 0);
219+
uint64_t after = sentry__monotonic_time();
220+
221+
TEST_CHECK_INT_EQUAL(os.count, 1);
222+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
223+
TEST_CHECK(after - before >= 50);
224+
225+
sentry__bgworker_shutdown(bgw, 500);
226+
sentry__bgworker_decref(bgw);
227+
}
228+
229+
SENTRY_TEST(bgworker_delayed_tasks)
230+
{
231+
struct order_state os;
232+
os.count = 0;
233+
234+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
235+
TEST_ASSERT(!!bgw);
236+
237+
// submit_at with a fixed base so ordering is deterministic regardless
238+
// of OS preemption between submissions (submit_delayed reads the clock
239+
// per call, so a pause between calls could shift execute_after values
240+
// and change the expected sort order)
241+
uint64_t base = sentry__monotonic_time();
242+
243+
// all tasks sorted by execute_after: immediate (0) first, then delayed
244+
// by deadline
245+
//
246+
// queue after each submit:
247+
// i(1)
248+
// i(1) d100(3)
249+
// i(1) i(6) d100(3)
250+
// i(1) i(6) d50(2) d100(3)
251+
// i(1) i(6) i(7) d50(2) d100(3)
252+
// i(1) i(6) i(7) d50(2) d100(3) d200(5)
253+
// i(1) i(6) i(7) d50(2) d100(3) d150(4) d200(5)
254+
// i(1) i(6) i(7) i(8) d50(2) d100(3) d150(4) d200(5)
255+
// i(1) i(6) i(7) i(8) d50(2) d75(9) d100(3) d150(4) d200(5)
256+
// i(1) i(6) i(7) i(8) i(10) d50(2) d75(9) d100(3) d150(4) d200(5)
257+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)1, base);
258+
sentry__bgworker_submit_at(
259+
bgw, record_order_task, NULL, (void *)3, base + 100);
260+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)6, base);
261+
sentry__bgworker_submit_at(
262+
bgw, record_order_task, NULL, (void *)2, base + 50);
263+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)7, base);
264+
sentry__bgworker_submit_at(
265+
bgw, record_order_task, NULL, (void *)5, base + 200);
266+
sentry__bgworker_submit_at(
267+
bgw, record_order_task, NULL, (void *)4, base + 150);
268+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)8, base);
269+
sentry__bgworker_submit_at(
270+
bgw, record_order_task, NULL, (void *)9, base + 75);
271+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)10, base);
272+
273+
sentry__bgworker_start(bgw);
274+
TEST_CHECK_INT_EQUAL(sentry__bgworker_flush(bgw, 5000), 0);
275+
276+
// all tasks execute: immediate first, then delayed in deadline order
277+
TEST_CHECK_INT_EQUAL(os.count, 10);
278+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
279+
TEST_CHECK_INT_EQUAL(os.order[1], 6);
280+
TEST_CHECK_INT_EQUAL(os.order[2], 7);
281+
TEST_CHECK_INT_EQUAL(os.order[3], 8);
282+
TEST_CHECK_INT_EQUAL(os.order[4], 10);
283+
TEST_CHECK_INT_EQUAL(os.order[5], 2);
284+
TEST_CHECK_INT_EQUAL(os.order[6], 9);
285+
TEST_CHECK_INT_EQUAL(os.order[7], 3);
286+
TEST_CHECK_INT_EQUAL(os.order[8], 4);
287+
TEST_CHECK_INT_EQUAL(os.order[9], 5);
288+
289+
sentry__bgworker_shutdown(bgw, 500);
290+
sentry__bgworker_decref(bgw);
291+
}
292+
293+
SENTRY_TEST(bgworker_delayed_priority)
294+
{
295+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
296+
sentry__cond_init(&blocker_signal);
297+
blocker_released = false;
298+
299+
struct order_state os;
300+
os.count = 0;
301+
302+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
303+
TEST_ASSERT(!!bgw);
304+
305+
// blocker holds the worker busy
306+
sentry__bgworker_submit(bgw, blocker_task, NULL, NULL);
307+
// delayed task queued behind the blocker
308+
sentry__bgworker_submit_delayed(
309+
bgw, record_order_task, NULL, (void *)1, 50);
310+
311+
sentry__bgworker_start(bgw);
312+
313+
// wait for the delayed task to become ready
314+
sleep_ms(100);
315+
316+
// submit an immediate task — should NOT bypass the ready delayed task
317+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
318+
319+
// release the blocker
320+
sentry__mutex_lock(&blocker_lock);
321+
blocker_released = true;
322+
sentry__cond_wake(&blocker_signal);
323+
sentry__mutex_unlock(&blocker_lock);
324+
325+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 5000), 0);
326+
327+
TEST_CHECK_INT_EQUAL(os.count, 2);
328+
TEST_CHECK_INT_EQUAL(os.order[0], 1); // delayed (was ready first)
329+
TEST_CHECK_INT_EQUAL(os.order[1], 2); // immediate (submitted later)
330+
331+
sentry__bgworker_decref(bgw);
332+
}
333+
334+
SENTRY_TEST(bgworker_delayed_shutdown)
335+
{
336+
struct order_state os;
337+
os.count = 0;
338+
339+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
340+
TEST_ASSERT(!!bgw);
341+
342+
// immediate tasks
343+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)1);
344+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
345+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)3);
346+
347+
// pending delayed tasks are discarded on shutdown
348+
sentry__bgworker_submit_at(
349+
bgw, record_order_task, NULL, (void *)4, UINT64_MAX);
350+
sentry__bgworker_submit_at(
351+
bgw, record_order_task, NULL, (void *)5, UINT64_MAX);
352+
sentry__bgworker_submit_at(
353+
bgw, record_order_task, NULL, (void *)6, UINT64_MAX);
354+
355+
sentry__bgworker_start(bgw);
356+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 1000), 0);
357+
358+
TEST_CHECK_INT_EQUAL(os.count, 3);
359+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
360+
TEST_CHECK_INT_EQUAL(os.order[1], 2);
361+
TEST_CHECK_INT_EQUAL(os.order[2], 3);
362+
363+
sentry__bgworker_decref(bgw);
364+
}

tests/unit/tests.inc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ XX(basic_tracing_context)
2424
XX(basic_transaction)
2525
XX(basic_transport_thread_name)
2626
XX(basic_write_envelope_to_file)
27+
XX(bgworker_delayed_priority)
28+
XX(bgworker_delayed_shutdown)
29+
XX(bgworker_delayed_tasks)
2730
XX(bgworker_flush)
31+
XX(bgworker_task_delay)
2832
XX(breadcrumb_without_type_or_message_still_valid)
2933
XX(build_id_parser)
3034
XX(cache_keep)

0 commit comments

Comments
 (0)