-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathScatterGatherApi.cs
More file actions
167 lines (143 loc) · 6.48 KB
/
ScatterGatherApi.cs
File metadata and controls
167 lines (143 loc) · 6.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
namespace ServiceControl.CompositeViews.Messages
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Infrastructure.WebApi;
using NServiceBus.Logging;
using Persistence.Infrastructure;
using ServiceBus.Management.Infrastructure.Settings;
using JsonSerializer = System.Text.Json.JsonSerializer;
interface IApi
{
}
// used to hoist the static jsonSerializer field across the generic instances
public abstract class ScatterGatherApiBase
{
}
public record ScatterGatherContext(PagingInfo PagingInfo);
public abstract class ScatterGatherApi<TDataStore, TIn, TOut> : ScatterGatherApiBase, IApi
where TIn : ScatterGatherContext
where TOut : class
{
protected ScatterGatherApi(TDataStore store, Settings settings, IHttpClientFactory httpClientFactory)
{
DataStore = store;
Settings = settings;
HttpClientFactory = httpClientFactory;
logger = LogManager.GetLogger(GetType());
}
protected TDataStore DataStore { get; }
Settings Settings { get; }
IHttpClientFactory HttpClientFactory { get; }
public async Task<QueryResult<TOut>> Execute(TIn input, string pathAndQuery)
{
var remotes = Settings.RemoteInstances;
var instanceId = Settings.InstanceId;
var tasks = new List<Task<QueryResult<TOut>>>(remotes.Length + 1)
{
LocalCall(input, instanceId)
};
foreach (var remote in remotes)
{
if (remote.TemporarilyUnavailable)
{
continue;
}
tasks.Add(RemoteCall(HttpClientFactory.CreateClient(remote.InstanceId), pathAndQuery, remote));
}
var results = await Task.WhenAll(tasks);
var response = AggregateResults(input, results);
return response;
}
async Task<QueryResult<TOut>> LocalCall(TIn input, string instanceId)
{
var result = await LocalQuery(input);
result.InstanceId = instanceId;
return result;
}
protected abstract Task<QueryResult<TOut>> LocalQuery(TIn input);
internal QueryResult<TOut> AggregateResults(TIn input, QueryResult<TOut>[] results)
{
var combinedResults = ProcessResults(input, results);
return new QueryResult<TOut>(
combinedResults,
AggregateStats(input, results, combinedResults)
);
}
protected abstract TOut ProcessResults(TIn input, QueryResult<TOut>[] results);
protected virtual QueryStatsInfo AggregateStats(TIn input, IEnumerable<QueryResult<TOut>> results, TOut processedResults)
{
var infos = results.Select(x => x.QueryStats).ToArray();
return new QueryStatsInfo(
string.Concat(infos.OrderBy(x => x.ETag).Select(x => x.ETag)),
infos.Sum(x => x.TotalCount),
isStale: infos.Any(x => x.IsStale),
infos.Max(x => x.HighestTotalCountOfAllTheInstances)
);
}
async Task<QueryResult<TOut>> RemoteCall(HttpClient client, string pathAndQuery, RemoteInstanceSetting remoteInstanceSetting)
{
var fetched = await FetchAndParse(client, pathAndQuery, remoteInstanceSetting);
fetched.InstanceId = remoteInstanceSetting.InstanceId;
return fetched;
}
async Task<QueryResult<TOut>> FetchAndParse(HttpClient httpClient, string pathAndQuery, RemoteInstanceSetting remoteInstanceSetting)
{
try
{
// Assuming SendAsync returns uncompressed response and the AutomaticDecompression is enabled on the http client.
var rawResponse = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, pathAndQuery));
// special case - queried by conversation ID and nothing was found
if (rawResponse.StatusCode == HttpStatusCode.NotFound)
{
return QueryResult<TOut>.Empty();
}
return await ParseResult(rawResponse);
}
catch (HttpRequestException httpRequestException)
{
remoteInstanceSetting.TemporarilyUnavailable = true;
logger.Warn(
$"An HttpRequestException occurred when querying remote instance at {remoteInstanceSetting.BaseAddress}. The instance at uri: {remoteInstanceSetting.BaseAddress} will be temporarily disabled.",
httpRequestException);
return QueryResult<TOut>.Empty();
}
catch (OperationCanceledException) // Intentional, used to gracefully handle timeout
{
logger.Warn($"Failed to query remote instance at {remoteInstanceSetting.BaseAddress} due to a timeout");
return QueryResult<TOut>.Empty();
}
catch (Exception exception)
{
logger.Warn($"Failed to query remote instance at {remoteInstanceSetting.BaseAddress}.", exception);
return QueryResult<TOut>.Empty();
}
}
static async Task<QueryResult<TOut>> ParseResult(HttpResponseMessage responseMessage)
{
await using var responseStream = await responseMessage.Content.ReadAsStreamAsync();
var remoteResults = await JsonSerializer.DeserializeAsync<TOut>(responseStream, SerializerOptions.Default);
var totalCount = 0;
if (responseMessage.Headers.TryGetValues("Total-Count", out var totalCounts))
{
totalCount = int.Parse(totalCounts.ElementAt(0));
}
string etag = responseMessage.Headers.ETag?.Tag;
if (etag != null)
{
// Strip quotes from Etag, checking for " which isn't really needed as Etag always has quotes but not 100% certain.
// Later the value is joined into a new Etag when the results are aggregated and returned
if (etag.StartsWith("\""))
{
etag = etag.Substring(1, etag.Length - 2);
}
}
return new QueryResult<TOut>(remoteResults, new QueryStatsInfo(etag, totalCount, isStale: false));
}
readonly ILog logger;
}
}