diff --git a/dist/src/memory-compactor.js b/dist/src/memory-compactor.js index 0954063d..03a41d5f 100644 --- a/dist/src/memory-compactor.js +++ b/dist/src/memory-compactor.js @@ -149,6 +149,22 @@ export function buildMergedEntry(members) { }); return { text, importance, category, scope, metadata }; } +const MAX_CLUSTER_COMPACTION_CONCURRENCY = 3; +const MAX_CLUSTER_DELETE_CONCURRENCY = 8; +async function mapWithConcurrency(items, concurrency, mapper) { + if (items.length === 0) + return []; + const results = new Array(items.length); + let nextIndex = 0; + const workerCount = Math.min(Math.max(1, concurrency), items.length); + await Promise.all(Array.from({ length: workerCount }, async () => { + while (nextIndex < items.length) { + const currentIndex = nextIndex++; + results[currentIndex] = await mapper(items[currentIndex], currentIndex); + } + })); + return results; +} // ============================================================================ // Main runner // ============================================================================ @@ -186,9 +202,7 @@ export async function runCompaction(store, embedder, config, scopes, logger) { dryRun: true, }; } - let memoriesDeleted = 0; - let memoriesCreated = 0; - for (const plan of plans) { + const outcomes = await mapWithConcurrency(plans, MAX_CLUSTER_COMPACTION_CONCURRENCY, async (plan) => { const members = plan.memberIndices.map((i) => valid[i]); try { // Embed the merged text @@ -202,18 +216,28 @@ export async function runCompaction(store, embedder, config, scopes, logger) { scope: plan.merged.scope, metadata: plan.merged.metadata, }); - memoriesCreated++; // Delete source entries - for (const m of members) { - const deleted = await store.delete(m.id); - if (deleted) - memoriesDeleted++; - } + const deleteResults = await mapWithConcurrency(members, MAX_CLUSTER_DELETE_CONCURRENCY, async (m) => { + try { + return (await store.delete(m.id)) ? 1 : 0; + } + catch (err) { + logger?.warn(`memory-compactor: failed to delete source memory ${m.id}: ${String(err)}`); + return 0; + } + }); + return { + memoriesDeleted: deleteResults.reduce((sum, deleted) => sum + deleted, 0), + memoriesCreated: 1, + }; } catch (err) { logger?.warn(`memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`); + return { memoriesDeleted: 0, memoriesCreated: 0 }; } - } + }); + const memoriesDeleted = outcomes.reduce((sum, outcome) => sum + outcome.memoriesDeleted, 0); + const memoriesCreated = outcomes.reduce((sum, outcome) => sum + outcome.memoriesCreated, 0); logger?.info(`memory-compactor: scanned=${valid.length} clusters=${plans.length} ` + `deleted=${memoriesDeleted} created=${memoriesCreated}`); return { diff --git a/dist/src/self-improvement-files.js b/dist/src/self-improvement-files.js index 39f4eeba..24b89c54 100644 --- a/dist/src/self-improvement-files.js +++ b/dist/src/self-improvement-files.js @@ -66,8 +66,10 @@ export async function ensureSelfImprovementLearningFiles(baseDir) { } await writeFile(filePath, `${content.trim()}\n`, "utf-8"); }; - await ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE); - await ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE); + await Promise.all([ + ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE), + ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE), + ]); } export async function appendSelfImprovementEntry(params) { const { baseDir, type, summary, details = "", suggestedAction = "", category = "best_practice", area = "config", priority = "medium", status = "pending", source = "memory-lancedb-pro/self_improvement_log", maxEntries, } = params; diff --git a/src/memory-compactor.ts b/src/memory-compactor.ts index 330ac3b4..6937030f 100644 --- a/src/memory-compactor.ts +++ b/src/memory-compactor.ts @@ -262,6 +262,32 @@ export interface CompactorLogger { warn(msg: string): void; } +const MAX_CLUSTER_COMPACTION_CONCURRENCY = 3; +const MAX_CLUSTER_DELETE_CONCURRENCY = 8; + +async function mapWithConcurrency( + items: T[], + concurrency: number, + mapper: (item: T, index: number) => Promise, +): Promise { + if (items.length === 0) return []; + + const results = new Array(items.length); + let nextIndex = 0; + const workerCount = Math.min(Math.max(1, concurrency), items.length); + + await Promise.all( + Array.from({ length: workerCount }, async () => { + while (nextIndex < items.length) { + const currentIndex = nextIndex++; + results[currentIndex] = await mapper(items[currentIndex], currentIndex); + } + }), + ); + + return results; +} + // ============================================================================ // Main runner // ============================================================================ @@ -322,38 +348,63 @@ export async function runCompaction( }; } - let memoriesDeleted = 0; - let memoriesCreated = 0; - - for (const plan of plans) { - const members = plan.memberIndices.map((i) => valid[i]); - - try { - // Embed the merged text - const vector = await embedder.embedPassage(plan.merged.text); - - // Store merged entry - await store.store({ - text: plan.merged.text, - vector, - importance: plan.merged.importance, - category: plan.merged.category, - scope: plan.merged.scope, - metadata: plan.merged.metadata, - }); - memoriesCreated++; - - // Delete source entries - for (const m of members) { - const deleted = await store.delete(m.id); - if (deleted) memoriesDeleted++; + const outcomes = await mapWithConcurrency( + plans, + MAX_CLUSTER_COMPACTION_CONCURRENCY, + async (plan) => { + const members = plan.memberIndices.map((i) => valid[i]); + + try { + // Embed the merged text + const vector = await embedder.embedPassage(plan.merged.text); + + // Store merged entry + await store.store({ + text: plan.merged.text, + vector, + importance: plan.merged.importance, + category: plan.merged.category, + scope: plan.merged.scope, + metadata: plan.merged.metadata, + }); + + // Delete source entries + const deleteResults = await mapWithConcurrency( + members, + MAX_CLUSTER_DELETE_CONCURRENCY, + async (m) => { + try { + return (await store.delete(m.id)) ? 1 : 0; + } catch (err) { + logger?.warn( + `memory-compactor: failed to delete source memory ${m.id}: ${String(err)}`, + ); + return 0; + } + }, + ); + + return { + memoriesDeleted: deleteResults.reduce((sum, deleted) => sum + deleted, 0), + memoriesCreated: 1, + }; + } catch (err) { + logger?.warn( + `memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`, + ); + return { memoriesDeleted: 0, memoriesCreated: 0 }; } - } catch (err) { - logger?.warn( - `memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`, - ); - } - } + }, + ); + + const memoriesDeleted = outcomes.reduce( + (sum, outcome) => sum + outcome.memoriesDeleted, + 0, + ); + const memoriesCreated = outcomes.reduce( + (sum, outcome) => sum + outcome.memoriesCreated, + 0, + ); logger?.info( `memory-compactor: scanned=${valid.length} clusters=${plans.length} ` + diff --git a/src/self-improvement-files.ts b/src/self-improvement-files.ts index ed75579f..f51cefd8 100644 --- a/src/self-improvement-files.ts +++ b/src/self-improvement-files.ts @@ -75,8 +75,10 @@ export async function ensureSelfImprovementLearningFiles(baseDir: string): Promi await writeFile(filePath, `${content.trim()}\n`, "utf-8"); }; - await ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE); - await ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE); + await Promise.all([ + ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE), + ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE), + ]); } export interface AppendSelfImprovementEntryParams { diff --git a/test/memory-compactor.test.mjs b/test/memory-compactor.test.mjs index 42b9a3a8..642e30e7 100644 --- a/test/memory-compactor.test.mjs +++ b/test/memory-compactor.test.mjs @@ -67,6 +67,10 @@ function makeEmbedder(dim = 4) { }; } +function delay(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + const defaultConfig = { enabled: true, minAgeDays: 7, @@ -289,4 +293,108 @@ describe("runCompaction", () => { assert.ok(result.scanned <= 3); }); + + it("processes independent clusters with bounded parallelism", async () => { + const entries = [ + entry({ id: "a1", text: "alpha 1", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }), + entry({ id: "a2", text: "alpha 2", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }), + entry({ id: "b1", text: "bravo 1", vector: vec(4, 0, 1, 0.01, 0), importance: 0.7 }), + entry({ id: "b2", text: "bravo 2", vector: vec(4, 0, 1, 0.02, 0), importance: 0.6 }), + entry({ id: "c1", text: "charlie 1", vector: vec(4, 0, 0, 1, 0.01), importance: 0.5 }), + entry({ id: "c2", text: "charlie 2", vector: vec(4, 0, 0, 1, 0.02), importance: 0.4 }), + entry({ id: "d1", text: "delta 1", vector: vec(4, 0.01, 0, 0, 1), importance: 0.3 }), + entry({ id: "d2", text: "delta 2", vector: vec(4, 0.02, 0, 0, 1), importance: 0.2 }), + ]; + const store = makeStore(entries); + let inFlight = 0; + let maxInFlight = 0; + const embedder = { + async embedPassage() { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + await delay(15); + inFlight--; + return vec(4, 1, 0, 0, 0); + }, + }; + + const result = await runCompaction(store, embedder, defaultConfig); + + assert.equal(result.clustersFound, 4); + assert.equal(result.memoriesCreated, 4); + assert.equal(result.memoriesDeleted, 8); + assert.ok(maxInFlight > 1, `expected cluster work to overlap, max=${maxInFlight}`); + assert.ok(maxInFlight < 4, `expected cluster work to be bounded below all clusters, max=${maxInFlight}`); + }); + + it("continues compacting other clusters when one merge fails", async () => { + const entries = [ + entry({ id: "good-1", text: "good cluster one", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }), + entry({ id: "good-2", text: "good cluster two", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }), + entry({ id: "bad-1", text: "bad cluster one", vector: vec(4, 0, 1, 0.01, 0), importance: 0.7 }), + entry({ id: "bad-2", text: "bad cluster two", vector: vec(4, 0, 1, 0.02, 0), importance: 0.6 }), + ]; + const store = makeStore(entries); + const warnings = []; + const logger = { info() {}, warn(msg) { warnings.push(msg); } }; + const embedder = { + async embedPassage(text) { + if (text.includes("bad cluster")) { + throw new Error("synthetic embed failure"); + } + return vec(4, 1, 0, 0, 0); + }, + }; + + const result = await runCompaction(store, embedder, defaultConfig, undefined, logger); + + assert.equal(result.clustersFound, 2); + assert.equal(result.memoriesCreated, 1); + assert.equal(result.memoriesDeleted, 2); + assert.equal(store.stored.length, 1); + assert.deepEqual(store.deleted.sort(), ["good-1", "good-2"]); + assert.ok(warnings.some((msg) => msg.includes("failed to merge cluster of 2"))); + }); + + it("continues deleting remaining source members when one delete fails", async () => { + const entries = [ + entry({ id: "delete-a", text: "same topic a", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }), + entry({ id: "delete-b", text: "same topic b", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }), + entry({ id: "delete-c", text: "same topic c", vector: vec(4, 1, 0.03, 0, 0), importance: 0.7 }), + ]; + const db = new Map(entries.map((e) => [e.id, { ...e }])); + const store = { + stored: [], + deleted: [], + async fetchForCompaction() { + return [...db.values()]; + }, + async store(e) { + const newEntry = { id: "merged-delete-test", ...e }; + this.stored.push(newEntry); + return newEntry; + }, + async delete(id) { + if (id === "delete-b") { + throw new Error("synthetic delete failure"); + } + if (db.has(id)) { + db.delete(id); + this.deleted.push(id); + return true; + } + return false; + }, + }; + const warnings = []; + const logger = { info() {}, warn(msg) { warnings.push(msg); } }; + + const result = await runCompaction(store, makeEmbedder(), defaultConfig, undefined, logger); + + assert.equal(result.clustersFound, 1); + assert.equal(result.memoriesCreated, 1); + assert.equal(result.memoriesDeleted, 2); + assert.deepEqual(store.deleted.sort(), ["delete-a", "delete-c"]); + assert.ok(warnings.some((msg) => msg.includes("failed to delete source memory delete-b"))); + }); }); diff --git a/test/self-improvement.test.mjs b/test/self-improvement.test.mjs index ded4b212..d5a93aed 100644 --- a/test/self-improvement.test.mjs +++ b/test/self-improvement.test.mjs @@ -1,6 +1,6 @@ import { describe, it, beforeEach, afterEach } from "node:test"; import assert from "node:assert/strict"; -import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; @@ -19,7 +19,11 @@ const { registerSelfImprovementExtractSkillTool, registerSelfImprovementReviewTool, } = jiti("../src/tools.ts"); -const { appendSelfImprovementEntry, countSelfImprovementEntries } = jiti("../src/self-improvement-files.ts"); +const { + appendSelfImprovementEntry, + countSelfImprovementEntries, + ensureSelfImprovementLearningFiles, +} = jiti("../src/self-improvement-files.ts"); const { extractReflectionLearningGovernanceCandidates, extractInjectableReflectionMappedMemories, @@ -70,6 +74,25 @@ describe("self-improvement", () => { rmSync(workspaceDir, { recursive: true, force: true }); }); + it("ensures learning files idempotently under concurrent calls", async () => { + const learningsDir = path.join(workspaceDir, ".learnings"); + mkdirSync(learningsDir, { recursive: true }); + writeFileSync(path.join(learningsDir, "LEARNINGS.md"), "# Custom Learnings\n", "utf-8"); + writeFileSync(path.join(learningsDir, "ERRORS.md"), "\n", "utf-8"); + + await Promise.all([ + ensureSelfImprovementLearningFiles(workspaceDir), + ensureSelfImprovementLearningFiles(workspaceDir), + ensureSelfImprovementLearningFiles(workspaceDir), + ]); + + const learningsBody = readFileSync(path.join(learningsDir, "LEARNINGS.md"), "utf-8"); + const errorsBody = readFileSync(path.join(learningsDir, "ERRORS.md"), "utf-8"); + assert.equal(learningsBody, "# Custom Learnings\n"); + assert.match(errorsBody, /^# Errors/); + assert.match(errorsBody, /ERR-YYYYMMDD-XXX/); + }); + it("extracts mapped reflection sections into preference/fact/decision memories", async () => { const reflectionText = [ "## Context (session background)",