Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon CloudWatch EMF Metric Publisher",
"contributor": "humanzz",
"description": "Add `propertiesSupplier` to `EmfMetricLoggingPublisher.Builder`, enabling users to enrich EMF records with custom key-value properties that are searchable in CloudWatch Logs Insights. See [#6595](https://github.com/aws/aws-sdk-java-v2/issues/6595)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.Immutable;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
Expand Down Expand Up @@ -82,6 +84,7 @@ private EmfMetricLoggingPublisher(Builder builder) {
.dimensions(builder.dimensions)
.metricLevel(builder.metricLevel)
.metricCategories(builder.metricCategories)
.propertiesSupplier(builder.propertiesSupplier)
.build();

this.metricConverter = new MetricEmfConverter(config);
Expand Down Expand Up @@ -123,6 +126,7 @@ public static final class Builder {
private Collection<SdkMetric<String>> dimensions;
private Collection<MetricCategory> metricCategories;
private MetricLevel metricLevel;
private Supplier<Map<String, String>> propertiesSupplier;

private Builder() {
}
Expand Down Expand Up @@ -217,6 +221,27 @@ public Builder metricLevel(MetricLevel metricLevel) {
}


/**
* Configure a supplier of custom properties to include in each EMF record.
* The supplier is invoked on each {@link #publish(MetricCollection)} call,
* and the returned map entries are written as top-level key-value pairs
* in the EMF JSON output. These appear as searchable fields in
* CloudWatch Logs Insights.
*
* <p>Keys that collide with reserved EMF fields ({@code _aws}), configured
* dimension names, or reported metric names are silently skipped.
*
* <p>If this is not specified, no custom properties are added.
*
* @param propertiesSupplier a supplier returning a map of property names to values,
* or {@code null} to disable custom properties
* @return this builder
*/
public Builder propertiesSupplier(Supplier<Map<String, String>> propertiesSupplier) {
this.propertiesSupplier = propertiesSupplier;
return this;
}

/**
* Build a {@link EmfMetricLoggingPublisher} using the configuration currently configured on this publisher.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -43,13 +45,17 @@ public final class EmfMetricConfiguration {
private final Set<SdkMetric<String>> dimensions;
private final Collection<MetricCategory> metricCategories;
private final MetricLevel metricLevel;
private final Supplier<Map<String, String>> propertiesSupplier;

private EmfMetricConfiguration(Builder builder) {
this.namespace = builder.namespace == null ? DEFAULT_NAMESPACE : builder.namespace;
this.logGroupName = Validate.paramNotNull(resolveLogGroupName(builder), "logGroupName");
this.dimensions = builder.dimensions == null ? DEFAULT_DIMENSIONS : new HashSet<>(builder.dimensions);
this.metricCategories = builder.metricCategories == null ? DEFAULT_CATEGORIES : new HashSet<>(builder.metricCategories);
this.metricLevel = builder.metricLevel == null ? DEFAULT_METRIC_LEVEL : builder.metricLevel;
this.propertiesSupplier = builder.propertiesSupplier == null
? Collections::emptyMap
: builder.propertiesSupplier;
}


Expand All @@ -59,6 +65,7 @@ public static class Builder {
private Collection<SdkMetric<String>> dimensions;
private Collection<MetricCategory> metricCategories;
private MetricLevel metricLevel;
private Supplier<Map<String, String>> propertiesSupplier;

public Builder namespace(String namespace) {
this.namespace = namespace;
Expand All @@ -85,6 +92,11 @@ public Builder metricLevel(MetricLevel metricLevel) {
return this;
}

public Builder propertiesSupplier(Supplier<Map<String, String>> propertiesSupplier) {
this.propertiesSupplier = propertiesSupplier;
return this;
}

public EmfMetricConfiguration build() {
return new EmfMetricConfiguration(this);
}
Expand All @@ -110,6 +122,10 @@ public MetricLevel metricLevel() {
return metricLevel;
}

public Supplier<Map<String, String>> propertiesSupplier() {
return propertiesSupplier;
}

private String resolveLogGroupName(Builder builder) {
return builder.logGroupName != null ? builder.logGroupName :
SystemSettingUtils.resolveEnvironmentVariable("AWS_LAMBDA_LOG_GROUP_NAME").orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.metrics.MetricCategory;
Expand Down Expand Up @@ -66,12 +68,14 @@ public class MetricEmfConverter {
private final EmfMetricConfiguration config;
private final boolean metricCategoriesContainsAll;
private final Clock clock;
private final Supplier<Map<String, String>> propertiesSupplier;

@SdkTestInternalApi
public MetricEmfConverter(EmfMetricConfiguration config, Clock clock) {
this.config = config;
this.clock = clock;
this.metricCategoriesContainsAll = config.metricCategories().contains(MetricCategory.ALL);
this.propertiesSupplier = config.propertiesSupplier();
}

public MetricEmfConverter(EmfMetricConfiguration config) {
Expand Down Expand Up @@ -136,7 +140,18 @@ public List<String> convertMetricCollectionToEmf(MetricCollection metricCollecti
}
}

return createEmfStrings(aggregatedMetrics);
Map<String, String> properties = resolveProperties();
return createEmfStrings(aggregatedMetrics, properties);
}

private Map<String, String> resolveProperties() {
try {
Map<String, String> result = propertiesSupplier.get();
return result == null ? Collections.emptyMap() : result;
} catch (Exception e) {
logger.warn(() -> "Properties supplier threw an exception, publishing without custom properties", e);
return Collections.emptyMap();
}
}

/**
Expand Down Expand Up @@ -188,7 +203,8 @@ private void processAndWriteValue(JsonWriter jsonWriter, MetricRecord<?> mRecord
}
}

private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> aggregatedMetrics) {
private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> aggregatedMetrics,
Map<String, String> properties) {
List<String> emfStrings = new ArrayList<>();
Map<SdkMetric<?>, List<MetricRecord<?>>> currentMetricBatch = new HashMap<>();

Expand All @@ -204,24 +220,26 @@ private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> a
}

if (currentMetricBatch.size() == MAX_METRIC_NUM) {
emfStrings.add(createEmfString(currentMetricBatch));
emfStrings.add(createEmfString(currentMetricBatch, properties));
currentMetricBatch = new HashMap<>();
}

currentMetricBatch.put(metric, records);
}

emfStrings.add(createEmfString(currentMetricBatch));
emfStrings.add(createEmfString(currentMetricBatch, properties));

return emfStrings;
}


private String createEmfString(Map<SdkMetric<?>, List<MetricRecord<?>>> metrics) {
private String createEmfString(Map<SdkMetric<?>, List<MetricRecord<?>>> metrics,
Map<String, String> properties) {

JsonWriter jsonWriter = JsonWriter.create();
jsonWriter.writeStartObject();

writeCustomProperties(jsonWriter, properties);
writeAwsObject(jsonWriter, metrics.keySet());
writeMetricValues(jsonWriter, metrics);

Expand All @@ -231,6 +249,16 @@ private String createEmfString(Map<SdkMetric<?>, List<MetricRecord<?>>> metrics)

}

private void writeCustomProperties(JsonWriter jsonWriter, Map<String, String> properties) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
jsonWriter.writeFieldName(entry.getKey());
jsonWriter.writeValue(entry.getValue());
}
}




private void writeAwsObject(JsonWriter jsonWriter, Set<SdkMetric<?>> metricNames) {
jsonWriter.writeFieldName("_aws");
jsonWriter.writeStartObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.http.HttpMetric;
Expand Down Expand Up @@ -97,4 +103,96 @@ void Publish_multipleMetrics() {
assertThat(loggedEvents()).hasSize(2);
}

@Test
void publish_propertiesSupplierThrowsException_publishesWithoutCustomProperties() {
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> { throw new RuntimeException("supplier failed"); })
.build();

MetricCollector metricCollector = MetricCollector.create("test");
metricCollector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(metricCollector.collect());

// Should have: 1 warning about supplier + 1 EMF info log
boolean hasWarning = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.WARN
&& e.getMessage().getFormattedMessage().contains("Properties supplier threw an exception"));
assertThat(hasWarning).isTrue();

boolean hasEmfOutput = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"));
assertThat(hasEmfOutput).isTrue();

// EMF output should not contain any custom properties
String emfLog = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.findFirst().get().getMessage().getFormattedMessage();
assertThat(emfLog).contains("\"AvailableConcurrency\":5");
}

@Test
void publish_propertiesSupplierReturnsNull_publishesWithoutCustomProperties() {
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> null)
.build();

MetricCollector metricCollector = MetricCollector.create("test");
metricCollector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(metricCollector.collect());

// Should have EMF output without custom properties
boolean hasEmfOutput = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"));
assertThat(hasEmfOutput).isTrue();

String emfLog = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.findFirst().get().getMessage().getFormattedMessage();
assertThat(emfLog).contains("\"AvailableConcurrency\":5");
// No warning should be logged for null return
boolean hasWarning = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.WARN);
assertThat(hasWarning).isFalse();
}

@Test
void publish_statefulSupplier_eachPublishUsesCurrentMap() {
AtomicInteger counter = new AtomicInteger(0);
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> {
int count = counter.incrementAndGet();
Map<String, String> map = new HashMap<String, String>();
map.put("InvocationCount", String.valueOf(count));
return map;
})
.build();

// First publish
MetricCollector mc1 = MetricCollector.create("test1");
mc1.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(mc1.collect());

// Second publish
MetricCollector mc2 = MetricCollector.create("test2");
mc2.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 10);
publisher.publish(mc2.collect());

// Collect all EMF info logs
List<String> emfLogs = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.map(e -> e.getMessage().getFormattedMessage())
.collect(java.util.stream.Collectors.toList());

assertThat(emfLogs).hasSize(2);
assertThat(emfLogs.get(0)).contains("\"InvocationCount\":\"1\"");
assertThat(emfLogs.get(1)).contains("\"InvocationCount\":\"2\"");
}
}
Loading
Loading