Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions dist/src/memory-compactor.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ export function buildMergedEntry(members) {
});
return { text, importance, category, scope, metadata };
}
const MAX_CLUSTER_COMPACTION_CONCURRENCY = 3;
const MAX_CLUSTER_DELETE_CONCURRENCY = 8;
async function mapWithConcurrency(items, concurrency, mapper) {
if (items.length === 0)
return [];
const results = new Array(items.length);
let nextIndex = 0;
const workerCount = Math.min(Math.max(1, concurrency), items.length);
await Promise.all(Array.from({ length: workerCount }, async () => {
while (nextIndex < items.length) {
const currentIndex = nextIndex++;
results[currentIndex] = await mapper(items[currentIndex], currentIndex);
}
}));
return results;
}
// ============================================================================
// Main runner
// ============================================================================
Expand Down Expand Up @@ -186,9 +202,7 @@ export async function runCompaction(store, embedder, config, scopes, logger) {
dryRun: true,
};
}
let memoriesDeleted = 0;
let memoriesCreated = 0;
for (const plan of plans) {
const outcomes = await mapWithConcurrency(plans, MAX_CLUSTER_COMPACTION_CONCURRENCY, async (plan) => {
const members = plan.memberIndices.map((i) => valid[i]);
try {
// Embed the merged text
Expand All @@ -202,18 +216,28 @@ export async function runCompaction(store, embedder, config, scopes, logger) {
scope: plan.merged.scope,
metadata: plan.merged.metadata,
});
memoriesCreated++;
// Delete source entries
for (const m of members) {
const deleted = await store.delete(m.id);
if (deleted)
memoriesDeleted++;
}
const deleteResults = await mapWithConcurrency(members, MAX_CLUSTER_DELETE_CONCURRENCY, async (m) => {
try {
return (await store.delete(m.id)) ? 1 : 0;
}
catch (err) {
logger?.warn(`memory-compactor: failed to delete source memory ${m.id}: ${String(err)}`);
return 0;
}
});
return {
memoriesDeleted: deleteResults.reduce((sum, deleted) => sum + deleted, 0),
memoriesCreated: 1,
};
}
catch (err) {
logger?.warn(`memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`);
return { memoriesDeleted: 0, memoriesCreated: 0 };
}
}
});
const memoriesDeleted = outcomes.reduce((sum, outcome) => sum + outcome.memoriesDeleted, 0);
const memoriesCreated = outcomes.reduce((sum, outcome) => sum + outcome.memoriesCreated, 0);
logger?.info(`memory-compactor: scanned=${valid.length} clusters=${plans.length} ` +
`deleted=${memoriesDeleted} created=${memoriesCreated}`);
return {
Expand Down
6 changes: 4 additions & 2 deletions dist/src/self-improvement-files.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ export async function ensureSelfImprovementLearningFiles(baseDir) {
}
await writeFile(filePath, `${content.trim()}\n`, "utf-8");
};
await ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE);
await ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE);
await Promise.all([
ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE),
ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE),
]);
}
export async function appendSelfImprovementEntry(params) {
const { baseDir, type, summary, details = "", suggestedAction = "", category = "best_practice", area = "config", priority = "medium", status = "pending", source = "memory-lancedb-pro/self_improvement_log", maxEntries, } = params;
Expand Down
113 changes: 82 additions & 31 deletions src/memory-compactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,32 @@ export interface CompactorLogger {
warn(msg: string): void;
}

const MAX_CLUSTER_COMPACTION_CONCURRENCY = 3;
const MAX_CLUSTER_DELETE_CONCURRENCY = 8;

async function mapWithConcurrency<T, R>(
items: T[],
concurrency: number,
mapper: (item: T, index: number) => Promise<R>,
): Promise<R[]> {
if (items.length === 0) return [];

const results = new Array<R>(items.length);
let nextIndex = 0;
const workerCount = Math.min(Math.max(1, concurrency), items.length);

await Promise.all(
Array.from({ length: workerCount }, async () => {
while (nextIndex < items.length) {
const currentIndex = nextIndex++;
results[currentIndex] = await mapper(items[currentIndex], currentIndex);
}
}),
);

return results;
}

// ============================================================================
// Main runner
// ============================================================================
Expand Down Expand Up @@ -322,38 +348,63 @@ export async function runCompaction(
};
}

let memoriesDeleted = 0;
let memoriesCreated = 0;

