From 76d6d177dd2e3672adcd76439636199c26b9329d Mon Sep 17 00:00:00 2001 From: Benjamin Michaelis Date: Sun, 26 Apr 2026 08:21:07 -0700 Subject: [PATCH 1/3] Update embedding logic --- .../Models/BookContentChunk.cs | 7 ++ .../Services/AISearchService.cs | 24 +++- .../Services/ChunkingResultExtensions.cs | 30 ++--- .../Services/EmbeddingService.cs | 111 ++++++++++++++---- .../Services/FileChunkingResult.cs | 9 +- .../Services/MarkdownChunkingService.cs | 37 ++++-- .../MarkdownChunkingServiceTests.cs | 6 +- EssentialCSharp.Chat/Program.cs | 17 +-- 8 files changed, 174 insertions(+), 67 deletions(-) diff --git a/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs b/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs index e70ac015..f0e0babf 100644 --- a/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs +++ b/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs @@ -37,6 +37,13 @@ public sealed class BookContentChunk [VectorStoreData] public int? ChapterNumber { get; set; } + /// + /// Zero-based ordinal of this chunk within its source file. + /// Together with FileName, forms the basis for the deterministic Id. + /// + [VectorStoreData] + public int ChunkIndex { get; set; } + /// /// SHA256 hash of the chunk content for change detection /// diff --git a/EssentialCSharp.Chat.Shared/Services/AISearchService.cs b/EssentialCSharp.Chat.Shared/Services/AISearchService.cs index 935a08b2..505037df 100644 --- a/EssentialCSharp.Chat.Shared/Services/AISearchService.cs +++ b/EssentialCSharp.Chat.Shared/Services/AISearchService.cs @@ -35,11 +35,29 @@ public async Task>> ExecuteVe { try { - var results = new List>(); - await foreach (var result in collection.SearchAsync(searchVector, options: vectorSearchOptions, top: top, cancellationToken: cancellationToken)) + // Fetch more candidates than needed so we can deduplicate by heading. + // Multiple chunks from the same section share the same Heading; without dedup + // all top-N results could come from one long section, reducing context diversity. + int candidates = top * 3; + + var candidatesList = new List>(); + await foreach (var result in collection.SearchAsync(searchVector, options: vectorSearchOptions, top: candidates, cancellationToken: cancellationToken)) { - results.Add(result); + candidatesList.Add(result); } + + // Keep only the highest-scoring chunk per unique heading, then take the globally + // top-N by score. GroupBy on a materialized list preserves insertion (score desc) + // order, but we make the ordering explicit via OrderByDescending so the result + // is correct regardless of provider sort guarantees. + // MaxBy on a non-empty IGrouping never returns null; ! asserts this invariant. + var results = candidatesList + .GroupBy(r => r.Record.Heading) + .Select(g => g.MaxBy(r => r.Score)!) + .OrderByDescending(r => r.Score) + .Take(top) + .ToList(); + return results; } catch (PostgresException ex) when (ex.SqlState == "28000" && attempt == 0) diff --git a/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs b/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs index f6be7d13..0f58d75e 100644 --- a/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs +++ b/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs @@ -1,6 +1,5 @@ using System.Security.Cryptography; using System.Text; -using System.Linq; using EssentialCSharp.Chat.Common.Models; namespace EssentialCSharp.Chat.Common.Services; @@ -12,16 +11,17 @@ public static List ToBookContentChunks(this FileChunkingResult int? chapterNumber = ExtractChapterNumber(result.FileName); var chunks = result.Chunks - .Select(chunkText => + .Select((markdownChunk, index) => { - var contentHash = ComputeSha256Hash(chunkText); + var contentHash = ComputeSha256Hash(markdownChunk.ChunkText); return new BookContentChunk { - Id = Guid.NewGuid().ToString(), + Id = $"{result.FileName}_{index}", FileName = result.FileName, - Heading = ExtractHeading(chunkText), - ChunkText = chunkText, + Heading = markdownChunk.Heading, + ChunkText = markdownChunk.ChunkText, ChapterNumber = chapterNumber, + ChunkIndex = index, ContentHash = contentHash }; }) @@ -30,25 +30,13 @@ public static List ToBookContentChunks(this FileChunkingResult return chunks; } - private static string ExtractHeading(string chunkText) + private static int? ExtractChapterNumber(string fileName) { - // get characters until the first " - " or newline - var firstLine = chunkText.Split(["\r\n", "\r", "\n"], StringSplitOptions.None)[0]; - var headingParts = firstLine.Split([" - "], StringSplitOptions.None); - return headingParts.Length > 0 ? headingParts[0].Trim() : string.Empty; - } - - private static int ExtractChapterNumber(string fileName) - { - // Example: "Chapter01.md" -> 1 - // Regex: Chapter(?[0-9]{2}) + // Example: "Chapter01.md" -> 1; non-chapter files return null. var match = ChapterNumberRegex().Match(fileName); if (match.Success && int.TryParse(match.Groups["ChapterNumber"].Value, out int chapterNumber)) - - { return chapterNumber; - } - throw new InvalidOperationException($"File name '{fileName}' does not contain a valid chapter number in the expected format."); + return null; } private static string ComputeSha256Hash(string text) diff --git a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs index dd1fe89c..451ade42 100644 --- a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs +++ b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs @@ -1,22 +1,24 @@ using EssentialCSharp.Chat.Common.Models; using Microsoft.Extensions.AI; using Microsoft.Extensions.VectorData; +using Npgsql; namespace EssentialCSharp.Chat.Common.Services; /// -/// Service for generating embeddings for markdown chunks using Azure OpenAI +/// Service for generating embeddings for markdown chunks using Azure OpenAI and uploading +/// them to a PostgreSQL vector store via a staging-then-swap pattern to avoid downtime. /// -public class EmbeddingService(VectorStore vectorStore, IEmbeddingGenerator> embeddingGenerator) +public class EmbeddingService( + VectorStore vectorStore, + IEmbeddingGenerator> embeddingGenerator, + NpgsqlDataSource? dataSource = null) { public static string CollectionName { get; } = "markdown_chunks"; /// /// Generate an embedding for the given text. /// - /// The text to generate an embedding for. - /// The cancellation token. - /// A search vector as ReadOnlyMemory<float>. public async Task> GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default) { var embedding = await embeddingGenerator.GenerateAsync(text, cancellationToken: cancellationToken); @@ -24,28 +26,97 @@ public async Task> GenerateEmbeddingAsync(string text, Can } /// - /// Generate an embedding for each text paragraph and upload it to the specified collection. + /// Generate embeddings for all chunks in a single batch call and upload them to the vector + /// store using a staging-then-atomic-swap pattern so the live collection stays queryable + /// throughout the rebuild. + /// + /// Steps: + /// 1. Create a staging collection ({collectionName}_staging). + /// 2. Embed all chunks in one batch API call (Azure OpenAI supports up to 2048 inputs). + /// 3. Batch-upsert all chunks into staging. + /// 4. Atomically swap staging → live via three SQL RENAMEs in a single transaction. + /// PostgreSQL ALTER TABLE acquires AccessExclusiveLock automatically; no explicit + /// LOCK TABLE is needed. The transaction ensures no reader sees an intermediate state. + /// 5. Drop the old live backup table. + /// + /// If an error occurs before the swap, only the staging table is affected — the live + /// collection is untouched. /// - /// The name of the collection to upload the text paragraphs to. - /// An async task. - public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore(IEnumerable bookContents, CancellationToken cancellationToken, string? collectionName = null) + public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore( + IEnumerable bookContents, + CancellationToken cancellationToken, + string? collectionName = null) { collectionName ??= CollectionName; + string stagingName = $"{collectionName}_staging"; + string oldName = $"{collectionName}_old"; - var collection = vectorStore.GetCollection(collectionName); - await collection.EnsureCollectionDeletedAsync(cancellationToken); - await collection.EnsureCollectionExistsAsync(cancellationToken); + if (dataSource is null) + throw new InvalidOperationException("NpgsqlDataSource is required for the staging swap. Ensure it is registered in DI."); - int uploadedCount = 0; + // ── Step 1: Prepare staging collection ──────────────────────────────────────── + var staging = vectorStore.GetCollection(stagingName); + await staging.EnsureCollectionDeletedAsync(cancellationToken); + await staging.EnsureCollectionExistsAsync(cancellationToken); - foreach (var chunk in bookContents) + // ── Step 2: Batch-embed all chunks in a single API call ─────────────────────── + // IEmbeddingGenerator.GenerateAsync natively accepts IEnumerable. + // The single-string overload used previously is a convenience extension method + // that wraps one item and calls this same method. + var chunkList = bookContents.ToList(); + var texts = chunkList.Select(c => c.ChunkText).ToList(); + + GeneratedEmbeddings> embeddings = + await embeddingGenerator.GenerateAsync(texts, cancellationToken: cancellationToken); + + if (embeddings.Count != chunkList.Count) + throw new InvalidOperationException( + $"Embedding count mismatch: expected {chunkList.Count}, got {embeddings.Count}."); + + for (int i = 0; i < chunkList.Count; i++) + { + chunkList[i].TextEmbedding = embeddings[i].Vector; + } + + // ── Step 3: Batch-upsert all chunks into staging ────────────────────────────── + await staging.UpsertAsync(chunkList, cancellationToken); + Console.WriteLine($"Uploaded {chunkList.Count} chunks to staging collection '{stagingName}'."); + + // ── Step 4: Atomic swap — staging → live ────────────────────────────────────── + // Three ALTER TABLE RENAME statements in one transaction. + // Each RENAME auto-acquires AccessExclusiveLock on its table; the transaction + // guarantees all three renames are visible atomically to other sessions. + await using var conn = await dataSource.OpenConnectionAsync(cancellationToken); + await using var tx = await conn.BeginTransactionAsync(cancellationToken); + + await using (var cmd = conn.CreateCommand()) { - cancellationToken.ThrowIfCancellationRequested(); - chunk.TextEmbedding = await GenerateEmbeddingAsync(chunk.ChunkText, cancellationToken); - await collection.UpsertAsync(chunk, cancellationToken); - Console.WriteLine($"Uploaded chunk '{chunk.Id}' to collection '{collectionName}' for file '{chunk.FileName}' with heading '{chunk.Heading}'."); - uploadedCount++; + cmd.Transaction = tx; + + // Drop any leftover backup from a previous run + cmd.CommandText = $"DROP TABLE IF EXISTS \"{oldName}\""; + await cmd.ExecuteNonQueryAsync(cancellationToken); + + // Rename live → old. IF EXISTS is a no-op on first run when no live table exists. + // Using ALTER TABLE IF EXISTS avoids PL/pgSQL string interpolation entirely. + cmd.CommandText = $"ALTER TABLE IF EXISTS \"{collectionName}\" RENAME TO \"{oldName}\""; + await cmd.ExecuteNonQueryAsync(cancellationToken); + + // Rename staging → live + cmd.CommandText = $"ALTER TABLE \"{stagingName}\" RENAME TO \"{collectionName}\""; + await cmd.ExecuteNonQueryAsync(cancellationToken); + } + + await tx.CommitAsync(cancellationToken); + Console.WriteLine($"Swapped '{stagingName}' → '{collectionName}' atomically."); + + // ── Step 5: Drop the old backup ─────────────────────────────────────────────── + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = $"DROP TABLE IF EXISTS \"{oldName}\""; + await cmd.ExecuteNonQueryAsync(cancellationToken); } - Console.WriteLine($"Successfully generated embeddings and uploaded {uploadedCount} chunks to collection '{collectionName}'."); + + Console.WriteLine($"Successfully generated embeddings and uploaded {chunkList.Count} chunks to collection '{collectionName}'."); } } diff --git a/EssentialCSharp.Chat.Shared/Services/FileChunkingResult.cs b/EssentialCSharp.Chat.Shared/Services/FileChunkingResult.cs index e2d0f40e..db35dae9 100644 --- a/EssentialCSharp.Chat.Shared/Services/FileChunkingResult.cs +++ b/EssentialCSharp.Chat.Shared/Services/FileChunkingResult.cs @@ -1,5 +1,12 @@ namespace EssentialCSharp.Chat.Common.Services; +/// +/// A single chunk from a markdown file, paired with the section heading it belongs to. +/// +/// Full breadcrumb heading for the section (e.g. "Chapter: 1: Intro: Summary"). +/// The raw chunk text, including the "Heading - " prefix prepended by TextChunker. +public record MarkdownChunk(string Heading, string ChunkText); + /// /// Data structure to hold chunking results for a single file /// @@ -9,6 +16,6 @@ public class FileChunkingResult public string FilePath { get; set; } = string.Empty; public int OriginalCharCount { get; set; } public int ChunkCount { get; set; } - public List Chunks { get; set; } = []; + public List Chunks { get; set; } = []; public int TotalChunkCharacters { get; set; } } diff --git a/EssentialCSharp.Chat.Shared/Services/MarkdownChunkingService.cs b/EssentialCSharp.Chat.Shared/Services/MarkdownChunkingService.cs index bb79f6f7..1f5d29df 100644 --- a/EssentialCSharp.Chat.Shared/Services/MarkdownChunkingService.cs +++ b/EssentialCSharp.Chat.Shared/Services/MarkdownChunkingService.cs @@ -61,15 +61,24 @@ public async Task> ProcessMarkdownFilesAsync( public FileChunkingResult ProcessSingleMarkdownFile( string[] fileContent, string fileName, string filePath) { - // Remove all multiple empty lines so there is no more than one empty line between paragraphs - string[] lines = [.. fileContent - .Select(line => line.Trim()) - .Where(line => !string.IsNullOrWhiteSpace(line))]; - + // Collapse consecutive blank lines to at most one blank line. Single blank lines must + // be preserved because TextChunker.SplitMarkdownParagraphs uses them as paragraph + // separators — stripping all blanks defeats paragraph-aware chunking. + var normalizedLines = new List(fileContent.Length); + bool lastWasBlank = false; + foreach (var raw in fileContent) + { + var line = raw.Trim(); + var isBlank = string.IsNullOrWhiteSpace(line); + if (!isBlank || !lastWasBlank) + normalizedLines.Add(line); + lastWasBlank = isBlank; + } + string[] lines = [.. normalizedLines]; string content = string.Join(Environment.NewLine, lines); var sections = MarkdownContentToHeadersAndSection(content); - var allChunks = new List(); + var allChunks = new List(); int totalChunkCharacters = 0; int chunkCount = 0; @@ -83,7 +92,7 @@ public FileChunkingResult ProcessSingleMarkdownFile( chunkHeader: Header + " - " ); #pragma warning restore SKEXP0050 - allChunks.AddRange(chunks); + allChunks.AddRange(chunks.Select(c => new MarkdownChunk(Header, c))); chunkCount += chunks.Count; totalChunkCharacters += chunks.Sum(c => c.Length); } @@ -155,18 +164,24 @@ public FileChunkingResult ProcessSingleMarkdownFile( } i++; - // Collect content until next header + // Collect content until next header, preserving blank lines as paragraph separators + // for TextChunker.SplitMarkdownParagraphs. var contentLines = new List(); while (i < lines.Length && !headerRegex.IsMatch(lines[i])) { - if (!string.IsNullOrWhiteSpace(lines[i])) - contentLines.Add(lines[i]); + contentLines.Add(lines[i]); i++; } + // Strip leading and trailing blank lines; keep internal blanks for paragraph detection. + while (contentLines.Count > 0 && string.IsNullOrWhiteSpace(contentLines[0])) + contentLines.RemoveAt(0); + while (contentLines.Count > 0 && string.IsNullOrWhiteSpace(contentLines[^1])) + contentLines.RemoveAt(contentLines.Count - 1); + // Compose full header context var fullHeader = string.Join(": ", headerStack.Select(h => h.Text)); - if (contentLines.Count > 0) + if (contentLines.Any(l => !string.IsNullOrWhiteSpace(l))) sections.Add((fullHeader, contentLines)); } return sections; diff --git a/EssentialCSharp.Chat.Tests/MarkdownChunkingServiceTests.cs b/EssentialCSharp.Chat.Tests/MarkdownChunkingServiceTests.cs index 978417d8..5cdbb0c7 100644 --- a/EssentialCSharp.Chat.Tests/MarkdownChunkingServiceTests.cs +++ b/EssentialCSharp.Chat.Tests/MarkdownChunkingServiceTests.cs @@ -183,9 +183,9 @@ public async Task ProcessSingleMarkdownFile_ProducesExpectedChunksAndHeaders() await Assert.That(result).IsNotNull(); await Assert.That(result.FileName).IsEqualTo(fileName); await Assert.That(result.FilePath).IsEqualTo(filePath); - await Assert.That(string.Join("\n", result.Chunks)).Contains("This is the first section."); - await Assert.That(string.Join("\n", result.Chunks)).Contains("Console.WriteLine(\"Hello World\");"); - await Assert.That(result.Chunks).Contains(c => c.Contains("This is the second section.")); + await Assert.That(string.Join("\n", result.Chunks.Select(c => c.ChunkText))).Contains("This is the first section."); + await Assert.That(string.Join("\n", result.Chunks.Select(c => c.ChunkText))).Contains("Console.WriteLine(\"Hello World\");"); + await Assert.That(result.Chunks).Contains(c => c.ChunkText.Contains("This is the second section.")); } #endregion ProcessSingleMarkdownFile } diff --git a/EssentialCSharp.Chat/Program.cs b/EssentialCSharp.Chat/Program.cs index ac92ca77..f7c62a9a 100644 --- a/EssentialCSharp.Chat/Program.cs +++ b/EssentialCSharp.Chat/Program.cs @@ -293,13 +293,13 @@ static int Main(string[] args) void WriteChunkingResult(FileChunkingResult result, TextWriter writer) { // lets build up some stats over the chunking - var chunkAverage = result.Chunks.Average(chunk => chunk.Length); - var chunkMedian = result.Chunks.OrderBy(chunk => chunk.Length).ElementAt(result.Chunks.Count / 2).Length; - var chunkMax = result.Chunks.Max(chunk => chunk.Length); - var chunkMin = result.Chunks.Min(chunk => chunk.Length); - var chunkTotal = result.Chunks.Sum(chunk => chunk.Length); - var chunkStandardDeviation = Math.Sqrt(result.Chunks.Average(chunk => Math.Pow(chunk.Length - chunkAverage, 2))); - var numberOfOutliers = result.Chunks.Count(chunk => chunk.Length > chunkAverage + chunkStandardDeviation); + var chunkAverage = result.Chunks.Average(chunk => chunk.ChunkText.Length); + var chunkMedian = result.Chunks.OrderBy(chunk => chunk.ChunkText.Length).ElementAt(result.Chunks.Count / 2).ChunkText.Length; + var chunkMax = result.Chunks.Max(chunk => chunk.ChunkText.Length); + var chunkMin = result.Chunks.Min(chunk => chunk.ChunkText.Length); + var chunkTotal = result.Chunks.Sum(chunk => chunk.ChunkText.Length); + var chunkStandardDeviation = Math.Sqrt(result.Chunks.Average(chunk => Math.Pow(chunk.ChunkText.Length - chunkAverage, 2))); + var numberOfOutliers = result.Chunks.Count(chunk => chunk.ChunkText.Length > chunkAverage + chunkStandardDeviation); if (chunkMax > maxChunkLength) maxChunkLength = chunkMax; if (chunkMin < minChunkLength || minChunkLength == 0) minChunkLength = chunkMin; @@ -318,7 +318,8 @@ void WriteChunkingResult(FileChunkingResult result, TextWriter writer) foreach (var chunk in result.Chunks) { writer.WriteLine(); - writer.WriteLine(chunk); + writer.WriteLine($"[{chunk.Heading}]"); + writer.WriteLine(chunk.ChunkText); } } From 65071fc694c6c72eba824f281ec2517ea39777fa Mon Sep 17 00:00:00 2001 From: Benjamin Michaelis Date: Wed, 6 May 2026 17:05:05 -0700 Subject: [PATCH 2/3] fix: address code review issues in embedding bulk update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename candidates_list → candidatesList (C# camelCase convention) - Make NpgsqlDataSource required in EmbeddingService constructor (always registered in DI; optional+throw was misleading anti-pattern) - Add EmbeddingBatchSize = 2048 constant and batch the GenerateAsync call to respect Azure OpenAI input limit - Validate collectionName against safe identifier regex before SQL use - Add best-effort staging cleanup on UpsertAsync failure (nested try so cleanup exception cannot mask the original) - Document ChapterNumber nullability on BookContentChunk property and ToBookContentChunks public method Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Models/BookContentChunk.cs | 3 +- .../Services/AISearchService.cs | 2 +- .../Services/ChunkingResultExtensions.cs | 8 +++ .../Services/EmbeddingService.cs | 61 +++++++++++++------ 4 files changed, 53 insertions(+), 21 deletions(-) diff --git a/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs b/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs index f0e0babf..05db5621 100644 --- a/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs +++ b/EssentialCSharp.Chat.Shared/Models/BookContentChunk.cs @@ -32,7 +32,8 @@ public sealed class BookContentChunk public string ChunkText { get; set; } = string.Empty; /// - /// Chapter number extracted from filename (e.g., "Chapter01.md" -> 1) + /// Chapter number extracted from filename (e.g., "Chapter01.md" -> 1). + /// Null for files that do not follow the ChapterNN naming pattern. /// [VectorStoreData] public int? ChapterNumber { get; set; } diff --git a/EssentialCSharp.Chat.Shared/Services/AISearchService.cs b/EssentialCSharp.Chat.Shared/Services/AISearchService.cs index 505037df..ddbcf38d 100644 --- a/EssentialCSharp.Chat.Shared/Services/AISearchService.cs +++ b/EssentialCSharp.Chat.Shared/Services/AISearchService.cs @@ -1,4 +1,4 @@ -using System.Diagnostics; +using System.Diagnostics; using EssentialCSharp.Chat.Common.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.VectorData; diff --git a/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs b/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs index 0f58d75e..72db92fc 100644 --- a/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs +++ b/EssentialCSharp.Chat.Shared/Services/ChunkingResultExtensions.cs @@ -6,6 +6,14 @@ namespace EssentialCSharp.Chat.Common.Services; public static partial class ChunkingResultExtensions { + /// + /// Converts a into a list of records + /// ready for embedding and vector store upload. + /// + /// + /// is set to null for files that do not match + /// the ChapterNN naming pattern (e.g. appendix or non-chapter markdown files). + /// public static List ToBookContentChunks(this FileChunkingResult result) { int? chapterNumber = ExtractChapterNumber(result.FileName); diff --git a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs index 451ade42..761836e0 100644 --- a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs +++ b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs @@ -1,3 +1,4 @@ +using System.Text.RegularExpressions; using EssentialCSharp.Chat.Common.Models; using Microsoft.Extensions.AI; using Microsoft.Extensions.VectorData; @@ -12,10 +13,18 @@ namespace EssentialCSharp.Chat.Common.Services; public class EmbeddingService( VectorStore vectorStore, IEmbeddingGenerator> embeddingGenerator, - NpgsqlDataSource? dataSource = null) + NpgsqlDataSource dataSource) { public static string CollectionName { get; } = "markdown_chunks"; + /// + /// Maximum number of inputs per Azure OpenAI embedding batch call. + /// + private const int EmbeddingBatchSize = 2048; + + // Only allow simple identifiers: letters, digits, and underscores, starting with a letter or underscore. + private static readonly Regex _safeIdentifierRegex = new(@"^[a-zA-Z_][a-zA-Z0-9_]*$", RegexOptions.Compiled); + /// /// Generate an embedding for the given text. /// @@ -26,13 +35,13 @@ public async Task> GenerateEmbeddingAsync(string text, Can } /// - /// Generate embeddings for all chunks in a single batch call and upload them to the vector - /// store using a staging-then-atomic-swap pattern so the live collection stays queryable + /// Generate embeddings for all chunks in batches and upload them to the vector store + /// using a staging-then-atomic-swap pattern so the live collection stays queryable /// throughout the rebuild. /// /// Steps: /// 1. Create a staging collection ({collectionName}_staging). - /// 2. Embed all chunks in one batch API call (Azure OpenAI supports up to 2048 inputs). + /// 2. Embed all chunks in batches of (Azure OpenAI limit). /// 3. Batch-upsert all chunks into staging. /// 4. Atomically swap staging → live via three SQL RENAMEs in a single transaction. /// PostgreSQL ALTER TABLE acquires AccessExclusiveLock automatically; no explicit @@ -48,39 +57,54 @@ public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore( string? collectionName = null) { collectionName ??= CollectionName; + + if (!_safeIdentifierRegex.IsMatch(collectionName)) + throw new ArgumentException( + $"Collection name '{collectionName}' contains unsafe characters. Use only letters, digits, and underscores.", + nameof(collectionName)); + string stagingName = $"{collectionName}_staging"; string oldName = $"{collectionName}_old"; - if (dataSource is null) - throw new InvalidOperationException("NpgsqlDataSource is required for the staging swap. Ensure it is registered in DI."); - // ── Step 1: Prepare staging collection ──────────────────────────────────────── var staging = vectorStore.GetCollection(stagingName); await staging.EnsureCollectionDeletedAsync(cancellationToken); await staging.EnsureCollectionExistsAsync(cancellationToken); - // ── Step 2: Batch-embed all chunks in a single API call ─────────────────────── - // IEmbeddingGenerator.GenerateAsync natively accepts IEnumerable. - // The single-string overload used previously is a convenience extension method - // that wraps one item and calls this same method. + // ── Step 2: Batch-embed all chunks ──────────────────────────────────────────── + // Azure OpenAI supports at most EmbeddingBatchSize inputs per GenerateAsync call. var chunkList = bookContents.ToList(); var texts = chunkList.Select(c => c.ChunkText).ToList(); - GeneratedEmbeddings> embeddings = - await embeddingGenerator.GenerateAsync(texts, cancellationToken: cancellationToken); + var allEmbeddings = new List>(chunkList.Count); + foreach (var batch in texts.Chunk(EmbeddingBatchSize)) + { + var batchEmbeddings = await embeddingGenerator.GenerateAsync(batch, cancellationToken: cancellationToken); + allEmbeddings.AddRange(batchEmbeddings); + } - if (embeddings.Count != chunkList.Count) + if (allEmbeddings.Count != chunkList.Count) throw new InvalidOperationException( - $"Embedding count mismatch: expected {chunkList.Count}, got {embeddings.Count}."); + $"Embedding count mismatch: expected {chunkList.Count}, got {allEmbeddings.Count}."); for (int i = 0; i < chunkList.Count; i++) { - chunkList[i].TextEmbedding = embeddings[i].Vector; + chunkList[i].TextEmbedding = allEmbeddings[i].Vector; } // ── Step 3: Batch-upsert all chunks into staging ────────────────────────────── - await staging.UpsertAsync(chunkList, cancellationToken); - Console.WriteLine($"Uploaded {chunkList.Count} chunks to staging collection '{stagingName}'."); + try + { + await staging.UpsertAsync(chunkList, cancellationToken); + Console.WriteLine($"Uploaded {chunkList.Count} chunks to staging collection '{stagingName}'."); + } + catch + { + // Best-effort cleanup: drop the partially-populated staging table so the + // next run starts clean. Do not let this secondary failure mask the original. + try { await staging.EnsureCollectionDeletedAsync(cancellationToken); } catch { } + throw; + } // ── Step 4: Atomic swap — staging → live ────────────────────────────────────── // Three ALTER TABLE RENAME statements in one transaction. @@ -98,7 +122,6 @@ public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore( await cmd.ExecuteNonQueryAsync(cancellationToken); // Rename live → old. IF EXISTS is a no-op on first run when no live table exists. - // Using ALTER TABLE IF EXISTS avoids PL/pgSQL string interpolation entirely. cmd.CommandText = $"ALTER TABLE IF EXISTS \"{collectionName}\" RENAME TO \"{oldName}\""; await cmd.ExecuteNonQueryAsync(cancellationToken); From d25e8b8c2299c8bd11cb390afa987b61a8729e95 Mon Sep 17 00:00:00 2001 From: Benjamin Michaelis Date: Wed, 6 May 2026 20:43:11 -0700 Subject: [PATCH 3/3] fix: replace empty catch with logged best-effort cleanup; fix doc comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace `catch { }` with `catch (Exception cleanupEx) when (cleanupEx is not OperationCanceledException)` + Console.Error.WriteLine so cleanup failures are visible without masking the original exception - Correct method summary: swap uses two SQL RENAMEs (live→old, staging→live) plus DROP TABLE statements, not "three SQL RENAMEs" Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Services/EmbeddingService.cs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs index 761836e0..19cbff24 100644 --- a/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs +++ b/EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs @@ -43,10 +43,11 @@ public async Task> GenerateEmbeddingAsync(string text, Can /// 1. Create a staging collection ({collectionName}_staging). /// 2. Embed all chunks in batches of (Azure OpenAI limit). /// 3. Batch-upsert all chunks into staging. - /// 4. Atomically swap staging → live via three SQL RENAMEs in a single transaction. - /// PostgreSQL ALTER TABLE acquires AccessExclusiveLock automatically; no explicit - /// LOCK TABLE is needed. The transaction ensures no reader sees an intermediate state. - /// 5. Drop the old live backup table. + /// 4. Atomically swap tables in a single transaction using two SQL RENAME operations + /// (live → old, staging → live). PostgreSQL ALTER TABLE acquires + /// AccessExclusiveLock automatically; no explicit LOCK TABLE is needed. The + /// transaction ensures no reader sees an intermediate state. + /// 5. Drop the old live backup table with DROP TABLE. /// /// If an error occurs before the swap, only the staging table is affected — the live /// collection is untouched. @@ -102,14 +103,21 @@ public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore( { // Best-effort cleanup: drop the partially-populated staging table so the // next run starts clean. Do not let this secondary failure mask the original. - try { await staging.EnsureCollectionDeletedAsync(cancellationToken); } catch { } + try + { + await staging.EnsureCollectionDeletedAsync(cancellationToken); + } + catch (Exception cleanupEx) when (cleanupEx is not OperationCanceledException) + { + Console.Error.WriteLine($"Warning: failed to clean up staging collection '{stagingName}' after upsert failure: {cleanupEx.Message}"); + } throw; } // ── Step 4: Atomic swap — staging → live ────────────────────────────────────── - // Three ALTER TABLE RENAME statements in one transaction. + // Two ALTER TABLE RENAME operations in one transaction (live → old, staging → live). // Each RENAME auto-acquires AccessExclusiveLock on its table; the transaction - // guarantees all three renames are visible atomically to other sessions. + // guarantees both renames are visible atomically to other sessions. await using var conn = await dataSource.OpenConnectionAsync(cancellationToken); await using var tx = await conn.BeginTransactionAsync(cancellationToken);