Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public override async IAsyncEnumerable<IngestionChunk> ProcessAsync(IngestionDoc

int stringBuilderTokenCount = 0;
StringBuilder stringBuilder = new();
Dictionary<string, object>? accumulatedMetadata = null;
foreach (IngestionDocumentElement element in document.EnumerateContent())
{
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -58,6 +59,7 @@ public override async IAsyncEnumerable<IngestionChunk> ProcessAsync(IngestionDoc

int contentToProcessTokenCount = _tokenizer.CountTokens(elementContent!, considerNormalization: false);
ReadOnlyMemory<char> contentToProcess = elementContent.AsMemory();
bool elementMetadataAccumulated = false;
while (stringBuilderTokenCount + contentToProcessTokenCount >= _maxTokensPerChunk)
{
int index = _tokenizer.GetIndexByTokenCount(
Expand All @@ -67,6 +69,13 @@ public override async IAsyncEnumerable<IngestionChunk> ProcessAsync(IngestionDoc
out int addedTokenCount,
considerNormalization: false);

// Accumulate metadata the first time this element contributes content.
if (!elementMetadataAccumulated && index > 0)
{
AccumulateMetadata(element, ref accumulatedMetadata);
elementMetadataAccumulated = true;
}

unsafe
{
fixed (char* ptr = &MemoryMarshal.GetReference(contentToProcess.Span))
Expand All @@ -75,30 +84,47 @@ public override async IAsyncEnumerable<IngestionChunk> ProcessAsync(IngestionDoc
}
}
stringBuilderTokenCount += addedTokenCount;
yield return FinalizeChunk();
yield return FinalizeChunk(ref accumulatedMetadata);

contentToProcess = contentToProcess.Slice(index);
contentToProcessTokenCount = _tokenizer.CountTokens(contentToProcess.Span, considerNormalization: false);
}

// Accumulate metadata if the element only contributed content after the loop.
if (!elementMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
}

_ = stringBuilder.Append(contentToProcess);
stringBuilderTokenCount += contentToProcessTokenCount;
}

if (stringBuilder.Length > 0)
{
yield return FinalizeChunk();
yield return FinalizeChunk(ref accumulatedMetadata);
}
yield break;

IngestionChunk FinalizeChunk()
IngestionChunk FinalizeChunk(ref Dictionary<string, object>? metadata)
{
TextContent chunkContent = new(stringBuilder.ToString());
IngestionChunk chunk = new IngestionChunk(
content: chunkContent,
document: document,
tokenCount: stringBuilderTokenCount,
context: string.Empty);

if (metadata is { Count: > 0 })
{
foreach (var kvp in metadata)
{
chunk.Metadata[kvp.Key] = kvp.Value;
}

metadata = null;
}

_ = stringBuilder.Clear();
stringBuilderTokenCount = 0;

Expand Down Expand Up @@ -126,5 +152,29 @@ IngestionChunk FinalizeChunk()
}
}

private static void AccumulateMetadata(IngestionDocumentElement element, ref Dictionary<string, object>? accumulated)
{
if (!element.HasMetadata)
{
return;
}

accumulated ??= [];
foreach (var kvp in element.Metadata)
{
if (kvp.Value is not null)
{
#if NET
accumulated.TryAdd(kvp.Key, kvp.Value);
#else
if (!accumulated.ContainsKey(kvp.Key))
{
accumulated[kvp.Key] = kvp.Value;
}
#endif
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
{
// Not using yield return here as we use ref structs.
List<IngestionChunk> chunks = [];
Dictionary<string, object>? accumulatedMetadata = null;

int contextTokenCount = CountTokens(context.AsSpan());
int totalTokenCount = contextTokenCount;
Expand Down Expand Up @@ -71,12 +72,15 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
int elementTokenCount = CountTokens(semanticContent.AsSpan());
if (elementTokenCount + totalTokenCount <= _maxTokensPerChunk)
{
// Element fits in the current chunk — accumulate its metadata here.
AccumulateMetadata(element, ref accumulatedMetadata);
totalTokenCount += elementTokenCount;
AppendNewLineAndSpan(_currentChunk, semanticContent.AsSpan());
}
else if (element is IngestionDocumentTable table)
{
ValueStringBuilder tableBuilder = new(initialCapacity: 8000);
bool tableMetadataAccumulated = false;

try
{
Expand Down Expand Up @@ -114,6 +118,13 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
// We append the table as long as it's not just the header.
if (rowIndex != 1)
{
// Accumulate metadata before first table content append.
if (!tableMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
tableMetadataAccumulated = true;
}

AppendNewLineAndSpan(_currentChunk, tableBuilder.AsSpan(0, tableLength - Environment.NewLine.Length));
}

Expand All @@ -138,6 +149,13 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
totalTokenCount += lastRowTokens;
}

// Accumulate metadata before appending remaining table content.
if (!tableMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
tableMetadataAccumulated = true;
}

AppendNewLineAndSpan(_currentChunk, tableBuilder.AsSpan(0, tableLength - Environment.NewLine.Length));
}
finally
Expand All @@ -148,6 +166,7 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
else
{
ReadOnlySpan<char> remainingContent = semanticContent.AsSpan();
bool elementMetadataAccumulated = false;

while (!remainingContent.IsEmpty)
{
Expand All @@ -171,6 +190,13 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string
tokenCount = CountTokens(remainingContent.Slice(0, index));
}

// Accumulate metadata the first time this (large) element contributes content.
if (!elementMetadataAccumulated)
{
AccumulateMetadata(element, ref accumulatedMetadata);
elementMetadataAccumulated = true;
}

totalTokenCount += tokenCount;
ReadOnlySpan<char> spanToAppend = remainingContent.Slice(0, index);
AppendNewLineAndSpan(_currentChunk, spanToAppend);
Expand All @@ -197,20 +223,36 @@ internal IEnumerable<IngestionChunk> Process(IngestionDocument document, string

if (totalTokenCount > contextTokenCount)
{
string chunkContent = _currentChunk.ToString();
int chunkTokenCount = CountTokens(chunkContent.AsSpan());
chunks.Add(new(new TextContent(chunkContent), document, chunkTokenCount, context));
chunks.Add(BuildChunk());
}

_currentChunk.Clear();

return chunks;

void Commit()
IngestionChunk BuildChunk()
{
string chunkContent = _currentChunk.ToString();
int chunkTokenCount = CountTokens(chunkContent.AsSpan());
chunks.Add(new(new TextContent(chunkContent), document, chunkTokenCount, context));
IngestionChunk chunk = new(new TextContent(chunkContent), document, chunkTokenCount, context);

// Propagate accumulated element metadata onto the chunk, then reset for the next chunk.
if (accumulatedMetadata is { Count: > 0 })
{
foreach (var kvp in accumulatedMetadata)
{
chunk.Metadata[kvp.Key] = kvp.Value;
}

accumulatedMetadata = null;
}

return chunk;
}

void Commit()
{
chunks.Add(BuildChunk());

// We keep the context in the current chunk as it's the same for all elements.
_currentChunk.Remove(
Expand Down Expand Up @@ -275,4 +317,28 @@ private static void AddMarkdownTableSeparatorRow(int columnCount, ref ValueStrin

private int CountTokens(ReadOnlySpan<char> input)
=> _tokenizer.CountTokens(input, considerNormalization: false);

private static void AccumulateMetadata(IngestionDocumentElement element, ref Dictionary<string, object>? accumulated)
{
if (!element.HasMetadata)
{
return;
}

accumulated ??= [];
foreach (var kvp in element.Metadata)
{
if (kvp.Value is not null)
{
#if NET
accumulated.TryAdd(kvp.Key, kvp.Value);
#else
if (!accumulated.ContainsKey(kvp.Key))
{
accumulated[kvp.Key] = kvp.Value;
}
#endif
}
}
}
}
Loading
Loading