-
-
Notifications
You must be signed in to change notification settings - Fork 201
feat(sync): add delayed task submission for throttling #1506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5f48f12
6d98462
a399d2a
b97d337
c28a807
3e19e39
90295b8
ba36aa7
b827096
6ee3c9f
bfbfe36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,10 +121,17 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name) | |
| * `done` *from* the worker signaling that it will close down and can be joined. | ||
| */ | ||
|
|
||
| static uint64_t | ||
| add_saturate(uint64_t a, uint64_t b) | ||
| { | ||
| return b <= UINT64_MAX - a ? a + b : UINT64_MAX; | ||
| } | ||
|
|
||
| struct sentry_bgworker_task_s; | ||
| typedef struct sentry_bgworker_task_s { | ||
| struct sentry_bgworker_task_s *next_task; | ||
| long refcount; | ||
| uint64_t execute_after; | ||
| sentry_task_exec_func_t exec_func; | ||
| void (*cleanup_func)(void *task_data); | ||
| void *task_data; | ||
|
|
@@ -155,6 +162,7 @@ struct sentry_bgworker_s { | |
| sentry_mutex_t task_lock; | ||
| sentry_bgworker_task_t *first_task; | ||
| sentry_bgworker_task_t *last_task; | ||
| sentry_bgworker_task_t *current_task; | ||
| void *state; | ||
| void (*free_state)(void *state); | ||
| long refcount; | ||
|
|
@@ -225,7 +233,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw) | |
| static bool | ||
| sentry__bgworker_is_done(sentry_bgworker_t *bgw) | ||
| { | ||
| return !bgw->first_task && !sentry__atomic_fetch(&bgw->running); | ||
| return (!bgw->first_task | ||
| || sentry__monotonic_time() < bgw->first_task->execute_after) | ||
| && !sentry__atomic_fetch(&bgw->running); | ||
| } | ||
|
|
||
| SENTRY_THREAD_FN | ||
|
|
@@ -260,7 +270,18 @@ worker_thread(void *data) | |
| continue; | ||
| } | ||
|
|
||
| // wait for a delayed task, wake up to new submissions | ||
| { | ||
| uint64_t now = sentry__monotonic_time(); | ||
| if (now < task->execute_after) { | ||
| sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock, | ||
| (uint32_t)(task->execute_after - now)); | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| sentry__task_incref(task); | ||
| bgw->current_task = task; | ||
| sentry__mutex_unlock(&bgw->task_lock); | ||
|
|
||
| SENTRY_DEBUG("executing task on worker thread"); | ||
|
|
@@ -274,6 +295,7 @@ worker_thread(void *data) | |
| // if not, we pop it and `decref` again, removing the _is inside | ||
| // list_ refcount. | ||
| sentry__mutex_lock(&bgw->task_lock); | ||
| bgw->current_task = NULL; | ||
| if (bgw->first_task == task) { | ||
| bgw->first_task = task->next_task; | ||
| if (task == bgw->last_task) { | ||
|
|
@@ -350,11 +372,25 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout) | |
| sentry__cond_init(&flush_task->signal); | ||
| sentry__mutex_init(&flush_task->lock); | ||
|
|
||
| // place the flush sentinel after the last task due within the timeout; | ||
| // tasks delayed beyond the timeout cannot complete in time anyway | ||
| uint64_t before = sentry__monotonic_time(); | ||
| uint64_t deadline = add_saturate(before, timeout); | ||
| uint64_t execute_after = before; | ||
| sentry__mutex_lock(&bgw->task_lock); | ||
| for (sentry_bgworker_task_t *t = bgw->first_task; | ||
| t && t->execute_after <= deadline; t = t->next_task) { | ||
| if (t->execute_after > execute_after) { | ||
| execute_after = t->execute_after; | ||
| } | ||
| } | ||
| sentry__mutex_unlock(&bgw->task_lock); | ||
|
|
||
| sentry__mutex_lock(&flush_task->lock); | ||
|
|
||
| /* submit the task that triggers our condvar once it runs */ | ||
| sentry__bgworker_submit(bgw, sentry__flush_task, | ||
| (void (*)(void *))sentry__flush_task_decref, flush_task); | ||
| sentry__bgworker_submit_at(bgw, sentry__flush_task, | ||
| (void (*)(void *))sentry__flush_task_decref, flush_task, execute_after); | ||
|
|
||
| uint64_t started = sentry__monotonic_time(); | ||
| bool was_flushed = false; | ||
|
|
@@ -422,6 +458,29 @@ int | |
| sentry__bgworker_submit(sentry_bgworker_t *bgw, | ||
| sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data), | ||
| void *task_data) | ||
| { | ||
| SENTRY_DEBUG("submitting task to background worker thread"); | ||
| return sentry__bgworker_submit_at( | ||
| bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time()); | ||
| } | ||
|
|
||
| int | ||
| sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw, | ||
| sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data), | ||
| void *task_data, uint64_t delay_ms) | ||
| { | ||
| SENTRY_DEBUGF("submitting %" PRIu64 | ||
| " ms delayed task to background worker thread", | ||
| delay_ms); | ||
| uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms); | ||
| return sentry__bgworker_submit_at( | ||
| bgw, exec_func, cleanup_func, task_data, execute_after); | ||
| } | ||
|
|
||
| int | ||
| sentry__bgworker_submit_at(sentry_bgworker_t *bgw, | ||
| sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data), | ||
| void *task_data, uint64_t execute_after) | ||
| { | ||
| sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t); | ||
| if (!task) { | ||
|
|
@@ -432,19 +491,42 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw, | |
| } | ||
| task->next_task = NULL; | ||
| task->refcount = 1; | ||
| task->execute_after = execute_after; | ||
| task->exec_func = exec_func; | ||
| task->cleanup_func = cleanup_func; | ||
| task->task_data = task_data; | ||
|
|
||
| SENTRY_DEBUG("submitting task to background worker thread"); | ||
| sentry__mutex_lock(&bgw->task_lock); | ||
|
|
||
| if (!bgw->first_task) { | ||
| // empty queue | ||
| bgw->first_task = task; | ||
| } | ||
| if (bgw->last_task) { | ||
| bgw->last_task = task; | ||
| } else if (bgw->last_task->execute_after <= task->execute_after) { | ||
| // append last (common fast path for FIFO immediates) | ||
| bgw->last_task->next_task = task; | ||
| bgw->last_task = task; | ||
| } else { | ||
| // insert sorted by execute_after; skip past current_task which | ||
| // may be executing without the lock held | ||
| sentry_bgworker_task_t *prev = bgw->current_task; | ||
| sentry_bgworker_task_t *cur = prev ? prev->next_task : bgw->first_task; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use-after-free accessing freed task pointerHigh Severity The insertion logic at line 513 accesses |
||
| while (cur && cur->execute_after <= task->execute_after) { | ||
| prev = cur; | ||
| cur = cur->next_task; | ||
| } | ||
|
|
||
| task->next_task = cur; | ||
| if (prev) { | ||
| prev->next_task = task; | ||
| } else { | ||
| bgw->first_task = task; | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!task->next_task) { | ||
| bgw->last_task = task; | ||
| } | ||
| } | ||
| bgw->last_task = task; | ||
|
|
||
| sentry__cond_wake(&bgw->submit_signal); | ||
| sentry__mutex_unlock(&bgw->task_lock); | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.