Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions apps/memos-local-openclaw/src/embedding/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import type { Logger } from "../types";
import { DEFAULTS } from "../types";

let extractorPromise: Promise<any> | null = null;
let callCount = 0;

// Read from env or use default
const RESET_AFTER_CALLS = parseInt(process.env.MEMOS_EMBED_RESET_AFTER_CALLS || "50", 10);

function getExtractor(log: Logger): Promise<any> {
if (extractorPromise) return extractorPromise;
Expand All @@ -23,13 +27,55 @@ function getExtractor(log: Logger): Promise<any> {
return extractorPromise;
}

async function resetExtractor(log: Logger): Promise<void> {
if (!extractorPromise) return;

try {
const ext = await extractorPromise;
// Attempt to dispose the pipeline to free ONNX session resources
if (typeof ext?.dispose === "function") {
await ext.dispose();
}
} catch (err) {
log.warn(`Failed to dispose extractor: ${err}`);
}

extractorPromise = null;
callCount = 0;
log.debug("Local embedding pipeline reset to free native memory");
}

export async function embedLocal(texts: string[], log: Logger): Promise<number[][]> {
const ext = await getExtractor(log);
const results: number[][] = [];

for (const text of texts) {
const output = await ext(text, { pooling: "mean", normalize: true });

// Extract the embedding vector
results.push(Array.from(output.data as Float32Array).slice(0, DEFAULTS.localEmbeddingDimensions));

// Explicitly release the output tensor to prevent ONNX memory leak
try {
// Null out the data reference
(output as any).data = null;
} catch {}

try {
// Call dispose if available
if (typeof (output as any).dispose === "function") {
(output as any).dispose();
}
} catch {}

callCount++;
}

// Periodically reset the pipeline to prevent long-term memory accumulation
// Set MEMOS_EMBED_RESET_AFTER_CALLS=0 to disable periodic reset
if (RESET_AFTER_CALLS > 0 && callCount >= RESET_AFTER_CALLS) {
log.debug(`Reached ${callCount} embedding calls, resetting pipeline to free native memory`);
await resetExtractor(log);
}

return results;
Expand Down
96 changes: 96 additions & 0 deletions apps/memos-local-openclaw/tests/embedding-memory-leak.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { describe, it, expect, vi } from "vitest";
import { embedLocal } from "../src/embedding/local";
import type { Logger } from "../src/types";

describe("embedLocal memory leak fix", () => {
const mockLogger: Logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};

it("should dispose tensor output after each embedding call", async () => {
const texts = ["test embedding 1", "test embedding 2"];

const result = await embedLocal(texts, mockLogger);

// Verify we got valid embeddings
expect(result).toHaveLength(2);
expect(result[0]).toHaveLength(384); // all-MiniLM-L6-v2 dimension
expect(result[1]).toHaveLength(384);

// Verify embeddings are valid numbers
result.forEach(embedding => {
embedding.forEach(value => {
expect(typeof value).toBe("number");
expect(isFinite(value)).toBe(true);
});
});
});

it("should handle multiple consecutive calls without crashing", async () => {
// Simulate multiple embedding calls that would trigger the leak
const calls = 10;

for (let i = 0; i < calls; i++) {
const result = await embedLocal([`test text ${i}`], mockLogger);
expect(result).toHaveLength(1);
expect(result[0]).toHaveLength(384);
}

// If we reached here without OOM, the tensor disposal is working
expect(true).toBe(true);
});

it("should reset pipeline after RESET_AFTER_CALLS threshold", async () => {
// Set a low threshold for testing
const originalEnv = process.env.MEMOS_EMBED_RESET_AFTER_CALLS;
process.env.MEMOS_EMBED_RESET_AFTER_CALLS = "5";

// This will trigger a reset after 5 calls
const calls = 6;

for (let i = 0; i < calls; i++) {
const result = await embedLocal([`test ${i}`], mockLogger);
expect(result[0]).toHaveLength(384);
}

// Verify reset was logged
expect(mockLogger.debug).toHaveBeenCalledWith(
expect.stringContaining("Reached 5 embedding calls")
);

// Restore env
if (originalEnv !== undefined) {
process.env.MEMOS_EMBED_RESET_AFTER_CALLS = originalEnv;
} else {
delete process.env.MEMOS_EMBED_RESET_AFTER_CALLS;
}
});

it("should allow disabling periodic reset via env variable", async () => {
const originalEnv = process.env.MEMOS_EMBED_RESET_AFTER_CALLS;
process.env.MEMOS_EMBED_RESET_AFTER_CALLS = "0";

// Clear previous mock calls
vi.clearAllMocks();

// Run many calls - should not trigger reset
for (let i = 0; i < 100; i++) {
await embedLocal([`test ${i}`], mockLogger);
}

// Verify no reset was logged
expect(mockLogger.debug).not.toHaveBeenCalledWith(
expect.stringContaining("Reached")
);

// Restore env
if (originalEnv !== undefined) {
process.env.MEMOS_EMBED_RESET_AFTER_CALLS = originalEnv;
} else {
delete process.env.MEMOS_EMBED_RESET_AFTER_CALLS;
}
});
});
Loading
Loading