Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerImplBuilder;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/** Builder for a server that services requests from an Android Service. */
Expand Down Expand Up @@ -141,6 +142,21 @@ public BinderServerBuilder useTransportSecurity(File certChain, File privateKey)
throw new UnsupportedOperationException("TLS not supported in BinderServer");
}

/**
* Provides a custom {@link Executor} for offloading blocking tasks.
*
* <p>Optional. If no custom Executor is provided, defaults to the main server executor
* (configured via {@link #executor(Executor)}). If that is also unset, a default
* shared channel executor will be used.
*
* @return this
*/
public BinderServerBuilder offloadExecutor(Executor executor) {
internalBuilder.setOffloadExecutorPool(
new FixedObjectPool<>(checkNotNull(executor, "executor")));
return this;
}

/**
* Builds a {@link Server} according to this builder's parameters and stores its listening {@link
* IBinder} in the {@link IBinderReceiver} passed to {@link #forAddress(AndroidComponentAddress,
Expand Down
17 changes: 8 additions & 9 deletions binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.Map;

import java.util.concurrent.Executor;

/**
* A security policy for a gRPC server.
*
Expand Down Expand Up @@ -61,27 +63,24 @@ public Status checkAuthorizationForService(int uid, String serviceName) {
/**
* Returns whether the given Android UID is authorized to access a particular service.
*
* <p>This method never throws an exception. If the execution of the security policy check fails,
* a failed future with such exception is returned.
* <p>This method does not block and never throws an exception. If the execution of the security
* policy check fails, a failed future with such exception is returned.
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
* @param offloadExecutor Where to submit synchronous security policy checks.
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
@CheckReturnValue
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName) {
ListenableFuture<Status> checkAuthorizationForServiceAsync(
int uid, String serviceName, Executor offloadExecutor) {
SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy);
if (securityPolicy instanceof AsyncSecurityPolicy) {
return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid);
}

try {
Status status = securityPolicy.checkAuthorization(uid);
return Futures.immediateFuture(status);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
return Futures.submit(() -> securityPolicy.checkAuthorization(uid), offloadExecutor);
}

public static Builder newBuilder() {
Expand Down
20 changes: 19 additions & 1 deletion binder/src/main/java/io/grpc/binder/internal/BinderServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.

private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> offloadExecutorPool;
private final ImmutableList<ServerStreamTracer.Factory> streamTracerFactories;
private final AndroidComponentAddress listenAddress;
private final LeakSafeOneWayBinder hostServiceBinder;
Expand All @@ -82,12 +83,19 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
@GuardedBy("this")
private Executor executor;

@Nullable // Before start() and after termination.
@GuardedBy("this")
private Executor offloadExecutor;

@GuardedBy("this")
private boolean shutdown;

private BinderServer(Builder builder) {
this.listenAddress = checkNotNull(builder.listenAddress);
this.executorPool = checkNotNull(builder.executorPool);
this.offloadExecutorPool = builder.offloadExecutorPool != null
? builder.offloadExecutorPool
: this.executorPool;
this.executorServicePool = builder.executorServicePool;
this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
Expand All @@ -107,6 +115,7 @@ public synchronized void start(ServerListener serverListener) throws IOException
listener = new ActiveTransportTracker(serverListener, this::onTerminated);
executorService = executorServicePool.getObject();
executor = executorPool.getObject();
offloadExecutor = offloadExecutorPool.getObject();
}

@Override
Expand Down Expand Up @@ -144,6 +153,7 @@ public synchronized void shutdown() {

private synchronized void onTerminated() {
executor = executorPool.returnObject(executor);
offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor);
}

@Override
Expand Down Expand Up @@ -178,7 +188,8 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) {
attrsBuilder,
callingUid,
serverPolicyChecker,
checkNotNull(executor, "Not started?"));
checkNotNull(executor, "Not started?"),
checkNotNull(offloadExecutor, "Not started?"));
// Create a new transport and let our listener know about it.
BinderServerTransport transport =
BinderServerTransport.create(
Expand Down Expand Up @@ -222,6 +233,7 @@ public static class Builder {
@Nullable AndroidComponentAddress listenAddress;
@Nullable List<? extends ServerStreamTracer.Factory> streamTracerFactories;
@Nullable ObjectPool<? extends Executor> executorPool;
@Nullable ObjectPool<? extends Executor> offloadExecutorPool;

ObjectPool<ScheduledExecutorService> executorServicePool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
Expand Down Expand Up @@ -267,6 +279,12 @@ public Builder setExecutorPool(ObjectPool<? extends Executor> executorPool) {
return this;
}

/** Sets the executor pool for offloading tasks. */
public Builder setOffloadExecutorPool(ObjectPool<? extends Executor> offloadExecutorPool) {
this.offloadExecutorPool = checkNotNull(offloadExecutorPool, "offloadExecutorPool");
return this;
}

