diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java index 692783c4cc..086e7333cd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java @@ -182,7 +182,7 @@ void sessionCloseBeforeInit() throws Exception { void sessionGoAwayTest() throws Exception { SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); - Duration goAwayDelay = Duration.ofMillis(100); + Duration goAwayDelay = Duration.ofMillis(500); FakeSessionListener sessionListener = new FakeSessionListener(); session.start( OpenSessionRequest.newBuilder() @@ -215,9 +215,14 @@ void sessionGoAwayTest() throws Exception { try { f.get(); numOk++; - } catch (VRpcException e) { - if (e.getResult().getState() == State.UNCOMMITED) { - numUncommittedErrors++; + } catch (ExecutionException e) { + if (e.getCause() instanceof VRpcException) { + VRpcException vrpcException = (VRpcException) e.getCause(); + if (vrpcException.getResult().getState() == State.UNCOMMITED) { + numUncommittedErrors++; + } + } else { + throw e; } } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java index c0e6e01d61..ea142d8a74 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java @@ -77,9 +77,9 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -448,23 +448,44 @@ public void refreshConfigTest() throws Exception { sessionPool.start(request, new Metadata()); - Thread.sleep(500); + long deadline = System.currentTimeMillis() + 10_000; + boolean conditionMet = false; + List requests = null; + boolean containsHeader = false; + + while (System.currentTimeMillis() < deadline) { + requests = fakeService.getSessionRequests(); + List headers = headerInterceptor.getHeadersList(); + + boolean matchesRefreshRequest = false; + for (SessionRequest r : requests) { + if (OPEN_SESSION_REQUEST_CORRESPONDENCE.compare(r, refreshRequest)) { + matchesRefreshRequest = true; + break; + } + } + + containsHeader = false; + for (Metadata header : headers) { + if (header.containsKey(metadataKey) && "refresh_value".equals(header.get(metadataKey))) { + containsHeader = true; + break; + } + } - List requests = fakeService.getSessionRequests(); + if (requests.size() > 1 && matchesRefreshRequest && containsHeader) { + conditionMet = true; + break; + } + + Thread.sleep(50); + } + + assertThat(conditionMet).isTrue(); assertThat(requests.size()).isGreaterThan(1); assertThat(requests) .comparingElementsUsing(OPEN_SESSION_REQUEST_CORRESPONDENCE) .contains(refreshRequest); - - // Verify headers - List headers = headerInterceptor.getHeadersList(); - boolean containsHeader = false; - for (Metadata header : headers) { - if (header.containsKey(metadataKey)) { - containsHeader = true; - assertThat(header.get(metadataKey)).isEqualTo("refresh_value"); - } - } assertThat(containsHeader).isTrue(); } @@ -519,7 +540,7 @@ public Deadline getObservedDeadline() { } private static class HeaderInterceptor implements ServerInterceptor { - private final List headersList = new ArrayList<>(); + private final List headersList = new CopyOnWriteArrayList<>(); @Override public io.grpc.ServerCall.Listener interceptCall( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/fake/FakeSessionService.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/fake/FakeSessionService.java index 1b575fd2d2..4fcdb65424 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/fake/FakeSessionService.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/fake/FakeSessionService.java @@ -20,8 +20,8 @@ import com.google.bigtable.v2.SessionRequest; import com.google.bigtable.v2.SessionResponse; import io.grpc.stub.StreamObserver; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -30,7 +30,7 @@ public class FakeSessionService extends FakeSessionGrpc.FakeSessionImplBase { private final ScheduledExecutorService executor; private final AtomicInteger openRequestCount = new AtomicInteger(0); - private final List sessionRequests = new ArrayList<>(); + private final List sessionRequests = new CopyOnWriteArrayList<>(); public FakeSessionService(ScheduledExecutorService executor) { this.executor = executor;