Skip to content

Commit b7db931

Browse files
reakaleekclaude
andcommitted
refactor: replace per-client PIT with shared singleton PIT manager
Extract PIT lifecycle from ChangesGateway into SharedPointInTimeManager, a thread-safe singleton that holds a single PIT for all concurrent paginators. Reduces ES resource usage for the public changes API. The cursor no longer carries a PIT ID — it's just [epochMs, url]. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a4a0e8d commit b7db931

5 files changed

Lines changed: 138 additions & 65 deletions

File tree

src/api/Elastic.Documentation.Api.Core/Changes/ChangesUsecase.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,7 @@ public async Task<ChangesApiResponse> GetChangesAsync(ChangesApiRequest request,
6969
if (epochEl.ValueKind != JsonValueKind.Number || urlEl.ValueKind != JsonValueKind.String)
7070
return null;
7171

72-
var pitId = arrayLength > 2 && root[2].ValueKind == JsonValueKind.String
73-
? root[2].GetString()
74-
: null;
75-
76-
return new ChangesPageCursor(epochEl.GetInt64(), urlEl.GetString()!, pitId);
72+
return new ChangesPageCursor(epochEl.GetInt64(), urlEl.GetString()!);
7773
}
7874
catch (Exception ex) when (ex is FormatException or JsonException or InvalidOperationException)
7975
{
@@ -88,8 +84,6 @@ private static string EncodeCursor(ChangesPageCursor cursor)
8884
writer.WriteStartArray();
8985
writer.WriteNumberValue(cursor.LastUpdatedEpochMs);
9086
writer.WriteStringValue(cursor.Url);
91-
if (cursor.PitId is not null)
92-
writer.WriteStringValue(cursor.PitId);
9387
writer.WriteEndArray();
9488
writer.Flush();
9589

src/api/Elastic.Documentation.Api.Core/Changes/IChangesGateway.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public record ChangesResult
2626
}
2727

2828
/// <summary>Cursor for search_after pagination over changed pages.</summary>
29-
public record ChangesPageCursor(long LastUpdatedEpochMs, string Url, string? PitId = null);
29+
public record ChangesPageCursor(long LastUpdatedEpochMs, string Url);
3030

3131
/// <summary>Shared defaults for the changes feed.</summary>
3232
public static class ChangesDefaults

src/services/Elastic.Documentation.Search/ChangesGateway.cs

