Skip to content

Commit 8821005

Browse files
authored
ENH: support custom index template for ES6 in opensearch sink (opensearch-project#3061)
Signed-off-by: George Chen <qchea@amazon.com>
1 parent c753ba6 commit 8821005

21 files changed

Lines changed: 1136 additions & 514 deletions

data-prepper-plugins/opensearch/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Default is null.
9393

9494
- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.
9595

96-
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration) and `distribution_version` is `null`, otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
96+
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
9797

9898
- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.
9999

@@ -133,7 +133,7 @@ Default is null.
133133
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
134134
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.
135135
- Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
136-
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint.
136+
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. Note: when `distribution_version` is `es6`, `template_type` is enforced into `v1`.
137137

138138
- <a name="template_file"></a>`template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
139139
`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/),

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder;
5353
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager;
5454
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
55+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
56+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
5557
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
5658
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
5759
import org.slf4j.Logger;
@@ -190,7 +192,10 @@ private void doInitializeInternal() throws IOException {
190192
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient(awsCredentialsSupplier);
191193
openSearchClient = openSearchSinkConfig.getConnectionConfiguration().createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
192194
configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias();
193-
final TemplateStrategy templateStrategy = openSearchSinkConfig.getIndexConfiguration().getTemplateType().createTemplateStrategy(openSearchClient);
195+
final IndexTemplateAPIWrapper indexTemplateAPIWrapper = IndexTemplateAPIWrapperFactory.getWrapper(
196+
openSearchSinkConfig.getIndexConfiguration(), openSearchClient);
197+
final TemplateStrategy templateStrategy = openSearchSinkConfig.getIndexConfiguration().getTemplateType()
198+
.createTemplateStrategy(indexTemplateAPIWrapper);
194199
indexManager = indexManagerFactory.getIndexManager(indexType, openSearchClient, restHighLevelClient,
195200
openSearchSinkConfig, templateStrategy, configuredIndexAlias);
196201
final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Optional;
8+
9+
public class ComposableIndexTemplate implements IndexTemplate {
10+
11+
private final Map<String, Object> indexTemplateMap;
12+
private String name;
13+
14+
public ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
15+
this.indexTemplateMap = new HashMap<>(indexTemplateMap);
16+
}
17+
18+
@Override
19+
public void setTemplateName(final String name) {
20+
this.name = name;
21+
22+
}
23+
24+
@Override
25+
public void setIndexPatterns(final List<String> indexPatterns) {
26+
indexTemplateMap.put("index_patterns", indexPatterns);
27+
}
28+
29+
@Override
30+
public void putCustomSetting(final String name, final Object value) {
31+
32+
}
33+
34+
@Override
35+
public Optional<Long> getVersion() {
36+
if(!indexTemplateMap.containsKey("version"))
37+
return Optional.empty();
38+
final Number version = (Number) indexTemplateMap.get("version");
39+
return Optional.of(version.longValue());
40+
}
41+
42+
public Map<String, Object> getIndexTemplateMap() {
43+
return Collections.unmodifiableMap(indexTemplateMap);
44+
}
45+
46+
public String getName() {
47+
return name;
48+
}
49+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategy.java

Lines changed: 14 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,10 @@
55

66
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
77

8-
import com.fasterxml.jackson.databind.ObjectMapper;
9-
import jakarta.json.stream.JsonParser;
10-
import org.opensearch.client.json.JsonpDeserializer;
11-
import org.opensearch.client.json.JsonpMapper;
12-
import org.opensearch.client.json.ObjectBuilderDeserializer;
13-
import org.opensearch.client.json.ObjectDeserializer;
14-
import org.opensearch.client.opensearch.OpenSearchClient;
15-
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
16-
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
178
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
18-
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
199
import org.opensearch.client.opensearch.indices.get_index_template.IndexTemplateItem;
20-
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
21-
import org.opensearch.client.transport.endpoints.BooleanResponse;
2210

23-
import java.io.ByteArrayInputStream;
2411
import java.io.IOException;
25-
import java.nio.charset.StandardCharsets;
26-
import java.util.HashMap;
2712
import java.util.List;
2813
import java.util.Map;
2914
import java.util.Optional;
@@ -32,18 +17,24 @@
3217
* A {@link TemplateStrategy} for the OpenSearch <a href="https://opensearch.org/docs/latest/im-plugin/index-templates/">index template</a>.
3318
*/
3419
class ComposableIndexTemplateStrategy implements TemplateStrategy {
35-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
36-
private final OpenSearchClient openSearchClient;
20+
private final IndexTemplateAPIWrapper<GetIndexTemplateResponse> indexTemplateAPIWrapper;
3721

38-
public ComposableIndexTemplateStrategy(final OpenSearchClient openSearchClient) {
39-
this.openSearchClient = openSearchClient;
22+
public ComposableIndexTemplateStrategy(final IndexTemplateAPIWrapper<GetIndexTemplateResponse> indexTemplateAPIWrapper) {
23+
this.indexTemplateAPIWrapper = indexTemplateAPIWrapper;
4024
}
4125

4226
@Override
4327
public Optional<Long> getExistingTemplateVersion(final String templateName) throws IOException {
44-
return getIndexTemplate(templateName)
45-
.map(IndexTemplateItem::indexTemplate)
46-
.map(indexTemplate -> indexTemplate.version());
28+
return indexTemplateAPIWrapper.getTemplate(templateName)
29+
.map(getIndexTemplateResponse -> {
30+
final List<IndexTemplateItem> indexTemplateItems = getIndexTemplateResponse.indexTemplates();
31+
if (indexTemplateItems.size() == 1) {
32+
return indexTemplateItems.stream().findFirst().get().indexTemplate().version();
33+
} else {
34+
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
35+
templateName));
36+
}
37+
});
4738
}
4839

4940
@Override
@@ -53,104 +44,6 @@ public IndexTemplate createIndexTemplate(final Map<String, Object> templateMap)
5344

5445
@Override
5546
public void createTemplate(final IndexTemplate indexTemplate) throws IOException {
56-
if(!(indexTemplate instanceof ComposableIndexTemplate)) {
57-
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
58-
}
59-
60-
final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;
61-
62-
final Map<String, Object> templateMapping = composableIndexTemplate.indexTemplateMap;
63-
64-
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(templateMapping);
65-
66-
final ByteArrayInputStream byteIn = new ByteArrayInputStream(
67-
indexTemplateString.getBytes(StandardCharsets.UTF_8));
68-
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
69-
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);
70-
71-
final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer.getJsonpDeserializer(composableIndexTemplate.name)
72-
.deserialize(parser, mapper);
73-
74-
openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);
75-
76-
}
77-
78-
private Optional<IndexTemplateItem> getIndexTemplate(final String indexTemplateName) throws IOException {
79-
final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder()
80-
.name(indexTemplateName)
81-
.build();
82-
final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest);
83-
84-
if (!existsResponse.value()) {
85-
return Optional.empty();
86-
}
87-
88-
final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder()
89-
.name(indexTemplateName)
90-
.build();
91-
final GetIndexTemplateResponse indexTemplateResponse = openSearchClient.indices().getIndexTemplate(getRequest);
92-
93-
final List<IndexTemplateItem> indexTemplateItems = indexTemplateResponse.indexTemplates();
94-
if (indexTemplateItems.size() == 1) {
95-
return indexTemplateItems.stream().findFirst();
96-
} else {
97-
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
98-
indexTemplateName));
99-
}
100-
}
101-
102-
static class ComposableIndexTemplate implements IndexTemplate {
103-
104-
private final Map<String, Object> indexTemplateMap;
105-
private String name;
106-
107-
private ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
108-
this.indexTemplateMap = new HashMap<>(indexTemplateMap);
109-
}
110-
111-
@Override
112-
public void setTemplateName(final String name) {
113-
this.name = name;
114-
115-
}
116-
117-
@Override
118-
public void setIndexPatterns(final List<String> indexPatterns) {
119-
indexTemplateMap.put("index_patterns", indexPatterns);
120-
}
121-
122-
@Override
123-
public void putCustomSetting(final String name, final Object value) {
124-
125-
}
126-
127-
@Override
128-
public Optional<Long> getVersion() {
129-
if(!indexTemplateMap.containsKey("version"))
130-
return Optional.empty();
131-
final Number version = (Number) indexTemplateMap.get("version");
132-
return Optional.of(version.longValue());
133-
}
134-
}
135-
136-
private static class PutIndexTemplateRequestDeserializer {
137-
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {
138-
139-
objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
140-
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
141-
"index_patterns");
142-
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
143-
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
144-
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
145-
"composed_of");
146-
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
147-
}
148-
149-
static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
150-
return ObjectBuilderDeserializer
151-
.lazy(
152-
() -> new PutIndexTemplateRequest.Builder().name(name),
153-
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
154-
}
47+
indexTemplateAPIWrapper.putTemplate(indexTemplate);
15548
}
15649
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import jakarta.json.stream.JsonParser;
5+
import org.opensearch.client.json.JsonpDeserializer;
6+
import org.opensearch.client.json.JsonpMapper;
7+
import org.opensearch.client.json.ObjectBuilderDeserializer;
8+
import org.opensearch.client.json.ObjectDeserializer;
9+
import org.opensearch.client.opensearch.OpenSearchClient;
10+
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
11+
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
12+
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
13+
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
14+
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
15+
import org.opensearch.client.transport.endpoints.BooleanResponse;
16+
17+
import java.io.ByteArrayInputStream;
18+
import java.io.IOException;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Optional;
21+
22+
public class ComposableTemplateAPIWrapper implements IndexTemplateAPIWrapper<GetIndexTemplateResponse> {
23+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
24+
private final OpenSearchClient openSearchClient;
25+
26+
public ComposableTemplateAPIWrapper(final OpenSearchClient openSearchClient) {
27+
this.openSearchClient = openSearchClient;
28+
}
29+
30+
@Override
31+
public void putTemplate(final IndexTemplate indexTemplate) throws IOException {
32+
if(!(indexTemplate instanceof ComposableIndexTemplate)) {
33+
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
34+
}
35+
36+
final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;
37+
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(
38+
composableIndexTemplate.getIndexTemplateMap());
39+
40+
final ByteArrayInputStream byteIn = new ByteArrayInputStream(
41+
indexTemplateString.getBytes(StandardCharsets.UTF_8));
42+
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
43+
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);
44+
45+
final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer
46+
.getJsonpDeserializer(composableIndexTemplate.getName())
47+
.deserialize(parser, mapper);
48+
49+
openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);
50+
}
51+
52+
@Override
53+
public Optional<GetIndexTemplateResponse> getTemplate(final String indexTemplateName) throws IOException {
54+
final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder()
55+
.name(indexTemplateName)
56+
.build();
57+
final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest);
58+
59+
if (!existsResponse.value()) {
60+
return Optional.empty();
61+
}
62+
63+
final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder()
64+
.name(indexTemplateName)
65+
.build();
66+
return Optional.of(openSearchClient.indices().getIndexTemplate(getRequest));
67+
}
68+
69+
private static class PutIndexTemplateRequestDeserializer {
70+
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {
71+
72+
objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
73+
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
74+
"index_patterns");
75+
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
76+
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
77+
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
78+
"composed_of");
79+
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
80+
}
81+
82+
static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
83+
return ObjectBuilderDeserializer
84+
.lazy(
85+
() -> new PutIndexTemplateRequest.Builder().name(name),
86+
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)