diff --git a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java index 5f0885883a5..d80316622ed 100644 --- a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java +++ b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java @@ -31,6 +31,7 @@ import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ServerImplBuilder; import java.io.File; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; /** Builder for a server that services requests from an Android Service. */ @@ -141,6 +142,21 @@ public BinderServerBuilder useTransportSecurity(File certChain, File privateKey) throw new UnsupportedOperationException("TLS not supported in BinderServer"); } + /** + * Provides a custom {@link Executor} for offloading blocking tasks. + * + *

Optional. If no custom Executor is provided, defaults to the main server executor + * (configured via {@link #executor(Executor)}). If that is also unset, a default + * shared channel executor will be used. + * + * @return this + */ + public BinderServerBuilder offloadExecutor(Executor executor) { + internalBuilder.setOffloadExecutorPool( + new FixedObjectPool<>(checkNotNull(executor, "executor"))); + return this; + } + /** * Builds a {@link Server} according to this builder's parameters and stores its listening {@link * IBinder} in the {@link IBinderReceiver} passed to {@link #forAddress(AndroidComponentAddress, diff --git a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java index 4786a5e6cc4..82898299baa 100644 --- a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java +++ b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; + /** * A security policy for a gRPC server. * @@ -61,27 +63,24 @@ public Status checkAuthorizationForService(int uid, String serviceName) { /** * Returns whether the given Android UID is authorized to access a particular service. * - *

This method never throws an exception. If the execution of the security policy check fails, - * a failed future with such exception is returned. + *

This method does not block and never throws an exception. If the execution of the security + * policy check fails, a failed future with such exception is returned. * * @param uid The Android UID to authenticate. * @param serviceName The name of the gRPC service being called. + * @param offloadExecutor Where to submit synchronous security policy checks. * @return a future with the result of the authorization check. A failed future represents a * failure to perform the authorization check, not that the access is denied. */ @CheckReturnValue - ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName) { + ListenableFuture checkAuthorizationForServiceAsync( + int uid, String serviceName, Executor offloadExecutor) { SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy); if (securityPolicy instanceof AsyncSecurityPolicy) { return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid); } - try { - Status status = securityPolicy.checkAuthorization(uid); - return Futures.immediateFuture(status); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } + return Futures.submit(() -> securityPolicy.checkAuthorization(uid), offloadExecutor); } public static Builder newBuilder() { diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java index f913775fcbe..40b0ad2865f 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java @@ -65,6 +65,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder. private final ObjectPool executorServicePool; private final ObjectPool executorPool; + private final ObjectPool offloadExecutorPool; private final ImmutableList streamTracerFactories; private final AndroidComponentAddress listenAddress; private final LeakSafeOneWayBinder hostServiceBinder; @@ -82,12 +83,19 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder. @GuardedBy("this") private Executor executor; + @Nullable // Before start() and after termination. + @GuardedBy("this") + private Executor offloadExecutor; + @GuardedBy("this") private boolean shutdown; private BinderServer(Builder builder) { this.listenAddress = checkNotNull(builder.listenAddress); this.executorPool = checkNotNull(builder.executorPool); + this.offloadExecutorPool = builder.offloadExecutorPool != null + ? builder.offloadExecutorPool + : this.executorPool; this.executorServicePool = builder.executorServicePool; this.streamTracerFactories = ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories")); @@ -107,6 +115,7 @@ public synchronized void start(ServerListener serverListener) throws IOException listener = new ActiveTransportTracker(serverListener, this::onTerminated); executorService = executorServicePool.getObject(); executor = executorPool.getObject(); + offloadExecutor = offloadExecutorPool.getObject(); } @Override @@ -144,6 +153,7 @@ public synchronized void shutdown() { private synchronized void onTerminated() { executor = executorPool.returnObject(executor); + offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor); } @Override @@ -178,7 +188,8 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) { attrsBuilder, callingUid, serverPolicyChecker, - checkNotNull(executor, "Not started?")); + checkNotNull(executor, "Not started?"), + checkNotNull(offloadExecutor, "Not started?")); // Create a new transport and let our listener know about it. BinderServerTransport transport = BinderServerTransport.create( @@ -222,6 +233,7 @@ public static class Builder { @Nullable AndroidComponentAddress listenAddress; @Nullable List streamTracerFactories; @Nullable ObjectPool executorPool; + @Nullable ObjectPool offloadExecutorPool; ObjectPool executorServicePool = SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); @@ -267,6 +279,12 @@ public Builder setExecutorPool(ObjectPool executorPool) { return this; } + /** Sets the executor pool for offloading tasks. */ + public Builder setOffloadExecutorPool(ObjectPool offloadExecutorPool) { + this.offloadExecutorPool = checkNotNull(offloadExecutorPool, "offloadExecutorPool"); + return this; + } + /** * Sets the executor to be used for scheduling channel timers. * diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java index 6f95ef8a83c..0812a9dce00 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java @@ -69,17 +69,20 @@ public static void installAuthInterceptor(ServerBuilder serverBuilder) { * @param remoteUid The remote UID of the transport. * @param serverPolicyChecker The policy checker for this transport. * @param executor used for calling into the application. Must outlive the transport. + * @param offloadExecutor used for potentially blocking work. Must outlive the transport. */ @Internal public static void attachAuthAttrs( Attributes.Builder builder, int remoteUid, ServerPolicyChecker serverPolicyChecker, - Executor executor) { + Executor executor, + Executor offloadExecutor) { builder .set( TRANSPORT_AUTHORIZATION_STATE, - new TransportAuthorizationState(remoteUid, serverPolicyChecker, executor)) + new TransportAuthorizationState( + remoteUid, serverPolicyChecker, executor, offloadExecutor)) .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY); } @@ -97,9 +100,8 @@ public ServerCall.Listener interceptCall( ListenableFuture authStatusFuture = transportAuthState.checkAuthorization(call.getMethodDescriptor()); - // Most SecurityPolicy will have synchronous implementations that provide an - // immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations - // and asynchronous code if the authorization result is already present. + // Short-circuit in the common case where the checkAuthorization() result is cached and + // completed. This avoids unnecessary allocations and asynchronous code. if (!authStatusFuture.isDone()) { return newServerCallListenerForPendingAuthResult( authStatusFuture, transportAuthState.executor, call, headers, next); @@ -166,15 +168,21 @@ private static final class TransportAuthorizationState { private final ServerPolicyChecker serverPolicyChecker; private final ConcurrentHashMap> serviceAuthorization; private final Executor executor; + private final Executor offloadExecutor; /** * @param executor used for calling into the application. Must outlive the transport. + * @param offloadExecutor used for offloading synchronous security policy checks. */ TransportAuthorizationState( - int uid, ServerPolicyChecker serverPolicyChecker, Executor executor) { + int uid, + ServerPolicyChecker serverPolicyChecker, + Executor executor, + Executor offloadExecutor) { this.uid = uid; this.serverPolicyChecker = serverPolicyChecker; this.executor = executor; + this.offloadExecutor = offloadExecutor; serviceAuthorization = new ConcurrentHashMap<>(8); } @@ -202,7 +210,7 @@ ListenableFuture checkAuthorization(MethodDescriptor method) { // TODO(10669): evaluate if there should be at most a single pending authorization check per // (uid, serviceName) pair at any given time. ListenableFuture authorization = - serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName); + serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName, offloadExecutor); if (useCache) { serviceAuthorization.putIfAbsent(serviceName, authorization); Futures.addCallback( @@ -240,9 +248,12 @@ public interface ServerPolicyChecker { * * @param uid The Android UID to authenticate. * @param serviceName The name of the gRPC service being called. + * @param offloadExecutor Where to run potentially blocking auth checks in case the + * SecurityPolicy in question is not async * @return a future with the result of the authorization check. A failed future represents a * failure to perform the authorization check, not that the access is denied. */ - ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName); + ListenableFuture checkAuthorizationForServiceAsync( + int uid, String serviceName, Executor offloadExecutor); } } diff --git a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java index eedc3f590cd..a9ccee16099 100644 --- a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java +++ b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java @@ -33,7 +33,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.robolectric.RobolectricTestRunner; @@ -50,6 +53,16 @@ public final class ServerSecurityPolicyTest { ServerSecurityPolicy policy; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @After + public void tearDown() throws Exception { + executor.shutdownNow(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + throw new AssertionError("executor failed to terminate promptly"); + } + } + @Test public void testDefaultInternalOnly() throws Exception { policy = new ServerSecurityPolicy(); @@ -181,7 +194,7 @@ public void testPerService_failedSecurityPolicyFuture_returnsAFailedFuture() { .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); assertThrows(ExecutionException.class, statusFuture::get); } @@ -194,7 +207,7 @@ public void testPerServiceAsync_cancelledFuture_propagatesStatus() { .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); assertThrows(CancellationException.class, statusFuture::get); } @@ -231,7 +244,7 @@ public void testPerServiceAsync_interrupted_cancelledFuture() { })) .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); assertThrows(InterruptedException.class, statusFuture::get); listeningExecutorService.shutdownNow(); @@ -337,10 +350,10 @@ public void testPerServiceNoDefaultAsync() throws Exception { * Shortcut for invoking {@link ServerSecurityPolicy#checkAuthorizationForServiceAsync} without * dealing with concurrency details. Returns a {link @Status.Code} for convenience. */ - private static Status.Code checkAuthorizationForServiceAsync( + private Status.Code checkAuthorizationForServiceAsync( ServerSecurityPolicy policy, int callerUid, String service) throws ExecutionException { ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(callerUid, service); + policy.checkAuthorizationForServiceAsync(callerUid, service, executor); return Uninterruptibles.getUninterruptibly(statusFuture).getCode(); }