Skip to content

Commit e62f581

Browse files
authored
Merge pull request #210 from buildersoftio/v3/bug/209
v3/bug/209 : Improve error handling and diagnostics in Streams/Kafka
2 parents 08efb09 + 40037de commit e62f581

6 files changed

Lines changed: 1094 additions & 9 deletions

File tree

src/Cortex.Streams/Operators/SinkOperatorAdapter.cs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
1414
{
1515
private readonly ISinkOperator<TInput> _sinkOperator;
1616

17+
// Cached operator name to avoid string allocation on hot path
18+
private static readonly string OperatorName = $"SinkOperatorAdapter<{typeof(TInput).Name}>";
19+
1720
// Telemetry fields
1821
private ITelemetryProvider _telemetryProvider;
1922
private ICounter _processedCounter;
@@ -22,6 +25,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
2225
private Action _incrementProcessedCounter;
2326
private Action<double> _recordProcessingTime;
2427

28+
// Error handling fields
29+
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;
30+
2531
public SinkOperatorAdapter(ISinkOperator<TInput> sinkOperator)
2632
{
2733
_sinkOperator = sinkOperator ?? throw new ArgumentNullException(nameof(sinkOperator));
@@ -51,9 +57,12 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
5157

5258
/// <summary>
5359
/// Forwards error handling configuration to the wrapped sink operator if it implements IErrorHandlingEnabled.
60+
/// Also stores the options for use in this adapter's error handling.
5461
/// </summary>
5562
public void SetErrorHandling(StreamExecutionOptions options)
5663
{
64+
_executionOptions = options ?? StreamExecutionOptions.Default;
65+
5766
// Forward error handling to the wrapped sink operator if it supports it
5867
if (_sinkOperator is IErrorHandlingEnabled errorHandlingEnabled)
5968
{
@@ -71,8 +80,19 @@ public void Process(object input)
7180
{
7281
try
7382
{
74-
_sinkOperator.Process((TInput)input);
75-
span.SetAttribute("status", "success");
83+
var executed = ErrorHandlingHelper.TryExecute<TInput>(
84+
_executionOptions,
85+
OperatorName,
86+
input,
87+
item => _sinkOperator.Process(item));
88+
89+
span.SetAttribute("status", executed ? "success" : "skipped");
90+
}
91+
catch (StreamStoppedException ex)
92+
{
93+
span.SetAttribute("status", "stopped");
94+
span.SetAttribute("exception", ex.ToString());
95+
throw;
7696
}
7797
catch (Exception ex)
7898
{
@@ -83,14 +103,18 @@ public void Process(object input)
83103
finally
84104
{
85105
stopwatch.Stop();
86-
_recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds);
87-
_incrementProcessedCounter();
106+
_recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds);
107+
_incrementProcessedCounter?.Invoke();
88108
}
89109
}
90110
}
91111
else
92112
{
93-
_sinkOperator.Process((TInput)input);
113+
ErrorHandlingHelper.TryExecute<TInput>(
114+
_executionOptions,
115+
OperatorName,
116+
input,
117+
item => _sinkOperator.Process(item));
94118
}
95119
}
96120

src/Cortex.Streams/Operators/SourceOperatorAdapter.cs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@
66

77
namespace Cortex.Streams.Operators
88
{
9-
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled
9+
/// <summary>
10+
/// Adapter that wraps an ISourceOperator to work within the operator chain.
11+
/// Handles telemetry, error handling, and lifecycle management for source operators.
12+
/// </summary>
13+
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled
1014
{
1115
private readonly ISourceOperator<TOutput> _sourceOperator;
1216
private IOperator _nextOperator;
1317

18+
// Cached operator name to avoid string allocation on hot path
19+
private static readonly string OperatorName = $"SourceOperatorAdapter<{typeof(TOutput).Name}>";
20+
1421
// Telemetry fields
1522
private ITelemetryProvider _telemetryProvider;
1623
private ICounter _emittedCounter;
@@ -19,11 +26,28 @@ public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITel
1926
private Action _incrementEmittedCounter;
2027
private Action<double> _recordEmissionTime;
2128

29+
// Error handling fields
30+
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;
31+
2232
public SourceOperatorAdapter(ISourceOperator<TOutput> sourceOperator)
2333
{
2434
_sourceOperator = sourceOperator;
2535
}
2636

37+
/// <summary>
38+
/// Sets the error handling options for this operator and propagates to next operators.
39+
/// </summary>
40+
public void SetErrorHandling(StreamExecutionOptions options)
41+
{
42+
_executionOptions = options ?? StreamExecutionOptions.Default;
43+
44+
// Propagate to the next operator if it supports error handling
45+
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling)
46+
{
47+
nextWithErrorHandling.SetErrorHandling(_executionOptions);
48+
}
49+
}
50+
2751
public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
2852
{
2953
_telemetryProvider = telemetryProvider;
@@ -67,6 +91,12 @@ public void SetNext(IOperator nextOperator)
6791
nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider);
6892
}
6993

94+
// Propagate error handling to the next operator
95+
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling && _executionOptions != null)
96+
{
97+
nextWithErrorHandling.SetErrorHandling(_executionOptions);
98+
}
99+
70100
// Start the source operator
71101
Start();
72102
}
@@ -84,8 +114,15 @@ private void Start()
84114
try
85115
{
86116
_incrementEmittedCounter?.Invoke();
87-
_nextOperator?.Process(output);
88-
span.SetAttribute("status", "success");
117+
118+
// Use error handling helper to properly handle errors according to stream configuration
119+
var executed = ErrorHandlingHelper.TryExecute<TOutput>(
120+
_executionOptions,
121+
OperatorName,
122+
output,
123+
item => _nextOperator?.Process(item));
124+
125+
span.SetAttribute("status", executed ? "success" : "skipped");
89126
}
90127
catch (StreamStoppedException ex)
91128
{
@@ -99,6 +136,7 @@ private void Start()
99136
{
100137
span.SetAttribute("status", "error");
101138
span.SetAttribute("exception", ex.ToString());
139+
// Re-throw to let the source operator handle it (e.g., logging, stopping)
102140
throw;
103141
}
104142
finally
@@ -112,7 +150,12 @@ private void Start()
112150
{
113151
try
114152
{
115-
_nextOperator?.Process(output);
153+
// Use error handling helper to properly handle errors according to stream configuration
154+
ErrorHandlingHelper.TryExecute<TOutput>(
155+
_executionOptions,
156+
OperatorName,
157+
output,
158+
item => _nextOperator?.Process(item));
116159
}
117160
catch (StreamStoppedException)
118161
{

src/Cortex.Tests/Cortex.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<ProjectReference Include="..\Cortex.Mediator.Behaviors.Transactional\Cortex.Mediator.Behaviors.Transactional.csproj" />
3131
<ProjectReference Include="..\Cortex.Mediator\Cortex.Mediator.csproj" />
3232
<ProjectReference Include="..\Cortex.Serialization.Yaml\Cortex.Serialization.Yaml.csproj" />
33+
<ProjectReference Include="..\Cortex.Streams.Kafka\Cortex.Streams.Kafka.csproj" />
3334
<ProjectReference Include="..\Cortex.Streams.Mediator\Cortex.Streams.Mediator.csproj" />
3435
<ProjectReference Include="..\Cortex.Streams\Cortex.Streams.csproj" />
3536
<ProjectReference Include="..\Cortex.Telemetry\Cortex.Telemetry.csproj" />

0 commit comments

Comments
 (0)