Lines changed: 24 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,46 +12,43 @@ namespace Elastic.Documentation.Search;
1212
/// <summary>
1313
/// Elasticsearch gateway for the documentation changes feed.
1414
/// Queries last_updated > since with search_after cursor pagination.
15-
/// Uses Point In Time (PIT) for consistent pagination across requests.
15+
/// Uses a shared Point In Time (PIT) for consistent pagination across requests.
1616
/// </summary>
17-
public partial class ChangesGateway(ElasticsearchClientAccessor clientAccessor, ILogger<ChangesGateway> logger)
18-
: IChangesGateway
17+
public partial class ChangesGateway(
18+
ElasticsearchClientAccessor clientAccessor,
19+
SharedPointInTimeManager pitManager,
20+
ILogger<ChangesGateway> logger
21+
) : IChangesGateway
1922
{
20-
private const string PitKeepAlive = "5m";
21-
2223
public async Task<ChangesResult> GetChangesAsync(ChangesRequest request, Cancel ctx = default)
2324
{
2425
var fetchSize = request.PageSize + 1;
2526

2627
try
2728
{
28-
var pitId = await ResolvePitId(request.Cursor?.PitId, ctx);
29+
var pitId = await pitManager.GetPitIdAsync(ctx);
2930

3031
var response = await Search(request, pitId, fetchSize, ctx);
3132

32-
if (!response.IsValidResponse)
33+
if (!response.IsValidResponse && IsExpiredPit(response))
3334
{
34-
if (IsExpiredPit(response) && request.Cursor?.PitId is not null)
35-
{
36-
LogPitExpired(logger);
37-
pitId = await OpenPit(ctx);
38-
var updatedCursor = request.Cursor with { PitId = pitId };
39-
response = await Search(request with { Cursor = updatedCursor }, pitId, fetchSize, ctx);
40-
}
35+
LogPitExpired(logger);
36+
await pitManager.HandleExpiredPitAsync(pitId, ctx);
37+
pitId = await pitManager.GetPitIdAsync(ctx);
38+
response = await Search(request, pitId, fetchSize, ctx);
39+
}
4140

42-
if (!response.IsValidResponse)
43-
{
44-
var reason = response.ElasticsearchServerError?.Error.Reason ?? "Unknown";
45-
throw new InvalidOperationException(
46-
$"Elasticsearch changes query failed (HTTP {response.ApiCallDetails?.HttpStatusCode}): {reason}"
47-
);
48-
}
41+
if (!response.IsValidResponse)
42+
{
43+
var reason = response.ElasticsearchServerError?.Error.Reason ?? "Unknown";
44+
throw new InvalidOperationException(
45+
$"Elasticsearch changes query failed (HTTP {response.ApiCallDetails?.HttpStatusCode}): {reason}"
46+
);
4947
}
5048

51-
// Use the PIT ID from the response if available, as ES may return a new one
52-
var responsePitId = response.PitId ?? pitId;
49+
pitManager.RefreshKeepAlive();
5350

54-
return BuildResult(response, request.PageSize, responsePitId);
51+
return BuildResult(response, request.PageSize);
5552
}
5653
catch (Exception ex)
5754
{
@@ -60,33 +57,6 @@ public async Task<ChangesResult> GetChangesAsync(ChangesRequest request, Cancel
6057
}
6158
}
6259

63-
private async Task<string> ResolvePitId(string? existingPitId, Cancel ctx)
64-
{
65-
if (!string.IsNullOrEmpty(existingPitId))
66-
return existingPitId;
67-
68-
return await OpenPit(ctx);
69-
}
70-
71-
private async Task<string> OpenPit(Cancel ctx)
72-
{
73-
var response = await clientAccessor.Client.OpenPointInTimeAsync(
74-
clientAccessor.SearchIndex,
75-
r => r.KeepAlive(PitKeepAlive),
76-
ctx
77-
);
78-
79-
if (!response.IsValidResponse)
80-
{
81-
throw new InvalidOperationException(
82-
$"Failed to open PIT: {response.ElasticsearchServerError?.Error.Reason ?? "Unknown"}"
83-
);
84-
}
85-
86-
LogPitOpened(logger, response.Id);
87-
return response.Id;
88-
}
89-
9060
private async Task<SearchResponse<DocumentationDocument>> Search(
9161
ChangesRequest request, string pitId, int fetchSize, Cancel ctx
9262
) =>
@@ -95,7 +65,7 @@ await clientAccessor.Client.SearchAsync<DocumentationDocument>(s =>
9565
_ = s
9666
.Size(fetchSize)
9767
.TrackTotalHits(t => t.Enabled(false))
98-
.Pit(p => p.Id(pitId).KeepAlive(PitKeepAlive))
68+
.Pit(p => p.Id(pitId).KeepAlive(SharedPointInTimeManager.PitKeepAlive))
9969
.Query(q => q.Range(r => r
10070
.Date(dr => dr
10171
.Field(f => f.LastUpdated)
@@ -132,7 +102,7 @@ private static bool IsExpiredPit(SearchResponse<DocumentationDocument> response)
132102
|| response.ElasticsearchServerError?.Error.Reason?.Contains("point in time", StringComparison.OrdinalIgnoreCase) == true
133103
|| response.ElasticsearchServerError?.Error.Reason?.Contains("No search context found", StringComparison.OrdinalIgnoreCase) == true;
134104

135-
private static ChangesResult BuildResult(SearchResponse<DocumentationDocument> response, int pageSize, string pitId)
105+
private static ChangesResult BuildResult(SearchResponse<DocumentationDocument> response, int pageSize)
136106
{
137107
var hits = response.Hits.ToList();
138108
var hasMore = hits.Count > pageSize;
@@ -167,7 +137,7 @@ private static ChangesResult BuildResult(SearchResponse<DocumentationDocument> r
167137
: default(long?);
168138

169139
if (epochMs is not null && sortUrl.TryGetString(out var url))
170-
nextCursor = new ChangesPageCursor(epochMs.Value, url!, pitId);
140+
nextCursor = new ChangesPageCursor(epochMs.Value, url!);
171141
}
172142
}
173143

@@ -178,9 +148,6 @@ private static ChangesResult BuildResult(SearchResponse<DocumentationDocument> r
178148
};
179149
}
180150

181-
[LoggerMessage(Level = LogLevel.Debug, Message = "Opened new PIT: {PitId}")]
182-
private static partial void LogPitOpened(ILogger logger, string pitId);
183-
184151
[LoggerMessage(Level = LogLevel.Warning, Message = "PIT expired or not found, opening a new one and retrying with existing search_after position")]
185152
private static partial void LogPitExpired(ILogger logger);
186153
}

src/services/Elastic.Documentation.Search/ServicesExtension.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public static IServiceCollection AddSearchServices(this IServiceCollection servi
3838
logger?.LogInformation("Full search use case registered with hybrid RRF support");
3939

4040
// Changes feed (cursor-paginated changes since a given date)
41+
_ = services.AddSingleton<SharedPointInTimeManager>();
4142
_ = services.AddScoped<IChangesGateway, ChangesGateway>();
4243
_ = services.AddScoped<ChangesUsecase>();
4344

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Elastic.Documentation.Search.Common;
6+
using Microsoft.Extensions.Logging;
7+
8+
namespace Elastic.Documentation.Search;
9+
10+
/// <summary>Singleton manager for a shared Elasticsearch Point In Time (PIT).</summary>
11+
public sealed partial class SharedPointInTimeManager(
12+
ElasticsearchClientAccessor clientAccessor,
13+
ILogger<SharedPointInTimeManager> logger
14+
) : IAsyncDisposable
15+
{
16+
private static readonly TimeSpan KeepAliveDuration = TimeSpan.FromMinutes(5);
17+
public const string PitKeepAlive = "5m";
18+
19+
private readonly SemaphoreSlim _semaphore = new(1, 1);
20+
private string? _pitId;
21+
private DateTimeOffset _expiresAt;
22+
23+
/// <summary>Returns a valid PIT ID, opening a new one if needed.</summary>
24+
public async Task<string> GetPitIdAsync(Cancel ctx)
25+
{
26+
await _semaphore.WaitAsync(ctx);
27+
try
28+
{
29+
if (_pitId is not null && DateTimeOffset.UtcNow < _expiresAt)
30+
return _pitId;
31+
32+
_pitId = await OpenPit(ctx);
33+
_expiresAt = DateTimeOffset.UtcNow.Add(KeepAliveDuration);
34+
return _pitId;
35+
}
36+
finally
37+
{
38+
_ = _semaphore.Release();
39+
}
40+
}
41+
42+
/// <summary>Nullifies the current PIT only if it matches the expired one, so a concurrent refresh is not discarded.</summary>
43+
public async Task HandleExpiredPitAsync(string expiredPitId, Cancel ctx)
44+
{
45+
await _semaphore.WaitAsync(ctx);
46+
try
47+
{
48+
if (_pitId == expiredPitId)
49+
{
50+
LogPitExpired(logger);
51+
_pitId = null;
52+
}
53+
}
54+
finally
55+
{
56+
_ = _semaphore.Release();
57+
}
58+
}
59+
60+
/// <summary>Bumps the local expiry after a successful search (ES extends the PIT server-side via KeepAlive).</summary>
61+
public void RefreshKeepAlive() => _expiresAt = DateTimeOffset.UtcNow.Add(KeepAliveDuration);
62+
63+
private async Task<string> OpenPit(Cancel ctx)
64+
{
65+
var response = await clientAccessor.Client.OpenPointInTimeAsync(
66+
clientAccessor.SearchIndex,
67+
r => r.KeepAlive(PitKeepAlive),
68+
ctx
69+
);
70+
71+
if (!response.IsValidResponse)
72+
{
73+
throw new InvalidOperationException(
74+
$"Failed to open PIT: {response.ElasticsearchServerError?.Error.Reason ?? "Unknown"}"
75+
);
76+
}
77+
78+
LogPitOpened(logger, response.Id);
79+
return response.Id;
80+
}
81+
82+
/// <inheritdoc />
83+
public async ValueTask DisposeAsync()
84+
{
85+
if (_pitId is null)
86+
return;
87+
88+
try
89+
{
90+
_ = await clientAccessor.Client.ClosePointInTimeAsync(r => r.Id(_pitId));
91+
LogPitClosed(logger, _pitId);
92+
}
93+
catch (Exception ex)
94+
{
95+
logger.LogWarning(ex, "Failed to close PIT {PitId} during shutdown", _pitId);
96+
}
97+
finally
98+
{
99+
_semaphore.Dispose();
100+
}
101+
}
102+
103+
[LoggerMessage(Level = LogLevel.Debug, Message = "Opened new shared PIT: {PitId}")]
104+
private static partial void LogPitOpened(ILogger logger, string pitId);
105+
106+
[LoggerMessage(Level = LogLevel.Warning, Message = "Shared PIT expired or not found, will open a new one")]
107+
private static partial void LogPitExpired(ILogger logger);
108+
109+
[LoggerMessage(Level = LogLevel.Debug, Message = "Closed shared PIT: {PitId}")]
110+
private static partial void LogPitClosed(ILogger logger, string pitId);
111+
}

0 commit comments

Comments
 (0)