for (const plan of plans) {
const members = plan.memberIndices.map((i) => valid[i]);

try {
// Embed the merged text
const vector = await embedder.embedPassage(plan.merged.text);

// Store merged entry
await store.store({
text: plan.merged.text,
vector,
importance: plan.merged.importance,
category: plan.merged.category,
scope: plan.merged.scope,
metadata: plan.merged.metadata,
});
memoriesCreated++;

// Delete source entries
for (const m of members) {
const deleted = await store.delete(m.id);
if (deleted) memoriesDeleted++;
const outcomes = await mapWithConcurrency(
plans,
MAX_CLUSTER_COMPACTION_CONCURRENCY,
async (plan) => {
const members = plan.memberIndices.map((i) => valid[i]);

try {
// Embed the merged text
const vector = await embedder.embedPassage(plan.merged.text);

// Store merged entry
await store.store({
text: plan.merged.text,
vector,
importance: plan.merged.importance,
category: plan.merged.category,
scope: plan.merged.scope,
metadata: plan.merged.metadata,
});

// Delete source entries
const deleteResults = await mapWithConcurrency(
members,
MAX_CLUSTER_DELETE_CONCURRENCY,
async (m) => {
try {
return (await store.delete(m.id)) ? 1 : 0;
} catch (err) {
logger?.warn(
`memory-compactor: failed to delete source memory ${m.id}: ${String(err)}`,
);
return 0;
}
},
);

return {
memoriesDeleted: deleteResults.reduce((sum, deleted) => sum + deleted, 0),
memoriesCreated: 1,
};
} catch (err) {
logger?.warn(
`memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`,
);
return { memoriesDeleted: 0, memoriesCreated: 0 };
}
} catch (err) {
logger?.warn(
`memory-compactor: failed to merge cluster of ${members.length}: ${String(err)}`,
);
}
}
},
);

const memoriesDeleted = outcomes.reduce(
(sum, outcome) => sum + outcome.memoriesDeleted,
0,
);
const memoriesCreated = outcomes.reduce(
(sum, outcome) => sum + outcome.memoriesCreated,
0,
);

logger?.info(
`memory-compactor: scanned=${valid.length} clusters=${plans.length} ` +
Expand Down
6 changes: 4 additions & 2 deletions src/self-improvement-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ export async function ensureSelfImprovementLearningFiles(baseDir: string): Promi
await writeFile(filePath, `${content.trim()}\n`, "utf-8");
};

await ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE);
await ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE);
await Promise.all([
ensureFile(join(learningsDir, "LEARNINGS.md"), DEFAULT_LEARNINGS_TEMPLATE),
ensureFile(join(learningsDir, "ERRORS.md"), DEFAULT_ERRORS_TEMPLATE),
]);
}

export interface AppendSelfImprovementEntryParams {
Expand Down
108 changes: 108 additions & 0 deletions test/memory-compactor.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ function makeEmbedder(dim = 4) {
};
}

function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

