Skip to content

Commit 49c97f4

Browse files
authored
fix(pegboard): skip protocol version keys in runner pool backfill (#4620)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent c1b1e03 commit 49c97f4

3 files changed

Lines changed: 28 additions & 39 deletions

File tree

engine/packages/pegboard/src/keys/runner_config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,11 @@ impl TuplePack for ProtocolVersionKey {
268268

269269
impl<'de> TupleUnpack<'de> for ProtocolVersionKey {
270270
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
271-
let (input, (_, _, _, namespace_id, name, _)) =
271+
let (input, (_, _, _, namespace_id, name, data)) =
272272
<(usize, usize, usize, Id, String, usize)>::unpack(input, tuple_depth)?;
273+
if data != PROTOCOL_VERSION {
274+
return Err(PackError::Message("expected PROTOCOL_VERSION data".into()));
275+
}
273276

274277
let v = ProtocolVersionKey { namespace_id, name };
275278

engine/packages/pegboard/src/workflows/runner_pool_backfill.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ async fn backfill_chunk(ctx: &ActivityCtx, input: &BackfillChunkInput) -> Result
9999
};
100100

101101
new_last_key = [entry.key(), &[0xff]].concat();
102+
103+
if tx
104+
.unpack::<keys::runner_config::ProtocolVersionKey>(entry.key())
105+
.is_ok()
106+
{
107+
continue;
108+
}
109+
102110
entries.push(tx.read_entry::<keys::runner_config::DataKey>(&entry)?);
103111
}
104112

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
use std::ffi::{c_char, CStr, CString};
1+
use std::ffi::{CStr, CString, c_char};
22
use std::ptr;
33
use std::sync::{Arc, Mutex};
44

55
use async_trait::async_trait;
66
use libsqlite3_sys::{
7-
sqlite3, sqlite3_bind_blob, sqlite3_bind_double, sqlite3_bind_int64, sqlite3_bind_null,
8-
sqlite3_bind_text, sqlite3_changes, sqlite3_column_blob, sqlite3_column_bytes,
9-
sqlite3_column_count, sqlite3_column_double, sqlite3_column_int64, sqlite3_column_name,
10-
sqlite3_column_text, sqlite3_column_type, sqlite3_errmsg, sqlite3_finalize,
11-
sqlite3_prepare_v2, sqlite3_step, SQLITE_BLOB, SQLITE_DONE, SQLITE_FLOAT, SQLITE_INTEGER,
12-
SQLITE_NULL, SQLITE_OK, SQLITE_ROW, SQLITE_TEXT, SQLITE_TRANSIENT,
7+
SQLITE_BLOB, SQLITE_DONE, SQLITE_FLOAT, SQLITE_INTEGER, SQLITE_NULL, SQLITE_OK, SQLITE_ROW,
8+
SQLITE_TEXT, SQLITE_TRANSIENT, sqlite3, sqlite3_bind_blob, sqlite3_bind_double,
9+
sqlite3_bind_int64, sqlite3_bind_null, sqlite3_bind_text, sqlite3_changes, sqlite3_column_blob,
10+
sqlite3_column_bytes, sqlite3_column_count, sqlite3_column_double, sqlite3_column_int64,
11+
sqlite3_column_name, sqlite3_column_text, sqlite3_column_type, sqlite3_errmsg,
12+
sqlite3_finalize, sqlite3_prepare_v2, sqlite3_step,
1313
};
1414
use napi::bindgen_prelude::Buffer;
1515
use napi_derive::napi;
@@ -109,8 +109,7 @@ pub struct JsNativeDatabase {
109109

110110
impl JsNativeDatabase {
111111
pub fn as_ptr(&self) -> *mut libsqlite3_sys::sqlite3 {
112-
self
113-
.db
112+
self.db
114113
.lock()
115114
.ok()
116115
.and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr))
@@ -243,13 +242,7 @@ fn bind_params(
243242
let text = CString::new(param.text_value.clone().unwrap_or_default())
244243
.map_err(|err| napi::Error::from_reason(err.to_string()))?;
245244
unsafe {
246-
sqlite3_bind_text(
247-
stmt,
248-
bind_index,
249-
text.as_ptr(),
250-
-1,
251-
SQLITE_TRANSIENT(),
252-
)
245+
sqlite3_bind_text(stmt, bind_index, text.as_ptr(), -1, SQLITE_TRANSIENT())
253246
}
254247
}
255248
"blob" => {
@@ -291,26 +284,17 @@ fn collect_columns(stmt: *mut libsqlite3_sys::sqlite3_stmt) -> Vec<String> {
291284
if name_ptr.is_null() {
292285
String::new()
293286
} else {
294-
CStr::from_ptr(name_ptr)
295-
.to_string_lossy()
296-
.into_owned()
287+
CStr::from_ptr(name_ptr).to_string_lossy().into_owned()
297288
}
298289
})
299290
.collect()
300291
}
301292

302-
fn column_value(
303-
stmt: *mut libsqlite3_sys::sqlite3_stmt,
304-
index: i32,
305-
) -> serde_json::Value {
293+
fn column_value(stmt: *mut libsqlite3_sys::sqlite3_stmt, index: i32) -> serde_json::Value {
306294
match unsafe { sqlite3_column_type(stmt, index) } {
307295
SQLITE_NULL => serde_json::Value::Null,
308-
SQLITE_INTEGER => {
309-
serde_json::Value::from(unsafe { sqlite3_column_int64(stmt, index) })
310-
}
311-
SQLITE_FLOAT => {
312-
serde_json::Value::from(unsafe { sqlite3_column_double(stmt, index) })
313-
}
296+
SQLITE_INTEGER => serde_json::Value::from(unsafe { sqlite3_column_int64(stmt, index) }),
297+
SQLITE_FLOAT => serde_json::Value::from(unsafe { sqlite3_column_double(stmt, index) }),
314298
SQLITE_TEXT => {
315299
let text_ptr = unsafe { sqlite3_column_text(stmt, index) };
316300
if text_ptr.is_null() {
@@ -328,9 +312,7 @@ fn column_value(
328312
serde_json::Value::Null
329313
} else {
330314
let blob_len = unsafe { sqlite3_column_bytes(stmt, index) } as usize;
331-
let blob = unsafe {
332-
std::slice::from_raw_parts(blob_ptr as *const u8, blob_len)
333-
};
315+
let blob = unsafe { std::slice::from_raw_parts(blob_ptr as *const u8, blob_len) };
334316
serde_json::Value::Array(
335317
blob.iter()
336318
.map(|byte| serde_json::Value::from(*byte))
@@ -349,9 +331,7 @@ fn execute_statement(
349331
) -> napi::Result<ExecuteResult> {
350332
let c_sql = CString::new(sql).map_err(|err| napi::Error::from_reason(err.to_string()))?;
351333
let mut stmt = ptr::null_mut();
352-
let rc = unsafe {
353-
sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut())
354-
};
334+
let rc = unsafe { sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) };
355335
if rc != SQLITE_OK {
356336
return Err(sqlite_error(db, "failed to prepare sqlite statement"));
357337
}
@@ -393,9 +373,7 @@ fn query_statement(
393373
) -> napi::Result<QueryResult> {
394374
let c_sql = CString::new(sql).map_err(|err| napi::Error::from_reason(err.to_string()))?;
395375
let mut stmt = ptr::null_mut();
396-
let rc = unsafe {
397-
sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut())
398-
};
376+
let rc = unsafe { sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) };
399377
if rc != SQLITE_OK {
400378
return Err(sqlite_error(db, "failed to prepare sqlite query"));
401379
}

0 commit comments

Comments
 (0)