diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index a9323fbdc25f..aabb9899269e 100644 --- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -16,8 +16,6 @@ package com.google.cloud.executor.spanner; -import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; - import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; @@ -55,6 +53,7 @@ import com.google.cloud.spanner.Mutation.WriteBuilder; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Partition; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; @@ -156,6 +155,8 @@ import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction; import com.google.spanner.executor.v1.UpdateCloudInstanceAction; import com.google.spanner.v1.StructType; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; +import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; import com.google.spanner.v1.TypeAnnotationCode; import com.google.spanner.v1.TypeCode; import io.grpc.Status; @@ -174,6 +175,7 @@ import java.time.Duration; import java.time.LocalDate; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -239,53 +241,79 @@ public static String unexpectedExceptionResponse(Exception e) { * *

Here's a typical workflow for how a read-write transaction works. * - *

When we call startRWTransaction, a transaction runner will be started in another thread with - * a callable that stores the passed TransactionContext into the ReadWriteTransaction and blocks. - * This TransactionContext is used to run the read/write actions. To execute the finish action, we - * store the FinishMode in the ReadWriteTransaction object, which unblocks the thread in the - * callable and causes the callable to either return (to commit) or throw an exception (to abort). - * If the underlying Spanner transaction aborted, the transaction runner will invoke the callable - * again. + *

When we call startRWTransaction, a transaction runner will be started in another thread + * with a callable that stores the passed TransactionContext into the ReadWriteTransaction and + * blocks. This TransactionContext is used to run the read/write actions. To execute the finish + * action, we store the FinishMode in the ReadWriteTransaction object, which unblocks the thread + * in the callable and causes the callable to either return (to commit) or throw an exception (to + * abort). If the underlying Spanner transaction aborted, the transaction runner will invoke the + * callable again. */ private static class ReadWriteTransaction { + private final DatabaseClient dbClient; private TransactionRunner runner; private TransactionContext txnContext; private com.google.protobuf.Timestamp timestamp; private Mode finishMode; + private SpannerException abortedException; private SpannerException error; private final String transactionSeed; private final boolean optimistic; + private final boolean repeatableRead; // Set to true when the transaction runner completed, one of these three could happen: runner // committed, abandoned or threw an error. private boolean runnerCompleted; public ReadWriteTransaction( - DatabaseClient dbClient, String transactionSeed, boolean optimistic) { + DatabaseClient dbClient, + String transactionSeed, + boolean optimistic, + boolean repeatableRead) { this.dbClient = dbClient; this.transactionSeed = transactionSeed; this.optimistic = optimistic; + this.repeatableRead = repeatableRead; this.runnerCompleted = false; } - /** Set context to be used for executing actions. */ + /** + * Set context to be used for executing actions. + */ private synchronized void setContext(TransactionContext transaction) { finishMode = null; + abortedException = null; txnContext = transaction; Preconditions.checkNotNull(txnContext); LOGGER.log(Level.INFO, "Transaction callable created, setting context %s\n", transactionSeed); notifyAll(); } - /** Wait for finishAction to be executed and return the requested finish mode. */ - private synchronized Mode waitForFinishAction() throws Exception { - while (finishMode == null) { + private synchronized void setAborted(SpannerException abortedException) { + LOGGER.log(Level.INFO, "Got aborted exception %s\n", abortedException.toString()); + this.abortedException = abortedException; + notifyAll(); + } + + /** + * Wait for finishAction to be executed and return the requested finish mode. + */ + private synchronized Mode waitForFinishActionOrAbort() throws Exception { + while (finishMode == null && abortedException == null) { wait(); } + // If a read aborted, throw the exception to the TransactionRunner callable to + // restart the transaction. + if (abortedException != null) { + LOGGER.log(Level.INFO, "Throw aborted exception %s\n", abortedException.toString()); + throw abortedException; + } return finishMode; } - /** Wait for transactionContext to be set. */ + /** + * Wait for transactionContext to be set. + */ private synchronized void waitForTransactionContext() throws Exception { while (txnContext == null && error == null) { wait(); @@ -295,14 +323,18 @@ private synchronized void waitForTransactionContext() throws Exception { } } - /** Transaction successfully committed with a timestamp. */ + /** + * Transaction successfully committed with a timestamp. + */ private synchronized void transactionSucceeded(com.google.protobuf.Timestamp timestamp) { this.timestamp = timestamp; this.runnerCompleted = true; notifyAll(); } - /** Transaction failed to commit, maybe abandoned or other errors occurred. */ + /** + * Transaction failed to commit, maybe abandoned or other errors occurred. + */ private synchronized void transactionFailed(SpannerException e) { // Handle abandon case if (e.getErrorCode() == ErrorCode.UNKNOWN && e.getMessage().contains(TRANSACTION_ABANDONED)) { @@ -315,14 +347,33 @@ private synchronized void transactionFailed(SpannerException e) { notifyAll(); } - /** Return the commit timestamp. */ + /** + * Return the commit timestamp. + */ public synchronized com.google.protobuf.Timestamp getTimestamp() { return timestamp; } - /** Return the transactionContext to run actions. Must be called after start action. */ + /** + * Return the transactionContext to run actions, waiting until it is set. + */ public synchronized TransactionContext getContext() { - Preconditions.checkState(txnContext != null); + while (txnContext == null || abortedException != null) { + // If the transaction was aborted by a read action, the abortedException will + // be thrown to the TransactionRunner callable to restart the transaction. + // The restarted callable will call setContext() to set the new transaction context + // and clear abortedException. + if (abortedException != null) { + LOGGER.log(Level.INFO, "Waiting for new RW transaction context after abort\n"); + } else { + LOGGER.log(Level.INFO, "Waiting for RW transaction context."); + } + try { + wait(); + } catch (InterruptedException e) { + LOGGER.log(Level.INFO, "Interrupted while waiting for RW transaction context."); + } + } return txnContext; } @@ -339,7 +390,7 @@ public void startRWTransaction() throws Exception { String.format( "Transaction context set, executing and waiting for finish %s\n", transactionSeed)); - Mode mode = waitForFinishAction(); + Mode mode = waitForFinishActionOrAbort(); if (mode == Mode.ABANDON) { throw new Exception(TRANSACTION_ABANDONED); } @@ -351,10 +402,21 @@ public void startRWTransaction() throws Exception { context.wrap( () -> { try { + List transactionOptions = new ArrayList<>(); + if (repeatableRead) { + transactionOptions.add(Options.isolationLevel(IsolationLevel.REPEATABLE_READ)); + } else { + transactionOptions.add(Options.isolationLevel(IsolationLevel.SERIALIZABLE)); + } + if (optimistic) { + transactionOptions.add(Options.readLockMode(ReadLockMode.OPTIMISTIC)); + } else { + transactionOptions.add(Options.readLockMode(ReadLockMode.PESSIMISTIC)); + } runner = - optimistic - ? dbClient.readWriteTransaction(Options.optimisticLock()) - : dbClient.readWriteTransaction(); + dbClient.readWriteTransaction( + transactionOptions.toArray( + new TransactionOption[transactionOptions.size()])); LOGGER.log( Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); runner.run(callable); @@ -397,7 +459,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception { "TxnContext cleared, sending finishMode to finish transaction %s\n", transactionSeed)); notifyAll(); - // Wait for the transaction to finish or restart + // Wait for the transaction to finish or restart due to an abort on COMMIT. while (txnContext == null && !runnerCompleted) { wait(); } @@ -434,6 +496,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception { * initialized. */ class ExecutionFlowContext { + // Database path from previous action private String prevDbPath; // Current read-write transaction @@ -448,9 +511,6 @@ class ExecutionFlowContext { private Metadata metadata; // Number of pending read/query actions. private int numPendingReads; - // Indicate whether there's a read/query action got aborted and the transaction need to be - // reset. - private boolean readAborted; // Log the workid and op pair for tracing the thread. private String transactionSeed; // Outgoing stream. @@ -460,17 +520,23 @@ public ExecutionFlowContext(StreamObserver responseO this.responseObserver = responseObserver; } - /** Call the underlying stream to send response. */ + /** + * Call the underlying stream to send response. + */ public synchronized void onNext(SpannerAsyncActionResponse response) { responseObserver.onNext(response); } - /** Call the underlying stream to send error. */ + /** + * Call the underlying stream to send error. + */ public synchronized void onError(Throwable t) { responseObserver.onError(t); } - /** Return current transaction that can used for performing read/query actions. */ + /** + * Return current transaction that can used for performing read/query actions. + */ public synchronized ReadContext getTransactionForRead() throws SpannerException { if (roTxn != null) { return roTxn; @@ -486,7 +552,9 @@ public synchronized ReadContext getTransactionForRead() throws SpannerException ErrorCode.INVALID_ARGUMENT, "No active transaction"); } - /** Return current transaction that can used for performing mutation/update actions. */ + /** + * Return current transaction that can used for performing mutation/update actions. + */ public synchronized TransactionContext getTransactionForWrite() throws SpannerException { if (rwTxn == null) { throw SpannerExceptionFactory.newSpannerException( @@ -495,7 +563,9 @@ public synchronized TransactionContext getTransactionForWrite() throws SpannerEx return rwTxn.getContext(); } - /** Return current batch transaction if it exists. */ + /** + * Return current batch transaction if it exists. + */ public synchronized BatchReadOnlyTransaction getBatchTxn() throws SpannerException { if (batchTxn == null) { throw SpannerExceptionFactory.newSpannerException( @@ -504,31 +574,41 @@ public synchronized BatchReadOnlyTransaction getBatchTxn() throws SpannerExcepti return batchTxn; } - /** Set the transactionSeed string retrieved from startTransactionAction. */ + /** + * Set the transactionSeed string retrieved from startTransactionAction. + */ public synchronized void updateTransactionSeed(String transactionSeed) { if (!transactionSeed.isEmpty()) { this.transactionSeed = transactionSeed; } } - /** Return current workid and op pair for logging. */ + /** + * Return current workid and op pair for logging. + */ public synchronized String getTransactionSeed() { return transactionSeed; } - /** Return current database client. */ + /** + * Return current database client. + */ public DatabaseClient getDbClient() { return dbClient; } - /** Clear the transaction related variables. */ + /** + * Clear the transaction related variables. + */ public synchronized void clear() { rwTxn = null; roTxn = null; metadata = null; } - /** Cleanup all the active transactions if the stubby call is closing. */ + /** + * Cleanup all the active transactions if the stubby call is closing. + */ public synchronized void cleanup() { if (roTxn != null) { LOGGER.log(Level.INFO, "A read only transaction was active when stubby call closed"); @@ -545,7 +625,9 @@ public synchronized void cleanup() { } } - /** Return previous databasePath if given dbPath is empty, then update. */ + /** + * Return previous databasePath if given dbPath is empty, then update. + */ public synchronized String getDatabasePath(String dbPath) { if (dbPath == null || dbPath.isEmpty()) { return prevDbPath; @@ -554,12 +636,16 @@ public synchronized String getDatabasePath(String dbPath) { return dbPath; } - /** Set the metadata for future use. */ + /** + * Set the metadata for future use. + */ public synchronized void setMetadata(Metadata metadata) { this.metadata = metadata; } - /** Start a read-only transaction. */ + /** + * Start a read-only transaction. + */ public synchronized void startReadOnlyTxn( DatabaseClient dbClient, TimestampBound timestampBound, Metadata metadata) { if ((rwTxn != null) || (roTxn != null) || (batchTxn != null)) { @@ -575,7 +661,9 @@ public synchronized void startReadOnlyTxn( } } - /** Start a read-write transaction. */ + /** + * Start a read-write transaction. + */ public synchronized void startReadWriteTxn( DatabaseClient dbClient, Metadata metadata, TransactionExecutionOptions options) throws Exception { @@ -588,7 +676,16 @@ public synchronized void startReadWriteTxn( String.format( "There's no active transaction, safe to create rwTxn: %s\n", getTransactionSeed())); this.metadata = metadata; - rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, options.getOptimistic()); + boolean optimistic = + options.getSerializableOptimistic() || options.getSnapshotIsolationOptimistic(); + boolean repeatableRead = + options.getSnapshotIsolationOptimistic() || options.getSnapshotIsolationPessimistic(); + rwTxn = + new ReadWriteTransaction( + dbClient, + transactionSeed, + optimistic, + repeatableRead); LOGGER.log( Level.INFO, String.format( @@ -596,7 +693,9 @@ public synchronized void startReadWriteTxn( rwTxn.startRWTransaction(); } - /** Start a batch transaction. */ + /** + * Start a batch transaction. + */ public synchronized Status startBatchTxn( StartBatchTransactionAction action, BatchClient batchClient, OutcomeSender sender) { try { @@ -635,7 +734,9 @@ public synchronized Status startBatchTxn( } } - /** Increase the read count when a read/query is issued. */ + /** + * Increase the read count when a read/query is issued. + */ public synchronized void startRead() { ++numPendingReads; } @@ -644,48 +745,57 @@ public synchronized void startRead() { * Decrease the read count when a read/query is finished, if status is aborted and there's no * pending read/query, reset the transaction for retry. */ - public synchronized void finishRead(Status status) { + public synchronized void finishRead(Status status, SpannerException e) { if (status.getCode() == Status.ABORTED.getCode()) { - readAborted = true; + if (rwTxn != null) { + rwTxn.setAborted(e); + } } --numPendingReads; - if (readAborted && numPendingReads <= 0) { - LOGGER.log(Level.FINE, "Transaction reset due to read/query abort"); - readAborted = false; - } } - /** Initialize the read count and aborted status when transaction started. */ + /** + * Initialize the read count and aborted status when transaction started. + */ public synchronized void initReadState() { - readAborted = false; numPendingReads = 0; } - /** Store the reference to the database client for future action use. */ + /** + * Store the reference to the database client for future action use. + */ public void setDatabaseClient(DatabaseClient client) { dbClient = client; } - /** Return a list of key column types of the given table. */ + /** + * Return a list of key column types of the given table. + */ public List getKeyColumnTypes(String tableName) throws SpannerException { Preconditions.checkNotNull(metadata); return metadata.getKeyColumnTypes(tableName); } - /** Return column type of the given table and column. */ + /** + * Return column type of the given table and column. + */ public com.google.spanner.v1.Type getColumnType(String tableName, String columnName) throws SpannerException { Preconditions.checkNotNull(metadata); return metadata.getColumnType(tableName, columnName); } - /** Buffer a list of mutations in a read-write transaction. */ + /** + * Buffer a list of mutations in a read-write transaction. + */ public synchronized void bufferMutations(List mutations) throws SpannerException { getTransactionForWrite().buffer(mutations); } - /** Execute a batch of updates in a read-write transaction. */ + /** + * Execute a batch of updates in a read-write transaction. + */ public synchronized long[] executeBatchDml(@Nonnull List stmts) throws SpannerException { for (int i = 0; i < stmts.size(); i++) { @@ -696,7 +806,9 @@ public synchronized long[] executeBatchDml(@Nonnull List stmts) .batchUpdate(stmts, Options.tag("batch-update-transaction-tag")); } - /** Finish active transaction in given finishMode, then send outcome back to client. */ + /** + * Finish active transaction in given finishMode, then send outcome back to client. + */ public synchronized Status finish(Mode finishMode, OutcomeSender sender) { if (numPendingReads > 0) { return sender.finishWithError( @@ -724,6 +836,12 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) { if (rwTxn.getTimestamp() != null) { outcomeBuilder.setCommitTime(rwTxn.getTimestamp()); } + if (finishMode == Mode.COMMIT + && rwTxn.runner.getCommitResponse().getSnapshotTimestamp() != null) { + outcomeBuilder.setSnapshotIsolationTxnReadTimestamp( + Timestamps.toMicros( + rwTxn.runner.getCommitResponse().getSnapshotTimestamp().toProto())); + } clear(); } } @@ -750,7 +868,9 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) { } } - /** Close active batch transaction. */ + /** + * Close active batch transaction. + */ public synchronized void closeBatchTxn() throws SpannerException { if (batchTxn == null) { throw SpannerExceptionFactory.newSpannerException( @@ -761,7 +881,7 @@ public synchronized void closeBatchTxn() throws SpannerException { } private Spanner client; - private Spanner clientWithTimeout; + private Map clientWithTimeoutMap = new HashMap<>(); private static final String TRANSACTION_ABANDONED = "Fake error to abandon transaction"; @@ -782,23 +902,25 @@ public synchronized void closeBatchTxn() throws SpannerException { private synchronized Spanner getClientWithTimeout( long timeoutSeconds, boolean useMultiplexedSession) throws IOException { - if (clientWithTimeout != null) { - return clientWithTimeout; + if (clientWithTimeoutMap.containsKey(timeoutSeconds)) { + return clientWithTimeoutMap.get(timeoutSeconds); } - clientWithTimeout = getClient(timeoutSeconds, useMultiplexedSession); - return clientWithTimeout; + clientWithTimeoutMap.put(timeoutSeconds, + initializeClient(timeoutSeconds, useMultiplexedSession)); + return clientWithTimeoutMap.get(timeoutSeconds); } private synchronized Spanner getClient(boolean useMultiplexedSession) throws IOException { if (client != null) { return client; } - client = getClient(/* timeoutSeconds= */ 0, useMultiplexedSession); + client = initializeClient(/* timeoutSeconds= */ 0, useMultiplexedSession); return client; } - // Return the spanner client, create one if not exists. - private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplexedSession) + // Initializes a newly created spanner client. NEVER CALL THIS METHOD DIRECTLY. + // ALWAYS CALL getClientWithTimeout() or getClient() INSTEAD. + private synchronized Spanner initializeClient(long timeoutSeconds, boolean useMultiplexedSession) throws IOException { // Create a cloud spanner client Credentials credentials; @@ -957,7 +1079,9 @@ public boolean isExportedEndToEndTraceValid(String traceId) { return true; } - /** Handle actions. */ + /** + * Handle actions. + */ public Status startHandlingRequest( SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) { OutcomeSender outcomeSender = new OutcomeSender(req.getActionId(), executionContext); @@ -998,7 +1122,9 @@ public Status startHandlingRequest( return Status.OK; } - /** Execute actions by action case, using OutcomeSender to send status and results back. */ + /** + * Execute actions by action case, using OutcomeSender to send status and results back. + */ private Status executeAction( OutcomeSender outcomeSender, SpannerAction action, @@ -1093,7 +1219,9 @@ private Status executeAction( } } - /** Execute admin actions by action case, using OutcomeSender to send status and results back. */ + /** + * Execute admin actions by action case, using OutcomeSender to send status and results back. + */ private Status executeAdminAction( boolean useMultiplexedSession, AdminAction action, OutcomeSender outcomeSender) { try { @@ -1186,7 +1314,9 @@ private Status executeAdminAction( } } - /** Execute action that creates a cloud instance. */ + /** + * Execute action that creates a cloud instance. + */ private Status executeCreateCloudInstance( boolean useMultiplexedSession, CreateCloudInstanceAction action, OutcomeSender sender) { try { @@ -1228,7 +1358,9 @@ private Status executeCreateCloudInstance( return sender.finishWithOK(); } - /** Execute action that updates a cloud instance. */ + /** + * Execute action that updates a cloud instance. + */ private Status executeUpdateCloudInstance( boolean useMultiplexedSession, UpdateCloudInstanceAction action, OutcomeSender sender) { try { @@ -1276,7 +1408,9 @@ private Status executeUpdateCloudInstance( return sender.finishWithOK(); } - /** Execute action that deletes a cloud instance. */ + /** + * Execute action that deletes a cloud instance. + */ private Status executeDeleteCloudInstance( boolean useMultiplexedSession, DeleteCloudInstanceAction action, OutcomeSender sender) { try { @@ -1298,7 +1432,9 @@ private Status executeDeleteCloudInstance( return sender.finishWithOK(); } - /** Execute action that lists cloud instances. */ + /** + * Execute action that lists cloud instances. + */ private Status executeListCloudInstances( boolean useMultiplexedSession, ListCloudInstancesAction action, OutcomeSender sender) { try { @@ -1345,7 +1481,9 @@ private Status executeListCloudInstances( } } - /** Execute action that lists cloud instance configs. */ + /** + * Execute action that lists cloud instance configs. + */ private Status executeListCloudInstanceConfigs( boolean useMultiplexedSession, ListCloudInstanceConfigsAction action, OutcomeSender sender) { LOGGER.log(Level.INFO, String.format("Listing instance configs:\n%s", action)); @@ -1389,7 +1527,9 @@ private Status executeListCloudInstanceConfigs( } } - /** Execute action that gets a cloud instance config. */ + /** + * Execute action that gets a cloud instance config. + */ private Status executeGetCloudInstanceConfig( boolean useMultiplexedSession, GetCloudInstanceConfigAction action, OutcomeSender sender) { LOGGER.log(Level.INFO, String.format("Getting instance config:\n%s", action)); @@ -1420,7 +1560,9 @@ private Status executeGetCloudInstanceConfig( } } - /** Execute action that retrieves a cloud instance. */ + /** + * Execute action that retrieves a cloud instance. + */ private Status executeGetCloudInstance( boolean useMultiplexedSession, GetCloudInstanceAction action, OutcomeSender sender) { try { @@ -1451,7 +1593,9 @@ private Status executeGetCloudInstance( } } - /** Execute action that creates a user instance config. */ + /** + * Execute action that creates a user instance config. + */ private Status executeCreateUserInstanceConfig( boolean useMultiplexedSession, CreateUserInstanceConfigAction action, OutcomeSender sender) { try { @@ -1482,7 +1626,9 @@ private Status executeCreateUserInstanceConfig( return sender.finishWithOK(); } - /** Execute action that deletes a user instance config. */ + /** + * Execute action that deletes a user instance config. + */ private Status executeDeleteUserInstanceConfig( boolean useMultiplexedSession, DeleteUserInstanceConfigAction action, OutcomeSender sender) { try { @@ -1502,7 +1648,9 @@ private Status executeDeleteUserInstanceConfig( return sender.finishWithOK(); } - /** Execute action that creates a cloud custom encrypted database. */ + /** + * Execute action that creates a cloud custom encrypted database. + */ private Status executeCreateCloudCustomEncryptedDatabase( boolean useMultiplexedSession, CreateCloudDatabaseAction action, OutcomeSender sender) { try { @@ -1531,7 +1679,9 @@ private Status executeCreateCloudCustomEncryptedDatabase( return sender.finishWithOK(); } - /** Execute action that creates a cloud database. */ + /** + * Execute action that creates a cloud database. + */ private Status executeCreateCloudDatabase( boolean useMultiplexedSession, CreateCloudDatabaseAction action, OutcomeSender sender) { if (action.hasEncryptionConfig()) { @@ -1566,7 +1716,9 @@ private Status executeCreateCloudDatabase( return sender.finishWithOK(); } - /** Execute action that updates a cloud database. */ + /** + * Execute action that updates a cloud database. + */ private Status executeUpdateCloudDatabaseDdl( boolean useMultiplexedSession, UpdateCloudDatabaseDdlAction action, OutcomeSender sender) { try { @@ -1600,7 +1752,9 @@ private Status executeUpdateCloudDatabaseDdl( return sender.finishWithOK(); } - /** Execute action that updates a cloud database. */ + /** + * Execute action that updates a cloud database. + */ private Status executeDropCloudDatabase( boolean useMultiplexedSession, DropCloudDatabaseAction action, OutcomeSender sender) { try { @@ -1621,7 +1775,9 @@ private Status executeDropCloudDatabase( return sender.finishWithOK(); } - /** Execute action that creates a cloud database backup. */ + /** + * Execute action that creates a cloud database backup. + */ private Status executeCreateCloudBackup( boolean useMultiplexedSession, CreateCloudBackupAction action, OutcomeSender sender) { try { @@ -1657,7 +1813,9 @@ private Status executeCreateCloudBackup( } } - /** Execute action that copies a cloud database backup. */ + /** + * Execute action that copies a cloud database backup. + */ private Status executeCopyCloudBackup( boolean useMultiplexedSession, CopyCloudBackupAction action, OutcomeSender sender) { try { @@ -1693,7 +1851,9 @@ private Status executeCopyCloudBackup( } } - /** Execute action that gets a cloud database backup. */ + /** + * Execute action that gets a cloud database backup. + */ private Status executeGetCloudBackup( boolean useMultiplexedSession, GetCloudBackupAction action, OutcomeSender sender) { try { @@ -1724,7 +1884,9 @@ private Status executeGetCloudBackup( } } - /** Execute action that updates a cloud database backup. */ + /** + * Execute action that updates a cloud database backup. + */ private Status executeUpdateCloudBackup( boolean useMultiplexedSession, UpdateCloudBackupAction action, OutcomeSender sender) { try { @@ -1758,7 +1920,9 @@ private Status executeUpdateCloudBackup( } } - /** Execute action that deletes a cloud database backup. */ + /** + * Execute action that deletes a cloud database backup. + */ private Status executeDeleteCloudBackup( boolean useMultiplexedSession, DeleteCloudBackupAction action, OutcomeSender sender) { try { @@ -1778,7 +1942,9 @@ private Status executeDeleteCloudBackup( } } - /** Execute action that lists cloud database backups. */ + /** + * Execute action that lists cloud database backups. + */ private Status executeListCloudBackups( boolean useMultiplexedSession, ListCloudBackupsAction action, OutcomeSender sender) { try { @@ -1818,7 +1984,9 @@ private Status executeListCloudBackups( } } - /** Execute action that lists cloud database backup operations. */ + /** + * Execute action that lists cloud database backup operations. + */ private Status executeListCloudBackupOperations( boolean useMultiplexedSession, ListCloudBackupOperationsAction action, OutcomeSender sender) { try { @@ -1855,7 +2023,9 @@ private Status executeListCloudBackupOperations( } } - /** Execute action that list cloud databases. */ + /** + * Execute action that list cloud databases. + */ private Status executeListCloudDatabases( boolean useMultiplexedSession, ListCloudDatabasesAction action, OutcomeSender sender) { try { @@ -1894,7 +2064,9 @@ private Status executeListCloudDatabases( } } - /** Execute action that lists cloud database operations. */ + /** + * Execute action that lists cloud database operations. + */ private Status executeListCloudDatabaseOperations( boolean useMultiplexedSession, ListCloudDatabaseOperationsAction action, @@ -1933,7 +2105,9 @@ private Status executeListCloudDatabaseOperations( } } - /** Execute action that restores a cloud database. */ + /** + * Execute action that restores a cloud database. + */ private Status executeRestoreCloudDatabase( boolean useMultiplexedSession, RestoreCloudDatabaseAction action, OutcomeSender sender) { try { @@ -1967,7 +2141,9 @@ private Status executeRestoreCloudDatabase( } } - /** Execute action that gets a cloud database. */ + /** + * Execute action that gets a cloud database. + */ private Status executeGetCloudDatabase( boolean useMultiplexedSession, GetCloudDatabaseAction action, OutcomeSender sender) { try { @@ -1998,7 +2174,9 @@ private Status executeGetCloudDatabase( } } - /** Execute action that gets an operation. */ + /** + * Execute action that gets an operation. + */ private Status executeGetOperation( boolean useMultiplexedSession, GetOperationAction action, OutcomeSender sender) { try { @@ -2026,7 +2204,9 @@ private Status executeGetOperation( } } - /** Execute action that cancels an operation. */ + /** + * Execute action that cancels an operation. + */ private Status executeCancelOperation( boolean useMultiplexedSession, CancelOperationAction action, OutcomeSender sender) { try { @@ -2045,7 +2225,9 @@ private Status executeCancelOperation( } } - /** Execute action that starts a batch transaction. */ + /** + * Execute action that starts a batch transaction. + */ private Status executeStartBatchTxn( StartBatchTransactionAction action, BatchClient batchClient, @@ -2055,7 +2237,9 @@ private Status executeStartBatchTxn( return executionContext.startBatchTxn(action, batchClient, sender); } - /** Execute action that finishes a batch transaction. */ + /** + * Execute action that finishes a batch transaction. + */ private Status executeCloseBatchTxn( CloseBatchTransactionAction action, OutcomeSender sender, @@ -2071,7 +2255,9 @@ private Status executeCloseBatchTxn( } } - /** Execute action that generates database partitions for the given read. */ + /** + * Execute action that generates database partitions for the given read. + */ private Status executeGenerateDbPartitionsRead( GenerateDbPartitionsForReadAction action, OutcomeSender sender, @@ -2139,7 +2325,9 @@ private Status executeGenerateDbPartitionsRead( } } - /** Execute action that generates database partitions for the given query. */ + /** + * Execute action that generates database partitions for the given query. + */ private Status executeGenerateDbPartitionsQuery( GenerateDbPartitionsForQueryAction action, OutcomeSender sender, @@ -2186,7 +2374,9 @@ private Status executeGenerateDbPartitionsQuery( } } - /** Execute a read or query for the given partitions. */ + /** + * Execute a read or query for the given partitions. + */ private Status executeExecutePartition( boolean useMultiplexedSession, ExecutePartitionAction action, @@ -2219,7 +2409,9 @@ private Status executeExecutePartition( } } - /** Execute a partitioned update which runs different partitions in parallel. */ + /** + * Execute a partitioned update which runs different partitions in parallel. + */ private Status executePartitionedUpdate( PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) { try { @@ -2247,7 +2439,9 @@ private Status executePartitionedUpdate( } } - /** Build a child partition record proto out of childPartitionRecord returned by client. */ + /** + * Build a child partition record proto out of childPartitionRecord returned by client. + */ private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord) throws Exception { ChildPartitionsRecord.Builder childPartitionRecordBuilder = ChildPartitionsRecord.newBuilder(); @@ -2264,7 +2458,9 @@ private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRec return childPartitionRecordBuilder.build(); } - /** Build a data change record proto out of dataChangeRecord returned by client. */ + /** + * Build a data change record proto out of dataChangeRecord returned by client. + */ private DataChangeRecord buildDataChangeRecord(Struct dataChangeRecord) throws Exception { DataChangeRecord.Builder dataChangeRecordBuilder = DataChangeRecord.newBuilder(); dataChangeRecordBuilder.setCommitTime( @@ -2302,7 +2498,9 @@ private DataChangeRecord buildDataChangeRecord(Struct dataChangeRecord) throws E return dataChangeRecordBuilder.build(); } - /** Returns the json or string value of a struct column with index=columnIndex. */ + /** + * Returns the json or string value of a struct column with index=columnIndex. + */ private String getJsonStringForStructColumn(Struct struct, int columnIndex) { Type columnType = struct.getColumnType(columnIndex); switch (columnType.getCode()) { @@ -2319,7 +2517,9 @@ private String getJsonStringForStructColumn(Struct struct, int columnIndex) { } } - /** Build a heartbeat record proto out of heartbeatRecord returned by client. */ + /** + * Build a heartbeat record proto out of heartbeatRecord returned by client. + */ private HeartbeatRecord buildHeartbeatRecord(Struct heartbeatRecord) throws Exception { HeartbeatRecord.Builder heartbeatRecordBuilder = HeartbeatRecord.newBuilder(); heartbeatRecordBuilder.setHeartbeatTime( @@ -2327,7 +2527,9 @@ private HeartbeatRecord buildHeartbeatRecord(Struct heartbeatRecord) throws Exce return heartbeatRecordBuilder.build(); } - /** Execute action that executes a change stream query. */ + /** + * Execute action that executes a change stream query. + */ private Status executeExecuteChangeStreamQuery( String dbPath, boolean useMultiplexedSession, @@ -2514,7 +2716,9 @@ private Status executeFinishTxn( return executionContext.finish(action.getMode(), sender); } - /** Execute mutation action request and buffer the mutations. */ + /** + * Execute mutation action request and buffer the mutations. + */ private Status executeMutation( MutationAction action, OutcomeSender sender, @@ -2594,7 +2798,9 @@ private Status executeMutation( } } - /** Build a Mutation by using the given WriteBuilder to set the columns for the action. */ + /** + * Build a Mutation by using the given WriteBuilder to set the columns for the action. + */ private Mutation buildWrite( List columnList, List valueList, WriteBuilder write) { Preconditions.checkState(columnList.size() == valueList.size()); @@ -2604,7 +2810,9 @@ private Mutation buildWrite( return write.build(); } - /** Execute a read action request, store the results in the OutcomeSender. */ + /** + * Execute a read action request, store the results in the OutcomeSender. + */ private Status executeRead( boolean useMultiplexedSession, ReadAction action, @@ -2654,7 +2862,9 @@ private Status executeRead( } } - /** Execute a query action request, store the results in the OutcomeSender. */ + /** + * Execute a query action request, store the results in the OutcomeSender. + */ private Status executeQuery( boolean useMultiplexedSession, QueryAction action, @@ -2691,7 +2901,9 @@ private Status executeQuery( } } - /** Execute a dml update action request, store the results in the OutcomeSender. */ + /** + * Execute a dml update action request, store the results in the OutcomeSender. + */ private Status executeCloudDmlUpdate( boolean useMultiplexedSession, DmlAction action, @@ -2724,7 +2936,9 @@ private Status executeCloudDmlUpdate( } } - /** Execute a BatchDml update action request, store the results in the OutcomeSender. */ + /** + * Execute a BatchDml update action request, store the results in the OutcomeSender. + */ private Status executeCloudBatchDmlUpdates( BatchDmlAction action, OutcomeSender sender, ExecutionFlowContext executionContext) { try { @@ -2762,7 +2976,9 @@ private Status executeCloudBatchDmlUpdates( } } - /** Process a ResultSet from a read/query and store the results in the OutcomeSender. */ + /** + * Process a ResultSet from a read/query and store the results in the OutcomeSender. + */ private Status processResults( boolean useMultiplexedSession, ResultSet results, @@ -2772,7 +2988,9 @@ private Status processResults( return processResults(useMultiplexedSession, results, limit, sender, executionContext, false); } - /** Process a ResultSet from a read/query/dml and store the results in the OutcomeSender. */ + /** + * Process a ResultSet from a read/query/dml and store the results in the OutcomeSender. + */ private Status processResults( boolean useMultiplexedSession, ResultSet results, @@ -2807,7 +3025,7 @@ private Status processResults( Level.INFO, String.format( "Successfully processed result: %s\n", executionContext.getTransactionSeed())); - executionContext.finishRead(Status.OK); + executionContext.finishRead(Status.OK, null); return sender.finishWithOK(); } catch (SpannerException e) { LOGGER.log(Level.WARNING, "Encountered exception: ", e); @@ -2817,7 +3035,7 @@ private Status processResults( String.format( "Encountered exception: %s %s\n", status.getDescription(), executionContext.getTransactionSeed())); - executionContext.finishRead(status); + executionContext.finishRead(status, e); if (status.getCode() == Status.ABORTED.getCode()) { return sender.finishWithTransactionRestarted(); } else { @@ -2842,14 +3060,18 @@ private Status processResults( } } - /** Convert a result row to a row proto(value list) for sending back to the client. */ + /** + * Convert a result row to a row proto(value list) for sending back to the client. + */ private com.google.spanner.executor.v1.ValueList buildRow( StructReader result, OutcomeSender sender) throws SpannerException { sender.setRowType(buildStructType(result)); return buildStruct(result); } - /** Construct a StructType for a given struct. This is used to set the row type. */ + /** + * Construct a StructType for a given struct. This is used to set the row type. + */ private com.google.spanner.v1.StructType buildStructType(StructReader struct) { com.google.spanner.v1.StructType.Builder rowTypeBuilder = com.google.spanner.v1.StructType.newBuilder(); @@ -2864,7 +3086,9 @@ private com.google.spanner.v1.StructType buildStructType(StructReader struct) { return rowTypeBuilder.build(); } - /** Convert a struct to a proto(value list) for constructing result rows and struct values. */ + /** + * Convert a struct to a proto(value list) for constructing result rows and struct values. + */ private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct) { com.google.spanner.executor.v1.ValueList.Builder structBuilder = com.google.spanner.executor.v1.ValueList.newBuilder(); @@ -2915,258 +3139,245 @@ private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct break; case ARRAY: switch (struct.getColumnType(i).getArrayElementType().getCode()) { - case BOOL: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getBooleanList(i); - for (Boolean booleanValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (booleanValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setBoolValue(booleanValue).build()); - } + case BOOL: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getBooleanList(i); + for (Boolean booleanValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (booleanValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setBoolValue(booleanValue).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BOOL).build()); } - break; - case FLOAT32: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getFloatList(i); - for (Float floatValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (floatValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setDoubleValue((double) floatValue).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BOOL).build()); + } + break; + case FLOAT32: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getFloatList(i); + for (Float floatValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (floatValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setDoubleValue((double) floatValue).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT32).build()); } - break; - case FLOAT64: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getDoubleList(i); - for (Double doubleValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (doubleValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setDoubleValue(doubleValue).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT32).build()); + } + break; + case FLOAT64: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getDoubleList(i); + for (Double doubleValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (doubleValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setDoubleValue(doubleValue).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT64).build()); } - break; - case INT64: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getLongList(i); - for (Long longValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (longValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setIntValue(longValue).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT64).build()); + } + break; + case INT64: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getLongList(i); + for (Long longValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (longValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setIntValue(longValue).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INT64).build()); } - break; - case STRING: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getStringList(i); - for (String stringValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (stringValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStringValue(stringValue)).build(); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INT64).build()); + } + break; + case STRING: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getStringList(i); + for (String stringValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (stringValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStringValue(stringValue)).build(); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRING).build()); } - break; - case BYTES: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getBytesList(i); - for (ByteArray byteArrayValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (byteArrayValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue( - valueProto - .setBytesValue(ByteString.copyFrom(byteArrayValue.toByteArray())) - .build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRING).build()); + } + break; + case BYTES: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getBytesList(i); + for (ByteArray byteArrayValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (byteArrayValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue( + valueProto + .setBytesValue(ByteString.copyFrom(byteArrayValue.toByteArray())) + .build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BYTES).build()); } - break; - case DATE: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getDateList(i); - for (Date dateValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (dateValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue( - valueProto.setDateDaysValue(daysFromDate(dateValue)).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BYTES).build()); + } + break; + case DATE: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getDateList(i); + for (Date dateValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (dateValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue( + valueProto.setDateDaysValue(daysFromDate(dateValue)).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.DATE).build()); } - break; - case TIMESTAMP: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getTimestampList(i); - for (Timestamp timestampValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (timestampValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue( - valueProto.setTimestampValue(timestampToProto(timestampValue)).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.DATE).build()); + } + break; + case TIMESTAMP: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getTimestampList(i); + for (Timestamp timestampValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (timestampValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue( + valueProto.setTimestampValue(timestampToProto(timestampValue)).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()); } - break; - case INTERVAL: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getIntervalList(i); - for (Interval interval : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (interval == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStringValue(interval.toISO8601()).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()); + } + break; + case INTERVAL: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getIntervalList(i); + for (Interval interval : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (interval == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStringValue(interval.toISO8601()).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INTERVAL).build()); } - break; - case UUID: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getUuidList(i); - for (UUID uuidValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (uuidValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStringValue(uuidValue.toString()).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INTERVAL).build()); + } + break; + case UUID: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getUuidList(i); + for (UUID uuidValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (uuidValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStringValue(uuidValue.toString()).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.UUID).build()); } - break; - case NUMERIC: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getBigDecimalList(i); - for (BigDecimal bigDec : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (bigDec == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStringValue(bigDec.toPlainString()).build()); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.UUID).build()); + } + break; + case NUMERIC: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getBigDecimalList(i); + for (BigDecimal bigDec : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (bigDec == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStringValue(bigDec.toPlainString()).build()); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.NUMERIC).build()); } - break; - case JSON: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getJsonList(i); - for (String stringValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (stringValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStringValue(stringValue)).build(); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.NUMERIC).build()); + } + break; + case JSON: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getJsonList(i); + for (String stringValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (stringValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStringValue(stringValue)).build(); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.JSON).build()); } - break; - case STRUCT: - { - com.google.spanner.executor.v1.ValueList.Builder builder = - com.google.spanner.executor.v1.ValueList.newBuilder(); - List values = struct.getStructList(i); - for (StructReader structValue : values) { - com.google.spanner.executor.v1.Value.Builder valueProto = - com.google.spanner.executor.v1.Value.newBuilder(); - if (structValue == null) { - builder.addValue(valueProto.setIsNull(true).build()); - } else { - builder.addValue(valueProto.setStructValue(buildStruct(structValue))).build(); - } + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.JSON).build()); + } + break; + case STRUCT: { + com.google.spanner.executor.v1.ValueList.Builder builder = + com.google.spanner.executor.v1.ValueList.newBuilder(); + List values = struct.getStructList(i); + for (StructReader structValue : values) { + com.google.spanner.executor.v1.Value.Builder valueProto = + com.google.spanner.executor.v1.Value.newBuilder(); + if (structValue == null) { + builder.addValue(valueProto.setIsNull(true).build()); + } else { + builder.addValue(valueProto.setStructValue(buildStruct(structValue))).build(); } - value.setArrayValue(builder.build()); - value.setArrayType( - com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRUCT).build()); } - break; + value.setArrayValue(builder.build()); + value.setArrayType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRUCT).build()); + } + break; default: throw SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, @@ -3191,7 +3402,9 @@ private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct return structBuilder.build(); } - /** Convert a ListValue proto to a list of cloud Value. */ + /** + * Convert a ListValue proto to a list of cloud Value. + */ private static List cloudValuesFromValueList( com.google.spanner.executor.v1.ValueList valueList, List typeList) throws SpannerException { @@ -3206,7 +3419,9 @@ private static List cloudValuesFromValueList( return cloudValues; } - /** Convert a proto KeySet to a cloud KeySet. */ + /** + * Convert a proto KeySet to a cloud KeySet. + */ private static com.google.cloud.spanner.KeySet keySetProtoToCloudKeySet( com.google.spanner.executor.v1.KeySet keySetProto, List typeList) throws SpannerException { @@ -3224,7 +3439,9 @@ private static com.google.cloud.spanner.KeySet keySetProtoToCloudKeySet( return cloudKeySetBuilder.build(); } - /** Convert a keyRange proto to a cloud KeyRange. */ + /** + * Convert a keyRange proto to a cloud KeyRange. + */ private static com.google.cloud.spanner.KeyRange keyRangeProtoToCloudKeyRange( com.google.spanner.executor.v1.KeyRange keyRangeProto, List typeList) @@ -3251,7 +3468,9 @@ private static com.google.cloud.spanner.KeyRange keyRangeProtoToCloudKeyRange( } } - /** Convert a key proto(value list) to a cloud Key. */ + /** + * Convert a key proto(value list) to a cloud Key. + */ private static com.google.cloud.spanner.Key keyProtoToCloudKey( com.google.spanner.executor.v1.ValueList keyProto, List typeList) throws SpannerException { @@ -3324,7 +3543,9 @@ private static com.google.cloud.spanner.Key keyProtoToCloudKey( return cloudKey.build(); } - /** Convert a Value proto to a cloud Value. */ + /** + * Convert a Value proto to a cloud Value. + */ @SuppressWarnings("NullTernary") private static com.google.cloud.spanner.Value valueProtoToCloudValue( com.google.spanner.v1.Type type, com.google.spanner.executor.v1.Value value) { @@ -3369,14 +3590,13 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue( case UUID: return com.google.cloud.spanner.Value.uuid( value.hasIsNull() ? null : UUID.fromString(value.getStringValue())); - case NUMERIC: - { - if (value.hasIsNull()) { - return com.google.cloud.spanner.Value.numeric(null); - } - String ascii = value.getStringValue(); - return com.google.cloud.spanner.Value.numeric(new BigDecimal(ascii)); + case NUMERIC: { + if (value.hasIsNull()) { + return com.google.cloud.spanner.Value.numeric(null); } + String ascii = value.getStringValue(); + return com.google.cloud.spanner.Value.numeric(new BigDecimal(ascii)); + } case JSON: return com.google.cloud.spanner.Value.json( value.hasIsNull() ? null : value.getStringValue()); @@ -3521,31 +3741,30 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue( .collect(Collectors.toList()), UUID::fromString)); } - case NUMERIC: - { - if (value.hasIsNull()) { - return com.google.cloud.spanner.Value.numericArray(null); - } - List nullList = - value.getArrayValue().getValueList().stream() - .map(com.google.spanner.executor.v1.Value::getIsNull) - .collect(Collectors.toList()); - List valueList = - value.getArrayValue().getValueList().stream() - .map(com.google.spanner.executor.v1.Value::getStringValue) - .collect(Collectors.toList()); - List newValueList = new ArrayList<>(valueList.size()); - - for (int i = 0; i < valueList.size(); ++i) { - if (i < nullList.size() && nullList.get(i)) { - newValueList.add(null); - continue; - } - String ascii = valueList.get(i); - newValueList.add(new BigDecimal(ascii)); + case NUMERIC: { + if (value.hasIsNull()) { + return com.google.cloud.spanner.Value.numericArray(null); + } + List nullList = + value.getArrayValue().getValueList().stream() + .map(com.google.spanner.executor.v1.Value::getIsNull) + .collect(Collectors.toList()); + List valueList = + value.getArrayValue().getValueList().stream() + .map(com.google.spanner.executor.v1.Value::getStringValue) + .collect(Collectors.toList()); + List newValueList = new ArrayList<>(valueList.size()); + + for (int i = 0; i < valueList.size(); ++i) { + if (i < nullList.size() && nullList.get(i)) { + newValueList.add(null); + continue; } - return com.google.cloud.spanner.Value.numericArray(newValueList); + String ascii = valueList.get(i); + newValueList.add(new BigDecimal(ascii)); } + return com.google.cloud.spanner.Value.numericArray(newValueList); + } case STRUCT: com.google.cloud.spanner.Type elementType = typeProtoToCloudType(type.getArrayElementType()); @@ -3588,7 +3807,9 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue( ErrorCode.INVALID_ARGUMENT, "Unsupported type while converting from value proto: " + type); } - /** Convert a cloud Timestamp to a Timestamp proto. */ + /** + * Convert a cloud Timestamp to a Timestamp proto. + */ private com.google.protobuf.Timestamp timestampToProto(Timestamp t) throws SpannerException { try { return Timestamps.parse(t.toString()); @@ -3598,12 +3819,16 @@ private com.google.protobuf.Timestamp timestampToProto(Timestamp t) throws Spann } } - /** Convert a cloud Date to a Date proto. */ + /** + * Convert a cloud Date to a Date proto. + */ private static int daysFromDate(Date date) { return (int) LocalDate.of(date.getYear(), date.getMonth(), date.getDayOfMonth()).toEpochDay(); } - /** Convert a Date proto to a cloud Date. */ + /** + * Convert a Date proto to a cloud Date. + */ private static Date dateFromDays(int daysSinceEpoch) { LocalDate localDate = LocalDate.ofEpochDay(daysSinceEpoch); return Date.fromYearMonthDay( @@ -3626,7 +3851,9 @@ private static ByteArray toByteArray(@Nullable ByteString byteString) { return ByteArray.copyFrom(byteString.toByteArray()); } - /** Convert a list of nullable value to another type. */ + /** + * Convert a list of nullable value to another type. + */ private static List unmarshallValueList( List isNullList, List valueList, Function converter) { List newValueList = new ArrayList<>(valueList.size()); @@ -3642,12 +3869,16 @@ private static List unmarshallValueList( return newValueList; } - /** Insert null into valueList according to isNullList. */ + /** + * Insert null into valueList according to isNullList. + */ private static List unmarshallValueList(List isNullList, List valueList) { return unmarshallValueList(isNullList, valueList, element -> element); } - /** Convert a Struct proto to a cloud Struct. */ + /** + * Convert a Struct proto to a cloud Struct. + */ private static com.google.cloud.spanner.Struct structProtoToCloudStruct( com.google.spanner.v1.Type type, com.google.spanner.executor.v1.ValueList structValue) { List fieldValues = structValue.getValueList(); @@ -3668,7 +3899,9 @@ private static com.google.cloud.spanner.Struct structProtoToCloudStruct( return builder.build(); } - /** Convert a Type proto to a cloud Type. */ + /** + * Convert a Type proto to a cloud Type. + */ private static com.google.cloud.spanner.Type typeProtoToCloudType( com.google.spanner.v1.Type typeProto) { switch (typeProto.getCode()) { @@ -3727,7 +3960,9 @@ private static com.google.cloud.spanner.Type typeProtoToCloudType( } } - /** Convert a cloud Type to a Type proto. */ + /** + * Convert a cloud Type to a Type proto. + */ private static com.google.spanner.v1.Type cloudTypeToTypeProto(@Nonnull Type cloudTypeProto) { switch (cloudTypeProto.getCode()) { case BOOL: @@ -3795,14 +4030,18 @@ private static com.google.spanner.v1.Type cloudTypeToTypeProto(@Nonnull Type clo } } - /** Unmarshall ByteString to serializable object. */ + /** + * Unmarshall ByteString to serializable object. + */ private T unmarshall(ByteString input) throws IOException, ClassNotFoundException { ObjectInputStream objectInputStream = new ObjectInputStream(input.newInput()); return (T) objectInputStream.readObject(); } - /** Marshall a serializable object into ByteString. */ + /** + * Marshall a serializable object into ByteString. + */ private ByteString marshall(T object) throws IOException { ByteString.Output output = ByteString.newOutput(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(output); @@ -3812,14 +4051,18 @@ private ByteString marshall(T object) throws IOExceptio return output.toByteString(); } - /** Build Timestamp from micros. */ + /** + * Build Timestamp from micros. + */ private Timestamp timestampFromMicros(long micros) { long seconds = TimeUnit.MICROSECONDS.toSeconds(micros); int nanos = (int) (micros * 1000 - seconds * 1000000000); return Timestamp.ofTimeSecondsAndNanos(seconds, nanos); } - /** Build TimestampBound from Concurrency. */ + /** + * Build TimestampBound from Concurrency. + */ private TimestampBound timestampBoundsFromConcurrency(Concurrency concurrency) { if (concurrency.hasStalenessSeconds()) { return TimestampBound.ofExactStaleness( @@ -3844,7 +4087,9 @@ private TimestampBound timestampBoundsFromConcurrency(Concurrency concurrency) { ErrorCode.INVALID_ARGUMENT, "Unsupported concurrency mode: " + concurrency); } - /** Build instance proto from cloud spanner instance. */ + /** + * Build instance proto from cloud spanner instance. + */ private com.google.spanner.admin.instance.v1.Instance instanceToProto(Instance instance) { com.google.spanner.admin.instance.v1.Instance.Builder instanceBuilder = com.google.spanner.admin.instance.v1.Instance.newBuilder(); @@ -3875,7 +4120,9 @@ private com.google.spanner.admin.instance.v1.Instance instanceToProto(Instance i return instanceBuilder.build(); } - /** Build instance proto from cloud spanner instance. */ + /** + * Build instance proto from cloud spanner instance. + */ private com.google.spanner.admin.instance.v1.InstanceConfig instanceConfigToProto( InstanceConfig instanceConfig) { com.google.spanner.admin.instance.v1.InstanceConfig.Builder instanceConfigBuilder = @@ -3930,4 +4177,4 @@ private com.google.spanner.admin.instance.v1.InstanceConfig instanceConfigToProt } return instanceConfigBuilder.build(); } -} +} \ No newline at end of file diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java index 0da30f82d23c..a1b532aca5ec 100644 --- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java +++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java @@ -70,7 +70,7 @@ public class WorkerProxy { public static int proxyPort = 0; public static String cert = ""; public static String serviceKeyFile = ""; - public static double multiplexedSessionOperationsRatio = 0.0; + public static double multiplexedSessionOperationsRatio = 1.0; public static boolean usePlainTextChannel = false; public static boolean enableGrpcFaultInjector = false; public static OpenTelemetrySdk openTelemetrySdk; @@ -231,4 +231,4 @@ private static CommandLine buildOptions(String[] args) { throw new IllegalArgumentException(e.getMessage()); } } -} +} \ No newline at end of file