1616 using ServiceControl . Infrastructure ;
1717 using Transports ;
1818
19- class AuditIngestion : IHostedService
19+ class AuditIngestion : BackgroundService
2020 {
2121 public AuditIngestion (
2222 Settings settings ,
@@ -51,23 +51,15 @@ public AuditIngestion(
5151
5252 errorHandlingPolicy = new AuditIngestionFaultPolicy ( failedImportsStorage , settings . LoggingSettings , OnCriticalError ) ;
5353
54- watchdog = new Watchdog ( "audit message ingestion" , EnsureStarted , EnsureStopped , ingestionState . ReportError , ingestionState . Clear , settings . TimeToRestartAuditIngestionAfterFailure , logger ) ;
55-
56- ingestionWorker = Task . Run ( ( ) => Loop ( ) , CancellationToken . None ) ;
57- }
58-
59- public Task StartAsync ( CancellationToken _ ) => watchdog . Start ( ( ) => applicationLifetime . StopApplication ( ) ) ;
60-
61- public async Task StopAsync ( CancellationToken cancellationToken )
62- {
63- await watchdog . Stop ( ) ;
64- channel . Writer . Complete ( ) ;
65- await ingestionWorker ;
66-
67- if ( transportInfrastructure != null )
68- {
69- await transportInfrastructure . Shutdown ( cancellationToken ) ;
70- }
54+ watchdog = new Watchdog (
55+ "audit message ingestion" ,
56+ EnsureStarted ,
57+ EnsureStopped ,
58+ ingestionState . ReportError ,
59+ ingestionState . Clear ,
60+ settings . TimeToRestartAuditIngestionAfterFailure ,
61+ logger
62+ ) ;
7163 }
7264
7365 Task OnCriticalError ( string failure , Exception exception )
@@ -132,7 +124,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
132124 }
133125 catch ( OperationCanceledException e ) when ( e . CancellationToken == cancellationToken )
134126 {
135- // ignored
127+ logger . Info ( "StopReceive cancelled" ) ;
136128 }
137129 }
138130
@@ -170,7 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
170162 }
171163 catch ( OperationCanceledException e ) when ( e . CancellationToken == cancellationToken )
172164 {
173- // ignored
165+ logger . Info ( "StopReceive cancelled" ) ;
174166 }
175167 finally
176168 {
@@ -200,57 +192,92 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
200192 }
201193 }
202194
203- async Task Loop ( )
195+ public override async Task StartAsync ( CancellationToken cancellationToken )
204196 {
205- var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
197+ await watchdog . Start ( ( ) => applicationLifetime . StopApplication ( ) ) ;
198+ await base . StartAsync ( cancellationToken ) ;
199+ }
206200
207- while ( await channel . Reader . WaitToReadAsync ( ) )
201+ protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
202+ {
203+ try
208204 {
209- // will only enter here if there is something to read.
210- try
205+ var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
206+
207+ while ( await channel . Reader . WaitToReadAsync ( stoppingToken ) )
211208 {
212- // as long as there is something to read this will fetch up to MaximumConcurrency items
213- while ( channel . Reader . TryRead ( out var context ) )
209+ // will only enter here if there is something to read.
210+ try
214211 {
215- contexts . Add ( context ) ;
212+ // as long as there is something to read this will fetch up to MaximumConcurrency items
213+ while ( channel . Reader . TryRead ( out var context ) )
214+ {
215+ contexts . Add ( context ) ;
216+ }
217+
218+ auditBatchSize . Record ( contexts . Count ) ;
219+
220+ using ( new DurationRecorder ( auditBatchDuration ) )
221+ {
222+ await auditIngestor . Ingest ( contexts ) ;
223+ }
224+
225+ consecutiveBatchFailuresCounter . Record ( 0 ) ;
216226 }
227+ catch ( Exception e )
228+ {
229+ // signal all message handling tasks to terminate
230+ foreach ( var context in contexts )
231+ {
232+ _ = context . GetTaskCompletionSource ( ) . TrySetException ( e ) ;
233+ }
234+
235+ if ( e is OperationCanceledException && stoppingToken . IsCancellationRequested )
236+ {
237+ logger . Info ( "Batch cancelled" , e ) ;
238+ break ;
239+ }
217240
218- auditBatchSize . Record ( contexts . Count ) ;
241+ logger . Info ( "Ingesting messages failed" , e ) ;
219242
220- using ( new DurationRecorder ( auditBatchDuration ) )
243+ // no need to do interlocked increment since this is running sequential
244+ consecutiveBatchFailuresCounter . Record ( consecutiveBatchFailures ++ ) ;
245+ }
246+ finally
221247 {
222- await auditIngestor . Ingest ( contexts ) ;
248+ contexts . Clear ( ) ;
223249 }
224-
225- consecutiveBatchFailuresCounter . Record ( 0 ) ;
226250 }
227- catch ( OperationCanceledException )
228- {
229- //Do nothing as we are shutting down
230- continue ;
231- }
232- catch ( Exception e ) // show must go on
251+ // will fall out here when writer is completed
252+ }
253+ catch ( OperationCanceledException ) when ( stoppingToken . IsCancellationRequested )
254+ {
255+ // ExecuteAsync cancelled
256+ }
257+ }
258+
259+ public override async Task StopAsync ( CancellationToken cancellationToken )
260+ {
261+ try
262+ {
263+ await watchdog . Stop ( ) ;
264+ channel . Writer . Complete ( ) ;
265+ await base . StopAsync ( cancellationToken ) ;
266+ }
267+ finally
268+ {
269+ if ( transportInfrastructure != null )
233270 {
234- if ( logger . IsInfoEnabled )
271+ try
235272 {
236- logger . Info ( "Ingesting messages failed" , e ) ;
273+ await transportInfrastructure . Shutdown ( cancellationToken ) ;
237274 }
238-
239- // signal all message handling tasks to terminate
240- foreach ( var context in contexts )
275+ catch ( OperationCanceledException e ) when ( cancellationToken . IsCancellationRequested )
241276 {
242- context . GetTaskCompletionSource ( ) . TrySetException ( e ) ;
277+ logger . Info ( "Shutdown cancelled" , e ) ;
243278 }
244-
245- // no need to do interlocked increment since this is running sequential
246- consecutiveBatchFailuresCounter . Record ( consecutiveBatchFailures ++ ) ;
247- }
248- finally
249- {
250- contexts . Clear ( ) ;
251279 }
252280 }
253- // will fall out here when writer is completed
254281 }
255282
256283 TransportInfrastructure transportInfrastructure ;
@@ -273,7 +300,6 @@ async Task Loop()
273300 readonly Histogram < long > consecutiveBatchFailuresCounter = Telemetry . Meter . CreateHistogram < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "consecutive_batch_failures" ) , unit : "count" , description : "Consecutive audit ingestion batch failure" ) ;
274301 readonly Histogram < double > ingestionDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion" , "duration" ) , unit : "ms" , description : "Average incoming audit message processing duration" ) ;
275302 readonly Watchdog watchdog ;
276- readonly Task ingestionWorker ;
277303 readonly IHostApplicationLifetime applicationLifetime ;
278304
279305 static readonly ILog logger = LogManager . GetLogger < AuditIngestion > ( ) ;
0 commit comments