/**
* Sets the executor to be used for scheduling channel timers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,20 @@ public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
* @param remoteUid The remote UID of the transport.
* @param serverPolicyChecker The policy checker for this transport.
* @param executor used for calling into the application. Must outlive the transport.
* @param offloadExecutor used for potentially blocking work. Must outlive the transport.
*/
@Internal
public static void attachAuthAttrs(
Attributes.Builder builder,
int remoteUid,
ServerPolicyChecker serverPolicyChecker,
Executor executor) {
Executor executor,
Executor offloadExecutor) {
builder
.set(
TRANSPORT_AUTHORIZATION_STATE,
new TransportAuthorizationState(remoteUid, serverPolicyChecker, executor))
new TransportAuthorizationState(
remoteUid, serverPolicyChecker, executor, offloadExecutor))
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY);
}

Expand All @@ -97,9 +100,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ListenableFuture<Status> authStatusFuture =
transportAuthState.checkAuthorization(call.getMethodDescriptor());

// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code if the authorization result is already present.
// Short-circuit in the common case where the checkAuthorization() result is cached and
// completed. This avoids unnecessary allocations and asynchronous code.
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(
authStatusFuture, transportAuthState.executor, call, headers, next);
Expand Down Expand Up @@ -166,15 +168,21 @@ private static final class TransportAuthorizationState {
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;
private final Executor executor;
private final Executor offloadExecutor;

/**
* @param executor used for calling into the application. Must outlive the transport.
* @param offloadExecutor used for offloading synchronous security policy checks.
*/
TransportAuthorizationState(
int uid, ServerPolicyChecker serverPolicyChecker, Executor executor) {
int uid,
ServerPolicyChecker serverPolicyChecker,
Executor executor,
Executor offloadExecutor) {
this.uid = uid;
this.serverPolicyChecker = serverPolicyChecker;
this.executor = executor;
this.offloadExecutor = offloadExecutor;
serviceAuthorization = new ConcurrentHashMap<>(8);
}

Expand Down Expand Up @@ -202,7 +210,7 @@ ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName, offloadExecutor);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
Futures.addCallback(
Expand Down Expand Up @@ -240,9 +248,12 @@ public interface ServerPolicyChecker {
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
* @param offloadExecutor Where to run potentially blocking auth checks in case the
* SecurityPolicy in question is not async
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName);
ListenableFuture<Status> checkAuthorizationForServiceAsync(
int uid, String serviceName, Executor offloadExecutor);
}
}
23 changes: 18 additions & 5 deletions binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
Expand All @@ -50,6 +53,16 @@ public final class ServerSecurityPolicyTest {

ServerSecurityPolicy policy;

private final ExecutorService executor = Executors.newSingleThreadExecutor();

@After
public void tearDown() throws Exception {
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
throw new AssertionError("executor failed to terminate promptly");
}
}

@Test
public void testDefaultInternalOnly() throws Exception {
policy = new ServerSecurityPolicy();
Expand Down Expand Up @@ -181,7 +194,7 @@ public void testPerService_failedSecurityPolicyFuture_returnsAFailedFuture() {
.build();

ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);

assertThrows(ExecutionException.class, statusFuture::get);
}
Expand All @@ -194,7 +207,7 @@ public void testPerServiceAsync_cancelledFuture_propagatesStatus() {
.build();

ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);

assertThrows(CancellationException.class, statusFuture::get);
}
Expand Down Expand Up @@ -231,7 +244,7 @@ public void testPerServiceAsync_interrupted_cancelledFuture() {
}))
.build();
ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);

assertThrows(InterruptedException.class, statusFuture::get);
listeningExecutorService.shutdownNow();
Expand Down Expand Up @@ -337,10 +350,10 @@ public void testPerServiceNoDefaultAsync() throws Exception {
* Shortcut for invoking {@link ServerSecurityPolicy#checkAuthorizationForServiceAsync} without
* dealing with concurrency details. Returns a {link @Status.Code} for convenience.
*/
private static Status.Code checkAuthorizationForServiceAsync(
private Status.Code checkAuthorizationForServiceAsync(
ServerSecurityPolicy policy, int callerUid, String service) throws ExecutionException {
ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(callerUid, service);
policy.checkAuthorizationForServiceAsync(callerUid, service, executor);
return Uninterruptibles.getUninterruptibly(statusFuture).getCode();
}

Expand Down