Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public <T> SearchResponse<T> search(List<FilterType> filters, Integer top, Strin
try {
Assert.hasText(indexPattern, "Parameter indexPattern must not be null or empty");
SearchRequest query = buildQuery(indexPattern, filters, top, pageable);
return client.getClient().search(query, type);
return client.execute(c -> c.search(query, type));
} catch (Exception e) {
throw new RuntimeException(ctx + ": " + e.getMessage());
}
Expand Down Expand Up @@ -400,12 +400,86 @@ public Map<String, Object> getLatestDocument(List<FilterType> filters, String in
public <T> SearchResponse<T> search(SearchRequest request, Class<T> type) {
final String ctx = CLASSNAME + ".search";
try {
return client.getClient().search(request, type);
return client.execute(c -> c.search(request, type));
} catch (Exception e) {
throw new RuntimeException(ctx + ": " + e.getMessage());
}
}

@FunctionalInterface
public interface SearchBatchConsumer<T> {
/** Returns false to stop iteration early. */
boolean accept(List<T> batch) throws Exception;
}

/**
* Streams a result set using search_after pagination, never holding more than {@code pageSize}
* documents in memory at a time. Designed for very large exports where loading every hit at
* once would OOM the JVM (and take the OpenSearch client's I/O reactor down with it).
*
* Sort is forced to {@code @timestamp desc} with {@code _id desc} as tiebreaker so that
* search_after is stable and deterministic.
*
* @param filters filters to apply
* @param max hard upper bound on total documents to emit; null or <=0 means unbounded
* @param indexPattern target index pattern
* @param pageSize batch size (capped at 10000 by OpenSearch per request)
* @param type deserialization type
* @param consumer receives each batch; return false to stop early
* @return total number of documents emitted
*/
public <T> long searchStream(List<FilterType> filters, Integer max, String indexPattern,
int pageSize, Class<T> type, SearchBatchConsumer<T> consumer) {
final String ctx = CLASSNAME + ".searchStream";
try {
Assert.hasText(indexPattern, "Parameter indexPattern must not be null or empty");
Assert.notNull(consumer, "consumer must not be null");
if (pageSize <= 0) pageSize = 500;

long emitted = 0;
List<String> after = null;
while (true) {
int remaining = (max != null && max > 0) ? (int) (max - emitted) : pageSize;
if (remaining <= 0) break;
int size = Math.min(pageSize, remaining);

final List<String> afterFinal = after;
final int sizeFinal = size;
SearchResponse<T> response = client.execute(c -> {
SearchRequest.Builder srb = new SearchRequest.Builder()
.index(indexPattern)
.query(SearchUtil.toQuery(filters))
.size(sizeFinal)
.sort(s -> s.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
.sort(s -> s.field(f -> f.field("_id").order(SortOrder.Desc)));
if (afterFinal != null && !afterFinal.isEmpty())
srb.searchAfter(afterFinal);
return c.search(srb.build(), type);
});

if (response == null || response.hits() == null) break;
List<org.opensearch.client.opensearch.core.search.Hit<T>> hits = response.hits().hits();
if (hits == null || hits.isEmpty()) break;

List<T> batch = new ArrayList<>(hits.size());
for (org.opensearch.client.opensearch.core.search.Hit<T> h : hits)
batch.add(h.source());

boolean keepGoing = consumer.accept(batch);
emitted += hits.size();

if (!keepGoing) break;
if (hits.size() < size) break;

after = hits.get(hits.size() - 1).sort();
if (after == null || after.isEmpty()) break;
}
return emitted;
} catch (Exception e) {
throw new RuntimeException(ctx + ": " + e.getMessage(), e);
}
}

public void updateByQuery(Query query, String index, String script) {
final String ctx = CLASSNAME + ".updateByQuery";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,71 @@
public class OpensearchClientBuilder {
private static final String CLASSNAME = "OpensearchClientBuilder";
private final Logger log = LoggerFactory.getLogger(OpensearchClientBuilder.class);
private OpenSearch client;
private volatile OpenSearch client;

@FunctionalInterface
public interface OsAction<T> {
T apply(OpenSearch client) throws Exception;
}

@Order(Ordered.HIGHEST_PRECEDENCE)
@EventListener(ApplicationReadyEvent.class)
public void init() throws Exception {
final String ctx = CLASSNAME + ".init";
buildClient();
}

public OpenSearch getClient() {
return client;
}

/**
* Runs an action against the OpenSearch client with one-shot recovery: if the underlying
* Apache HttpAsyncClient I/O reactor has transitioned to STOPPED (typically after an OOM
* or a fatal callback exception while streaming a very large response), the singleton
* client is rebuilt and the action is retried once. All other failures propagate unchanged.
* Callers that don't need recovery should keep using {@link #getClient()} directly.
*/
public <T> T execute(OsAction<T> action) throws Exception {
try {
return action.apply(client);
} catch (Exception e) {
if (!isReactorStopped(e))
throw e;
log.warn("OpenSearch I/O reactor is STOPPED; rebuilding client and retrying once", e);
rebuild();
return action.apply(client);
}
}

public synchronized void rebuild() {
final String ctx = CLASSNAME + ".rebuild";
try {
OpenSearch old = this.client;
buildClient();
tryClose(old);
} catch (Exception e) {
String msg = ctx + ": " + e.getMessage();
log.error(msg);
throw new RuntimeException(msg);
}
}

private synchronized void buildClient() {
final String ctx = CLASSNAME + ".buildClient";
try {
String host = System.getenv(Constants.ENV_ELASTICSEARCH_HOST);
Assert.hasText(host, "Environment variable ELASTICSEARCH_HOST is missing or his value is null or empty");
Assert.hasText(host, "Environment variable ELASTICSEARCH_HOST is missing or its value is null or empty");

String port = System.getenv(Constants.ENV_ELASTICSEARCH_PORT);
Assert.hasText(port, "Environment variable ELASTICSEARCH_PORT is missing or his value is null or empty");
Assert.hasText(port, "Environment variable ELASTICSEARCH_PORT is missing or its value is null or empty");

String user = System.getenv(Constants.ENV_ELASTICSEARCH_USER);
Assert.hasText(user, "Environment variable ELASTICSEARCH_USER is missing or his value is null or empty");
Assert.hasText(user, "Environment variable ELASTICSEARCH_USER is missing or its value is null or empty");

String password = System.getenv(Constants.ENV_ELASTICSEARCH_PASSWORD);
Assert.hasText(password, "Environment variable ELASTICSEARCH_PASSWORD is missing or his value is null or empty");
Assert.hasText(password, "Environment variable ELASTICSEARCH_PASSWORD is missing or its value is null or empty");

client = OpenSearch.builder()
this.client = OpenSearch.builder()
.withHost(host, Integer.parseInt(port), HttpScheme.https)
.withCredentials(user, password)
.build();
Expand All @@ -46,7 +91,28 @@ public void init() throws Exception {
}
}

public OpenSearch getClient() {
return client;
private void tryClose(OpenSearch old) {
if (old == null) return;
try {
if (old instanceof AutoCloseable) {
((AutoCloseable) old).close();
}
} catch (Exception ignored) {
// best-effort: the old client is unusable anyway
}
}

/**
* Detects the Apache HttpAsyncClient "Request cannot be executed; I/O reactor status: STOPPED"
* condition anywhere in the cause chain.
*/
public static boolean isReactorStopped(Throwable t) {
while (t != null) {
String msg = t.getMessage();
if (msg != null && msg.contains("I/O reactor") && msg.contains("STOPPED"))
return true;
t = t.getCause();
}
return false;
}
}
93 changes: 92 additions & 1 deletion backend/src/main/java/com/park/utmstack/util/UtilCsv.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.park.utmstack.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.park.utmstack.domain.shared_types.DataColumn;
Expand All @@ -13,6 +14,7 @@
import org.springframework.util.StringUtils;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,6 +53,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
List<String[]> rows = new ArrayList<>();

data.forEach(d -> {
DocumentContext docctx = JsonPath.parse(d);
String[] cells = new String[columns.length];
for (int i = 0; i < columns.length; i++) {
String fieldName = columns[i].getField();
Expand All @@ -59,7 +62,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]

Object value;
try {
value = JsonPath.parse(d).read("$." + fieldName);
value = docctx.read("$." + fieldName);
} catch (PathNotFoundException e) {
continue;
}
Expand All @@ -81,6 +84,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
cells[i] = value.toString();
}
}
cells[i] = sanitizeCsvCell(cells[i]);
}
rows.add(cells);
});
Expand All @@ -104,4 +108,91 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
throw new UtmCsvException(msg);
}
}

/**
* Opens a CSV response stream: sets content-type/disposition headers and writes the header row.
* Caller is responsible for closing the returned printer (try-with-resources is fine).
*
* Column names are normalized in-place by stripping a trailing {@code .keyword}.
*/
public static CSVPrinter openCsvStream(HttpServletResponse response, DataColumn[] columns) throws IOException {
Assert.notEmpty(columns);

Arrays.stream(columns).forEach(column ->
column.setField(column.getField().replace(".keyword", "")));

String[] headers = Stream.of(columns).map(column -> {
if (StringUtils.hasText(column.getLabel()))
return column.getLabel();
return column.getField().replace(".keyword", "");
}).toArray(String[]::new);

response.setContentType("text/csv");
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=data.csv");

return new CSVPrinter(response.getWriter(),
CSVFormat.DEFAULT.withHeader(headers).withQuoteMode(QuoteMode.ALL));
}

/**
* Writes a batch of source maps as CSV rows using the same field-extraction logic as
* {@link #prepareToDownload}. Intended to be called repeatedly while paginating through
* a large result set; pair with {@link #openCsvStream}.
*/
public static void writeCsvBatch(CSVPrinter printer, DataColumn[] columns, List<?> data) throws IOException {
if (data == null || data.isEmpty()) return;

final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss z")
.withLocale(Locale.getDefault()).withZone(TimezoneUtil.getAppTimezone());

for (Object d : data) {
DocumentContext ctx = JsonPath.parse(d);
String[] cells = new String[columns.length];
for (int i = 0; i < columns.length; i++) {
String fieldName = columns[i].getField();
String fieldType = columns[i].getType();
cells[i] = null;

Object value;
try {
value = ctx.read("$." + fieldName);
} catch (PathNotFoundException e) {
continue;
}

if (value == null)
continue;

if (value instanceof String) {
cells[i] = "date".equals(fieldType) ? DATE_FORMATTER.format(Instant.parse(String.valueOf(value))) :
String.valueOf(value).replace("\n", " ").replace("\t", " ");
} else if (value instanceof List) {
cells[i] = ((List<?>) value).stream().map(String::valueOf).collect(Collectors.joining(","));
} else if (value instanceof Number) {
cells[i] = String.valueOf(value);
} else if (value instanceof Map) {
try {
cells[i] = OBJECT_MAPPER.writeValueAsString(value);
} catch (Exception ex) {
cells[i] = value.toString();
}
}
cells[i] = sanitizeCsvCell(cells[i]);
}
printer.printRecord((Object[]) cells);
}
printer.flush();
}

/**
* Neutralizes CSV-injection payloads by prefixing a single quote to any cell whose first
* character is interpreted as a formula trigger by Excel/LibreOffice/Sheets.
*/
private static String sanitizeCsvCell(String value) {
if (value == null || value.isEmpty()) return value;
char first = value.charAt(0);
if (first == '=' || first == '+' || first == '-' || first == '@' || first == '\t' || first == '\r')
return "'" + value;
return value;
}
}
Loading
Loading