Skip to content

Commit f7bdb42

Browse files
committed
offline feedback
1 parent a3cf0a8 commit f7bdb42

3 files changed

Lines changed: 274 additions & 52 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientV2.java

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
3535
import com.google.cloud.bigtable.admin.v2.models.OptimizeRestoredTableOperationToken;
3636
import com.google.cloud.bigtable.admin.v2.models.RestoredTableResult;
37-
import com.google.cloud.bigtable.admin.v2.stub.AwaitConsistencyCallable;
38-
import com.google.cloud.bigtable.admin.v2.stub.BigtableTableAdminStub;
37+
38+
import com.google.cloud.bigtable.admin.v2.stub.AwaitConsistencyCallableV2;
39+
import com.google.cloud.bigtable.admin.v2.stub.GrpcBigtableTableAdminStub;
3940
import com.google.cloud.bigtable.admin.v2.stub.BigtableTableAdminStubSettings;
4041
import com.google.common.base.Strings;
4142
import com.google.longrunning.Operation;
@@ -44,6 +45,7 @@
4445
import java.io.IOException;
4546
import java.time.Duration;
4647
import java.util.concurrent.ExecutionException;
48+
import javax.annotation.Nullable;
4749

4850
/**
4951
* Modern Cloud Bigtable Table Admin Client.
@@ -54,7 +56,7 @@
5456
* built-in, automated polling for consistency tokens.
5557
*/
5658
public class BigtableTableAdminClientV2 extends BaseBigtableTableAdminClient {
57-
private final AwaitConsistencyCallable awaitConsistencyCallable;
59+
private final AwaitConsistencyCallableV2 awaitConsistencyCallable;
5860
private final OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
5961
optimizeRestoredTableOperationBaseCallable;
6062
private final java.util.concurrent.ScheduledExecutorService backgroundExecutor;
@@ -80,47 +82,52 @@ public class BigtableTableAdminClientV2 extends BaseBigtableTableAdminClient {
8082
.setTotalTimeoutDuration(Duration.ofMillis(600000L))
8183
.build();
8284

83-
protected BigtableTableAdminClientV2(BaseBigtableTableAdminSettings settings) throws IOException {
84-
super(settings);
85-
// Extract the executor directly without spinning up a full channel
86-
this.backgroundExecutor =
85+
/**
86+
* Constructs an instance of BigtableTableAdminClientV2 with the given settings.
87+
*/
88+
public static final BigtableTableAdminClientV2 create(BaseBigtableTableAdminSettings settings)
89+
throws IOException {
90+
GrpcBigtableTableAdminStub stub =
91+
(GrpcBigtableTableAdminStub)
92+
((BigtableTableAdminStubSettings) settings.getStubSettings()).createStub();
93+
java.util.concurrent.ScheduledExecutorService backgroundExecutor =
8794
settings.getStubSettings().getBackgroundExecutorProvider().getExecutor();
88-
com.google.api.gax.rpc.ClientContext lightweightContext =
89-
com.google.api.gax.rpc.ClientContext.newBuilder()
90-
.setClock(settings.getStubSettings().getClock())
91-
.setExecutor(this.backgroundExecutor)
92-
.setDefaultCallContext(com.google.api.gax.grpc.GrpcCallContext.createDefault())
93-
.build();
9495

95-
this.awaitConsistencyCallable =
96+
AwaitConsistencyCallableV2 awaitConsistencyCallable =
9697
createAwaitConsistencyCallable(
98+
stub,
9799
(BigtableTableAdminStubSettings) settings.getStubSettings(),
98100
settings.getStubSettings().getClock(),
99-
this.backgroundExecutor);
100-
this.optimizeRestoredTableOperationBaseCallable =
101-
createOptimizeRestoredTableOperationBaseCallable(lightweightContext);
101+
backgroundExecutor);
102+
103+
OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
104+
optimizeRestoredTableOperationBaseCallable =
105+
createOptimizeRestoredTableOperationBaseCallable(stub, settings, backgroundExecutor);
106+
107+
return new BigtableTableAdminClientV2(
108+
stub, backgroundExecutor, awaitConsistencyCallable, optimizeRestoredTableOperationBaseCallable);
102109
}
103110

104-
protected BigtableTableAdminClientV2(BigtableTableAdminStub stub) {
105-
super(stub);
106-
this.backgroundExecutor = null;
107-
this.awaitConsistencyCallable = null;
108-
this.optimizeRestoredTableOperationBaseCallable = null;
111+
/** Constructs an instance of BigtableTableAdminClientV2 with the given stub. */
112+
public static final BigtableTableAdminClientV2 create(GrpcBigtableTableAdminStub stub) {
113+
return new BigtableTableAdminClientV2(stub, null, null, null);
109114
}
110115

111-
@com.google.common.annotations.VisibleForTesting
112-
BigtableTableAdminClientV2(
113-
BigtableTableAdminStub stub,
114-
AwaitConsistencyCallable awaitConsistencyCallable,
115-
OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
116-
optimizeRestoredTableOperationBaseCallable) {
116+
protected BigtableTableAdminClientV2(
117+
GrpcBigtableTableAdminStub stub,
118+
@Nullable java.util.concurrent.ScheduledExecutorService backgroundExecutor,
119+
@Nullable AwaitConsistencyCallableV2 awaitConsistencyCallable,
120+
@Nullable
121+
OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
122+
optimizeRestoredTableOperationBaseCallable) {
117123
super(stub);
118-
this.backgroundExecutor = null; // No custom executor for tests
124+
this.backgroundExecutor = backgroundExecutor;
119125
this.awaitConsistencyCallable = awaitConsistencyCallable;
120126
this.optimizeRestoredTableOperationBaseCallable = optimizeRestoredTableOperationBaseCallable;
121127
}
122128

123-
private AwaitConsistencyCallable createAwaitConsistencyCallable(
129+
private static AwaitConsistencyCallableV2 createAwaitConsistencyCallable(
130+
GrpcBigtableTableAdminStub stub,
124131
BigtableTableAdminStubSettings settings,
125132
com.google.api.core.ApiClock clock,
126133
java.util.concurrent.ScheduledExecutorService executor) {
@@ -131,17 +138,19 @@ private AwaitConsistencyCallable createAwaitConsistencyCallable(
131138
settings.checkConsistencySettings().getRetrySettings().getTotalTimeout())
132139
.build();
133140

134-
return AwaitConsistencyCallable.create(
135-
getStub().generateConsistencyTokenCallable(),
136-
getStub().checkConsistencyCallable(),
141+
return AwaitConsistencyCallableV2.create(
142+
stub.generateConsistencyTokenCallable(),
143+
stub.checkConsistencyCallable(),
137144
clock,
138145
executor,
139146
pollingSettings);
140147
}
141148

142-
private OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
149+
private static OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
143150
createOptimizeRestoredTableOperationBaseCallable(
144-
com.google.api.gax.rpc.ClientContext clientContext) throws IOException {
151+
GrpcBigtableTableAdminStub stub,
152+
BaseBigtableTableAdminSettings settings,
153+
java.util.concurrent.ScheduledExecutorService backgroundExecutor) throws IOException {
145154

146155
@SuppressWarnings("unchecked")
147156
MethodDescriptor<Void, Operation> fakeDescriptor =
@@ -182,22 +191,18 @@ public Empty apply(OperationSnapshot input) {
182191
OperationTimedPollAlgorithm.create(OPTIMIZE_RESTORED_TABLE_POLLING_SETTINGS))
183192
.build();
184193

194+
com.google.api.gax.rpc.ClientContext clientContext =
195+
com.google.api.gax.rpc.ClientContext.newBuilder()
196+
.setClock(settings.getStubSettings().getClock())
197+
.setExecutor(backgroundExecutor)
198+
.setDefaultCallContext(com.google.api.gax.grpc.GrpcCallContext.createDefault())
199+
.build();
200+
185201
return GrpcCallableFactory.createOperationCallable(
186202
unusedInitialCallSettings,
187203
operationCallSettings,
188204
clientContext,
189-
getStub().getOperationsStub());
190-
}
191-
192-
/** Constructs an instance of BigtableTableAdminClientV2 with the given settings. */
193-
public static final BigtableTableAdminClientV2 create(BaseBigtableTableAdminSettings settings)
194-
throws IOException {
195-
return new BigtableTableAdminClientV2(settings);
196-
}
197-
198-
/** Constructs an instance of BigtableTableAdminClientV2 with the given stub. */
199-
public static final BigtableTableAdminClientV2 create(BigtableTableAdminStub stub) {
200-
return new BigtableTableAdminClientV2(stub);
205+
stub.getOperationsStub());
201206
}
202207

203208
/**
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.admin.v2.stub;
17+
18+
import com.google.api.core.ApiAsyncFunction;
19+
import com.google.api.core.ApiClock;
20+
import com.google.api.core.ApiFunction;
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.core.InternalApi;
24+
import com.google.api.gax.retrying.ExponentialPollAlgorithm;
25+
import com.google.api.gax.retrying.NonCancellableFuture;
26+
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
27+
import com.google.api.gax.retrying.RetryAlgorithm;
28+
import com.google.api.gax.retrying.RetrySettings;
29+
import com.google.api.gax.retrying.RetryingContext;
30+
import com.google.api.gax.retrying.RetryingExecutor;
31+
import com.google.api.gax.retrying.RetryingFuture;
32+
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
33+
import com.google.api.gax.retrying.TimedAttemptSettings;
34+
import com.google.api.gax.rpc.ApiCallContext;
35+
import com.google.api.gax.rpc.UnaryCallable;
36+
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
37+
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
38+
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
39+
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
40+
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
41+
import com.google.common.annotations.VisibleForTesting;
42+
import com.google.common.util.concurrent.MoreExecutors;
43+
import java.util.concurrent.Callable;
44+
import java.util.concurrent.CancellationException;
45+
import java.util.concurrent.ScheduledExecutorService;
46+
47+
/**
48+
* Decoupled modern consistency polling callable for V2 client.
49+
*
50+
* <p>This callable waits until either replication or Data Boost has caught up to the point it was
51+
* called. It wraps GenerateConsistencyToken and CheckConsistency RPCs and contains absolutely no
52+
* reference or dependency on the data module.
53+
*/
54+
@InternalApi
55+
public class AwaitConsistencyCallableV2 extends UnaryCallable<ConsistencyRequest, Void> {
56+
private final UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
57+
generateCallable;
58+
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
59+
private final RetryingExecutor<CheckConsistencyResponse> executor;
60+
61+
@InternalApi
62+
public static AwaitConsistencyCallableV2 create(
63+
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
64+
generateCallable,
65+
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
66+
ApiClock clock,
67+
ScheduledExecutorService executor,
68+
RetrySettings pollingSettings) {
69+
70+
RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
71+
new RetryAlgorithm<>(
72+
new PollResultAlgorithm(), new ExponentialPollAlgorithm(pollingSettings, clock));
73+
74+
RetryingExecutor<CheckConsistencyResponse> retryingExecutor =
75+
new ScheduledRetryingExecutor<>(retryAlgorithm, executor);
76+
77+
return new AwaitConsistencyCallableV2(generateCallable, checkCallable, retryingExecutor);
78+
}
79+
80+
@VisibleForTesting
81+
AwaitConsistencyCallableV2(
82+
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
83+
generateCallable,
84+
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
85+
RetryingExecutor<CheckConsistencyResponse> executor) {
86+
this.generateCallable = generateCallable;
87+
this.checkCallable = checkCallable;
88+
this.executor = executor;
89+
}
90+
91+
@Override
92+
public ApiFuture<Void> futureCall(
93+
final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) {
94+
95+
// If the token is already provided, skip generation and poll directly.
96+
if (consistencyRequest.getConsistencyToken() != null) {
97+
CheckConsistencyRequest request =
98+
consistencyRequest.toCheckConsistencyProto(consistencyRequest.getConsistencyToken());
99+
return pollToken(request, apiCallContext);
100+
}
101+
102+
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
103+
generateToken(consistencyRequest.toGenerateTokenProto(), apiCallContext);
104+
105+
return ApiFutures.transformAsync(
106+
tokenFuture,
107+
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
108+
@Override
109+
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
110+
CheckConsistencyRequest request =
111+
consistencyRequest.toCheckConsistencyProto(input.getConsistencyToken());
112+
return pollToken(request, apiCallContext);
113+
}
114+
},
115+
MoreExecutors.directExecutor());
116+
}
117+
118+
private ApiFuture<GenerateConsistencyTokenResponse> generateToken(
119+
GenerateConsistencyTokenRequest generateRequest, ApiCallContext context) {
120+
return generateCallable.futureCall(generateRequest, context);
121+
}
122+
123+
private ApiFuture<Void> pollToken(CheckConsistencyRequest request, ApiCallContext context) {
124+
AttemptCallable<CheckConsistencyRequest, CheckConsistencyResponse> attemptCallable =
125+
new AttemptCallable<>(checkCallable, request, context);
126+
RetryingFuture<CheckConsistencyResponse> retryingFuture =
127+
executor.createFuture(attemptCallable);
128+
attemptCallable.setExternalFuture(retryingFuture);
129+
attemptCallable.call();
130+
131+
return ApiFutures.transform(
132+
retryingFuture,
133+
new ApiFunction<CheckConsistencyResponse, Void>() {
134+
@Override
135+
public Void apply(CheckConsistencyResponse input) {
136+
return null;
137+
}
138+
},
139+
MoreExecutors.directExecutor());
140+
}
141+
142+
/** A callable representing an attempt to make an RPC call. */
143+
private static class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
144+
private final UnaryCallable<RequestT, ResponseT> callable;
145+
private final RequestT request;
146+
147+
private volatile RetryingFuture<ResponseT> externalFuture;
148+
private final ApiCallContext callContext;
149+
150+
AttemptCallable(
151+
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
152+
this.callable = callable;
153+
this.request = request;
154+
this.callContext = callContext;
155+
}
156+
157+
void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
158+
this.externalFuture = externalFuture;
159+
}
160+
161+
@Override
162+
public ResponseT call() {
163+
try {
164+
// NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts
165+
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
166+
if (externalFuture.isDone()) {
167+
return null;
168+
}
169+
ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
170+
externalFuture.setAttemptFuture(internalFuture);
171+
} catch (Throwable e) {
172+
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
173+
}
174+
175+
return null;
176+
}
177+
}
178+
179+
/**
180+
* A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note
181+
* that this class doesn't handle retryable errors and expects the underlying callable chain to
182+
* handle this.
183+
*/
184+
private static class PollResultAlgorithm
185+
implements ResultRetryAlgorithmWithContext<CheckConsistencyResponse> {
186+
187+
@Override
188+
public TimedAttemptSettings createNextAttempt(
189+
Throwable prevThrowable,
190+
CheckConsistencyResponse prevResponse,
191+
TimedAttemptSettings prevSettings) {
192+
return null;
193+
}
194+
195+
@Override
196+
public TimedAttemptSettings createNextAttempt(
197+
RetryingContext context,
198+
Throwable previousThrowable,
199+
CheckConsistencyResponse previousResponse,
200+
TimedAttemptSettings previousSettings) {
201+
return null;
202+
}
203+
204+
@Override
205+
public boolean shouldRetry(
206+
RetryingContext context, Throwable previousThrowable, CheckConsistencyResponse prevResponse)
207+
throws CancellationException {
208+
return prevResponse != null && !prevResponse.getConsistent();
209+
}
210+
211+
@Override
212+
public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse)
213+
throws CancellationException {
214+
return prevResponse != null && !prevResponse.getConsistent();
215+
}
216+
}
217+
}

0 commit comments

Comments
 (0)