1313
1414package io .dapr .durabletask ;
1515
16+ import com .google .protobuf .Empty ;
1617import io .dapr .durabletask .implementation .protobuf .OrchestratorService ;
1718import io .dapr .durabletask .implementation .protobuf .TaskHubSidecarServiceGrpc ;
1819import io .dapr .durabletask .orchestration .TaskOrchestrationFactories ;
3536import java .util .Map ;
3637import java .util .concurrent .ExecutorService ;
3738import java .util .concurrent .Executors ;
39+ import java .util .concurrent .ScheduledExecutorService ;
3840import java .util .concurrent .TimeUnit ;
3941import java .util .logging .Level ;
4042import java .util .logging .Logger ;
@@ -48,6 +50,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4850 private static final int DEFAULT_PORT = 4001 ;
4951 private static final Logger logger = Logger .getLogger (DurableTaskGrpcWorker .class .getPackage ().getName ());
5052 private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration .ofDays (3 );
53+ private static final long KEEPALIVE_INTERVAL_SECONDS = 30 ;
5154
5255 private final TaskOrchestrationFactories orchestrationFactories ;
5356
@@ -63,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6366 private final TaskHubSidecarServiceGrpc .TaskHubSidecarServiceBlockingStub sidecarClient ;
6467 private final boolean isExecutorServiceManaged ;
6568 private volatile boolean isNormalShutdown = false ;
69+ private ScheduledExecutorService keepaliveScheduler ;
6670 private Thread workerThread ;
6771
6872 DurableTaskGrpcWorker (DurableTaskGrpcWorkerBuilder builder ) {
@@ -136,6 +140,7 @@ public void close() {
136140 this .workerThread .interrupt ();
137141 }
138142 this .isNormalShutdown = true ;
143+ this .stopKeepaliveLoop ();
139144 this .shutDownWorkerPool ();
140145 this .closeSideCarChannel ();
141146 }
@@ -175,6 +180,7 @@ public void startAndBlock() {
175180 OrchestratorService .GetWorkItemsRequest getWorkItemsRequest = OrchestratorService .GetWorkItemsRequest
176181 .newBuilder ().build ();
177182 Iterator <OrchestratorService .WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
183+ startKeepaliveLoop ();
178184 while (workItemStream .hasNext ()) {
179185 OrchestratorService .WorkItem workItem = workItemStream .next ();
180186 OrchestratorService .WorkItem .RequestCase requestType = workItem .getRequestCase ();
@@ -214,6 +220,8 @@ public void startAndBlock() {
214220 String .format ("Unexpected failure connecting to %s" , this .getSidecarAddress ()), e );
215221 }
216222
223+ stopKeepaliveLoop ();
224+
217225 // Retry after 5 seconds
218226 try {
219227 Thread .sleep (5000 );
@@ -262,6 +270,37 @@ private void shutDownWorkerPool() {
262270 }
263271 }
264272
273+ /**
274+ * Starts a background keepalive loop to keep the gRPC connection alive.
275+ * This is an application-level keepalive to prevent AWS ALBs from
276+ * killing idle HTTP/2 connections.
277+ */
278+ private void startKeepaliveLoop () {
279+ stopKeepaliveLoop ();
280+ this .keepaliveScheduler = Executors .newSingleThreadScheduledExecutor (r -> {
281+ Thread t = new Thread (r , "durabletask-keepalive" );
282+ t .setDaemon (true );
283+ return t ;
284+ });
285+ this .keepaliveScheduler .scheduleAtFixedRate (() -> {
286+ try {
287+ this .sidecarClient .hello (Empty .getDefaultInstance ());
288+ } catch (StatusRuntimeException e ) {
289+ logger .log (Level .FINE , "keepalive failed" , e );
290+ }
291+ }, KEEPALIVE_INTERVAL_SECONDS , KEEPALIVE_INTERVAL_SECONDS , TimeUnit .SECONDS );
292+ }
293+
294+ /**
295+ * Stops the background keepalive loop if one is running.
296+ */
297+ private void stopKeepaliveLoop () {
298+ if (this .keepaliveScheduler != null ) {
299+ this .keepaliveScheduler .shutdownNow ();
300+ this .keepaliveScheduler = null ;
301+ }
302+ }
303+
265304 private String getSidecarAddress () {
266305 return this .sidecarClient .getChannel ().authority ();
267306 }
0 commit comments