diff --git a/api/src/main/java/io/minio/BaseS3Client.java b/api/src/main/java/io/minio/BaseS3Client.java index aaf7c18a8..372a07e2c 100644 --- a/api/src/main/java/io/minio/BaseS3Client.java +++ b/api/src/main/java/io/minio/BaseS3Client.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; -import com.google.common.collect.ImmutableSet; import io.minio.credentials.Credentials; import io.minio.credentials.Provider; import io.minio.errors.ErrorResponseException; @@ -53,9 +52,8 @@ import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.Locale; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -64,9 +62,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import java.util.logging.Logger; +import java.util.stream.IntStream; import javax.annotation.Nonnull; import okhttp3.Call; import okhttp3.Callback; +import okhttp3.Interceptor; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.RequestBody; @@ -100,17 +100,13 @@ public abstract class BaseS3Client implements AutoCloseable { .build(); private static final String RETRY_HEAD = "RetryHead"; - private static final String END_HTTP = "----------END-HTTP----------"; - private static final String UPLOAD_ID = "uploadId"; - private static final Set TRACE_QUERY_PARAMS = - ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes"); - private PrintWriter traceStream; + private volatile PrintWriter traceStream; protected final Map regionCache = new ConcurrentHashMap<>(); protected String userAgent = Utils.getDefaultUserAgent(); protected Http.BaseUrl baseUrl; protected Provider provider; - protected OkHttpClient httpClient; + protected volatile OkHttpClient httpClient; protected boolean closeHttpClient; protected BaseS3Client( @@ -137,6 +133,45 @@ public void close() { } } + private static int getStatusRetryInterceptorIndex(List interceptors) { + return IntStream.range(0, interceptors.size()) + .filter(i -> interceptors.get(i) instanceof Http.StatusRetryInterceptor) + .findFirst() + .orElse(-1); + } + + /** + * Sets request retry parameters. Any null/invalid values disable retry. + * + *
Example:{@code
+   * minioClient.setRetry(ImmutableSet.of(408, 504), 250, 3);
+   * }
+ * + * @param retryStatusCodes HTTP status codes to be retried. + * @param delayMs Delay between retries. + * @param maxRetries Maximum number of retry attempts. + */ + public synchronized void setRetry( + Set retryStatusCodes, Long delayMs, Integer maxRetries) { + Interceptor interceptor = + new Http.StatusRetryInterceptor( + retryStatusCodes, delayMs == null ? 0 : delayMs, maxRetries == null ? 0 : maxRetries); + + List interceptors = this.httpClient.interceptors(); + int i = getStatusRetryInterceptorIndex(interceptors); + OkHttpClient.Builder builder = this.httpClient.newBuilder(); + if (i >= 0) { + builder.interceptors().clear(); + for (int j = 0; j < interceptors.size(); j++) { + builder.addInterceptor(i == j ? interceptor : interceptors.get(j)); + } + } else { + builder.addInterceptor(interceptor); + } + + this.httpClient = builder.build(); + } + /** * Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values * must be between 1 and Integer.MAX_VALUE when converted to milliseconds. @@ -268,25 +303,48 @@ private String[] handleRedirectResponse( return new String[] {code, message}; } + private OkHttpClient getHttpClient(PrintWriter traceStream, Http.S3Request s3request) { + if (traceStream == null) return this.httpClient; + + OkHttpClient httpClient = this.httpClient; + List interceptors = httpClient.interceptors(); + int i = getStatusRetryInterceptorIndex(interceptors); + Http.StatusRetryInterceptor interceptor = + i < 0 ? null : (Http.StatusRetryInterceptor) interceptors.get(i); + + OkHttpClient.Builder builder = httpClient.newBuilder(); + if (interceptor == null) { + builder.addInterceptor( + new Http.StatusRetryInterceptor(interceptor, traceStream, s3request.object() == null)); + } else { + builder.interceptors().clear(); + for (int j = 0; j < interceptors.size(); j++) { + if (i == j) { + builder.addInterceptor( + new Http.StatusRetryInterceptor( + interceptor, traceStream, s3request.object() == null)); + } else { + builder.addInterceptor(interceptors.get(j)); + } + } + } + + return builder.build(); + } + /** Execute HTTP request asynchronously for given parameters. */ protected CompletableFuture executeAsync(Http.S3Request s3request, String region) { Credentials credentials = (provider == null) ? null : provider.fetch(); Http.Request request = null; + PrintWriter traceStream = this.traceStream; try { request = s3request.toRequest(baseUrl, region, credentials); } catch (MinioException e) { return Utils.failedFuture(e); } + OkHttpClient httpClient = getHttpClient(traceStream, s3request); StringBuilder traceBuilder = new StringBuilder(request.httpTraces()); - PrintWriter traceStream = this.traceStream; - if (traceStream != null) traceStream.print(request.httpTraces()); - - OkHttpClient httpClient = this.httpClient; - // FIXME: enable retry for all request. - // if (!s3request.retryFailure()) { - // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build(); - // } okhttp3.Request httpRequest = request.httpRequest(); CompletableFuture completableFuture = newCompleteableFuture(); @@ -309,57 +367,23 @@ public void onResponse(Call call, final Response response) throws IOException { } private void onResponse(final Response response) throws IOException { - String trace = - String.format( - "%s %d %s%n%s", - response.protocol().toString().toUpperCase(Locale.US), - response.code(), - response.message(), - response.headers().toString()); - if (!trace.endsWith("\n\n")) { - trace += trace.endsWith("\n") ? "\n" : "\n\n"; - } - traceBuilder.append(trace); - if (traceStream != null) traceStream.print(trace); - + String traces = + Http.getResponseTraces( + response, + s3request.method(), + s3request.queryParams(), + s3request.object() == null); if (response.isSuccessful()) { - if (traceStream != null) { - // Trace response body only if the request is not - // GetObject/ListenBucketNotification - // S3 API. - Set keys = s3request.queryParams().keySet(); - if ((s3request.method() != Http.Method.GET - || s3request.object() == null - || !Collections.disjoint(keys, TRACE_QUERY_PARAMS)) - && !(keys.contains("events") - && (keys.contains("prefix") || keys.contains("suffix")))) { - String responseBody = response.peekBody(1024 * 1024).string(); - traceStream.print(responseBody); - if (!responseBody.endsWith("\n")) traceStream.println(); - } - traceStream.println(END_HTTP); - } - completableFuture.complete(response); return; } + traceBuilder.append(traces); String errorXml = null; try (ResponseBody responseBody = response.body()) { errorXml = responseBody.string(); } - if (!("".equals(errorXml) && s3request.method().equals(Http.Method.HEAD))) { - traceBuilder.append(errorXml); - if (traceStream != null) traceStream.print(errorXml); - if (!errorXml.endsWith("\n")) { - traceBuilder.append("\n"); - if (traceStream != null) traceStream.println(); - } - } - traceBuilder.append(END_HTTP).append("\n"); - if (traceStream != null) traceStream.println(END_HTTP); - // Error out for Non-XML response from server for non-HEAD requests. String contentType = response.headers().get(Http.Headers.CONTENT_TYPE); if (!s3request.method().equals(Http.Method.HEAD) @@ -635,7 +659,7 @@ protected void checkArgs(BaseArgs args) { public CompletableFuture abortMultipartUpload( AbortMultipartUploadArgs args) { checkArgs(args); - return executeDeleteAsync(args, null, new Http.QueryParameters(UPLOAD_ID, args.uploadId())) + return executeDeleteAsync(args, null, new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId())) .thenApply( response -> { try { @@ -672,7 +696,7 @@ public CompletableFuture completeMultipartUpload( return executePostAsync( args, args.ssec() == null ? null : args.ssec().headers(), - new Http.QueryParameters(UPLOAD_ID, args.uploadId()), + new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()), body) .thenApply( response -> { @@ -1191,7 +1215,7 @@ public CompletableFuture listObjectVersions( public CompletableFuture listParts(ListPartsArgs args) { Http.QueryParameters queryParams = new Http.QueryParameters( - UPLOAD_ID, + Http.UPLOAD_ID, args.uploadId(), "max-parts", (args.maxParts() != null) ? args.maxParts().toString() : "1000"); diff --git a/api/src/main/java/io/minio/Http.java b/api/src/main/java/io/minio/Http.java index 846bfc3b9..b87e7f92c 100644 --- a/api/src/main/java/io/minio/Http.java +++ b/api/src/main/java/io/minio/Http.java @@ -22,6 +22,7 @@ import io.minio.errors.MinioException; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; import java.io.RandomAccessFile; import java.net.URL; import java.nio.channels.Channels; @@ -52,8 +53,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -65,9 +68,12 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import okhttp3.Interceptor; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; import okio.BufferedSink; import okio.Okio; @@ -85,6 +91,23 @@ public class Http { DEFAULT_MEDIA_TYPE, Checksum.ZERO_SHA256_HASH, Checksum.ZERO_MD5_HASH); + public static final Set RETRIABLE_STATUS_CODES = + ImmutableSet.of( + 408, // Request Timeout + 429, // Too Many Requests + 499, // Client Closed Request (nginx) + 500, // Internal Server Error + 502, // Bad Gateway + 503, // Service Unavailable + 504, // Gateway Timeout + 520); // Cloudflare unknown error + + public static final String END_HTTP = "----------END-HTTP----------"; + public static final String UPLOAD_ID = "uploadId"; + public static final Set TRACE_QUERY_PARAMS = + ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes"); + private static final Pattern SIGNATURE_PATTERN = Pattern.compile("Signature=([0-9a-f]+)"); + private static final Pattern CREDENTIAL_PATTERN = Pattern.compile("Credential=([^/]+)"); /** Base URL of S3 endpoint. */ public static class BaseUrl { @@ -608,6 +631,174 @@ public static OkHttpClient enableExternalCertificatesFromEnv(OkHttpClient client client, System.getenv("SSL_CERT_FILE"), System.getenv("SSL_CERT_DIR")); } + public static String getRequestTraces(okhttp3.Request request, String bodyString) { + Method method = Method.fromString(request.method()); + + StringBuilder traceBuilder = new StringBuilder(); + traceBuilder.append("---------START-HTTP---------\n"); + String encodedPath = request.url().encodedPath(); + String encodedQuery = request.url().encodedQuery(); + if (encodedQuery != null) encodedPath += "?" + encodedQuery; + traceBuilder.append(method.toString()).append(" ").append(encodedPath).append(" HTTP/1.1\n"); + traceBuilder.append( + SIGNATURE_PATTERN + .matcher( + CREDENTIAL_PATTERN + .matcher(request.headers().toString()) + .replaceAll("Credential=*REDACTED*")) + .replaceAll("Signature=*REDACTED*")); + + String lastTwoChars = traceBuilder.substring(traceBuilder.length() - 2); + if (lastTwoChars.charAt(1) != '\n') { + traceBuilder.append("\n\n"); + } else if (lastTwoChars.charAt(0) != '\n') { + traceBuilder.append("\n"); + } + if (method == Method.PUT || method == Method.POST) { + if (bodyString != null) { + traceBuilder.append(bodyString); + if (!bodyString.endsWith("\n")) traceBuilder.append("\n"); + } + } + return traceBuilder.toString(); + } + + public static String getResponseTraces( + okhttp3.Response response, + Method method, + QueryParameters queryParams, + boolean isBucketRequest) + throws IOException { + StringBuilder traceBuilder = new StringBuilder(); + String trace = + String.format( + "%s %d %s%n%s", + response.protocol().toString().toUpperCase(Locale.US), + response.code(), + response.message(), + response.headers().toString()); + if (!trace.endsWith("\n\n")) { + trace += trace.endsWith("\n") ? "\n" : "\n\n"; + } + traceBuilder.append(trace); + + if (response.isSuccessful()) { + // Trace response body only if the request is not + // GetObject/ListenBucketNotification + // S3 API. + Set keys = queryParams.keySet(); + if ((method != Method.GET + || isBucketRequest + || !Collections.disjoint(keys, TRACE_QUERY_PARAMS)) + && !(keys.contains("events") && (keys.contains("prefix") || keys.contains("suffix")))) { + String responseBody = response.peekBody(1024 * 1024).string(); + traceBuilder.append(responseBody); + if (!responseBody.endsWith("\n")) traceBuilder.append("\n"); + } else { + traceBuilder.append("<<>>\n"); + } + traceBuilder.append(END_HTTP).append("\n"); + } else { + String responseBody = response.peekBody(1024 * 1024).string(); + traceBuilder.append(responseBody); + if (!responseBody.endsWith("\n") && !(responseBody.isEmpty() && method == Method.HEAD)) { + traceBuilder.append("\n"); + } + traceBuilder.append(END_HTTP).append("\n"); + } + + return traceBuilder.toString(); + } + + public static class StatusRetryInterceptor implements Interceptor { + private final Set retryStatusCodes; + private final long delayMs; + private final int maxRetries; + private final PrintWriter traceWriter; + private final boolean isBucketRequest; + + private StatusRetryInterceptor( + Set retryStatusCodes, + long delayMs, + int maxRetries, + PrintWriter traceWriter, + boolean isBucketRequest) { + this.retryStatusCodes = retryStatusCodes; + this.delayMs = delayMs; + this.maxRetries = Math.max(1, maxRetries); + this.traceWriter = traceWriter; + this.isBucketRequest = isBucketRequest; + } + + public StatusRetryInterceptor() { + this(RETRIABLE_STATUS_CODES, 100, 5, null, false); + } + + public StatusRetryInterceptor(Set retryStatusCodes, long delayMs, int maxRetries) { + this(retryStatusCodes, delayMs, maxRetries, null, false); + } + + public StatusRetryInterceptor( + StatusRetryInterceptor interceptor, PrintWriter traceWriter, boolean isBucketRequest) { + this( + interceptor != null ? interceptor.retryStatusCodes : RETRIABLE_STATUS_CODES, + interceptor != null ? interceptor.delayMs : 100, + interceptor != null ? interceptor.maxRetries : 5, + traceWriter, + isBucketRequest); + } + + @Override + public Response intercept(Chain chain) throws IOException { + okhttp3.Request request = chain.request(); + Method method = Method.fromString(request.method()); + QueryParameters queryParams = new QueryParameters(); + for (String key : request.url().queryParameterNames()) { + for (String value : request.url().queryParameterValues(key)) { + queryParams.add(key, value); + } + } + + String bodyString = null; + if (request.body() instanceof RequestBody) { + RequestBody body = (RequestBody) request.body(); + bodyString = body.bodyString(); + } + + for (int i = 0; i < maxRetries; i++) { + if (traceWriter != null) { + traceWriter.print(getRequestTraces(request, bodyString)); + traceWriter.flush(); + } + + okhttp3.Response response = chain.proceed(request); + if (traceWriter != null) { + traceWriter.print(getResponseTraces(response, method, queryParams, isBucketRequest)); + traceWriter.flush(); + } + + if (response.isSuccessful() + || i == maxRetries - 1 + || retryStatusCodes == null + || !retryStatusCodes.contains(response.code())) return response; + + response.close(); + + if (delayMs <= 0) continue; + long maxBackoffLimit = delayMs * (1L << (i + 1)); + long jitteredDelay = ThreadLocalRandom.current().nextLong(0, maxBackoffLimit); + try { + Thread.sleep(jitteredDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Retry interrupted", e); + } + } + + return null; // This never happens. + } + } + /** * Creates new HTTP client with default timeout with additional TLS certificates from * SSL_CERT_FILE and SSL_CERT_DIR environment variables if present. @@ -620,6 +811,7 @@ public static OkHttpClient newDefaultClient() { .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) .readTimeout(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) .protocols(Arrays.asList(Protocol.HTTP_1_1)) + .addInterceptor(new StatusRetryInterceptor()) .build(); try { return enableExternalCertificatesFromEnv(client); @@ -693,7 +885,7 @@ public static class Body { private MediaType contentType; private String sha256Hash; private String md5Hash; - private boolean bodyString; + private String bodyString = "<<>>"; /** Creates Body for okhttp3 RequestBody. */ public Body(okhttp3.RequestBody requestBody) { @@ -740,9 +932,9 @@ public Body(Object body, MediaType contentType, String sha256Hash, String md5Has sha256Hash = Checksum.hexString(Checksum.SHA256.sum(data)); md5Hash = Checksum.base64String(Checksum.MD5.sum(data)); - this.bodyString = true; this.data = data; set((long) data.length, contentType, sha256Hash, md5Hash); + this.bodyString = new String(data, StandardCharsets.UTF_8); } private void set(Long length, MediaType contentType, String sha256Hash, String md5Hash) { @@ -783,14 +975,18 @@ public Headers headers() { /** Creates HTTP RequestBody for this body. */ public RequestBody toRequestBody() throws MinioException { if (requestBody != null) return new RequestBody(requestBody); - if (file != null) return new RequestBody(file, length, contentType); - if (buffer != null) return new RequestBody(buffer, contentType); - return new RequestBody(data, length.intValue(), contentType); + if (file != null) { + return new RequestBody(file, length, contentType, bodyString); + } + if (buffer != null) { + return new RequestBody(buffer, contentType, bodyString); + } + return new RequestBody(data, length.intValue(), contentType, bodyString); } @Override public String toString() { - return bodyString ? new String(data, StandardCharsets.UTF_8) : "<<>>"; + return bodyString; } } @@ -803,21 +999,27 @@ public static class RequestBody extends okhttp3.RequestBody { private byte[] bytes; private long length; private MediaType contentType; + private String bodyString; /** Creates RequestBody for byte array. */ public RequestBody( - @Nonnull final byte[] bytes, final int length, @Nonnull final MediaType contentType) { + @Nonnull final byte[] bytes, + final int length, + @Nonnull final MediaType contentType, + final String bodyString) { this.bytes = Utils.validateNotNull(bytes, "data bytes"); if (length < 0) throw new IllegalArgumentException("length must not be negative value"); this.length = length; this.contentType = Utils.validateNotNull(contentType, "content type"); + this.bodyString = bodyString; } /** Creates RequestBody for RandomAccessFile. */ public RequestBody( @Nonnull final RandomAccessFile file, final long length, - @Nonnull final MediaType contentType) + @Nonnull final MediaType contentType, + final String bodyString) throws MinioException { this.file = Utils.validateNotNull(file, "randome access file"); if (length < 0) throw new IllegalArgumentException("length must not be negative value"); @@ -828,13 +1030,41 @@ public RequestBody( } catch (IOException e) { throw new MinioException(e); } + this.bodyString = bodyString; } /** Creates RequestBody for ByteBuffer. */ - public RequestBody(@Nonnull final ByteBuffer buffer, @Nonnull final MediaType contentType) { + public RequestBody( + @Nonnull final ByteBuffer buffer, + @Nonnull final MediaType contentType, + final String bodyString) { this.buffer = Utils.validateNotNull(buffer, "buffer"); this.length = buffer.length(); this.contentType = Utils.validateNotNull(contentType, "content type"); + this.bodyString = bodyString; + } + + /** Creates RequestBody for byte array. */ + @Deprecated + public RequestBody( + @Nonnull final byte[] bytes, final int length, @Nonnull final MediaType contentType) { + this(bytes, length, contentType, null); + } + + /** Creates RequestBody for RandomAccessFile. */ + @Deprecated + public RequestBody( + @Nonnull final RandomAccessFile file, + final long length, + @Nonnull final MediaType contentType) + throws MinioException { + this(file, length, contentType, null); + } + + /** Creates RequestBody for ByteBuffer. */ + @Deprecated + public RequestBody(@Nonnull final ByteBuffer buffer, @Nonnull final MediaType contentType) { + this(buffer, contentType, null); } /** Creates RequestBody for okhttp3 RequestBody. */ @@ -877,6 +1107,11 @@ public void writeTo(BufferedSink sink) throws IOException { sink.write(bytes, 0, (int) length); } } + + /** Get body trace string. */ + public String bodyString() { + return this.bodyString; + } } /** HTTP methods. */ @@ -886,6 +1121,15 @@ public static enum Method { POST, PUT, DELETE; + + public static Method fromString(String value) { + if (value == null) return null; + try { + return Method.valueOf(value.toUpperCase(Locale.US).trim()); + } catch (IllegalArgumentException e) { + return null; + } + } } /** HTTP headers. */ @@ -1592,31 +1836,7 @@ private Request toRequest( } } - StringBuilder traceBuilder = new StringBuilder(); - traceBuilder.append("---------START-HTTP---------\n"); - String encodedPath = request.url().encodedPath(); - String encodedQuery = request.url().encodedQuery(); - if (encodedQuery != null) encodedPath += "?" + encodedQuery; - traceBuilder.append(request.method()).append(" ").append(encodedPath).append(" HTTP/1.1\n"); - traceBuilder.append( - request - .headers() - .toString() - .replaceAll("Signature=([0-9a-f]+)", "Signature=*REDACTED*") - .replaceAll("Credential=([^/]+)", "Credential=*REDACTED*")); - String lastTwoChars = traceBuilder.substring(traceBuilder.length() - 2); - if (lastTwoChars.charAt(1) != '\n') { - traceBuilder.append("\n\n"); - } else if (lastTwoChars.charAt(0) != '\n') { - traceBuilder.append("\n"); - } - String value = body.toString(); - if (method == Method.PUT || method == Method.POST) { - traceBuilder.append(value); - if (!value.endsWith("\n")) traceBuilder.append("\n"); - } - - return new Request(request, traceBuilder.toString()); + return new Request(request, getRequestTraces(request, body.toString())); } public Request toRequest(BaseUrl baseUrl, String region, Credentials credentials) diff --git a/api/src/main/java/io/minio/MinioAsyncClient.java b/api/src/main/java/io/minio/MinioAsyncClient.java index d0e21a4f2..ca9c69d8f 100644 --- a/api/src/main/java/io/minio/MinioAsyncClient.java +++ b/api/src/main/java/io/minio/MinioAsyncClient.java @@ -3388,7 +3388,7 @@ public CompletableFuture putObjectFanOut(PutObjectFanOu multipartBuilder.addFormDataPart( "file", "fanout-content", - new Http.RequestBody(buffer, Http.DEFAULT_MEDIA_TYPE)); + new Http.RequestBody(buffer, Http.DEFAULT_MEDIA_TYPE, null)); return multipartBuilder.build(); } catch (JsonProcessingException e) { diff --git a/api/src/main/java/io/minio/MinioClient.java b/api/src/main/java/io/minio/MinioClient.java index 03dd587a9..e28667cb5 100644 --- a/api/src/main/java/io/minio/MinioClient.java +++ b/api/src/main/java/io/minio/MinioClient.java @@ -37,6 +37,7 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionException; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; @@ -1908,6 +1909,22 @@ public void setTimeout(long connectTimeout, long writeTimeout, long readTimeout) asyncClient.setTimeout(connectTimeout, writeTimeout, readTimeout); } + /** + * Sets request retry parameters. Any null/invalid values disable retry. + * + *
Example:{@code
+   * minioClient.setRetry(ImmutableSet.of(408, 504), 250, 3);
+   * }
+ * + * @param retryStatusCodes HTTP status codes to be retried. + * @param delayMs Delay between retries. + * @param maxRetries Maximum number of retry attempts. + */ + public synchronized void setRetry( + Set retryStatusCodes, Long delayMs, Integer maxRetries) { + asyncClient.setRetry(retryStatusCodes, delayMs, maxRetries); + } + /** * Ignores check on server certificate for HTTPS connection. * diff --git a/api/src/test/java/io/minio/MinioClientTest.java b/api/src/test/java/io/minio/MinioClientTest.java index edb18fb3a..cdee673f2 100644 --- a/api/src/test/java/io/minio/MinioClientTest.java +++ b/api/src/test/java/io/minio/MinioClientTest.java @@ -17,12 +17,16 @@ package io.minio; +import io.minio.errors.ErrorResponseException; import io.minio.errors.InvalidResponseException; import io.minio.errors.MinioException; +import io.minio.messages.ListAllMyBucketsResult; import java.io.IOException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.crypto.KeyGenerator; import okhttp3.OkHttpClient; @@ -584,4 +588,48 @@ public void testMakeBucketRegionConflicts() client.makeBucket(MakeBucketArgs.builder().bucket("mybucket").region("us-west-2").build()); Assert.fail("exception should be thrown"); } + + @Test + public void testStatusRetryInterceptor() + throws NoSuchAlgorithmException, IOException, InvalidKeyException, MinioException { + MockWebServer server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(408)); // OkHttp itself retries. + server.enqueue(new MockResponse().setResponseCode(429)); + server.enqueue(new MockResponse().setResponseCode(504)); + server.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader(CONTENT_LENGTH, "211") + .setBody( + new Buffer() + .writeUtf8( + "2019-12-11T23:32:47Zamzn-s3-demo-bucketAIDACKCEVSQ6C2EXAMPLE"))); + server.start(); + + MinioClient client = MinioClient.builder().endpoint(server.url("")).build(); + List buckets = client.listBuckets(); + for (ListAllMyBucketsResult.Bucket bucket : buckets) { + Assert.assertEquals("amzn-s3-demo-bucket", bucket.name()); + } + } + + @Test(expected = ErrorResponseException.class) + public void testSetRetryDisable() + throws NoSuchAlgorithmException, IOException, InvalidKeyException, MinioException { + MockWebServer server = new MockWebServer(); + server.enqueue( + new MockResponse() + .setResponseCode(500) + .setHeader(CONTENT_TYPE, "application/xml;utf-8") + .setHeader(CONTENT_LENGTH, "211") + .setBody( + new Buffer() + .writeUtf8( + "InternalErrorAn internal error occurred. Try again."))); + server.start(); + MinioClient client = MinioClient.builder().endpoint(server.url("")).build(); + client.setRetry(Collections.singleton(500), 100L, 1); + client.listBuckets(); + Assert.fail("exception should be thrown"); + } } diff --git a/functional/TestMinioClient.java b/functional/TestMinioClient.java index 4f5720c89..ed37ac187 100644 --- a/functional/TestMinioClient.java +++ b/functional/TestMinioClient.java @@ -92,7 +92,6 @@ import io.minio.messages.AccessControlPolicy; import io.minio.messages.CORSConfiguration; import io.minio.messages.DeleteRequest; -import io.minio.messages.DeleteResult; import io.minio.messages.ErrorResponse; import io.minio.messages.EventType; import io.minio.messages.Filter; @@ -122,10 +121,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import okhttp3.Headers; @@ -139,12 +136,6 @@ @edu.umd.cs.findbugs.annotations.SuppressFBWarnings( value = {"THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION", "REC_CATCH_EXCEPTION"}) public class TestMinioClient extends TestArgs { - private static final int MAX_DELETE_ATTEMPTS = 5; - private static final Set TRANSIENT_DELETE_CODES = - Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList("InternalError", "RequestTimeout", "ServiceUnavailable", "SlowDown"))); - private String bucketName = getRandomName(); private String bucketNameWithLock = getRandomName(); public boolean isQuickTest; @@ -1038,46 +1029,15 @@ public List createObjects(String bucketName, int count, int public void removeObjects(String bucketName, List results) throws Exception { List objects = results.stream() - .map(r -> new DeleteRequest.Object(r.object(), r.versionId())) + .map( + result -> { + return new DeleteRequest.Object(result.object(), result.versionId()); + }) .collect(Collectors.toList()); - boolean anyTransient = false; - for (int attempt = 0; attempt < MAX_DELETE_ATTEMPTS; attempt++) { - if (attempt > 0) { - try { - Thread.sleep(500L << attempt); // 1s / 2s / 4s / 8s - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw ie; - } - } - anyTransient = false; - for (Result r : - client.removeObjects( - RemoveObjectsArgs.builder().bucket(bucketName).objects(objects).build())) { - DeleteResult.Error err = r.get(); - String code = err.code(); - if (!TRANSIENT_DELETE_CODES.contains(code)) { - throw new Exception( - "non-transient delete error '" - + code - + "': " - + err.message() - + " on " - + err.objectName() - + " in bucket " - + bucketName); - } - anyTransient = true; - } - if (!anyTransient) break; - } - if (anyTransient) { - throw new Exception( - results.size() - + " object(s) not deleted after " - + MAX_DELETE_ATTEMPTS - + " attempts in bucket " - + bucketName); + for (Result r : + client.removeObjects( + RemoveObjectsArgs.builder().bucket(bucketName).objects(objects).build())) { + ignore(r.get()); } } @@ -1230,6 +1190,8 @@ public void testRemoveObjects(String testTags, List results mintSuccessLog(methodName, testTags, startTime); } catch (Exception e) { handleException(methodName, testTags, startTime, e); + } finally { + removeObjects(bucketName, results); } }