diff --git a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
index 5f0885883a5..d80316622ed 100644
--- a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
+++ b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
@@ -31,6 +31,7 @@
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerImplBuilder;
import java.io.File;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/** Builder for a server that services requests from an Android Service. */
@@ -141,6 +142,21 @@ public BinderServerBuilder useTransportSecurity(File certChain, File privateKey)
throw new UnsupportedOperationException("TLS not supported in BinderServer");
}
+ /**
+ * Provides a custom {@link Executor} for offloading blocking tasks.
+ *
+ *
Optional. If no custom Executor is provided, defaults to the main server executor
+ * (configured via {@link #executor(Executor)}). If that is also unset, a default
+ * shared channel executor will be used.
+ *
+ * @return this
+ */
+ public BinderServerBuilder offloadExecutor(Executor executor) {
+ internalBuilder.setOffloadExecutorPool(
+ new FixedObjectPool<>(checkNotNull(executor, "executor")));
+ return this;
+ }
+
/**
* Builds a {@link Server} according to this builder's parameters and stores its listening {@link
* IBinder} in the {@link IBinderReceiver} passed to {@link #forAddress(AndroidComponentAddress,
diff --git a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
index 4786a5e6cc4..82898299baa 100644
--- a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
+++ b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
+
/**
* A security policy for a gRPC server.
*
@@ -61,27 +63,24 @@ public Status checkAuthorizationForService(int uid, String serviceName) {
/**
* Returns whether the given Android UID is authorized to access a particular service.
*
- *
This method never throws an exception. If the execution of the security policy check fails,
- * a failed future with such exception is returned.
+ *
This method does not block and never throws an exception. If the execution of the security
+ * policy check fails, a failed future with such exception is returned.
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
+ * @param offloadExecutor Where to submit synchronous security policy checks.
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
@CheckReturnValue
- ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName) {
+ ListenableFuture checkAuthorizationForServiceAsync(
+ int uid, String serviceName, Executor offloadExecutor) {
SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy);
if (securityPolicy instanceof AsyncSecurityPolicy) {
return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid);
}
- try {
- Status status = securityPolicy.checkAuthorization(uid);
- return Futures.immediateFuture(status);
- } catch (Exception e) {
- return Futures.immediateFailedFuture(e);
- }
+ return Futures.submit(() -> securityPolicy.checkAuthorization(uid), offloadExecutor);
}
public static Builder newBuilder() {
diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java
index f913775fcbe..40b0ad2865f 100644
--- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java
+++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java
@@ -65,6 +65,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
private final ObjectPool executorServicePool;
private final ObjectPool extends Executor> executorPool;
+ private final ObjectPool extends Executor> offloadExecutorPool;
private final ImmutableList streamTracerFactories;
private final AndroidComponentAddress listenAddress;
private final LeakSafeOneWayBinder hostServiceBinder;
@@ -82,12 +83,19 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
@GuardedBy("this")
private Executor executor;
+ @Nullable // Before start() and after termination.
+ @GuardedBy("this")
+ private Executor offloadExecutor;
+
@GuardedBy("this")
private boolean shutdown;
private BinderServer(Builder builder) {
this.listenAddress = checkNotNull(builder.listenAddress);
this.executorPool = checkNotNull(builder.executorPool);
+ this.offloadExecutorPool = builder.offloadExecutorPool != null
+ ? builder.offloadExecutorPool
+ : this.executorPool;
this.executorServicePool = builder.executorServicePool;
this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
@@ -107,6 +115,7 @@ public synchronized void start(ServerListener serverListener) throws IOException
listener = new ActiveTransportTracker(serverListener, this::onTerminated);
executorService = executorServicePool.getObject();
executor = executorPool.getObject();
+ offloadExecutor = offloadExecutorPool.getObject();
}
@Override
@@ -144,6 +153,7 @@ public synchronized void shutdown() {
private synchronized void onTerminated() {
executor = executorPool.returnObject(executor);
+ offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor);
}
@Override
@@ -178,7 +188,8 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) {
attrsBuilder,
callingUid,
serverPolicyChecker,
- checkNotNull(executor, "Not started?"));
+ checkNotNull(executor, "Not started?"),
+ checkNotNull(offloadExecutor, "Not started?"));
// Create a new transport and let our listener know about it.
BinderServerTransport transport =
BinderServerTransport.create(
@@ -222,6 +233,7 @@ public static class Builder {
@Nullable AndroidComponentAddress listenAddress;
@Nullable List extends ServerStreamTracer.Factory> streamTracerFactories;
@Nullable ObjectPool extends Executor> executorPool;
+ @Nullable ObjectPool extends Executor> offloadExecutorPool;
ObjectPool executorServicePool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
@@ -267,6 +279,12 @@ public Builder setExecutorPool(ObjectPool extends Executor> executorPool) {
return this;
}
+ /** Sets the executor pool for offloading tasks. */
+ public Builder setOffloadExecutorPool(ObjectPool extends Executor> offloadExecutorPool) {
+ this.offloadExecutorPool = checkNotNull(offloadExecutorPool, "offloadExecutorPool");
+ return this;
+ }
+
/**
* Sets the executor to be used for scheduling channel timers.
*
diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java
index 6f95ef8a83c..0812a9dce00 100644
--- a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java
+++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java
@@ -69,17 +69,20 @@ public static void installAuthInterceptor(ServerBuilder> serverBuilder) {
* @param remoteUid The remote UID of the transport.
* @param serverPolicyChecker The policy checker for this transport.
* @param executor used for calling into the application. Must outlive the transport.
+ * @param offloadExecutor used for potentially blocking work. Must outlive the transport.
*/
@Internal
public static void attachAuthAttrs(
Attributes.Builder builder,
int remoteUid,
ServerPolicyChecker serverPolicyChecker,
- Executor executor) {
+ Executor executor,
+ Executor offloadExecutor) {
builder
.set(
TRANSPORT_AUTHORIZATION_STATE,
- new TransportAuthorizationState(remoteUid, serverPolicyChecker, executor))
+ new TransportAuthorizationState(
+ remoteUid, serverPolicyChecker, executor, offloadExecutor))
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY);
}
@@ -97,9 +100,8 @@ public ServerCall.Listener interceptCall(
ListenableFuture authStatusFuture =
transportAuthState.checkAuthorization(call.getMethodDescriptor());
- // Most SecurityPolicy will have synchronous implementations that provide an
- // immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
- // and asynchronous code if the authorization result is already present.
+ // Short-circuit in the common case where the checkAuthorization() result is cached and
+ // completed. This avoids unnecessary allocations and asynchronous code.
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(
authStatusFuture, transportAuthState.executor, call, headers, next);
@@ -166,15 +168,21 @@ private static final class TransportAuthorizationState {
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap> serviceAuthorization;
private final Executor executor;
+ private final Executor offloadExecutor;
/**
* @param executor used for calling into the application. Must outlive the transport.
+ * @param offloadExecutor used for offloading synchronous security policy checks.
*/
TransportAuthorizationState(
- int uid, ServerPolicyChecker serverPolicyChecker, Executor executor) {
+ int uid,
+ ServerPolicyChecker serverPolicyChecker,
+ Executor executor,
+ Executor offloadExecutor) {
this.uid = uid;
this.serverPolicyChecker = serverPolicyChecker;
this.executor = executor;
+ this.offloadExecutor = offloadExecutor;
serviceAuthorization = new ConcurrentHashMap<>(8);
}
@@ -202,7 +210,7 @@ ListenableFuture checkAuthorization(MethodDescriptor, ?> method) {
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture authorization =
- serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
+ serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName, offloadExecutor);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
Futures.addCallback(
@@ -240,9 +248,12 @@ public interface ServerPolicyChecker {
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
+ * @param offloadExecutor Where to run potentially blocking auth checks in case the
+ * SecurityPolicy in question is not async
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
- ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName);
+ ListenableFuture checkAuthorizationForServiceAsync(
+ int uid, String serviceName, Executor offloadExecutor);
}
}
diff --git a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
index eedc3f590cd..a9ccee16099 100644
--- a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
+++ b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
@@ -33,7 +33,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
@@ -50,6 +53,16 @@ public final class ServerSecurityPolicyTest {
ServerSecurityPolicy policy;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdownNow();
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new AssertionError("executor failed to terminate promptly");
+ }
+ }
+
@Test
public void testDefaultInternalOnly() throws Exception {
policy = new ServerSecurityPolicy();
@@ -181,7 +194,7 @@ public void testPerService_failedSecurityPolicyFuture_returnsAFailedFuture() {
.build();
ListenableFuture statusFuture =
- policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
+ policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
assertThrows(ExecutionException.class, statusFuture::get);
}
@@ -194,7 +207,7 @@ public void testPerServiceAsync_cancelledFuture_propagatesStatus() {
.build();
ListenableFuture statusFuture =
- policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
+ policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
assertThrows(CancellationException.class, statusFuture::get);
}
@@ -231,7 +244,7 @@ public void testPerServiceAsync_interrupted_cancelledFuture() {
}))
.build();
ListenableFuture statusFuture =
- policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
+ policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
assertThrows(InterruptedException.class, statusFuture::get);
listeningExecutorService.shutdownNow();
@@ -337,10 +350,10 @@ public void testPerServiceNoDefaultAsync() throws Exception {
* Shortcut for invoking {@link ServerSecurityPolicy#checkAuthorizationForServiceAsync} without
* dealing with concurrency details. Returns a {link @Status.Code} for convenience.
*/
- private static Status.Code checkAuthorizationForServiceAsync(
+ private Status.Code checkAuthorizationForServiceAsync(
ServerSecurityPolicy policy, int callerUid, String service) throws ExecutionException {
ListenableFuture statusFuture =
- policy.checkAuthorizationForServiceAsync(callerUid, service);
+ policy.checkAuthorizationForServiceAsync(callerUid, service, executor);
return Uninterruptibles.getUninterruptibly(statusFuture).getCode();
}