44import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
55import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelBuilder ;
66import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelConnector ;
7- import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
87import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayload ;
98import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayloadType ;
109import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueueSource ;
2423import java .util .concurrent .LinkedBlockingQueue ;
2524import java .util .concurrent .TimeUnit ;
2625import java .util .concurrent .atomic .AtomicBoolean ;
27- import java .util .function .Consumer ;
2826import lombok .extern .slf4j .Slf4j ;
2927
3028/**
31- * Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract.
29+ * Implements the {@link QueueSource} contract and emit flags obtained from
30+ * flagd sync gRPC contract.
3231 */
3332@ Slf4j
3433@ SuppressFBWarnings (
3534 value = {"EI_EXPOSE_REP" },
36- justification = "Random is used to generate a variation & flag configurations require exposing " )
35+ justification = "We need to expose the BlockingQueue to allow consumers to read from it " )
3736public class SyncStreamQueueSource implements QueueSource {
3837 private static final int QUEUE_SIZE = 5 ;
3938
@@ -45,26 +44,42 @@ public class SyncStreamQueueSource implements QueueSource {
4544 private final String selector ;
4645 private final String providerId ;
4746 private final boolean syncMetadataDisabled ;
48- private final ChannelConnector channelConnector ;
47+ private final boolean reinitializeOnError ;
48+ private final FlagdOptions options ;
4949 private final BlockingQueue <QueuePayload > outgoingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
50- private final FlagSyncServiceStub flagSyncStub ;
51- private final FlagSyncServiceBlockingStub metadataStub ;
50+ private volatile GrpcComponents grpcComponents ;
5251
5352 /**
54- * Creates a new SyncStreamQueueSource responsible for observing the event stream.
53+ * Container for gRPC components to ensure atomicity during reinitialization.
54+ * All three components are updated together to prevent consumers from seeing
55+ * an inconsistent state where components are from different channel instances.
5556 */
56- public SyncStreamQueueSource (final FlagdOptions options , Consumer <FlagdProviderEvent > onConnectionEvent ) {
57+ private static class GrpcComponents {
58+ final ChannelConnector channelConnector ;
59+ final FlagSyncServiceStub flagSyncStub ;
60+ final FlagSyncServiceBlockingStub metadataStub ;
61+
62+ GrpcComponents (ChannelConnector connector , FlagSyncServiceStub stub , FlagSyncServiceBlockingStub blockingStub ) {
63+ this .channelConnector = connector ;
64+ this .flagSyncStub = stub ;
65+ this .metadataStub = blockingStub ;
66+ }
67+ }
68+
69+ /**
70+ * Creates a new SyncStreamQueueSource responsible for observing the event
71+ * stream.
72+ */
73+ public SyncStreamQueueSource (final FlagdOptions options ) {
5774 streamDeadline = options .getStreamDeadlineMs ();
5875 deadline = options .getDeadline ();
5976 selector = options .getSelector ();
6077 providerId = options .getProviderId ();
6178 maxBackoffMs = options .getRetryBackoffMaxMs ();
6279 syncMetadataDisabled = options .isSyncMetadataDisabled ();
63- channelConnector = new ChannelConnector (options , onConnectionEvent , ChannelBuilder .nettyChannel (options ));
64- flagSyncStub =
65- FlagSyncServiceGrpc .newStub (channelConnector .getChannel ()).withWaitForReady ();
66- metadataStub = FlagSyncServiceGrpc .newBlockingStub (channelConnector .getChannel ())
67- .withWaitForReady ();
80+ reinitializeOnError = options .isReinitializeOnError ();
81+ this .options = options ;
82+ initializeChannelComponents ();
6883 }
6984
7085 // internal use only
@@ -77,16 +92,54 @@ protected SyncStreamQueueSource(
7792 deadline = options .getDeadline ();
7893 selector = options .getSelector ();
7994 providerId = options .getProviderId ();
80- channelConnector = connectorMock ;
8195 maxBackoffMs = options .getRetryBackoffMaxMs ();
82- flagSyncStub = stubMock ;
8396 syncMetadataDisabled = options .isSyncMetadataDisabled ();
84- metadataStub = blockingStubMock ;
97+ reinitializeOnError = options .isReinitializeOnError ();
98+ this .options = options ;
99+ this .grpcComponents = new GrpcComponents (connectorMock , stubMock , blockingStubMock );
100+ }
101+
102+ /** Initialize channel connector and stubs. */
103+ private synchronized void initializeChannelComponents () {
104+ ChannelConnector newConnector = new ChannelConnector (options , ChannelBuilder .nettyChannel (options ));
105+ FlagSyncServiceStub newFlagSyncStub =
106+ FlagSyncServiceGrpc .newStub (newConnector .getChannel ()).withWaitForReady ();
107+ FlagSyncServiceBlockingStub newMetadataStub =
108+ FlagSyncServiceGrpc .newBlockingStub (newConnector .getChannel ()).withWaitForReady ();
109+
110+ // atomic assignment of all components as a single unit
111+ grpcComponents = new GrpcComponents (newConnector , newFlagSyncStub , newMetadataStub );
112+ }
113+
114+ /** Reinitialize channel connector and stubs on error. */
115+ public synchronized void reinitializeChannelComponents () {
116+ if (!reinitializeOnError || shutdown .get ()) {
117+ return ;
118+ }
119+
120+ log .info ("Reinitializing channel gRPC components in attempt to restore stream." );
121+ GrpcComponents oldComponents = grpcComponents ;
122+
123+ try {
124+ // create new channel components first
125+ initializeChannelComponents ();
126+ } catch (Exception e ) {
127+ log .error ("Failed to reinitialize channel components" , e );
128+ return ;
129+ }
130+
131+ // shutdown old connector after successful reinitialization
132+ if (oldComponents != null && oldComponents .channelConnector != null ) {
133+ try {
134+ oldComponents .channelConnector .shutdown ();
135+ } catch (Exception e ) {
136+ log .debug ("Error shutting down old channel connector during reinitialization" , e );
137+ }
138+ }
85139 }
86140
87141 /** Initialize sync stream connector. */
88142 public void init () throws Exception {
89- channelConnector .initialize ();
90143 Thread listener = new Thread (this ::observeSyncStream );
91144 listener .setDaemon (true );
92145 listener .start ();
@@ -109,7 +162,7 @@ public void shutdown() throws InterruptedException {
109162 log .debug ("Shutdown already in progress or completed" );
110163 return ;
111164 }
112- this .channelConnector .shutdown ();
165+ grpcComponents .channelConnector .shutdown ();
113166 }
114167
115168 /** Contains blocking calls, to be used concurrently. */
@@ -159,13 +212,14 @@ private void observeSyncStream() {
159212 log .info ("Shutdown invoked, exiting event stream listener" );
160213 }
161214
162- // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
215+ // TODO: remove the metadata call entirely after
216+ // https://github.com/open-feature/flagd/issues/1584
163217 private Struct getMetadata () {
164218 if (syncMetadataDisabled ) {
165219 return null ;
166220 }
167221
168- FlagSyncServiceBlockingStub localStub = metadataStub ;
222+ FlagSyncServiceBlockingStub localStub = grpcComponents . metadataStub ;
169223
170224 if (deadline > 0 ) {
171225 localStub = localStub .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS );
@@ -180,7 +234,8 @@ private Struct getMetadata() {
180234
181235 return null ;
182236 } catch (StatusRuntimeException e ) {
183- // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we
237+ // In newer versions of flagd, metadata is part of the sync stream. If the
238+ // method is unimplemented, we
184239 // can ignore the error
185240 if (e .getStatus () != null
186241 && Status .Code .UNIMPLEMENTED .equals (e .getStatus ().getCode ())) {
@@ -192,7 +247,7 @@ private Struct getMetadata() {
192247 }
193248
194249 private void syncFlags (SyncStreamObserver streamObserver ) {
195- FlagSyncServiceStub localStub = flagSyncStub ; // don't mutate the stub
250+ FlagSyncServiceStub localStub = grpcComponents . flagSyncStub ; // don't mutate the stub
196251 if (streamDeadline > 0 ) {
197252 localStub = localStub .withDeadlineAfter (streamDeadline , TimeUnit .MILLISECONDS );
198253 }
0 commit comments