const defaultConfig = {
enabled: true,
minAgeDays: 7,
Expand Down Expand Up @@ -289,4 +293,108 @@ describe("runCompaction", () => {

assert.ok(result.scanned <= 3);
});

it("processes independent clusters with bounded parallelism", async () => {
const entries = [
entry({ id: "a1", text: "alpha 1", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }),
entry({ id: "a2", text: "alpha 2", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }),
entry({ id: "b1", text: "bravo 1", vector: vec(4, 0, 1, 0.01, 0), importance: 0.7 }),
entry({ id: "b2", text: "bravo 2", vector: vec(4, 0, 1, 0.02, 0), importance: 0.6 }),
entry({ id: "c1", text: "charlie 1", vector: vec(4, 0, 0, 1, 0.01), importance: 0.5 }),
entry({ id: "c2", text: "charlie 2", vector: vec(4, 0, 0, 1, 0.02), importance: 0.4 }),
entry({ id: "d1", text: "delta 1", vector: vec(4, 0.01, 0, 0, 1), importance: 0.3 }),
entry({ id: "d2", text: "delta 2", vector: vec(4, 0.02, 0, 0, 1), importance: 0.2 }),
];
const store = makeStore(entries);
let inFlight = 0;
let maxInFlight = 0;
const embedder = {
async embedPassage() {
inFlight++;
maxInFlight = Math.max(maxInFlight, inFlight);
await delay(15);
inFlight--;
return vec(4, 1, 0, 0, 0);
},
};

const result = await runCompaction(store, embedder, defaultConfig);

assert.equal(result.clustersFound, 4);
assert.equal(result.memoriesCreated, 4);
assert.equal(result.memoriesDeleted, 8);
assert.ok(maxInFlight > 1, `expected cluster work to overlap, max=${maxInFlight}`);
assert.ok(maxInFlight < 4, `expected cluster work to be bounded below all clusters, max=${maxInFlight}`);
});

it("continues compacting other clusters when one merge fails", async () => {
const entries = [
entry({ id: "good-1", text: "good cluster one", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }),
entry({ id: "good-2", text: "good cluster two", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }),
entry({ id: "bad-1", text: "bad cluster one", vector: vec(4, 0, 1, 0.01, 0), importance: 0.7 }),
entry({ id: "bad-2", text: "bad cluster two", vector: vec(4, 0, 1, 0.02, 0), importance: 0.6 }),
];
const store = makeStore(entries);
const warnings = [];
const logger = { info() {}, warn(msg) { warnings.push(msg); } };
const embedder = {
async embedPassage(text) {
if (text.includes("bad cluster")) {
throw new Error("synthetic embed failure");
}
return vec(4, 1, 0, 0, 0);
},
};

const result = await runCompaction(store, embedder, defaultConfig, undefined, logger);

assert.equal(result.clustersFound, 2);
assert.equal(result.memoriesCreated, 1);
assert.equal(result.memoriesDeleted, 2);
assert.equal(store.stored.length, 1);
assert.deepEqual(store.deleted.sort(), ["good-1", "good-2"]);
assert.ok(warnings.some((msg) => msg.includes("failed to merge cluster of 2")));
});

it("continues deleting remaining source members when one delete fails", async () => {
const entries = [
entry({ id: "delete-a", text: "same topic a", vector: vec(4, 1, 0.01, 0, 0), importance: 0.9 }),
entry({ id: "delete-b", text: "same topic b", vector: vec(4, 1, 0.02, 0, 0), importance: 0.8 }),
entry({ id: "delete-c", text: "same topic c", vector: vec(4, 1, 0.03, 0, 0), importance: 0.7 }),
];
const db = new Map(entries.map((e) => [e.id, { ...e }]));
const store = {
stored: [],
deleted: [],
async fetchForCompaction() {
return [...db.values()];
},
async store(e) {
const newEntry = { id: "merged-delete-test", ...e };
this.stored.push(newEntry);
return newEntry;
},
async delete(id) {
if (id === "delete-b") {
throw new Error("synthetic delete failure");
}
if (db.has(id)) {
db.delete(id);
this.deleted.push(id);
return true;
}
return false;
},
};
const warnings = [];
const logger = { info() {}, warn(msg) { warnings.push(msg); } };

const result = await runCompaction(store, makeEmbedder(), defaultConfig, undefined, logger);

assert.equal(result.clustersFound, 1);
assert.equal(result.memoriesCreated, 1);
assert.equal(result.memoriesDeleted, 2);
assert.deepEqual(store.deleted.sort(), ["delete-a", "delete-c"]);
assert.ok(warnings.some((msg) => msg.includes("failed to delete source memory delete-b")));
});
});
27 changes: 25 additions & 2 deletions test/self-improvement.test.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, beforeEach, afterEach } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, readFileSync, rmSync } from "node:fs";
import { mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
Expand All @@ -19,7 +19,11 @@ const {
registerSelfImprovementExtractSkillTool,
registerSelfImprovementReviewTool,
} = jiti("../src/tools.ts");
const { appendSelfImprovementEntry, countSelfImprovementEntries } = jiti("../src/self-improvement-files.ts");
const {
appendSelfImprovementEntry,
countSelfImprovementEntries,
ensureSelfImprovementLearningFiles,
} = jiti("../src/self-improvement-files.ts");
const {
extractReflectionLearningGovernanceCandidates,
extractInjectableReflectionMappedMemories,
Expand Down Expand Up @@ -70,6 +74,25 @@ describe("self-improvement", () => {
rmSync(workspaceDir, { recursive: true, force: true });
});

it("ensures learning files idempotently under concurrent calls", async () => {
const learningsDir = path.join(workspaceDir, ".learnings");
mkdirSync(learningsDir, { recursive: true });
writeFileSync(path.join(learningsDir, "LEARNINGS.md"), "# Custom Learnings\n", "utf-8");
writeFileSync(path.join(learningsDir, "ERRORS.md"), "\n", "utf-8");

await Promise.all([
ensureSelfImprovementLearningFiles(workspaceDir),
ensureSelfImprovementLearningFiles(workspaceDir),
ensureSelfImprovementLearningFiles(workspaceDir),
]);

const learningsBody = readFileSync(path.join(learningsDir, "LEARNINGS.md"), "utf-8");
const errorsBody = readFileSync(path.join(learningsDir, "ERRORS.md"), "utf-8");
assert.equal(learningsBody, "# Custom Learnings\n");
assert.match(errorsBody, /^# Errors/);
assert.match(errorsBody, /ERR-YYYYMMDD-XXX/);
});

it("extracts mapped reflection sections into preference/fact/decision memories", async () => {
const reflectionText = [
"## Context (session background)",
Expand Down
Loading