Skip to content

Commit 0218957

Browse files
committed
IGNITE-22530 Add atomic write to caches file
1 parent 7d30f0e commit 0218957

3 files changed

Lines changed: 84 additions & 69 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23-
import java.nio.file.StandardOpenOption;
24-
import java.util.Collections;
2523
import java.util.HashSet;
2624
import java.util.Iterator;
2725
import java.util.List;
@@ -46,6 +44,8 @@
4644
import org.apache.ignite.metric.MetricRegistry;
4745
import org.apache.ignite.resources.LoggerResource;
4846

47+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
48+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
4949
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
5050

5151
/**
@@ -81,6 +81,9 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
8181
/** File with saved names of caches added by cache masks. */
8282
private static final String SAVED_CACHES_FILE = "caches";
8383

84+
/** Temporary file with saved names of caches added by cache masks. */
85+
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
86+
8487
/** CDC directory path. */
8588
private Path cdcDir;
8689

@@ -193,7 +196,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
193196

194197
/**
195198
* Finds match between cache name and user's regex templates.
196-
* If match found, adds this cache's id to id's list and saves cache name to file.
199+
* If match is found, adds this cache's id to id's list and saves cache name to file.
197200
*
198201
* @param cacheName Cache name.
199202
*/
@@ -204,7 +207,11 @@ private void matchWithRegexTemplates(String cacheName) {
204207
cachesIds.add(cacheId);
205208

206209
try {
207-
saveCache(cacheName);
210+
List<String> caches = loadCaches();
211+
212+
caches.add(cacheName);
213+
214+
save(caches);
208215
}
209216
catch (IOException e) {
210217
throw new IgniteException(e);
@@ -216,18 +223,28 @@ private void matchWithRegexTemplates(String cacheName) {
216223
}
217224

218225
/**
219-
* Writes cache name to file
226+
* Writes caches list to file
220227
*
221-
* @param cacheName Cache name.
228+
* @param caches Caches list.
222229
*/
223-
private void saveCache(String cacheName) throws IOException {
224-
if (cdcDir != null) {
225-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
230+
private void save(List<String> caches) throws IOException {
231+
if (cdcDir == null) {
232+
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
233+
}
234+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
235+
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
226236

227-
String cn = cacheName + '\n';
237+
StringBuilder cacheList = new StringBuilder();
228238

229-
Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
239+
for (String cache : caches) {
240+
cacheList.append(cache);
241+
242+
cacheList.append('\n');
230243
}
244+
245+
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
246+
247+
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
231248
}
232249

233250
/**
@@ -236,19 +253,19 @@ private void saveCache(String cacheName) throws IOException {
236253
* @return List of saved caches names.
237254
*/
238255
private List<String> loadCaches() throws IOException {
239-
if (cdcDir != null) {
240-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
241-
242-
if (Files.notExists(savedCachesPath)) {
243-
Files.createFile(savedCachesPath);
256+
if (cdcDir == null) {
257+
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
258+
}
259+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
244260

245-
if (log.isInfoEnabled())
246-
log.info("Cache list created: " + savedCachesPath);
247-
}
261+
if (Files.notExists(savedCachesPath)) {
262+
Files.createFile(savedCachesPath);
248263

249-
return Files.readAllLines(savedCachesPath);
264+
if (log.isInfoEnabled())
265+
log.info("Cache list created: " + savedCachesPath);
250266
}
251-
return Collections.emptyList();
267+
268+
return Files.readAllLines(savedCachesPath);
252269
}
253270

254271
/**
@@ -305,17 +322,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {
305322

306323
caches.remove(name);
307324

308-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
309-
310-
StringBuilder cacheList = new StringBuilder();
311-
312-
for (String cache : caches) {
313-
cacheList.append(cache);
314-
315-
cacheList.append('\n');
316-
}
317-
318-
Files.write(savedCachesPath, cacheList.toString().getBytes());
325+
save(caches);
319326

320327
if (log.isInfoEnabled())
321328
log.info("Cache has been removed from replication [cacheName=" + name + ']');

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23-
import java.nio.file.StandardOpenOption;
2423
import java.util.ArrayList;
2524
import java.util.Collection;
26-
import java.util.Collections;
2725
import java.util.HashSet;
2826
import java.util.Iterator;
2927
import java.util.List;
@@ -66,6 +64,8 @@
6664
import org.apache.kafka.common.serialization.ByteArraySerializer;
6765
import org.apache.kafka.common.serialization.IntegerSerializer;
6866

67+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
68+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
6969
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
7070
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
7171
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -163,6 +163,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
163163
/** File with saved names of caches added by cache masks. */
164164
private static final String SAVED_CACHES_FILE = "caches";
165165

166+
/** Temporary file with saved names of caches added by cache masks. */
167+
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
168+
166169
/** CDC directory path. */
167170
private Path cdcDir;
168171

@@ -304,17 +307,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {
304307

305308
caches.remove(name);
306309

307-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
308-
309-
StringBuilder cacheList = new StringBuilder();
310-
311-
for (String cache : caches) {
312-
cacheList.append(cache);
313-
314-
cacheList.append('\n');
315-
}
316-
317-
Files.write(savedCachesPath, cacheList.toString().getBytes());
310+
save(caches);
318311

319312
if (log.isInfoEnabled())
320313
log.info("Cache has been removed from replication [cacheName=" + name + ']');
@@ -481,19 +474,19 @@ private void prepareRegexFilters() {
481474
* @return List of saved caches names.
482475
*/
483476
private List<String> loadCaches() throws IOException {
484-
if (cdcDir != null) {
485-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
486-
487-
if (Files.notExists(savedCachesPath)) {
488-
Files.createFile(savedCachesPath);
477+
if (cdcDir == null) {
478+
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
479+
}
480+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
489481

490-
if (log.isInfoEnabled())
491-
log.info("Cache list created: " + savedCachesPath);
492-
}
482+
if (Files.notExists(savedCachesPath)) {
483+
Files.createFile(savedCachesPath);
493484

494-
return Files.readAllLines(savedCachesPath);
485+
if (log.isInfoEnabled())
486+
log.info("Cache list created: " + savedCachesPath);
495487
}
496-
return Collections.emptyList();
488+
489+
return Files.readAllLines(savedCachesPath);
497490
}
498491

499492
/**
@@ -514,7 +507,7 @@ private boolean matchesFilters(String cacheName) {
514507

515508
/**
516509
* Finds match between cache name and user's regex templates.
517-
* If match found, adds this cache's id to id's list and saves cache name to file.
510+
* If match is found, adds this cache's id to id's list and saves cache name to file.
518511
*
519512
* @param cacheName Cache name.
520513
*/
@@ -525,7 +518,11 @@ private void matchWithRegexTemplates(String cacheName) {
525518
cachesIds.add(cacheId);
526519

527520
try {
528-
saveCache(cacheName);
521+
List<String> caches = loadCaches();
522+
523+
caches.add(cacheName);
524+
525+
save(caches);
529526
}
530527
catch (IOException e) {
531528
throw new IgniteException(e);
@@ -537,18 +534,28 @@ private void matchWithRegexTemplates(String cacheName) {
537534
}
538535

539536
/**
540-
* Writes cache name to file.
537+
* Writes caches list to file
541538
*
542-
* @param cacheName Cache name.
539+
* @param caches Caches list.
543540
*/
544-
private void saveCache(String cacheName) throws IOException {
545-
if (cdcDir != null) {
546-
Path savedCaches = cdcDir.resolve(SAVED_CACHES_FILE);
541+
private void save(List<String> caches) throws IOException {
542+
if (cdcDir == null) {
543+
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
544+
}
545+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
546+
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
547+
548+
StringBuilder cacheList = new StringBuilder();
547549

548-
String cn = cacheName + '\n';
550+
for (String cache : caches) {
551+
cacheList.append(cache);
549552

550-
Files.write(savedCaches, cn.getBytes(), StandardOpenOption.APPEND);
553+
cacheList.append('\n');
551554
}
555+
556+
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
557+
558+
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
552559
}
553560

554561
/**

modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
115115
for (IgniteEx ex : srcCluster) {
116116
int idx = getTestIgniteInstanceIndex(ex.name());
117117

118-
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
118+
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates,
119+
excludeTemplates, "ignite-src-to-kafka-" + idx));
119120
}
120121

121122
for (int i = 0; i < destCluster.length; i++) {
@@ -149,15 +150,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
149150
for (IgniteEx ex : srcCluster) {
150151
int idx = getTestIgniteInstanceIndex(ex.name());
151152

152-
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
153-
excludeTemplates, "ignite-src-to-kafka-" + idx));
153+
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE,
154+
includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
154155
}
155156

156157
for (IgniteEx ex : destCluster) {
157158
int idx = getTestIgniteInstanceIndex(ex.name());
158159

159-
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
160-
excludeTemplates, "ignite-dest-to-kafka-" + idx));
160+
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE,
161+
includeTemplates, excludeTemplates, "ignite-dest-to-kafka-" + idx));
161162
}
162163

163164
futs.add(kafkaToIgnite(

0 commit comments

Comments
 (0)