From 821e8512ba8a33af7f024ad69e99747226425506 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 24 Mar 2026 14:13:35 -0400 Subject: [PATCH 1/2] Add CosmosEndToEndOperationLatencyPolicyConfig support to azure-cosmos-benchmark Add per-request E2E timeout policy and availability strategy configuration to the benchmark module. The policy is applied on Cosmos(Item|Query)RequestOptions, not at the client builder level, to avoid impacting metadata calls during startup. Changes: - TenantWorkloadConfig: Add endToEndTimeoutMs and availabilityStrategyEnabled JSON config fields with getters and applyField support - AsyncBenchmark: Build CosmosEndToEndOperationLatencyPolicyConfig once in the base class constructor, expose as e2ePolicyConfig for subclasses - AsyncReadBenchmark: Create CosmosItemRequestOptions with E2E policy, pass to readItem() (was using the 3-arg overload without options) - AsyncWriteBenchmark: Create CosmosItemRequestOptions with E2E policy, replace null options in createItem() - AsyncQueryBenchmark: Set E2E policy on CosmosQueryRequestOptions immediately after construction (applies to all query operation types) Usage in workload config JSON: "endToEndTimeoutMs": 3000, "availabilityStrategyEnabled": true Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure/cosmos/benchmark/AsyncBenchmark.java | 15 +++++++++++++++ .../cosmos/benchmark/AsyncQueryBenchmark.java | 3 +++ .../cosmos/benchmark/AsyncReadBenchmark.java | 9 ++++++++- .../cosmos/benchmark/AsyncWriteBenchmark.java | 8 +++++++- .../cosmos/benchmark/TenantWorkloadConfig.java | 13 +++++++++++++ 5 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index bbf9b10a14f3..d54d25316832 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -10,6 +10,8 @@ import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnosticsThresholds; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosContainerProactiveInitConfigBuilder; import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; @@ -60,12 +62,25 @@ abstract class AsyncBenchmark implements Benchmark { final String partitionKey; final TenantWorkloadConfig workloadConfig; final List docsToRead; + final CosmosEndToEndOperationLatencyPolicyConfig e2ePolicyConfig; AsyncBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { workloadConfig = cfg; this.benchmarkScheduler = scheduler; + // Build E2E timeout policy if configured (applied per-request, NOT at client level) + if (cfg.getEndToEndTimeoutMs() != null) { + logger.info("E2E timeout policy: {}ms (will be set on request options, not client builder)", + cfg.getEndToEndTimeoutMs()); + this.e2ePolicyConfig = new CosmosEndToEndOperationLatencyPolicyConfigBuilder( + Duration.ofMillis(cfg.getEndToEndTimeoutMs())) + .enable(true) + .build(); + } else { + this.e2ePolicyConfig = null; + } + final TokenCredential credential = cfg.isManagedIdentityRequired() ? cfg.buildTokenCredential() : null; diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java index 86fc4f777872..7af4545450de 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java @@ -41,6 +41,9 @@ protected Mono> performWorkload(long i) { Flux> obs; Random r = new Random(); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + if (e2ePolicyConfig != null) { + options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } if (workloadConfig.getOperationType() == Operation.QueryCross) { diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java index 8732ef24088b..74d5bdfde597 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java @@ -3,6 +3,7 @@ package com.azure.cosmos.benchmark; +import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.PartitionKey; @@ -11,8 +12,14 @@ class AsyncReadBenchmark extends AsyncBenchmark { + private final CosmosItemRequestOptions readOptions; + AsyncReadBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { super(cfg, scheduler); + this.readOptions = new CosmosItemRequestOptions(); + if (e2ePolicyConfig != null) { + readOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } } @Override @@ -20,7 +27,7 @@ protected Mono performWorkload(long i) { int index = (int) (i % docsToRead.size()); PojoizedJson doc = docsToRead.get(index); return cosmosAsyncContainer.readItem(doc.getId(), - new PartitionKey(doc.getId()), PojoizedJson.class) + new PartitionKey(doc.getId()), readOptions, PojoizedJson.class) .map(CosmosItemResponse::getItem); } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java index bbbaf7193722..3280ef694b69 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java @@ -3,6 +3,7 @@ package com.azure.cosmos.benchmark; +import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.PartitionKey; @@ -16,12 +17,17 @@ class AsyncWriteBenchmark extends AsyncBenchmark { private final String uuid; private final String dataFieldValue; + private final CosmosItemRequestOptions writeOptions; AsyncWriteBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { super(cfg, scheduler); uuid = UUID.randomUUID().toString(); dataFieldValue = RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize()); + this.writeOptions = new CosmosItemRequestOptions(); + if (e2ePolicyConfig != null) { + writeOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } } @Override @@ -40,7 +46,7 @@ protected Mono performWorkload(long i) { partitionKey, workloadConfig.getDocumentDataFieldCount()), new PartitionKey(id), - null); + writeOptions); } // Raw type cast is required because CosmosItemResponse uses wildcard generics // that cannot be expressed in the class type parameter without propagating diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java index 20d3838d3375..fdfdc5a93a89 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java @@ -188,6 +188,12 @@ public enum Environment { @JsonProperty("aggressiveWarmupDuration") private String aggressiveWarmupDuration; + @JsonProperty("endToEndTimeoutMs") + private Integer endToEndTimeoutMs; + + @JsonProperty("availabilityStrategyEnabled") + private Boolean availabilityStrategyEnabled; + // ======== Connection params ======== @JsonProperty("connectionMode") @@ -314,6 +320,9 @@ public Duration getAggressiveWarmupDuration() { return Duration.parse(aggressiveWarmupDuration); } + public Integer getEndToEndTimeoutMs() { return endToEndTimeoutMs; } + public boolean isAvailabilityStrategyEnabled() { return availabilityStrategyEnabled != null && availabilityStrategyEnabled; } + public int getNumberOfCollectionForCtl() { return numberOfCollectionForCtl != null ? numberOfCollectionForCtl : 4; } public String getReadWriteQueryReadManyPct() { return readWriteQueryReadManyPct != null ? readWriteQueryReadManyPct : "90,8,1,1"; } public int getEncryptedStringFieldCount() { return encryptedStringFieldCount != null ? encryptedStringFieldCount : 1; } @@ -496,6 +505,10 @@ private void applyField(String key, String value, boolean overwrite) { if (overwrite || proactiveConnectionRegionsCount == null) proactiveConnectionRegionsCount = Integer.parseInt(value); break; case "aggressiveWarmupDuration": if (overwrite || aggressiveWarmupDuration == null) aggressiveWarmupDuration = value; break; + case "endToEndTimeoutMs": + if (overwrite || endToEndTimeoutMs == null) endToEndTimeoutMs = Integer.parseInt(value); break; + case "availabilityStrategyEnabled": + if (overwrite || availabilityStrategyEnabled == null) availabilityStrategyEnabled = Boolean.parseBoolean(value); break; case "connectionMode": if (overwrite || connectionMode == null) connectionMode = value; break; case "consistencyLevel": From ac7f06fc6d1d2ab2107ede048841140a4d5b54bf Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 25 Mar 2026 16:48:17 -0400 Subject: [PATCH 2/2] Fix CosmosMetricsReporter distribution summary field names to match ADX schema DistributionSummary metrics (payload sizes, request sizes) were writing fields named Mean/Max/P50/P90/P95/P99 but the ADX PerfTimeSeries table expects MeanMs/MaxMs/P50Ms/P90Ms/P95Ms/P99Ms. Also adds Value field using totalAmount() so Grafana dashboard can display bytes/s. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../cosmos/benchmark/CosmosMetricsReporter.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java index be162e5e3d32..4488227f8078 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java @@ -214,16 +214,17 @@ private void reportDistributionSummary(String timestamp, DistributionSummary sum ObjectNode doc = createBaseDoc(timestamp, summary, "distribution", cpuPercent); doc.put("Count", summary.count()); - doc.put("Mean", round(summary.mean())); - doc.put("Max", round(summary.max())); + doc.put("MeanMs", round(summary.mean())); + doc.put("MaxMs", round(summary.max())); + doc.put("Value", round(summary.totalAmount())); HistogramSnapshot snapshot = summary.takeSnapshot(); for (ValueAtPercentile vp : snapshot.percentileValues()) { double p = vp.percentile(); - if (p == 0.5) doc.put("P50", round(vp.value())); - else if (p == 0.9) doc.put("P90", round(vp.value())); - else if (p == 0.95) doc.put("P95", round(vp.value())); - else if (p == 0.99) doc.put("P99", round(vp.value())); + if (p == 0.5) doc.put("P50Ms", round(vp.value())); + else if (p == 0.9) doc.put("P90Ms", round(vp.value())); + else if (p == 0.95) doc.put("P95Ms", round(vp.value())); + else if (p == 0.99) doc.put("P99Ms", round(vp.value())); } uploadDoc(doc);