Skip to content

Commit 821e851

Browse files
jeet1995Copilot
andcommitted
Add CosmosEndToEndOperationLatencyPolicyConfig support to azure-cosmos-benchmark
Add per-request E2E timeout policy and availability strategy configuration to the benchmark module. The policy is applied on Cosmos(Item|Query)RequestOptions, not at the client builder level, to avoid impacting metadata calls during startup. Changes: - TenantWorkloadConfig: Add endToEndTimeoutMs and availabilityStrategyEnabled JSON config fields with getters and applyField support - AsyncBenchmark: Build CosmosEndToEndOperationLatencyPolicyConfig once in the base class constructor, expose as e2ePolicyConfig for subclasses - AsyncReadBenchmark: Create CosmosItemRequestOptions with E2E policy, pass to readItem() (was using the 3-arg overload without options) - AsyncWriteBenchmark: Create CosmosItemRequestOptions with E2E policy, replace null options in createItem() - AsyncQueryBenchmark: Set E2E policy on CosmosQueryRequestOptions immediately after construction (applies to all query operation types) Usage in workload config JSON: "endToEndTimeoutMs": 3000, "availabilityStrategyEnabled": true Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 1b5c5c7 commit 821e851

5 files changed

Lines changed: 46 additions & 2 deletions

File tree

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import com.azure.cosmos.CosmosAsyncDatabase;
1111
import com.azure.cosmos.CosmosClientBuilder;
1212
import com.azure.cosmos.CosmosDiagnosticsThresholds;
13+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
14+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
1315
import com.azure.cosmos.CosmosContainerProactiveInitConfigBuilder;
1416
import com.azure.cosmos.CosmosException;
1517
import com.azure.cosmos.DirectConnectionConfig;
@@ -60,12 +62,25 @@ abstract class AsyncBenchmark<T> implements Benchmark {
6062
final String partitionKey;
6163
final TenantWorkloadConfig workloadConfig;
6264
final List<PojoizedJson> docsToRead;
65+
final CosmosEndToEndOperationLatencyPolicyConfig e2ePolicyConfig;
6366

6467
AsyncBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {
6568

6669
workloadConfig = cfg;
6770
this.benchmarkScheduler = scheduler;
6871

72+
// Build E2E timeout policy if configured (applied per-request, NOT at client level)
73+
if (cfg.getEndToEndTimeoutMs() != null) {
74+
logger.info("E2E timeout policy: {}ms (will be set on request options, not client builder)",
75+
cfg.getEndToEndTimeoutMs());
76+
this.e2ePolicyConfig = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(
77+
Duration.ofMillis(cfg.getEndToEndTimeoutMs()))
78+
.enable(true)
79+
.build();
80+
} else {
81+
this.e2ePolicyConfig = null;
82+
}
83+
6984
final TokenCredential credential = cfg.isManagedIdentityRequired()
7085
? cfg.buildTokenCredential()
7186
: null;

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ protected Mono<FeedResponse<PojoizedJson>> performWorkload(long i) {
4141
Flux<FeedResponse<PojoizedJson>> obs;
4242
Random r = new Random();
4343
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
44+
if (e2ePolicyConfig != null) {
45+
options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
46+
}
4447

4548
if (workloadConfig.getOperationType() == Operation.QueryCross) {
4649

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.cosmos.benchmark;
55

6+
import com.azure.cosmos.models.CosmosItemRequestOptions;
67
import com.azure.cosmos.models.CosmosItemResponse;
78
import com.azure.cosmos.models.PartitionKey;
89

@@ -11,16 +12,22 @@
1112

1213
class AsyncReadBenchmark extends AsyncBenchmark<PojoizedJson> {
1314

15+
private final CosmosItemRequestOptions readOptions;
16+
1417
AsyncReadBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {
1518
super(cfg, scheduler);
19+
this.readOptions = new CosmosItemRequestOptions();
20+
if (e2ePolicyConfig != null) {
21+
readOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
22+
}
1623
}
1724

1825
@Override
1926
protected Mono<PojoizedJson> performWorkload(long i) {
2027
int index = (int) (i % docsToRead.size());
2128
PojoizedJson doc = docsToRead.get(index);
2229
return cosmosAsyncContainer.readItem(doc.getId(),
23-
new PartitionKey(doc.getId()), PojoizedJson.class)
30+
new PartitionKey(doc.getId()), readOptions, PojoizedJson.class)
2431
.map(CosmosItemResponse::getItem);
2532
}
2633
}

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncWriteBenchmark.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.cosmos.benchmark;
55

6+
import com.azure.cosmos.models.CosmosItemRequestOptions;
67
import com.azure.cosmos.models.CosmosItemResponse;
78
import com.azure.cosmos.models.PartitionKey;
89

@@ -16,12 +17,17 @@ class AsyncWriteBenchmark extends AsyncBenchmark<CosmosItemResponse> {
1617

1718
private final String uuid;
1819
private final String dataFieldValue;
20+
private final CosmosItemRequestOptions writeOptions;
1921

2022
AsyncWriteBenchmark(TenantWorkloadConfig cfg, Scheduler scheduler) {
2123
super(cfg, scheduler);
2224

2325
uuid = UUID.randomUUID().toString();
2426
dataFieldValue = RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize());
27+
this.writeOptions = new CosmosItemRequestOptions();
28+
if (e2ePolicyConfig != null) {
29+
writeOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicyConfig);
30+
}
2531
}
2632

2733
@Override
@@ -40,7 +46,7 @@ protected Mono<CosmosItemResponse> performWorkload(long i) {
4046
partitionKey,
4147
workloadConfig.getDocumentDataFieldCount()),
4248
new PartitionKey(id),
43-
null);
49+
writeOptions);
4450
}
4551
// Raw type cast is required because CosmosItemResponse uses wildcard generics
4652
// that cannot be expressed in the class type parameter without propagating

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ public enum Environment {
188188
@JsonProperty("aggressiveWarmupDuration")
189189
private String aggressiveWarmupDuration;
190190

191+
@JsonProperty("endToEndTimeoutMs")
192+
private Integer endToEndTimeoutMs;
193+
194+
@JsonProperty("availabilityStrategyEnabled")
195+
private Boolean availabilityStrategyEnabled;
196+
191197
// ======== Connection params ========
192198

193199
@JsonProperty("connectionMode")
@@ -314,6 +320,9 @@ public Duration getAggressiveWarmupDuration() {
314320
return Duration.parse(aggressiveWarmupDuration);
315321
}
316322

323+
public Integer getEndToEndTimeoutMs() { return endToEndTimeoutMs; }
324+
public boolean isAvailabilityStrategyEnabled() { return availabilityStrategyEnabled != null && availabilityStrategyEnabled; }
325+
317326
public int getNumberOfCollectionForCtl() { return numberOfCollectionForCtl != null ? numberOfCollectionForCtl : 4; }
318327
public String getReadWriteQueryReadManyPct() { return readWriteQueryReadManyPct != null ? readWriteQueryReadManyPct : "90,8,1,1"; }
319328
public int getEncryptedStringFieldCount() { return encryptedStringFieldCount != null ? encryptedStringFieldCount : 1; }
@@ -496,6 +505,10 @@ private void applyField(String key, String value, boolean overwrite) {
496505
if (overwrite || proactiveConnectionRegionsCount == null) proactiveConnectionRegionsCount = Integer.parseInt(value); break;
497506
case "aggressiveWarmupDuration":
498507
if (overwrite || aggressiveWarmupDuration == null) aggressiveWarmupDuration = value; break;
508+
case "endToEndTimeoutMs":
509+
if (overwrite || endToEndTimeoutMs == null) endToEndTimeoutMs = Integer.parseInt(value); break;
510+
case "availabilityStrategyEnabled":
511+
if (overwrite || availabilityStrategyEnabled == null) availabilityStrategyEnabled = Boolean.parseBoolean(value); break;
499512
case "connectionMode":
500513
if (overwrite || connectionMode == null) connectionMode = value; break;
501514
case "consistencyLevel":

0 commit comments

Comments
 (0)