Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosDiagnosticsThresholds;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
import com.azure.cosmos.CosmosContainerProactiveInitConfigBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.DirectConnectionConfig;
Expand Down Expand Up @@ -60,12 +62,25 @@ abstract class AsyncBenchmark<T> implements Benchmark {
final String partitionKey;
final TenantWorkloadConfig workloadConfig;
final List<PojoizedJson> docsToRead;
final CosmosEndToEndOperationLatencyPolicyConfig e2ePolicyConfig;

AsyncBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {

workloadConfig = cfg;
this.benchmarkScheduler = scheduler;

// Build E2E timeout policy if configured (applied per-request, NOT at client level)
if (cfg.getEndToEndTimeoutMs() != null) {
logger.info("E2E timeout policy: {}ms (will be set on request options, not client builder)",
cfg.getEndToEndTimeoutMs());
this.e2ePolicyConfig = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(
Duration.ofMillis(cfg.getEndToEndTimeoutMs()))
.enable(true)
.build();
} else {
this.e2ePolicyConfig = null;
}

final TokenCredential credential = cfg.isManagedIdentityRequired()
? cfg.buildTokenCredential()
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ protected Mono<FeedResponse<PojoizedJson>> performWorkload(long i) {
Flux<FeedResponse<PojoizedJson>> obs;
Random r = new Random();
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
if (e2ePolicyConfig != null) {
options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
}

if (workloadConfig.getOperationType() == Operation.QueryCross) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.PartitionKey;

Expand All @@ -11,16 +12,22 @@

class AsyncReadBenchmark extends AsyncBenchmark<PojoizedJson> {

private final CosmosItemRequestOptions readOptions;

AsyncReadBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {
super(cfg, scheduler);
this.readOptions = new CosmosItemRequestOptions();
if (e2ePolicyConfig != null) {
readOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
}
}

@Override
protected Mono<PojoizedJson> performWorkload(long i) {
int index = (int) (i % docsToRead.size());
PojoizedJson doc = docsToRead.get(index);
return cosmosAsyncContainer.readItem(doc.getId(),
new PartitionKey(doc.getId()), PojoizedJson.class)
new PartitionKey(doc.getId()), readOptions, PojoizedJson.class)
.map(CosmosItemResponse::getItem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.PartitionKey;

Expand All @@ -16,12 +17,17 @@ class AsyncWriteBenchmark extends AsyncBenchmark<CosmosItemResponse> {

private final String uuid;
private final String dataFieldValue;
private final CosmosItemRequestOptions writeOptions;

AsyncWriteBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {
super(cfg, scheduler);

uuid = UUID.randomUUID().toString();
dataFieldValue = RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize());
this.writeOptions = new CosmosItemRequestOptions();
if (e2ePolicyConfig != null) {
writeOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
}
}

@Override
Expand All @@ -40,7 +46,7 @@ protected Mono<CosmosItemResponse> performWorkload(long i) {
partitionKey,
workloadConfig.getDocumentDataFieldCount()),
new PartitionKey(id),
null);
writeOptions);
}
// Raw type cast is required because CosmosItemResponse uses wildcard generics
// that cannot be expressed in the class type parameter without propagating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,17 @@ private void reportDistributionSummary(String timestamp, DistributionSummary sum

ObjectNode doc = createBaseDoc(timestamp, summary, "distribution", cpuPercent);
doc.put("Count", summary.count());
doc.put("Mean", round(summary.mean()));
doc.put("Max", round(summary.max()));
doc.put("MeanMs", round(summary.mean()));
doc.put("MaxMs", round(summary.max()));
doc.put("Value", round(summary.totalAmount()));

HistogramSnapshot snapshot = summary.takeSnapshot();
for (ValueAtPercentile vp : snapshot.percentileValues()) {
double p = vp.percentile();
if (p == 0.5) doc.put("P50", round(vp.value()));
else if (p == 0.9) doc.put("P90", round(vp.value()));
else if (p == 0.95) doc.put("P95", round(vp.value()));
else if (p == 0.99) doc.put("P99", round(vp.value()));
if (p == 0.5) doc.put("P50Ms", round(vp.value()));
else if (p == 0.9) doc.put("P90Ms", round(vp.value()));
else if (p == 0.95) doc.put("P95Ms", round(vp.value()));
else if (p == 0.99) doc.put("P99Ms", round(vp.value()));
}

uploadDoc(doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ public enum Environment {
@JsonProperty("aggressiveWarmupDuration")
private String aggressiveWarmupDuration;

@JsonProperty("endToEndTimeoutMs")
private Integer endToEndTimeoutMs;

@JsonProperty("availabilityStrategyEnabled")
private Boolean availabilityStrategyEnabled;

// ======== Connection params ========

@JsonProperty("connectionMode")
Expand Down Expand Up @@ -314,6 +320,9 @@ public Duration getAggressiveWarmupDuration() {
return Duration.parse(aggressiveWarmupDuration);
}

public Integer getEndToEndTimeoutMs() { return endToEndTimeoutMs; }
public boolean isAvailabilityStrategyEnabled() { return availabilityStrategyEnabled != null && availabilityStrategyEnabled; }

public int getNumberOfCollectionForCtl() { return numberOfCollectionForCtl != null ? numberOfCollectionForCtl : 4; }
public String getReadWriteQueryReadManyPct() { return readWriteQueryReadManyPct != null ? readWriteQueryReadManyPct : "90,8,1,1"; }
public int getEncryptedStringFieldCount() { return encryptedStringFieldCount != null ? encryptedStringFieldCount : 1; }
Expand Down Expand Up @@ -496,6 +505,10 @@ private void applyField(String key, String value, boolean overwrite) {
if (overwrite || proactiveConnectionRegionsCount == null) proactiveConnectionRegionsCount = Integer.parseInt(value); break;
case "aggressiveWarmupDuration":
if (overwrite || aggressiveWarmupDuration == null) aggressiveWarmupDuration = value; break;
case "endToEndTimeoutMs":
if (overwrite || endToEndTimeoutMs == null) endToEndTimeoutMs = Integer.parseInt(value); break;
case "availabilityStrategyEnabled":
if (overwrite || availabilityStrategyEnabled == null) availabilityStrategyEnabled = Boolean.parseBoolean(value); break;
case "connectionMode":
if (overwrite || connectionMode == null) connectionMode = value; break;
case "consistencyLevel":
Expand Down