Skip to content

Commit b40c279

Browse files
committed
SK-2302 Update Concurrency values for insert and detokenize
- Add ability to capture system env variables
1 parent 2cc408f commit b40c279

File tree

3 files changed

+57
-37
lines changed

3 files changed

+57
-37
lines changed

v3/src/main/java/com/skyflow/utils/Constants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ public final class Constants extends BaseConstants {
77
public static final String SDK_PREFIX = SDK_NAME + SDK_VERSION;
88
public static final Integer INSERT_BATCH_SIZE = 50;
99
public static final Integer MAX_INSERT_BATCH_SIZE = 1000;
10-
public static final Integer INSERT_CONCURRENCY_LIMIT = 10;
10+
public static final Integer INSERT_CONCURRENCY_LIMIT = 1;
1111
public static final Integer MAX_INSERT_CONCURRENCY_LIMIT = 10;
1212
public static final Integer DETOKENIZE_BATCH_SIZE = 50;
13-
public static final Integer DETOKENIZE_CONCURRENCY_LIMIT = 10;
13+
public static final Integer DETOKENIZE_CONCURRENCY_LIMIT = 1;
1414
public static final Integer MAX_DETOKENIZE_BATCH_SIZE = 1000;
1515
public static final Integer MAX_DETOKENIZE_CONCURRENCY_LIMIT = 10;
1616

v3/src/main/java/com/skyflow/utils/Utils.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
package com.skyflow.utils;
22

3-
import java.util.ArrayList;
4-
import java.util.HashMap;
5-
import java.util.List;
6-
import java.util.Map;
7-
83
import com.google.gson.JsonObject;
94
import com.skyflow.enums.Env;
105
import com.skyflow.errors.ErrorCode;
@@ -22,10 +17,14 @@
2217
import com.skyflow.vault.data.ErrorRecord;
2318
import com.skyflow.vault.data.Success;
2419
import com.skyflow.vault.data.Token;
25-
2620
import io.github.cdimascio.dotenv.Dotenv;
2721
import io.github.cdimascio.dotenv.DotenvException;
2822

23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
2928
public final class Utils extends BaseUtils {
3029

3130
public static String getVaultURL(String clusterId, Env env) {
@@ -55,7 +54,8 @@ public static List<DetokenizeRequest> createDetokenizeBatches(DetokenizeRequest
5554
// Create a sublist for the current batch
5655
List<String> batchTokens = tokens.subList(i, Math.min(i + batchSize, tokens.size()));
5756
List<TokenGroupRedactions> tokenGroupRedactions = null;
58-
if (request.getTokenGroupRedactions().isPresent() && !request.getTokenGroupRedactions().get().isEmpty() && i < request.getTokenGroupRedactions().get().size()) {
57+
if (request.getTokenGroupRedactions().isPresent() && !request.getTokenGroupRedactions().get().isEmpty()
58+
) {
5959
tokenGroupRedactions = request.getTokenGroupRedactions().get();
6060
}
6161
// Build a new DetokenizeRequest for the current batch
@@ -80,7 +80,7 @@ public static ErrorRecord createErrorRecord(Map<String, Object> recordMap, int i
8080
} else if (recordMap.containsKey("httpCode")) {
8181
code = (Integer) recordMap.get("httpCode");
8282

83-
} else{
83+
} else {
8484
if (recordMap.containsKey("statusCode")) {
8585
code = (Integer) recordMap.get("statusCode");
8686
}
@@ -125,7 +125,7 @@ public static List<ErrorRecord> handleBatchException(
125125
}
126126
}
127127
} else {
128-
int indexNumber = batchNumber > 0 ? batchNumber * batchSize: 0;
128+
int indexNumber = batchNumber > 0 ? batchNumber * batchSize : 0;
129129
for (int j = 0; j < batch.size(); j++) {
130130
ErrorRecord err = new ErrorRecord(indexNumber, ex.getMessage(), 500);
131131
errorRecords.add(err);
@@ -226,7 +226,7 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons
226226
if (value instanceof List) {
227227
List<?> valueList = (List<?>) value;
228228
for (Object item : valueList) {
229-
if(item instanceof Map) {
229+
if (item instanceof Map) {
230230
Map<String, Object> tokenMap = (Map<String, Object>) item;
231231
Token token = new Token((String) tokenMap.get("token"), (String) tokenMap.get("tokenGroupName"));
232232
tokenList.add(token);
@@ -259,7 +259,7 @@ public static String getEnvVaultURL() throws SkyflowException {
259259
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyVaultUrl.getMessage());
260260
} else if (vaultURL != null && !vaultURL.startsWith(BaseConstants.SECURE_PROTOCOL)) {
261261
LogUtil.printErrorLog(ErrorLogs.INVALID_VAULT_URL_FORMAT.getLog());
262-
throw new SkyflowException( ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.InvalidVaultUrlFormat.getMessage());
262+
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.InvalidVaultUrlFormat.getMessage());
263263
}
264264
return vaultURL;
265265
} catch (DotenvException e) {

v3/src/main/java/com/skyflow/vault/controller/VaultController.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
11
package com.skyflow.vault.controller;
22

3-
import java.util.ArrayList;
4-
import java.util.Collections;
5-
import java.util.List;
6-
import java.util.concurrent.CompletableFuture;
7-
import java.util.concurrent.ExecutionException;
8-
import java.util.concurrent.ExecutorService;
9-
import java.util.concurrent.Executors;
10-
113
import com.google.gson.Gson;
124
import com.google.gson.GsonBuilder;
135
import com.skyflow.VaultClient;
@@ -25,15 +17,17 @@
2517
import com.skyflow.utils.Utils;
2618
import com.skyflow.utils.logger.LogUtil;
2719
import com.skyflow.utils.validations.Validations;
28-
import com.skyflow.vault.data.DetokenizeRequest;
29-
import com.skyflow.vault.data.DetokenizeResponse;
30-
import com.skyflow.vault.data.DetokenizeResponseObject;
31-
import com.skyflow.vault.data.ErrorRecord;
32-
import com.skyflow.vault.data.InsertRecord;
33-
import com.skyflow.vault.data.InsertRequest;
34-
import com.skyflow.vault.data.Success;
35-
20+
import com.skyflow.vault.data.*;
3621
import io.github.cdimascio.dotenv.Dotenv;
22+
import io.github.cdimascio.dotenv.DotenvException;
23+
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
3731

3832
public final class VaultController extends VaultClient {
3933
private static final Gson gson = new GsonBuilder().serializeNulls().create();
@@ -307,18 +301,31 @@ private InsertResponse insertBatch(List<InsertRecordData> batch, String tableNam
307301
.records(batch)
308302
.upsert(upsert);
309303
// .build();
310-
if(tableName != null && !tableName.isEmpty()){
304+
if (tableName != null && !tableName.isEmpty()) {
311305
req.tableName(tableName);
312306
}
313-
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = req.build();
307+
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = req.build();
314308
return this.getRecordsApi().insert(request);
315309
}
316310

317311
private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
318312
try {
319-
Dotenv dotenv = Dotenv.load();
320-
String userProvidedBatchSize = dotenv.get("INSERT_BATCH_SIZE");
321-
String userProvidedConcurrencyLimit = dotenv.get("INSERT_CONCURRENCY_LIMIT");
313+
String userProvidedBatchSize = System.getenv("INSERT_BATCH_SIZE");
314+
String userProvidedConcurrencyLimit = System.getenv("INSERT_CONCURRENCY_LIMIT");
315+
316+
Dotenv dotenv = null;
317+
try {
318+
dotenv = Dotenv.load();
319+
} catch (DotenvException ignored) {
320+
// ignore the case if .env file is not found
321+
}
322+
323+
if (userProvidedBatchSize == null && dotenv != null) {
324+
userProvidedBatchSize = dotenv.get("INSERT_BATCH_SIZE");
325+
}
326+
if (userProvidedConcurrencyLimit == null && dotenv != null) {
327+
userProvidedConcurrencyLimit = dotenv.get("INSERT_CONCURRENCY_LIMIT");
328+
}
322329

323330
if (userProvidedBatchSize != null) {
324331
try {
@@ -372,9 +379,22 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
372379

373380
private void configureDetokenizeConcurrencyAndBatchSize(int totalRequests) {
374381
try {
375-
Dotenv dotenv = Dotenv.load();
376-
String userProvidedBatchSize = dotenv.get("DETOKENIZE_BATCH_SIZE");
377-
String userProvidedConcurrencyLimit = dotenv.get("DETOKENIZE_CONCURRENCY_LIMIT");
382+
String userProvidedBatchSize = System.getenv("DETOKENIZE_BATCH_SIZE");
383+
String userProvidedConcurrencyLimit = System.getenv("DETOKENIZE_CONCURRENCY_LIMIT");
384+
385+
Dotenv dotenv = null;
386+
try {
387+
dotenv = Dotenv.load();
388+
} catch (DotenvException ignored) {
389+
// ignore the case if .env file is not found
390+
}
391+
392+
if (userProvidedBatchSize == null && dotenv != null) {
393+
userProvidedBatchSize = dotenv.get("DETOKENIZE_BATCH_SIZE");
394+
}
395+
if (userProvidedConcurrencyLimit == null && dotenv != null) {
396+
userProvidedConcurrencyLimit = dotenv.get("DETOKENIZE_CONCURRENCY_LIMIT");
397+
}
378398

379399
if (userProvidedBatchSize != null) {
380400
try {

0 commit comments

Comments
 (0)