Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
## 2026-04-08 - [Performance: Defer Allocation during Traversal]
**Learning:** During DAG traversals, creating owned variants of identifiers (like `file.to_path_buf()`) *before* checking `visited` HashSets results in heap allocations (O(E)) for every edge instead of every visited node (O(V)). By moving the `&PathBuf` allocation strictly *after* all HashSet `contains` checks using the borrowed reference (`&Path`), we drastically reduce memory churn.
**Action:** Always check `HashSet::contains` with a borrowed reference *before* creating the owned version required by `HashSet::insert`, especially in performance-critical graph traversal paths.

## 2024-05-18 - [Performance: Optimizing SQL string generation]
**Learning:** Generating SQL statements dynamically in `D1ExportContext` by using intermediate string allocations and vectors (e.g. `vec![]` then `join(", ")`) incurs significant overhead. Direct formatted writes to a pre-allocated `String` using `std::fmt::Write` reduces allocations and drastically improves statement generation performance.
**Action:** Replace intermediate vectors and string slices with single, pre-allocated strings (`String::with_capacity()`) and `write!()` macros in performance critical SQL statement construction logic.
75 changes: 45 additions & 30 deletions crates/flow/src/targets/d1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,40 +300,52 @@ impl D1ExportContext {
key: &KeyValue,
values: &FieldValues,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut columns = vec![];
let mut placeholders = vec![];
let mut params = vec![];
let mut update_clauses = vec![];
use std::fmt::Write;

// ⚡ Bolt: Pre-allocate capacity and avoid intermediate vectors
let num_keys = key.0.len().min(self.key_fields_schema.len());
let num_values = values.fields.len().min(self.value_fields_schema.len());
let total_cols = num_keys + num_values;

let mut params = Vec::with_capacity(total_cols);
let mut sql = String::with_capacity(64 + total_cols * 15 + num_values * 30);

write!(sql, "INSERT INTO {} (", self.table_name).unwrap();

// Extract key parts - KeyValue is a wrapper around Box<[KeyPart]>
let mut first = true;
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
columns.push(self.key_fields_schema[idx].name.clone());
placeholders.push("?".to_string());
if !first { sql.push_str(", "); }
sql.push_str(&self.key_fields_schema[idx].name);
params.push(key_part_to_json(key_part)?);
first = false;
}
}

// Add value fields
for (idx, value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
columns.push(value_field.name.clone());
placeholders.push("?".to_string());
if !first { sql.push_str(", "); }
sql.push_str(&value_field.name);
params.push(value_to_json(value)?);
update_clauses.push(format!(
"{} = excluded.{}",
value_field.name, value_field.name
));
first = false;
}
}

let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
self.table_name,
columns.join(", "),
placeholders.join(", "),
update_clauses.join(", ")
);
sql.push_str(") VALUES (");
for i in 0..params.len() {
if i > 0 { sql.push_str(", "); }
sql.push('?');
}

sql.push_str(") ON CONFLICT DO UPDATE SET ");
let mut first_update = true;
for (idx, _value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
if !first_update { sql.push_str(", "); }
write!(sql, "{0} = excluded.{0}", value_field.name).unwrap();
first_update = false;
}
}

Ok((sql, params))
}
Expand All @@ -342,22 +354,25 @@ impl D1ExportContext {
&self,
key: &KeyValue,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut where_clauses = vec![];
let mut params = vec![];
use std::fmt::Write;

// ⚡ Bolt: Pre-allocate capacity and avoid intermediate vectors
let num_keys = key.0.len().min(self.key_fields_schema.len());
let mut params = Vec::with_capacity(num_keys);

let mut sql = String::with_capacity(32 + num_keys * 20);
write!(sql, "DELETE FROM {} WHERE ", self.table_name).unwrap();

let mut first = true;
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
if !first { sql.push_str(" AND "); }
write!(sql, "{} = ?", self.key_fields_schema[idx].name).unwrap();
params.push(key_part_to_json(key_part)?);
first = false;
}
}

let sql = format!(
"DELETE FROM {} WHERE {}",
self.table_name,
where_clauses.join(" AND ")
);

Ok((sql, params))
}

Expand Down