Skip to content

Commit 3105641

Browse files
committed
fix(test): chunk PopulateDBConfiguration bulk inserts and fail fast on errors
Bulk requests of 1500+ channels with rich property payloads exceeded the default ES circuit-breaker limit (~102 MiB), causing 429/413 rejections in CI. Split channel inserts into batches of 1000 and throw on bulk errors rather than logging and continuing with a silently empty index. Same fail-fast treatment applied to checkBulkResponse (properties/tags).
1 parent 5240fab commit 3105641

1 file changed

Lines changed: 44 additions & 34 deletions

File tree

src/main/java/org/phoebus/channelfinder/configuration/PopulateDBConfiguration.java

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -225,28 +225,27 @@ private void checkBulkResponse(BulkRequest.Builder br) {
225225
try {
226226
BulkResponse results = client.bulk(br.build());
227227
if (results.errors()) {
228-
logger.log(Level.SEVERE, "CreateDB Bulk had errors");
228+
StringBuilder errors = new StringBuilder("CreateDB bulk had errors:");
229229
for (BulkResponseItem item : results.items()) {
230230
if (item.error() != null) {
231-
logger.log(Level.SEVERE, () -> item.error().reason());
231+
errors.append("\n ").append(item.error().reason());
232232
}
233233
}
234+
throw new RuntimeException(errors.toString());
234235
}
235236
} catch (IOException e) {
236-
logger.log(Level.WARNING, "CreateDB Bulk operation failed.", e);
237+
throw new RuntimeException("CreateDB bulk operation failed", e);
237238
}
238239
}
239240

240241
private void bulkInsertAllChannels(Collection<Channel> channels) {
242+
logger.info("Bulk inserting channels");
241243
try {
242-
logger.info("Bulk inserting channels");
243-
244244
bulkInsertChannels(channels);
245-
channels.clear();
246-
247-
} catch (Exception e) {
248-
logger.log(Level.WARNING, e.getMessage(), e);
245+
} catch (IOException e) {
246+
throw new RuntimeException("Failed to bulk insert channels into test database", e);
249247
}
248+
channels.clear();
250249
}
251250

252251
private void createBOChannels(Collection<Channel> channels, int freq) {
@@ -400,32 +399,43 @@ private Collection<Channel> insertSRCell(String cell) {
400399
return result;
401400
}
402401

403-
private void bulkInsertChannels(Collection<Channel> result) throws IOException {
404-
long start = System.currentTimeMillis();
405-
BulkRequest.Builder br = new BulkRequest.Builder();
406-
for (Channel channel : result) {
407-
br.operations(
408-
op ->
409-
op.index(
410-
IndexOperation.of(
411-
i ->
412-
i.index(esService.getES_CHANNEL_INDEX())
413-
.id(channel.getName())
414-
.document(channel))));
415-
}
416-
String prepare = "|Prepare: " + (System.currentTimeMillis() - start) + "|";
417-
start = System.currentTimeMillis();
418-
br.refresh(Refresh.True);
419-
420-
BulkResponse srResult = client.bulk(br.build());
421-
String execute = "|Execute: " + (System.currentTimeMillis() - start) + "|";
422-
logger.log(Level.INFO, () -> "Inserted cell " + prepare + " " + execute);
423-
if (srResult.errors()) {
424-
logger.log(Level.SEVERE, "Bulk insert had errors");
425-
for (BulkResponseItem item : srResult.items()) {
426-
if (item.error() != null) {
427-
logger.log(Level.SEVERE, () -> item.error().reason());
402+
private static final int BULK_INSERT_BATCH_SIZE = 1000;
403+
404+
private void bulkInsertChannels(Collection<Channel> channels) throws IOException {
405+
List<Channel> list = new ArrayList<>(channels);
406+
for (int offset = 0; offset < list.size(); offset += BULK_INSERT_BATCH_SIZE) {
407+
List<Channel> batch =
408+
list.subList(offset, Math.min(offset + BULK_INSERT_BATCH_SIZE, list.size()));
409+
int batchCount = batch.size();
410+
long t0 = System.currentTimeMillis();
411+
BulkRequest.Builder br = new BulkRequest.Builder();
412+
for (Channel channel : batch) {
413+
br.operations(
414+
op ->
415+
op.index(
416+
IndexOperation.of(
417+
i ->
418+
i.index(esService.getES_CHANNEL_INDEX())
419+
.id(channel.getName())
420+
.document(channel))));
421+
}
422+
String prepare = "|Prepare: " + (System.currentTimeMillis() - t0) + "|";
423+
t0 = System.currentTimeMillis();
424+
br.refresh(Refresh.True);
425+
BulkResponse result = client.bulk(br.build());
426+
String execute = "|Execute: " + (System.currentTimeMillis() - t0) + "|";
427+
logger.log(
428+
Level.INFO,
429+
() -> "Inserted batch (" + batchCount + " channels) " + prepare + " " + execute);
430+
if (result.errors()) {
431+
StringBuilder errors = new StringBuilder("Bulk insert batch had errors:");
432+
for (BulkResponseItem item : result.items()) {
433+
if (item.error() != null) {
434+
errors.append("\n ").append(item.error().reason());
435+
}
428436
}
437+
logger.log(Level.SEVERE, errors.toString());
438+
throw new IOException(errors.toString());
429439
}
430440
}
431441
}

0 commit comments

Comments
 (0)