diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index bbf9b10a14f3..d54d25316832 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -10,6 +10,8 @@ import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnosticsThresholds; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosContainerProactiveInitConfigBuilder; import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; @@ -60,12 +62,25 @@ abstract class AsyncBenchmark implements Benchmark { final String partitionKey; final TenantWorkloadConfig workloadConfig; final List docsToRead; + final CosmosEndToEndOperationLatencyPolicyConfig e2ePolicyConfig; AsyncBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { workloadConfig = cfg; this.benchmarkScheduler = scheduler; + // Build E2E timeout policy if configured (applied per-request, NOT at client level) + if (cfg.getEndToEndTimeoutMs() != null) { + logger.info("E2E timeout policy: {}ms (will be set on request options, not client builder)", + cfg.getEndToEndTimeoutMs()); + this.e2ePolicyConfig = new CosmosEndToEndOperationLatencyPolicyConfigBuilder( + Duration.ofMillis(cfg.getEndToEndTimeoutMs())) + .enable(true) + .build(); + } else { + this.e2ePolicyConfig = null; + } + final TokenCredential credential = cfg.isManagedIdentityRequired() ? cfg.buildTokenCredential() : null; diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java index 86fc4f777872..7af4545450de 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java @@ -41,6 +41,9 @@ protected Mono> performWorkload(long i) { Flux> obs; Random r = new Random(); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + if (e2ePolicyConfig != null) { + options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } if (workloadConfig.getOperationType() == Operation.QueryCross) { diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java index 8732ef24088b..74d5bdfde597 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java @@ -3,6 +3,7 @@ package com.azure.cosmos.benchmark; +import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.PartitionKey; @@ -11,8 +12,14 @@ class AsyncReadBenchmark extends AsyncBenchmark { + private final CosmosItemRequestOptions readOptions; + AsyncReadBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { super(cfg, scheduler); + this.readOptions = new CosmosItemRequestOptions(); + if (e2ePolicyConfig != null) { + readOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } } @Override @@ -20,7 +27,7 @@ protected Mono performWorkload(long i) { int index = (int) (i % docsToRead.size()); PojoizedJson doc = docsToRead.get(index); return cosmosAsyncContainer.readItem(doc.getId(), - new PartitionKey(doc.getId()), PojoizedJson.class) + new PartitionKey(doc.getId()), readOptions, PojoizedJson.class) .map(CosmosItemResponse::getItem); } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java index bbbaf7193722..3280ef694b69 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java @@ -3,6 +3,7 @@ package com.azure.cosmos.benchmark; +import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.PartitionKey; @@ -16,12 +17,17 @@ class AsyncWriteBenchmark extends AsyncBenchmark { private final String uuid; private final String dataFieldValue; + private final CosmosItemRequestOptions writeOptions; AsyncWriteBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) { super(cfg, scheduler); uuid = UUID.randomUUID().toString(); dataFieldValue = RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize()); + this.writeOptions = new CosmosItemRequestOptions(); + if (e2ePolicyConfig != null) { + writeOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig); + } } @Override @@ -40,7 +46,7 @@ protected Mono performWorkload(long i) { partitionKey, workloadConfig.getDocumentDataFieldCount()), new PartitionKey(id), - null); + writeOptions); } // Raw type cast is required because CosmosItemResponse uses wildcard generics // that cannot be expressed in the class type parameter without propagating diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java index be162e5e3d32..4488227f8078 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/CosmosMetricsReporter.java @@ -214,16 +214,17 @@ private void reportDistributionSummary(String timestamp, DistributionSummary sum ObjectNode doc = createBaseDoc(timestamp, summary, "distribution", cpuPercent); doc.put("Count", summary.count()); - doc.put("Mean", round(summary.mean())); - doc.put("Max", round(summary.max())); + doc.put("MeanMs", round(summary.mean())); + doc.put("MaxMs", round(summary.max())); + doc.put("Value", round(summary.totalAmount())); HistogramSnapshot snapshot = summary.takeSnapshot(); for (ValueAtPercentile vp : snapshot.percentileValues()) { double p = vp.percentile(); - if (p == 0.5) doc.put("P50", round(vp.value())); - else if (p == 0.9) doc.put("P90", round(vp.value())); - else if (p == 0.95) doc.put("P95", round(vp.value())); - else if (p == 0.99) doc.put("P99", round(vp.value())); + if (p == 0.5) doc.put("P50Ms", round(vp.value())); + else if (p == 0.9) doc.put("P90Ms", round(vp.value())); + else if (p == 0.95) doc.put("P95Ms", round(vp.value())); + else if (p == 0.99) doc.put("P99Ms", round(vp.value())); } uploadDoc(doc); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java index 20d3838d3375..fdfdc5a93a89 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java @@ -188,6 +188,12 @@ public enum Environment { @JsonProperty("aggressiveWarmupDuration") private String aggressiveWarmupDuration; + @JsonProperty("endToEndTimeoutMs") + private Integer endToEndTimeoutMs; + + @JsonProperty("availabilityStrategyEnabled") + private Boolean availabilityStrategyEnabled; + // ======== Connection params ======== @JsonProperty("connectionMode") @@ -314,6 +320,9 @@ public Duration getAggressiveWarmupDuration() { return Duration.parse(aggressiveWarmupDuration); } + public Integer getEndToEndTimeoutMs() { return endToEndTimeoutMs; } + public boolean isAvailabilityStrategyEnabled() { return availabilityStrategyEnabled != null && availabilityStrategyEnabled; } + public int getNumberOfCollectionForCtl() { return numberOfCollectionForCtl != null ? numberOfCollectionForCtl : 4; } public String getReadWriteQueryReadManyPct() { return readWriteQueryReadManyPct != null ? readWriteQueryReadManyPct : "90,8,1,1"; } public int getEncryptedStringFieldCount() { return encryptedStringFieldCount != null ? encryptedStringFieldCount : 1; } @@ -496,6 +505,10 @@ private void applyField(String key, String value, boolean overwrite) { if (overwrite || proactiveConnectionRegionsCount == null) proactiveConnectionRegionsCount = Integer.parseInt(value); break; case "aggressiveWarmupDuration": if (overwrite || aggressiveWarmupDuration == null) aggressiveWarmupDuration = value; break; + case "endToEndTimeoutMs": + if (overwrite || endToEndTimeoutMs == null) endToEndTimeoutMs = Integer.parseInt(value); break; + case "availabilityStrategyEnabled": + if (overwrite || availabilityStrategyEnabled == null) availabilityStrategyEnabled = Boolean.parseBoolean(value); break; case "connectionMode": if (overwrite || connectionMode == null) connectionMode = value; break; case "consistencyLevel":