diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
index a9323fbdc25f..aabb9899269e 100644
--- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
+++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
@@ -16,8 +16,6 @@
package com.google.cloud.executor.spanner;
-import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;
-
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
@@ -55,6 +53,7 @@
import com.google.cloud.spanner.Mutation.WriteBuilder;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
+import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
@@ -156,6 +155,8 @@
import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction;
import com.google.spanner.executor.v1.UpdateCloudInstanceAction;
import com.google.spanner.v1.StructType;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
+import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
@@ -174,6 +175,7 @@
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -239,53 +241,79 @@ public static String unexpectedExceptionResponse(Exception e) {
*
*
Here's a typical workflow for how a read-write transaction works.
*
- *
When we call startRWTransaction, a transaction runner will be started in another thread with
- * a callable that stores the passed TransactionContext into the ReadWriteTransaction and blocks.
- * This TransactionContext is used to run the read/write actions. To execute the finish action, we
- * store the FinishMode in the ReadWriteTransaction object, which unblocks the thread in the
- * callable and causes the callable to either return (to commit) or throw an exception (to abort).
- * If the underlying Spanner transaction aborted, the transaction runner will invoke the callable
- * again.
+ *
When we call startRWTransaction, a transaction runner will be started in another thread
+ * with a callable that stores the passed TransactionContext into the ReadWriteTransaction and
+ * blocks. This TransactionContext is used to run the read/write actions. To execute the finish
+ * action, we store the FinishMode in the ReadWriteTransaction object, which unblocks the thread
+ * in the callable and causes the callable to either return (to commit) or throw an exception (to
+ * abort). If the underlying Spanner transaction aborted, the transaction runner will invoke the
+ * callable again.
*/
private static class ReadWriteTransaction {
+
private final DatabaseClient dbClient;
private TransactionRunner runner;
private TransactionContext txnContext;
private com.google.protobuf.Timestamp timestamp;
private Mode finishMode;
+ private SpannerException abortedException;
private SpannerException error;
private final String transactionSeed;
private final boolean optimistic;
+ private final boolean repeatableRead;
// Set to true when the transaction runner completed, one of these three could happen: runner
// committed, abandoned or threw an error.
private boolean runnerCompleted;
public ReadWriteTransaction(
- DatabaseClient dbClient, String transactionSeed, boolean optimistic) {
+ DatabaseClient dbClient,
+ String transactionSeed,
+ boolean optimistic,
+ boolean repeatableRead) {
this.dbClient = dbClient;
this.transactionSeed = transactionSeed;
this.optimistic = optimistic;
+ this.repeatableRead = repeatableRead;
this.runnerCompleted = false;
}
- /** Set context to be used for executing actions. */
+ /**
+ * Set context to be used for executing actions.
+ */
private synchronized void setContext(TransactionContext transaction) {
finishMode = null;
+ abortedException = null;
txnContext = transaction;
Preconditions.checkNotNull(txnContext);
LOGGER.log(Level.INFO, "Transaction callable created, setting context %s\n", transactionSeed);
notifyAll();
}
- /** Wait for finishAction to be executed and return the requested finish mode. */
- private synchronized Mode waitForFinishAction() throws Exception {
- while (finishMode == null) {
+ private synchronized void setAborted(SpannerException abortedException) {
+ LOGGER.log(Level.INFO, "Got aborted exception %s\n", abortedException.toString());
+ this.abortedException = abortedException;
+ notifyAll();
+ }
+
+ /**
+ * Wait for finishAction to be executed and return the requested finish mode.
+ */
+ private synchronized Mode waitForFinishActionOrAbort() throws Exception {
+ while (finishMode == null && abortedException == null) {
wait();
}
+ // If a read aborted, throw the exception to the TransactionRunner callable to
+ // restart the transaction.
+ if (abortedException != null) {
+ LOGGER.log(Level.INFO, "Throw aborted exception %s\n", abortedException.toString());
+ throw abortedException;
+ }
return finishMode;
}
- /** Wait for transactionContext to be set. */
+ /**
+ * Wait for transactionContext to be set.
+ */
private synchronized void waitForTransactionContext() throws Exception {
while (txnContext == null && error == null) {
wait();
@@ -295,14 +323,18 @@ private synchronized void waitForTransactionContext() throws Exception {
}
}
- /** Transaction successfully committed with a timestamp. */
+ /**
+ * Transaction successfully committed with a timestamp.
+ */
private synchronized void transactionSucceeded(com.google.protobuf.Timestamp timestamp) {
this.timestamp = timestamp;
this.runnerCompleted = true;
notifyAll();
}
- /** Transaction failed to commit, maybe abandoned or other errors occurred. */
+ /**
+ * Transaction failed to commit, maybe abandoned or other errors occurred.
+ */
private synchronized void transactionFailed(SpannerException e) {
// Handle abandon case
if (e.getErrorCode() == ErrorCode.UNKNOWN && e.getMessage().contains(TRANSACTION_ABANDONED)) {
@@ -315,14 +347,33 @@ private synchronized void transactionFailed(SpannerException e) {
notifyAll();
}
- /** Return the commit timestamp. */
+ /**
+ * Return the commit timestamp.
+ */
public synchronized com.google.protobuf.Timestamp getTimestamp() {
return timestamp;
}
- /** Return the transactionContext to run actions. Must be called after start action. */
+ /**
+ * Return the transactionContext to run actions, waiting until it is set.
+ */
public synchronized TransactionContext getContext() {
- Preconditions.checkState(txnContext != null);
+ while (txnContext == null || abortedException != null) {
+ // If the transaction was aborted by a read action, the abortedException will
+ // be thrown to the TransactionRunner callable to restart the transaction.
+ // The restarted callable will call setContext() to set the new transaction context
+ // and clear abortedException.
+ if (abortedException != null) {
+ LOGGER.log(Level.INFO, "Waiting for new RW transaction context after abort\n");
+ } else {
+ LOGGER.log(Level.INFO, "Waiting for RW transaction context.");
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.INFO, "Interrupted while waiting for RW transaction context.");
+ }
+ }
return txnContext;
}
@@ -339,7 +390,7 @@ public void startRWTransaction() throws Exception {
String.format(
"Transaction context set, executing and waiting for finish %s\n",
transactionSeed));
- Mode mode = waitForFinishAction();
+ Mode mode = waitForFinishActionOrAbort();
if (mode == Mode.ABANDON) {
throw new Exception(TRANSACTION_ABANDONED);
}
@@ -351,10 +402,21 @@ public void startRWTransaction() throws Exception {
context.wrap(
() -> {
try {
+ List transactionOptions = new ArrayList<>();
+ if (repeatableRead) {
+ transactionOptions.add(Options.isolationLevel(IsolationLevel.REPEATABLE_READ));
+ } else {
+ transactionOptions.add(Options.isolationLevel(IsolationLevel.SERIALIZABLE));
+ }
+ if (optimistic) {
+ transactionOptions.add(Options.readLockMode(ReadLockMode.OPTIMISTIC));
+ } else {
+ transactionOptions.add(Options.readLockMode(ReadLockMode.PESSIMISTIC));
+ }
runner =
- optimistic
- ? dbClient.readWriteTransaction(Options.optimisticLock())
- : dbClient.readWriteTransaction();
+ dbClient.readWriteTransaction(
+ transactionOptions.toArray(
+ new TransactionOption[transactionOptions.size()]));
LOGGER.log(
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
@@ -397,7 +459,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
"TxnContext cleared, sending finishMode to finish transaction %s\n",
transactionSeed));
notifyAll();
- // Wait for the transaction to finish or restart
+ // Wait for the transaction to finish or restart due to an abort on COMMIT.
while (txnContext == null && !runnerCompleted) {
wait();
}
@@ -434,6 +496,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
* initialized.
*/
class ExecutionFlowContext {
+
// Database path from previous action
private String prevDbPath;
// Current read-write transaction
@@ -448,9 +511,6 @@ class ExecutionFlowContext {
private Metadata metadata;
// Number of pending read/query actions.
private int numPendingReads;
- // Indicate whether there's a read/query action got aborted and the transaction need to be
- // reset.
- private boolean readAborted;
// Log the workid and op pair for tracing the thread.
private String transactionSeed;
// Outgoing stream.
@@ -460,17 +520,23 @@ public ExecutionFlowContext(StreamObserver responseO
this.responseObserver = responseObserver;
}
- /** Call the underlying stream to send response. */
+ /**
+ * Call the underlying stream to send response.
+ */
public synchronized void onNext(SpannerAsyncActionResponse response) {
responseObserver.onNext(response);
}
- /** Call the underlying stream to send error. */
+ /**
+ * Call the underlying stream to send error.
+ */
public synchronized void onError(Throwable t) {
responseObserver.onError(t);
}
- /** Return current transaction that can used for performing read/query actions. */
+ /**
+ * Return current transaction that can used for performing read/query actions.
+ */
public synchronized ReadContext getTransactionForRead() throws SpannerException {
if (roTxn != null) {
return roTxn;
@@ -486,7 +552,9 @@ public synchronized ReadContext getTransactionForRead() throws SpannerException
ErrorCode.INVALID_ARGUMENT, "No active transaction");
}
- /** Return current transaction that can used for performing mutation/update actions. */
+ /**
+ * Return current transaction that can used for performing mutation/update actions.
+ */
public synchronized TransactionContext getTransactionForWrite() throws SpannerException {
if (rwTxn == null) {
throw SpannerExceptionFactory.newSpannerException(
@@ -495,7 +563,9 @@ public synchronized TransactionContext getTransactionForWrite() throws SpannerEx
return rwTxn.getContext();
}
- /** Return current batch transaction if it exists. */
+ /**
+ * Return current batch transaction if it exists.
+ */
public synchronized BatchReadOnlyTransaction getBatchTxn() throws SpannerException {
if (batchTxn == null) {
throw SpannerExceptionFactory.newSpannerException(
@@ -504,31 +574,41 @@ public synchronized BatchReadOnlyTransaction getBatchTxn() throws SpannerExcepti
return batchTxn;
}
- /** Set the transactionSeed string retrieved from startTransactionAction. */
+ /**
+ * Set the transactionSeed string retrieved from startTransactionAction.
+ */
public synchronized void updateTransactionSeed(String transactionSeed) {
if (!transactionSeed.isEmpty()) {
this.transactionSeed = transactionSeed;
}
}
- /** Return current workid and op pair for logging. */
+ /**
+ * Return current workid and op pair for logging.
+ */
public synchronized String getTransactionSeed() {
return transactionSeed;
}
- /** Return current database client. */
+ /**
+ * Return current database client.
+ */
public DatabaseClient getDbClient() {
return dbClient;
}
- /** Clear the transaction related variables. */
+ /**
+ * Clear the transaction related variables.
+ */
public synchronized void clear() {
rwTxn = null;
roTxn = null;
metadata = null;
}
- /** Cleanup all the active transactions if the stubby call is closing. */
+ /**
+ * Cleanup all the active transactions if the stubby call is closing.
+ */
public synchronized void cleanup() {
if (roTxn != null) {
LOGGER.log(Level.INFO, "A read only transaction was active when stubby call closed");
@@ -545,7 +625,9 @@ public synchronized void cleanup() {
}
}
- /** Return previous databasePath if given dbPath is empty, then update. */
+ /**
+ * Return previous databasePath if given dbPath is empty, then update.
+ */
public synchronized String getDatabasePath(String dbPath) {
if (dbPath == null || dbPath.isEmpty()) {
return prevDbPath;
@@ -554,12 +636,16 @@ public synchronized String getDatabasePath(String dbPath) {
return dbPath;
}
- /** Set the metadata for future use. */
+ /**
+ * Set the metadata for future use.
+ */
public synchronized void setMetadata(Metadata metadata) {
this.metadata = metadata;
}
- /** Start a read-only transaction. */
+ /**
+ * Start a read-only transaction.
+ */
public synchronized void startReadOnlyTxn(
DatabaseClient dbClient, TimestampBound timestampBound, Metadata metadata) {
if ((rwTxn != null) || (roTxn != null) || (batchTxn != null)) {
@@ -575,7 +661,9 @@ public synchronized void startReadOnlyTxn(
}
}
- /** Start a read-write transaction. */
+ /**
+ * Start a read-write transaction.
+ */
public synchronized void startReadWriteTxn(
DatabaseClient dbClient, Metadata metadata, TransactionExecutionOptions options)
throws Exception {
@@ -588,7 +676,16 @@ public synchronized void startReadWriteTxn(
String.format(
"There's no active transaction, safe to create rwTxn: %s\n", getTransactionSeed()));
this.metadata = metadata;
- rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, options.getOptimistic());
+ boolean optimistic =
+ options.getSerializableOptimistic() || options.getSnapshotIsolationOptimistic();
+ boolean repeatableRead =
+ options.getSnapshotIsolationOptimistic() || options.getSnapshotIsolationPessimistic();
+ rwTxn =
+ new ReadWriteTransaction(
+ dbClient,
+ transactionSeed,
+ optimistic,
+ repeatableRead);
LOGGER.log(
Level.INFO,
String.format(
@@ -596,7 +693,9 @@ public synchronized void startReadWriteTxn(
rwTxn.startRWTransaction();
}
- /** Start a batch transaction. */
+ /**
+ * Start a batch transaction.
+ */
public synchronized Status startBatchTxn(
StartBatchTransactionAction action, BatchClient batchClient, OutcomeSender sender) {
try {
@@ -635,7 +734,9 @@ public synchronized Status startBatchTxn(
}
}
- /** Increase the read count when a read/query is issued. */
+ /**
+ * Increase the read count when a read/query is issued.
+ */
public synchronized void startRead() {
++numPendingReads;
}
@@ -644,48 +745,57 @@ public synchronized void startRead() {
* Decrease the read count when a read/query is finished, if status is aborted and there's no
* pending read/query, reset the transaction for retry.
*/
- public synchronized void finishRead(Status status) {
+ public synchronized void finishRead(Status status, SpannerException e) {
if (status.getCode() == Status.ABORTED.getCode()) {
- readAborted = true;
+ if (rwTxn != null) {
+ rwTxn.setAborted(e);
+ }
}
--numPendingReads;
- if (readAborted && numPendingReads <= 0) {
- LOGGER.log(Level.FINE, "Transaction reset due to read/query abort");
- readAborted = false;
- }
}
- /** Initialize the read count and aborted status when transaction started. */
+ /**
+ * Initialize the read count and aborted status when transaction started.
+ */
public synchronized void initReadState() {
- readAborted = false;
numPendingReads = 0;
}
- /** Store the reference to the database client for future action use. */
+ /**
+ * Store the reference to the database client for future action use.
+ */
public void setDatabaseClient(DatabaseClient client) {
dbClient = client;
}
- /** Return a list of key column types of the given table. */
+ /**
+ * Return a list of key column types of the given table.
+ */
public List getKeyColumnTypes(String tableName)
throws SpannerException {
Preconditions.checkNotNull(metadata);
return metadata.getKeyColumnTypes(tableName);
}
- /** Return column type of the given table and column. */
+ /**
+ * Return column type of the given table and column.
+ */
public com.google.spanner.v1.Type getColumnType(String tableName, String columnName)
throws SpannerException {
Preconditions.checkNotNull(metadata);
return metadata.getColumnType(tableName, columnName);
}
- /** Buffer a list of mutations in a read-write transaction. */
+ /**
+ * Buffer a list of mutations in a read-write transaction.
+ */
public synchronized void bufferMutations(List mutations) throws SpannerException {
getTransactionForWrite().buffer(mutations);
}
- /** Execute a batch of updates in a read-write transaction. */
+ /**
+ * Execute a batch of updates in a read-write transaction.
+ */
public synchronized long[] executeBatchDml(@Nonnull List stmts)
throws SpannerException {
for (int i = 0; i < stmts.size(); i++) {
@@ -696,7 +806,9 @@ public synchronized long[] executeBatchDml(@Nonnull List stmts)
.batchUpdate(stmts, Options.tag("batch-update-transaction-tag"));
}
- /** Finish active transaction in given finishMode, then send outcome back to client. */
+ /**
+ * Finish active transaction in given finishMode, then send outcome back to client.
+ */
public synchronized Status finish(Mode finishMode, OutcomeSender sender) {
if (numPendingReads > 0) {
return sender.finishWithError(
@@ -724,6 +836,12 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) {
if (rwTxn.getTimestamp() != null) {
outcomeBuilder.setCommitTime(rwTxn.getTimestamp());
}
+ if (finishMode == Mode.COMMIT
+ && rwTxn.runner.getCommitResponse().getSnapshotTimestamp() != null) {
+ outcomeBuilder.setSnapshotIsolationTxnReadTimestamp(
+ Timestamps.toMicros(
+ rwTxn.runner.getCommitResponse().getSnapshotTimestamp().toProto()));
+ }
clear();
}
}
@@ -750,7 +868,9 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) {
}
}
- /** Close active batch transaction. */
+ /**
+ * Close active batch transaction.
+ */
public synchronized void closeBatchTxn() throws SpannerException {
if (batchTxn == null) {
throw SpannerExceptionFactory.newSpannerException(
@@ -761,7 +881,7 @@ public synchronized void closeBatchTxn() throws SpannerException {
}
private Spanner client;
- private Spanner clientWithTimeout;
+ private Map clientWithTimeoutMap = new HashMap<>();
private static final String TRANSACTION_ABANDONED = "Fake error to abandon transaction";
@@ -782,23 +902,25 @@ public synchronized void closeBatchTxn() throws SpannerException {
private synchronized Spanner getClientWithTimeout(
long timeoutSeconds, boolean useMultiplexedSession) throws IOException {
- if (clientWithTimeout != null) {
- return clientWithTimeout;
+ if (clientWithTimeoutMap.containsKey(timeoutSeconds)) {
+ return clientWithTimeoutMap.get(timeoutSeconds);
}
- clientWithTimeout = getClient(timeoutSeconds, useMultiplexedSession);
- return clientWithTimeout;
+ clientWithTimeoutMap.put(timeoutSeconds,
+ initializeClient(timeoutSeconds, useMultiplexedSession));
+ return clientWithTimeoutMap.get(timeoutSeconds);
}
private synchronized Spanner getClient(boolean useMultiplexedSession) throws IOException {
if (client != null) {
return client;
}
- client = getClient(/* timeoutSeconds= */ 0, useMultiplexedSession);
+ client = initializeClient(/* timeoutSeconds= */ 0, useMultiplexedSession);
return client;
}
- // Return the spanner client, create one if not exists.
- private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplexedSession)
+ // Initializes a newly created spanner client. NEVER CALL THIS METHOD DIRECTLY.
+ // ALWAYS CALL getClientWithTimeout() or getClient() INSTEAD.
+ private synchronized Spanner initializeClient(long timeoutSeconds, boolean useMultiplexedSession)
throws IOException {
// Create a cloud spanner client
Credentials credentials;
@@ -957,7 +1079,9 @@ public boolean isExportedEndToEndTraceValid(String traceId) {
return true;
}
- /** Handle actions. */
+ /**
+ * Handle actions.
+ */
public Status startHandlingRequest(
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
OutcomeSender outcomeSender = new OutcomeSender(req.getActionId(), executionContext);
@@ -998,7 +1122,9 @@ public Status startHandlingRequest(
return Status.OK;
}
- /** Execute actions by action case, using OutcomeSender to send status and results back. */
+ /**
+ * Execute actions by action case, using OutcomeSender to send status and results back.
+ */
private Status executeAction(
OutcomeSender outcomeSender,
SpannerAction action,
@@ -1093,7 +1219,9 @@ private Status executeAction(
}
}
- /** Execute admin actions by action case, using OutcomeSender to send status and results back. */
+ /**
+ * Execute admin actions by action case, using OutcomeSender to send status and results back.
+ */
private Status executeAdminAction(
boolean useMultiplexedSession, AdminAction action, OutcomeSender outcomeSender) {
try {
@@ -1186,7 +1314,9 @@ private Status executeAdminAction(
}
}
- /** Execute action that creates a cloud instance. */
+ /**
+ * Execute action that creates a cloud instance.
+ */
private Status executeCreateCloudInstance(
boolean useMultiplexedSession, CreateCloudInstanceAction action, OutcomeSender sender) {
try {
@@ -1228,7 +1358,9 @@ private Status executeCreateCloudInstance(
return sender.finishWithOK();
}
- /** Execute action that updates a cloud instance. */
+ /**
+ * Execute action that updates a cloud instance.
+ */
private Status executeUpdateCloudInstance(
boolean useMultiplexedSession, UpdateCloudInstanceAction action, OutcomeSender sender) {
try {
@@ -1276,7 +1408,9 @@ private Status executeUpdateCloudInstance(
return sender.finishWithOK();
}
- /** Execute action that deletes a cloud instance. */
+ /**
+ * Execute action that deletes a cloud instance.
+ */
private Status executeDeleteCloudInstance(
boolean useMultiplexedSession, DeleteCloudInstanceAction action, OutcomeSender sender) {
try {
@@ -1298,7 +1432,9 @@ private Status executeDeleteCloudInstance(
return sender.finishWithOK();
}
- /** Execute action that lists cloud instances. */
+ /**
+ * Execute action that lists cloud instances.
+ */
private Status executeListCloudInstances(
boolean useMultiplexedSession, ListCloudInstancesAction action, OutcomeSender sender) {
try {
@@ -1345,7 +1481,9 @@ private Status executeListCloudInstances(
}
}
- /** Execute action that lists cloud instance configs. */
+ /**
+ * Execute action that lists cloud instance configs.
+ */
private Status executeListCloudInstanceConfigs(
boolean useMultiplexedSession, ListCloudInstanceConfigsAction action, OutcomeSender sender) {
LOGGER.log(Level.INFO, String.format("Listing instance configs:\n%s", action));
@@ -1389,7 +1527,9 @@ private Status executeListCloudInstanceConfigs(
}
}
- /** Execute action that gets a cloud instance config. */
+ /**
+ * Execute action that gets a cloud instance config.
+ */
private Status executeGetCloudInstanceConfig(
boolean useMultiplexedSession, GetCloudInstanceConfigAction action, OutcomeSender sender) {
LOGGER.log(Level.INFO, String.format("Getting instance config:\n%s", action));
@@ -1420,7 +1560,9 @@ private Status executeGetCloudInstanceConfig(
}
}
- /** Execute action that retrieves a cloud instance. */
+ /**
+ * Execute action that retrieves a cloud instance.
+ */
private Status executeGetCloudInstance(
boolean useMultiplexedSession, GetCloudInstanceAction action, OutcomeSender sender) {
try {
@@ -1451,7 +1593,9 @@ private Status executeGetCloudInstance(
}
}
- /** Execute action that creates a user instance config. */
+ /**
+ * Execute action that creates a user instance config.
+ */
private Status executeCreateUserInstanceConfig(
boolean useMultiplexedSession, CreateUserInstanceConfigAction action, OutcomeSender sender) {
try {
@@ -1482,7 +1626,9 @@ private Status executeCreateUserInstanceConfig(
return sender.finishWithOK();
}
- /** Execute action that deletes a user instance config. */
+ /**
+ * Execute action that deletes a user instance config.
+ */
private Status executeDeleteUserInstanceConfig(
boolean useMultiplexedSession, DeleteUserInstanceConfigAction action, OutcomeSender sender) {
try {
@@ -1502,7 +1648,9 @@ private Status executeDeleteUserInstanceConfig(
return sender.finishWithOK();
}
- /** Execute action that creates a cloud custom encrypted database. */
+ /**
+ * Execute action that creates a cloud custom encrypted database.
+ */
private Status executeCreateCloudCustomEncryptedDatabase(
boolean useMultiplexedSession, CreateCloudDatabaseAction action, OutcomeSender sender) {
try {
@@ -1531,7 +1679,9 @@ private Status executeCreateCloudCustomEncryptedDatabase(
return sender.finishWithOK();
}
- /** Execute action that creates a cloud database. */
+ /**
+ * Execute action that creates a cloud database.
+ */
private Status executeCreateCloudDatabase(
boolean useMultiplexedSession, CreateCloudDatabaseAction action, OutcomeSender sender) {
if (action.hasEncryptionConfig()) {
@@ -1566,7 +1716,9 @@ private Status executeCreateCloudDatabase(
return sender.finishWithOK();
}
- /** Execute action that updates a cloud database. */
+ /**
+ * Execute action that updates a cloud database.
+ */
private Status executeUpdateCloudDatabaseDdl(
boolean useMultiplexedSession, UpdateCloudDatabaseDdlAction action, OutcomeSender sender) {
try {
@@ -1600,7 +1752,9 @@ private Status executeUpdateCloudDatabaseDdl(
return sender.finishWithOK();
}
- /** Execute action that updates a cloud database. */
+ /**
+ * Execute action that updates a cloud database.
+ */
private Status executeDropCloudDatabase(
boolean useMultiplexedSession, DropCloudDatabaseAction action, OutcomeSender sender) {
try {
@@ -1621,7 +1775,9 @@ private Status executeDropCloudDatabase(
return sender.finishWithOK();
}
- /** Execute action that creates a cloud database backup. */
+ /**
+ * Execute action that creates a cloud database backup.
+ */
private Status executeCreateCloudBackup(
boolean useMultiplexedSession, CreateCloudBackupAction action, OutcomeSender sender) {
try {
@@ -1657,7 +1813,9 @@ private Status executeCreateCloudBackup(
}
}
- /** Execute action that copies a cloud database backup. */
+ /**
+ * Execute action that copies a cloud database backup.
+ */
private Status executeCopyCloudBackup(
boolean useMultiplexedSession, CopyCloudBackupAction action, OutcomeSender sender) {
try {
@@ -1693,7 +1851,9 @@ private Status executeCopyCloudBackup(
}
}
- /** Execute action that gets a cloud database backup. */
+ /**
+ * Execute action that gets a cloud database backup.
+ */
private Status executeGetCloudBackup(
boolean useMultiplexedSession, GetCloudBackupAction action, OutcomeSender sender) {
try {
@@ -1724,7 +1884,9 @@ private Status executeGetCloudBackup(
}
}
- /** Execute action that updates a cloud database backup. */
+ /**
+ * Execute action that updates a cloud database backup.
+ */
private Status executeUpdateCloudBackup(
boolean useMultiplexedSession, UpdateCloudBackupAction action, OutcomeSender sender) {
try {
@@ -1758,7 +1920,9 @@ private Status executeUpdateCloudBackup(
}
}
- /** Execute action that deletes a cloud database backup. */
+ /**
+ * Execute action that deletes a cloud database backup.
+ */
private Status executeDeleteCloudBackup(
boolean useMultiplexedSession, DeleteCloudBackupAction action, OutcomeSender sender) {
try {
@@ -1778,7 +1942,9 @@ private Status executeDeleteCloudBackup(
}
}
- /** Execute action that lists cloud database backups. */
+ /**
+ * Execute action that lists cloud database backups.
+ */
private Status executeListCloudBackups(
boolean useMultiplexedSession, ListCloudBackupsAction action, OutcomeSender sender) {
try {
@@ -1818,7 +1984,9 @@ private Status executeListCloudBackups(
}
}
- /** Execute action that lists cloud database backup operations. */
+ /**
+ * Execute action that lists cloud database backup operations.
+ */
private Status executeListCloudBackupOperations(
boolean useMultiplexedSession, ListCloudBackupOperationsAction action, OutcomeSender sender) {
try {
@@ -1855,7 +2023,9 @@ private Status executeListCloudBackupOperations(
}
}
- /** Execute action that list cloud databases. */
+ /**
+ * Execute action that list cloud databases.
+ */
private Status executeListCloudDatabases(
boolean useMultiplexedSession, ListCloudDatabasesAction action, OutcomeSender sender) {
try {
@@ -1894,7 +2064,9 @@ private Status executeListCloudDatabases(
}
}
- /** Execute action that lists cloud database operations. */
+ /**
+ * Execute action that lists cloud database operations.
+ */
private Status executeListCloudDatabaseOperations(
boolean useMultiplexedSession,
ListCloudDatabaseOperationsAction action,
@@ -1933,7 +2105,9 @@ private Status executeListCloudDatabaseOperations(
}
}
- /** Execute action that restores a cloud database. */
+ /**
+ * Execute action that restores a cloud database.
+ */
private Status executeRestoreCloudDatabase(
boolean useMultiplexedSession, RestoreCloudDatabaseAction action, OutcomeSender sender) {
try {
@@ -1967,7 +2141,9 @@ private Status executeRestoreCloudDatabase(
}
}
- /** Execute action that gets a cloud database. */
+ /**
+ * Execute action that gets a cloud database.
+ */
private Status executeGetCloudDatabase(
boolean useMultiplexedSession, GetCloudDatabaseAction action, OutcomeSender sender) {
try {
@@ -1998,7 +2174,9 @@ private Status executeGetCloudDatabase(
}
}
- /** Execute action that gets an operation. */
+ /**
+ * Execute action that gets an operation.
+ */
private Status executeGetOperation(
boolean useMultiplexedSession, GetOperationAction action, OutcomeSender sender) {
try {
@@ -2026,7 +2204,9 @@ private Status executeGetOperation(
}
}
- /** Execute action that cancels an operation. */
+ /**
+ * Execute action that cancels an operation.
+ */
private Status executeCancelOperation(
boolean useMultiplexedSession, CancelOperationAction action, OutcomeSender sender) {
try {
@@ -2045,7 +2225,9 @@ private Status executeCancelOperation(
}
}
- /** Execute action that starts a batch transaction. */
+ /**
+ * Execute action that starts a batch transaction.
+ */
private Status executeStartBatchTxn(
StartBatchTransactionAction action,
BatchClient batchClient,
@@ -2055,7 +2237,9 @@ private Status executeStartBatchTxn(
return executionContext.startBatchTxn(action, batchClient, sender);
}
- /** Execute action that finishes a batch transaction. */
+ /**
+ * Execute action that finishes a batch transaction.
+ */
private Status executeCloseBatchTxn(
CloseBatchTransactionAction action,
OutcomeSender sender,
@@ -2071,7 +2255,9 @@ private Status executeCloseBatchTxn(
}
}
- /** Execute action that generates database partitions for the given read. */
+ /**
+ * Execute action that generates database partitions for the given read.
+ */
private Status executeGenerateDbPartitionsRead(
GenerateDbPartitionsForReadAction action,
OutcomeSender sender,
@@ -2139,7 +2325,9 @@ private Status executeGenerateDbPartitionsRead(
}
}
- /** Execute action that generates database partitions for the given query. */
+ /**
+ * Execute action that generates database partitions for the given query.
+ */
private Status executeGenerateDbPartitionsQuery(
GenerateDbPartitionsForQueryAction action,
OutcomeSender sender,
@@ -2186,7 +2374,9 @@ private Status executeGenerateDbPartitionsQuery(
}
}
- /** Execute a read or query for the given partitions. */
+ /**
+ * Execute a read or query for the given partitions.
+ */
private Status executeExecutePartition(
boolean useMultiplexedSession,
ExecutePartitionAction action,
@@ -2219,7 +2409,9 @@ private Status executeExecutePartition(
}
}
- /** Execute a partitioned update which runs different partitions in parallel. */
+ /**
+ * Execute a partitioned update which runs different partitions in parallel.
+ */
private Status executePartitionedUpdate(
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
try {
@@ -2247,7 +2439,9 @@ private Status executePartitionedUpdate(
}
}
- /** Build a child partition record proto out of childPartitionRecord returned by client. */
+ /**
+ * Build a child partition record proto out of childPartitionRecord returned by client.
+ */
private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord)
throws Exception {
ChildPartitionsRecord.Builder childPartitionRecordBuilder = ChildPartitionsRecord.newBuilder();
@@ -2264,7 +2458,9 @@ private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRec
return childPartitionRecordBuilder.build();
}
- /** Build a data change record proto out of dataChangeRecord returned by client. */
+ /**
+ * Build a data change record proto out of dataChangeRecord returned by client.
+ */
private DataChangeRecord buildDataChangeRecord(Struct dataChangeRecord) throws Exception {
DataChangeRecord.Builder dataChangeRecordBuilder = DataChangeRecord.newBuilder();
dataChangeRecordBuilder.setCommitTime(
@@ -2302,7 +2498,9 @@ private DataChangeRecord buildDataChangeRecord(Struct dataChangeRecord) throws E
return dataChangeRecordBuilder.build();
}
- /** Returns the json or string value of a struct column with index=columnIndex. */
+ /**
+ * Returns the json or string value of a struct column with index=columnIndex.
+ */
private String getJsonStringForStructColumn(Struct struct, int columnIndex) {
Type columnType = struct.getColumnType(columnIndex);
switch (columnType.getCode()) {
@@ -2319,7 +2517,9 @@ private String getJsonStringForStructColumn(Struct struct, int columnIndex) {
}
}
- /** Build a heartbeat record proto out of heartbeatRecord returned by client. */
+ /**
+ * Build a heartbeat record proto out of heartbeatRecord returned by client.
+ */
private HeartbeatRecord buildHeartbeatRecord(Struct heartbeatRecord) throws Exception {
HeartbeatRecord.Builder heartbeatRecordBuilder = HeartbeatRecord.newBuilder();
heartbeatRecordBuilder.setHeartbeatTime(
@@ -2327,7 +2527,9 @@ private HeartbeatRecord buildHeartbeatRecord(Struct heartbeatRecord) throws Exce
return heartbeatRecordBuilder.build();
}
- /** Execute action that executes a change stream query. */
+ /**
+ * Execute action that executes a change stream query.
+ */
private Status executeExecuteChangeStreamQuery(
String dbPath,
boolean useMultiplexedSession,
@@ -2514,7 +2716,9 @@ private Status executeFinishTxn(
return executionContext.finish(action.getMode(), sender);
}
- /** Execute mutation action request and buffer the mutations. */
+ /**
+ * Execute mutation action request and buffer the mutations.
+ */
private Status executeMutation(
MutationAction action,
OutcomeSender sender,
@@ -2594,7 +2798,9 @@ private Status executeMutation(
}
}
- /** Build a Mutation by using the given WriteBuilder to set the columns for the action. */
+ /**
+ * Build a Mutation by using the given WriteBuilder to set the columns for the action.
+ */
private Mutation buildWrite(
List columnList, List valueList, WriteBuilder write) {
Preconditions.checkState(columnList.size() == valueList.size());
@@ -2604,7 +2810,9 @@ private Mutation buildWrite(
return write.build();
}
- /** Execute a read action request, store the results in the OutcomeSender. */
+ /**
+ * Execute a read action request, store the results in the OutcomeSender.
+ */
private Status executeRead(
boolean useMultiplexedSession,
ReadAction action,
@@ -2654,7 +2862,9 @@ private Status executeRead(
}
}
- /** Execute a query action request, store the results in the OutcomeSender. */
+ /**
+ * Execute a query action request, store the results in the OutcomeSender.
+ */
private Status executeQuery(
boolean useMultiplexedSession,
QueryAction action,
@@ -2691,7 +2901,9 @@ private Status executeQuery(
}
}
- /** Execute a dml update action request, store the results in the OutcomeSender. */
+ /**
+ * Execute a dml update action request, store the results in the OutcomeSender.
+ */
private Status executeCloudDmlUpdate(
boolean useMultiplexedSession,
DmlAction action,
@@ -2724,7 +2936,9 @@ private Status executeCloudDmlUpdate(
}
}
- /** Execute a BatchDml update action request, store the results in the OutcomeSender. */
+ /**
+ * Execute a BatchDml update action request, store the results in the OutcomeSender.
+ */
private Status executeCloudBatchDmlUpdates(
BatchDmlAction action, OutcomeSender sender, ExecutionFlowContext executionContext) {
try {
@@ -2762,7 +2976,9 @@ private Status executeCloudBatchDmlUpdates(
}
}
- /** Process a ResultSet from a read/query and store the results in the OutcomeSender. */
+ /**
+ * Process a ResultSet from a read/query and store the results in the OutcomeSender.
+ */
private Status processResults(
boolean useMultiplexedSession,
ResultSet results,
@@ -2772,7 +2988,9 @@ private Status processResults(
return processResults(useMultiplexedSession, results, limit, sender, executionContext, false);
}
- /** Process a ResultSet from a read/query/dml and store the results in the OutcomeSender. */
+ /**
+ * Process a ResultSet from a read/query/dml and store the results in the OutcomeSender.
+ */
private Status processResults(
boolean useMultiplexedSession,
ResultSet results,
@@ -2807,7 +3025,7 @@ private Status processResults(
Level.INFO,
String.format(
"Successfully processed result: %s\n", executionContext.getTransactionSeed()));
- executionContext.finishRead(Status.OK);
+ executionContext.finishRead(Status.OK, null);
return sender.finishWithOK();
} catch (SpannerException e) {
LOGGER.log(Level.WARNING, "Encountered exception: ", e);
@@ -2817,7 +3035,7 @@ private Status processResults(
String.format(
"Encountered exception: %s %s\n",
status.getDescription(), executionContext.getTransactionSeed()));
- executionContext.finishRead(status);
+ executionContext.finishRead(status, e);
if (status.getCode() == Status.ABORTED.getCode()) {
return sender.finishWithTransactionRestarted();
} else {
@@ -2842,14 +3060,18 @@ private Status processResults(
}
}
- /** Convert a result row to a row proto(value list) for sending back to the client. */
+ /**
+ * Convert a result row to a row proto(value list) for sending back to the client.
+ */
private com.google.spanner.executor.v1.ValueList buildRow(
StructReader result, OutcomeSender sender) throws SpannerException {
sender.setRowType(buildStructType(result));
return buildStruct(result);
}
- /** Construct a StructType for a given struct. This is used to set the row type. */
+ /**
+ * Construct a StructType for a given struct. This is used to set the row type.
+ */
private com.google.spanner.v1.StructType buildStructType(StructReader struct) {
com.google.spanner.v1.StructType.Builder rowTypeBuilder =
com.google.spanner.v1.StructType.newBuilder();
@@ -2864,7 +3086,9 @@ private com.google.spanner.v1.StructType buildStructType(StructReader struct) {
return rowTypeBuilder.build();
}
- /** Convert a struct to a proto(value list) for constructing result rows and struct values. */
+ /**
+ * Convert a struct to a proto(value list) for constructing result rows and struct values.
+ */
private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct) {
com.google.spanner.executor.v1.ValueList.Builder structBuilder =
com.google.spanner.executor.v1.ValueList.newBuilder();
@@ -2915,258 +3139,245 @@ private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct
break;
case ARRAY:
switch (struct.getColumnType(i).getArrayElementType().getCode()) {
- case BOOL:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getBooleanList(i);
- for (Boolean booleanValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (booleanValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setBoolValue(booleanValue).build());
- }
+ case BOOL: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getBooleanList(i);
+ for (Boolean booleanValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (booleanValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setBoolValue(booleanValue).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BOOL).build());
}
- break;
- case FLOAT32:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getFloatList(i);
- for (Float floatValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (floatValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setDoubleValue((double) floatValue).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BOOL).build());
+ }
+ break;
+ case FLOAT32: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getFloatList(i);
+ for (Float floatValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (floatValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setDoubleValue((double) floatValue).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT32).build());
}
- break;
- case FLOAT64:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getDoubleList(i);
- for (Double doubleValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (doubleValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setDoubleValue(doubleValue).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT32).build());
+ }
+ break;
+ case FLOAT64: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getDoubleList(i);
+ for (Double doubleValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (doubleValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setDoubleValue(doubleValue).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT64).build());
}
- break;
- case INT64:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getLongList(i);
- for (Long longValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (longValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setIntValue(longValue).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.FLOAT64).build());
+ }
+ break;
+ case INT64: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getLongList(i);
+ for (Long longValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (longValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setIntValue(longValue).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INT64).build());
}
- break;
- case STRING:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getStringList(i);
- for (String stringValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (stringValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStringValue(stringValue)).build();
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INT64).build());
+ }
+ break;
+ case STRING: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getStringList(i);
+ for (String stringValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (stringValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStringValue(stringValue)).build();
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRING).build());
}
- break;
- case BYTES:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getBytesList(i);
- for (ByteArray byteArrayValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (byteArrayValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(
- valueProto
- .setBytesValue(ByteString.copyFrom(byteArrayValue.toByteArray()))
- .build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRING).build());
+ }
+ break;
+ case BYTES: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getBytesList(i);
+ for (ByteArray byteArrayValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (byteArrayValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(
+ valueProto
+ .setBytesValue(ByteString.copyFrom(byteArrayValue.toByteArray()))
+ .build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BYTES).build());
}
- break;
- case DATE:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getDateList(i);
- for (Date dateValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (dateValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(
- valueProto.setDateDaysValue(daysFromDate(dateValue)).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.BYTES).build());
+ }
+ break;
+ case DATE: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getDateList(i);
+ for (Date dateValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (dateValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(
+ valueProto.setDateDaysValue(daysFromDate(dateValue)).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.DATE).build());
}
- break;
- case TIMESTAMP:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getTimestampList(i);
- for (Timestamp timestampValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (timestampValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(
- valueProto.setTimestampValue(timestampToProto(timestampValue)).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.DATE).build());
+ }
+ break;
+ case TIMESTAMP: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getTimestampList(i);
+ for (Timestamp timestampValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (timestampValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(
+ valueProto.setTimestampValue(timestampToProto(timestampValue)).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.TIMESTAMP).build());
}
- break;
- case INTERVAL:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getIntervalList(i);
- for (Interval interval : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (interval == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStringValue(interval.toISO8601()).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.TIMESTAMP).build());
+ }
+ break;
+ case INTERVAL: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getIntervalList(i);
+ for (Interval interval : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (interval == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStringValue(interval.toISO8601()).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INTERVAL).build());
}
- break;
- case UUID:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getUuidList(i);
- for (UUID uuidValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (uuidValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStringValue(uuidValue.toString()).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INTERVAL).build());
+ }
+ break;
+ case UUID: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getUuidList(i);
+ for (UUID uuidValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (uuidValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStringValue(uuidValue.toString()).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.UUID).build());
}
- break;
- case NUMERIC:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getBigDecimalList(i);
- for (BigDecimal bigDec : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (bigDec == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStringValue(bigDec.toPlainString()).build());
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.UUID).build());
+ }
+ break;
+ case NUMERIC: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getBigDecimalList(i);
+ for (BigDecimal bigDec : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (bigDec == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStringValue(bigDec.toPlainString()).build());
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.NUMERIC).build());
}
- break;
- case JSON:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getJsonList(i);
- for (String stringValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (stringValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStringValue(stringValue)).build();
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.NUMERIC).build());
+ }
+ break;
+ case JSON: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getJsonList(i);
+ for (String stringValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (stringValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStringValue(stringValue)).build();
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.JSON).build());
}
- break;
- case STRUCT:
- {
- com.google.spanner.executor.v1.ValueList.Builder builder =
- com.google.spanner.executor.v1.ValueList.newBuilder();
- List values = struct.getStructList(i);
- for (StructReader structValue : values) {
- com.google.spanner.executor.v1.Value.Builder valueProto =
- com.google.spanner.executor.v1.Value.newBuilder();
- if (structValue == null) {
- builder.addValue(valueProto.setIsNull(true).build());
- } else {
- builder.addValue(valueProto.setStructValue(buildStruct(structValue))).build();
- }
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.JSON).build());
+ }
+ break;
+ case STRUCT: {
+ com.google.spanner.executor.v1.ValueList.Builder builder =
+ com.google.spanner.executor.v1.ValueList.newBuilder();
+ List values = struct.getStructList(i);
+ for (StructReader structValue : values) {
+ com.google.spanner.executor.v1.Value.Builder valueProto =
+ com.google.spanner.executor.v1.Value.newBuilder();
+ if (structValue == null) {
+ builder.addValue(valueProto.setIsNull(true).build());
+ } else {
+ builder.addValue(valueProto.setStructValue(buildStruct(structValue))).build();
}
- value.setArrayValue(builder.build());
- value.setArrayType(
- com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRUCT).build());
}
- break;
+ value.setArrayValue(builder.build());
+ value.setArrayType(
+ com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.STRUCT).build());
+ }
+ break;
default:
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
@@ -3191,7 +3402,9 @@ private com.google.spanner.executor.v1.ValueList buildStruct(StructReader struct
return structBuilder.build();
}
- /** Convert a ListValue proto to a list of cloud Value. */
+ /**
+ * Convert a ListValue proto to a list of cloud Value.
+ */
private static List cloudValuesFromValueList(
com.google.spanner.executor.v1.ValueList valueList, List typeList)
throws SpannerException {
@@ -3206,7 +3419,9 @@ private static List cloudValuesFromValueList(
return cloudValues;
}
- /** Convert a proto KeySet to a cloud KeySet. */
+ /**
+ * Convert a proto KeySet to a cloud KeySet.
+ */
private static com.google.cloud.spanner.KeySet keySetProtoToCloudKeySet(
com.google.spanner.executor.v1.KeySet keySetProto, List typeList)
throws SpannerException {
@@ -3224,7 +3439,9 @@ private static com.google.cloud.spanner.KeySet keySetProtoToCloudKeySet(
return cloudKeySetBuilder.build();
}
- /** Convert a keyRange proto to a cloud KeyRange. */
+ /**
+ * Convert a keyRange proto to a cloud KeyRange.
+ */
private static com.google.cloud.spanner.KeyRange keyRangeProtoToCloudKeyRange(
com.google.spanner.executor.v1.KeyRange keyRangeProto,
List typeList)
@@ -3251,7 +3468,9 @@ private static com.google.cloud.spanner.KeyRange keyRangeProtoToCloudKeyRange(
}
}
- /** Convert a key proto(value list) to a cloud Key. */
+ /**
+ * Convert a key proto(value list) to a cloud Key.
+ */
private static com.google.cloud.spanner.Key keyProtoToCloudKey(
com.google.spanner.executor.v1.ValueList keyProto, List typeList)
throws SpannerException {
@@ -3324,7 +3543,9 @@ private static com.google.cloud.spanner.Key keyProtoToCloudKey(
return cloudKey.build();
}
- /** Convert a Value proto to a cloud Value. */
+ /**
+ * Convert a Value proto to a cloud Value.
+ */
@SuppressWarnings("NullTernary")
private static com.google.cloud.spanner.Value valueProtoToCloudValue(
com.google.spanner.v1.Type type, com.google.spanner.executor.v1.Value value) {
@@ -3369,14 +3590,13 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue(
case UUID:
return com.google.cloud.spanner.Value.uuid(
value.hasIsNull() ? null : UUID.fromString(value.getStringValue()));
- case NUMERIC:
- {
- if (value.hasIsNull()) {
- return com.google.cloud.spanner.Value.numeric(null);
- }
- String ascii = value.getStringValue();
- return com.google.cloud.spanner.Value.numeric(new BigDecimal(ascii));
+ case NUMERIC: {
+ if (value.hasIsNull()) {
+ return com.google.cloud.spanner.Value.numeric(null);
}
+ String ascii = value.getStringValue();
+ return com.google.cloud.spanner.Value.numeric(new BigDecimal(ascii));
+ }
case JSON:
return com.google.cloud.spanner.Value.json(
value.hasIsNull() ? null : value.getStringValue());
@@ -3521,31 +3741,30 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue(
.collect(Collectors.toList()),
UUID::fromString));
}
- case NUMERIC:
- {
- if (value.hasIsNull()) {
- return com.google.cloud.spanner.Value.numericArray(null);
- }
- List nullList =
- value.getArrayValue().getValueList().stream()
- .map(com.google.spanner.executor.v1.Value::getIsNull)
- .collect(Collectors.toList());
- List valueList =
- value.getArrayValue().getValueList().stream()
- .map(com.google.spanner.executor.v1.Value::getStringValue)
- .collect(Collectors.toList());
- List newValueList = new ArrayList<>(valueList.size());
-
- for (int i = 0; i < valueList.size(); ++i) {
- if (i < nullList.size() && nullList.get(i)) {
- newValueList.add(null);
- continue;
- }
- String ascii = valueList.get(i);
- newValueList.add(new BigDecimal(ascii));
+ case NUMERIC: {
+ if (value.hasIsNull()) {
+ return com.google.cloud.spanner.Value.numericArray(null);
+ }
+ List nullList =
+ value.getArrayValue().getValueList().stream()
+ .map(com.google.spanner.executor.v1.Value::getIsNull)
+ .collect(Collectors.toList());
+ List valueList =
+ value.getArrayValue().getValueList().stream()
+ .map(com.google.spanner.executor.v1.Value::getStringValue)
+ .collect(Collectors.toList());
+ List newValueList = new ArrayList<>(valueList.size());
+
+ for (int i = 0; i < valueList.size(); ++i) {
+ if (i < nullList.size() && nullList.get(i)) {
+ newValueList.add(null);
+ continue;
}
- return com.google.cloud.spanner.Value.numericArray(newValueList);
+ String ascii = valueList.get(i);
+ newValueList.add(new BigDecimal(ascii));
}
+ return com.google.cloud.spanner.Value.numericArray(newValueList);
+ }
case STRUCT:
com.google.cloud.spanner.Type elementType =
typeProtoToCloudType(type.getArrayElementType());
@@ -3588,7 +3807,9 @@ private static com.google.cloud.spanner.Value valueProtoToCloudValue(
ErrorCode.INVALID_ARGUMENT, "Unsupported type while converting from value proto: " + type);
}
- /** Convert a cloud Timestamp to a Timestamp proto. */
+ /**
+ * Convert a cloud Timestamp to a Timestamp proto.
+ */
private com.google.protobuf.Timestamp timestampToProto(Timestamp t) throws SpannerException {
try {
return Timestamps.parse(t.toString());
@@ -3598,12 +3819,16 @@ private com.google.protobuf.Timestamp timestampToProto(Timestamp t) throws Spann
}
}
- /** Convert a cloud Date to a Date proto. */
+ /**
+ * Convert a cloud Date to a Date proto.
+ */
private static int daysFromDate(Date date) {
return (int) LocalDate.of(date.getYear(), date.getMonth(), date.getDayOfMonth()).toEpochDay();
}
- /** Convert a Date proto to a cloud Date. */
+ /**
+ * Convert a Date proto to a cloud Date.
+ */
private static Date dateFromDays(int daysSinceEpoch) {
LocalDate localDate = LocalDate.ofEpochDay(daysSinceEpoch);
return Date.fromYearMonthDay(
@@ -3626,7 +3851,9 @@ private static ByteArray toByteArray(@Nullable ByteString byteString) {
return ByteArray.copyFrom(byteString.toByteArray());
}
- /** Convert a list of nullable value to another type. */
+ /**
+ * Convert a list of nullable value to another type.
+ */
private static List unmarshallValueList(
List isNullList, List valueList, Function converter) {
List newValueList = new ArrayList<>(valueList.size());
@@ -3642,12 +3869,16 @@ private static List unmarshallValueList(
return newValueList;
}
- /** Insert null into valueList according to isNullList. */
+ /**
+ * Insert null into valueList according to isNullList.
+ */
private static List unmarshallValueList(List isNullList, List valueList) {
return unmarshallValueList(isNullList, valueList, element -> element);
}
- /** Convert a Struct proto to a cloud Struct. */
+ /**
+ * Convert a Struct proto to a cloud Struct.
+ */
private static com.google.cloud.spanner.Struct structProtoToCloudStruct(
com.google.spanner.v1.Type type, com.google.spanner.executor.v1.ValueList structValue) {
List fieldValues = structValue.getValueList();
@@ -3668,7 +3899,9 @@ private static com.google.cloud.spanner.Struct structProtoToCloudStruct(
return builder.build();
}
- /** Convert a Type proto to a cloud Type. */
+ /**
+ * Convert a Type proto to a cloud Type.
+ */
private static com.google.cloud.spanner.Type typeProtoToCloudType(
com.google.spanner.v1.Type typeProto) {
switch (typeProto.getCode()) {
@@ -3727,7 +3960,9 @@ private static com.google.cloud.spanner.Type typeProtoToCloudType(
}
}
- /** Convert a cloud Type to a Type proto. */
+ /**
+ * Convert a cloud Type to a Type proto.
+ */
private static com.google.spanner.v1.Type cloudTypeToTypeProto(@Nonnull Type cloudTypeProto) {
switch (cloudTypeProto.getCode()) {
case BOOL:
@@ -3795,14 +4030,18 @@ private static com.google.spanner.v1.Type cloudTypeToTypeProto(@Nonnull Type clo
}
}
- /** Unmarshall ByteString to serializable object. */
+ /**
+ * Unmarshall ByteString to serializable object.
+ */
private T unmarshall(ByteString input)
throws IOException, ClassNotFoundException {
ObjectInputStream objectInputStream = new ObjectInputStream(input.newInput());
return (T) objectInputStream.readObject();
}
- /** Marshall a serializable object into ByteString. */
+ /**
+ * Marshall a serializable object into ByteString.
+ */
private ByteString marshall(T object) throws IOException {
ByteString.Output output = ByteString.newOutput();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(output);
@@ -3812,14 +4051,18 @@ private ByteString marshall(T object) throws IOExceptio
return output.toByteString();
}
- /** Build Timestamp from micros. */
+ /**
+ * Build Timestamp from micros.
+ */
private Timestamp timestampFromMicros(long micros) {
long seconds = TimeUnit.MICROSECONDS.toSeconds(micros);
int nanos = (int) (micros * 1000 - seconds * 1000000000);
return Timestamp.ofTimeSecondsAndNanos(seconds, nanos);
}
- /** Build TimestampBound from Concurrency. */
+ /**
+ * Build TimestampBound from Concurrency.
+ */
private TimestampBound timestampBoundsFromConcurrency(Concurrency concurrency) {
if (concurrency.hasStalenessSeconds()) {
return TimestampBound.ofExactStaleness(
@@ -3844,7 +4087,9 @@ private TimestampBound timestampBoundsFromConcurrency(Concurrency concurrency) {
ErrorCode.INVALID_ARGUMENT, "Unsupported concurrency mode: " + concurrency);
}
- /** Build instance proto from cloud spanner instance. */
+ /**
+ * Build instance proto from cloud spanner instance.
+ */
private com.google.spanner.admin.instance.v1.Instance instanceToProto(Instance instance) {
com.google.spanner.admin.instance.v1.Instance.Builder instanceBuilder =
com.google.spanner.admin.instance.v1.Instance.newBuilder();
@@ -3875,7 +4120,9 @@ private com.google.spanner.admin.instance.v1.Instance instanceToProto(Instance i
return instanceBuilder.build();
}
- /** Build instance proto from cloud spanner instance. */
+ /**
+ * Build instance proto from cloud spanner instance.
+ */
private com.google.spanner.admin.instance.v1.InstanceConfig instanceConfigToProto(
InstanceConfig instanceConfig) {
com.google.spanner.admin.instance.v1.InstanceConfig.Builder instanceConfigBuilder =
@@ -3930,4 +4177,4 @@ private com.google.spanner.admin.instance.v1.InstanceConfig instanceConfigToProt
}
return instanceConfigBuilder.build();
}
-}
+}
\ No newline at end of file
diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
index 0da30f82d23c..a1b532aca5ec 100644
--- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
+++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
@@ -70,7 +70,7 @@ public class WorkerProxy {
public static int proxyPort = 0;
public static String cert = "";
public static String serviceKeyFile = "";
- public static double multiplexedSessionOperationsRatio = 0.0;
+ public static double multiplexedSessionOperationsRatio = 1.0;
public static boolean usePlainTextChannel = false;
public static boolean enableGrpcFaultInjector = false;
public static OpenTelemetrySdk openTelemetrySdk;
@@ -231,4 +231,4 @@ private static CommandLine buildOptions(String[] args) {
throw new IllegalArgumentException(e.getMessage());
}
}
-}
+}
\ No newline at end of file