Skip to content

Commit 21a74d9

Browse files
committed
refacto: make async persisting logic more readable and maintainable
use a ScheduledExecutorService to handle the background periodic logic safely cancel, wait for completion and close the threads and executor using its api remove Thread::sleep from the code
1 parent 479bc8b commit 21a74d9

10 files changed

Lines changed: 794 additions & 458 deletions

src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java

Lines changed: 123 additions & 126 deletions
Large diffs are not rendered by default.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright (c) 2026, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.sensitivityanalysis.server.util;
8+
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.UUID;
14+
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.function.BiConsumer;
17+
18+
/**
19+
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
20+
*/
21+
@Slf4j
22+
public class BatchAsyncPoller<T> {
23+
24+
protected static final int BUFFER_SIZE = 512;
25+
private static final int TASK_INITIAL_DELAY = 0;
26+
private static final int TASK_DELAY = 100;
27+
28+
private final UUID resultUuid;
29+
private final String taskName;
30+
private final AtomicBoolean isProducerFinished;
31+
private final BiConsumer<UUID, List<T>> batchHandlingFunction;
32+
33+
private final BlockingQueue<T> blockingQueue;
34+
private final ScheduledFuture<?> pollingFuture;
35+
36+
public BatchAsyncPoller(ScheduledExecutorService scheduledExecutorService, UUID resultUuid,
37+
String taskName, BiConsumer<UUID, List<T>> batchHandlingFunction) {
38+
this.resultUuid = resultUuid;
39+
this.taskName = taskName;
40+
this.batchHandlingFunction = batchHandlingFunction;
41+
this.isProducerFinished = new AtomicBoolean(false);
42+
43+
this.blockingQueue = new LinkedBlockingQueue<>();
44+
this.pollingFuture = scheduledExecutorService.scheduleWithFixedDelay(this::drainQueue, TASK_INITIAL_DELAY, TASK_DELAY, TimeUnit.MILLISECONDS);
45+
}
46+
47+
public void add(T data) {
48+
if (pollingFuture.isDone()) {
49+
throw new IllegalStateException("Cannot add data to a finished Poller");
50+
}
51+
52+
blockingQueue.add(data);
53+
}
54+
55+
public void notifyCompletion() {
56+
isProducerFinished.set(true);
57+
}
58+
59+
public void waitForCompletion() {
60+
try {
61+
pollingFuture.get();
62+
} catch (CancellationException e) {
63+
log.warn("{} - Task was canceled", taskName, e);
64+
} catch (InterruptedException e) {
65+
Thread.currentThread().interrupt();
66+
} catch (Exception e) {
67+
log.error("{} - Unexpected error occurred during completion wait", taskName, e);
68+
}
69+
}
70+
71+
private void drainQueue() {
72+
try {
73+
List<T> buffer = new ArrayList<>(BUFFER_SIZE);
74+
75+
while (!shouldStop() && hasDrainedData(buffer)) {
76+
log.debug("{} - Treating {} elements in the batch", taskName, buffer.size());
77+
log.debug("{} - Remaining {} elements in the queue", taskName, blockingQueue.size());
78+
batchHandlingFunction.accept(resultUuid, new ArrayList<>(buffer));
79+
buffer.clear();
80+
}
81+
} catch (Exception e) {
82+
log.error("{} - Unexpected error occurred during persisting results", taskName, e);
83+
}
84+
85+
if (shouldStop()) {
86+
pollingFuture.cancel(false);
87+
}
88+
}
89+
90+
private boolean shouldStop() {
91+
// Thread.currentThread().isInterrupted() check is mandatory for the loop since it doesn't have method calls that checks the flag
92+
// isProducerFinished.get() && blockingQueue.isEmpty() is also mandatory given the logic inside the calling method
93+
// it allows to consume all data before leaving the calling loop
94+
return Thread.currentThread().isInterrupted() || isProducerFinished.get() && blockingQueue.isEmpty();
95+
}
96+
97+
private boolean hasDrainedData(List<T> buffer) {
98+
return blockingQueue.drainTo(buffer, BUFFER_SIZE) > 0;
99+
}
100+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Copyright (c) 2026, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.sensitivityanalysis.server.util;
8+
9+
import java.util.List;
10+
import java.util.UUID;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.function.BiConsumer;
13+
14+
/**
15+
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
16+
*/
17+
public class BatchAsyncPollerFactory {
18+
19+
public <T> BatchAsyncPoller<T> create(ScheduledExecutorService scheduledExecutorService, UUID resultUuid,
20+
String taskName, BiConsumer<UUID, List<T>> batchHandlingFunction) {
21+
return new BatchAsyncPoller<>(scheduledExecutorService, resultUuid, taskName, batchHandlingFunction);
22+
}
23+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright (c) 2026, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.sensitivityanalysis.server.util;
8+
9+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
10+
import org.springframework.stereotype.Service;
11+
12+
import java.util.Objects;
13+
import java.util.UUID;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.ScheduledExecutorService;
16+
import java.util.concurrent.ThreadFactory;
17+
18+
/**
19+
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
20+
*/
21+
@Service
22+
public class ExecutorProviderService {
23+
public ScheduledExecutorService newScheduledThreadPool(int size, UUID threadPrefix) {
24+
if (size <= 0) {
25+
throw new IllegalArgumentException("Thread pool size must be strictly positive");
26+
}
27+
Objects.requireNonNull(threadPrefix);
28+
29+
ThreadFactory factory = new ThreadFactoryBuilder()
30+
.setNameFormat(threadPrefix + "-%d")
31+
.setDaemon(false)
32+
.build();
33+
return Executors.newScheduledThreadPool(size, factory);
34+
}
35+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright (c) 2024, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.sensitivityanalysis.server.util;
8+
9+
import com.powsybl.sensitivity.SensitivityAnalysisResult;
10+
import com.powsybl.sensitivity.SensitivityResultWriter;
11+
import com.powsybl.sensitivity.SensitivityValue;
12+
import org.gridsuite.sensitivityanalysis.server.service.SensitivityAnalysisResultService;
13+
14+
import java.util.UUID;
15+
import java.util.concurrent.ScheduledExecutorService;
16+
17+
/**
18+
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
19+
*/
20+
public class SensitivityResultPersistedWriter implements SensitivityResultWriter, AutoCloseable {
21+
22+
protected static final String SENSITIVITY_WRITER_THREAD = "sensitivityWriterThread";
23+
protected static final String CONTINGENCY_WRITER_THREAD = "contingencyWriterThread";
24+
25+
private final ScheduledExecutorService scheduledExecutorService;
26+
private final BatchAsyncPoller<SensitivityValue> sensitivityBatchAsyncPoller;
27+
private final BatchAsyncPoller<ContingencyResult> contingencyBatchAsyncPoller;
28+
29+
public SensitivityResultPersistedWriter(UUID resultUuid, SensitivityAnalysisResultService sensitivityAnalysisResultService,
30+
ExecutorProviderService executorProviderService, BatchAsyncPollerFactory batchAsyncPollerFactory) {
31+
this.scheduledExecutorService = executorProviderService.newScheduledThreadPool(2, resultUuid);
32+
this.sensitivityBatchAsyncPoller = batchAsyncPollerFactory.create(this.scheduledExecutorService, resultUuid, SENSITIVITY_WRITER_THREAD, sensitivityAnalysisResultService::writeSensitivityValues);
33+
this.contingencyBatchAsyncPoller = batchAsyncPollerFactory.create(this.scheduledExecutorService, resultUuid, CONTINGENCY_WRITER_THREAD, sensitivityAnalysisResultService::writeContingenciesStatus);
34+
}
35+
36+
@Override
37+
public void writeSensitivityValue(int factorIndex, int contingencyIndex, double value, double functionReference) {
38+
throwOnExecutorShutdown();
39+
40+
if (Double.isNaN(functionReference) || Double.isNaN(value)) {
41+
return;
42+
}
43+
sensitivityBatchAsyncPoller.add(new SensitivityValue(factorIndex, contingencyIndex, value, functionReference));
44+
}
45+
46+
@Override
47+
public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResult.Status status) {
48+
throwOnExecutorShutdown();
49+
50+
contingencyBatchAsyncPoller.add(new ContingencyResult(contingencyIndex, status));
51+
}
52+
53+
@Override
54+
public void close() {
55+
scheduledExecutorService.shutdownNow();
56+
}
57+
58+
public void notifyCompletion() {
59+
sensitivityBatchAsyncPoller.notifyCompletion();
60+
contingencyBatchAsyncPoller.notifyCompletion();
61+
}
62+
63+
public void waitForCompletion() {
64+
sensitivityBatchAsyncPoller.waitForCompletion();
65+
contingencyBatchAsyncPoller.waitForCompletion();
66+
}
67+
68+
private void throwOnExecutorShutdown() {
69+
if (scheduledExecutorService.isShutdown()) {
70+
throw new IllegalStateException("Cannot add data to a finished Writer");
71+
}
72+
}
73+
}

src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
 (0)