From 6ff00d6d2b68b4536a968161378d45c46754e269 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Mon, 9 Mar 2026 20:15:58 +0530 Subject: [PATCH 01/10] Changes for async writer issues for rate limiting strategy --- .../base/sink/writer/AsyncSinkWriter.java | 2 +- .../sink/writer/AsyncSinkWriterYieldTest.java | 207 ++++++++++++++++++ .../base-connector-examples/pom.xml | 93 ++++++++ .../org/apache/flink/AsyncSinkHangDemo.java | 73 ++++++ .../java/org/apache/flink/DummyAsyncSink.java | 105 +++++++++ .../org/apache/flink/TokenBucketProvider.java | 42 ++++ .../TokenBucketRateLimitingStrategy.java | 99 +++++++++ flink-examples/pom.xml | 1 + 8 files changed, 621 insertions(+), 1 deletion(-) create mode 100755 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterYieldTest.java create mode 100755 flink-examples/base-connector-examples/pom.xml create mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java create mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java create mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java create mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index 1376d87db5781..d5134addc1d02 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -353,7 +353,7 @@ private BasicRequestInfo createRequestInfo() { private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { - mailboxExecutor.yield(); + yieldIfThereExistsInFlightRequests(); requestInfo = createRequestInfo(); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterYieldTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterYieldTest.java new file mode 100755 index 0000000000000..691fd16548009 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterYieldTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for conditional yielding behavior in AsyncSinkWriter flush method. Verifies that + * yieldIfThereExistsInFlightRequests() only yields when there are actual in-flight requests. + */ +class AsyncSinkWriterYieldTest { + + private TestSinkInitContext sinkInitContext; + private final List results = new ArrayList<>(); + private AtomicInteger yieldCallCount; + + @BeforeEach + void setup() { + yieldCallCount = new AtomicInteger(0); + sinkInitContext = new TestSinkInitContext(); + results.clear(); + } + + @Test + void testFlushWithEmptyBufferDoesNotYield() throws Exception { + TrackingMailboxExecutor trackingMailbox = new TrackingMailboxExecutor(yieldCallCount); + TestSinkInitContext contextWithTracking = + new TestSinkInitContextWithCustomMailbox(trackingMailbox); + + TestAsyncSinkWriter sink = + new TestAsyncSinkWriter(contextWithTracking, 10, 100, 1000, results); + + int yieldCountBefore = yieldCallCount.get(); + sink.flush(false); + int yieldCountAfter = yieldCallCount.get(); + + assertThat(yieldCountAfter).isEqualTo(yieldCountBefore); + assertThat(yieldCountAfter).isEqualTo(0); + } + + @Test + void testFlushWithBufferedElementsButNoInFlightRequestsDoesNotYield() throws Exception { + TrackingMailboxExecutor trackingMailbox = new TrackingMailboxExecutor(yieldCallCount); + TestSinkInitContext contextWithTracking = + new TestSinkInitContextWithCustomMailbox(trackingMailbox); + + TestAsyncSinkWriter sink = + new TestAsyncSinkWriter(contextWithTracking, 10, 100, 1000, results); + + sink.write("1"); + sink.write("2"); + + int yieldCountBefore = yieldCallCount.get(); + sink.flush(false); + int yieldCountAfter = yieldCallCount.get(); + + assertThat(yieldCountAfter).isEqualTo(yieldCountBefore); + assertThat(yieldCountAfter).isEqualTo(0); + } + + @Test + void testFlushWithTrueFlushesAllElementsWithoutYielding() throws Exception { + TrackingMailboxExecutor trackingMailbox = new TrackingMailboxExecutor(yieldCallCount); + TestSinkInitContext contextWithTracking = + new TestSinkInitContextWithCustomMailbox(trackingMailbox); + + TestAsyncSinkWriter sink = + new TestAsyncSinkWriter(contextWithTracking, 10, 100, 1000, results); + + sink.write("1"); + sink.write("2"); + sink.write("3"); + + sink.flush(true); + + assertThat(results).containsExactly(1, 2, 3); + assertThat(yieldCallCount.get()).isEqualTo(0); + } + + private static class TestAsyncSinkWriter extends AsyncSinkWriter { + private final List results; + + TestAsyncSinkWriter( + TestSinkInitContext context, + int maxBatchSize, + int maxBufferedRequests, + long maxBatchSizeInBytes, + List results) { + super( + (elem, ctx) -> Integer.parseInt(elem), + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(1) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(1000) + .setMaxRecordSizeInBytes(100) + .setRateLimitingStrategy( + CongestionControlRateLimitingStrategy.builder() + .setInitialMaxInFlightMessages(maxBatchSize) + .setMaxInFlightRequests(1) + .setScalingStrategy( + AIMDScalingStrategy.builder(maxBatchSize) + .build()) + .build()) + .build(), + Collections.emptyList()); + this.results = results; + } + + @Override + protected void submitRequestEntries( + List requestEntries, ResultHandler resultHandler) { + results.addAll(requestEntries); + resultHandler.complete(); + } + + @Override + protected long getSizeInBytes(Integer requestEntry) { + return 4; + } + + public void write(String val) throws IOException, InterruptedException { + write(val, null); + } + } + + private static class TrackingMailboxExecutor implements MailboxExecutor { + private final AtomicInteger yieldCount; + + TrackingMailboxExecutor(AtomicInteger yieldCount) { + this.yieldCount = yieldCount; + } + + @Override + public void execute( + MailOptions mailOptions, + ThrowingRunnable command, + String descriptionFormat, + Object... descriptionArgs) { + try { + command.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void yield() throws InterruptedException { + yieldCount.incrementAndGet(); + } + + @Override + public boolean tryYield() { + return false; + } + + @Override + public boolean shouldInterrupt() { + return false; + } + } + + private static class TestSinkInitContextWithCustomMailbox extends TestSinkInitContext { + private final MailboxExecutor customMailbox; + + TestSinkInitContextWithCustomMailbox(MailboxExecutor mailbox) { + this.customMailbox = mailbox; + } + + @Override + public MailboxExecutor getMailboxExecutor() { + return customMailbox; + } + } +} diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml new file mode 100755 index 0000000000000..946cb30164b14 --- /dev/null +++ b/flink-examples/base-connector-examples/pom.xml @@ -0,0 +1,93 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-examples + 2.3-SNAPSHOT + + + flink-examples-base-connector + Flink : Examples : Base Connector + + + + org.apache.flink + flink-streaming-java + ${project.version} + provided + + + org.apache.flink + flink-clients + ${project.version} + provided + + + org.apache.flink + flink-connector-base + ${project.version} + + + org.apache.flink + flink-connector-files + ${project.version} + + + org.apache.flink + flink-connector-datagen + ${project.version} + + + com.github.vladimir-bukhtoyarov + bucket4j-core + 7.6.0 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.github.vladimir-bukhtoyarov:bucket4j-core + + + + + + + + + diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java new file mode 100755 index 0000000000000..2a0065d401744 --- /dev/null +++ b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class AsyncSinkHangDemo { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); // Checkpoint every 5 seconds - will timeout due to hang + + // Generate continuous stream of data using DataGeneratorSource + DataGeneratorSource source = + new DataGeneratorSource<>( + (Long index) -> "Record-" + index, + 1000, // records per second + Types.STRING); + + DataStream stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "DataGenerator"); + + // AsyncSink with rate limiting - demonstrates the hang when rate limiting blocks with no + // in-flight requests + // Configuration: maxInFlightRequests=1, tokensPerSecond=5, tokensPerMinute=100 + // This will cause tokens to be exhausted quickly, triggering the hang scenario: + // 1. Tokens get consumed by initial requests + // 2. shouldBlock() returns true (no tokens available) + // 3. currentInFlightRequests drops to 0 (requests complete fast) + // 4. flush() loops on mailboxExecutor.yield() indefinitely + // 5. Checkpoint cannot complete -> timeout -> job failure + TokenBucketRateLimitingStrategy rateLimiter = + new TokenBucketRateLimitingStrategy(1, 5, 100); + DummyAsyncSink sink = new DummyAsyncSink(rateLimiter); + stream.sinkTo(sink); + + // File sink - original implementation (commented out) + /* + FileSink sink = FileSink + .forRowFormat(new Path("/tmp/flink-output"), new SimpleStringEncoder("UTF-8")) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withRolloverInterval(Duration.ofMinutes(1)) + .withInactivityInterval(Duration.ofSeconds(30)) + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .build()) + .build(); + + stream.sinkTo(sink); + */ + + env.execute("AsyncSink Hang Demo"); + } +} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java new file mode 100755 index 0000000000000..1a3326d70a46b --- /dev/null +++ b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink; + +import org.apache.flink.api.connector.sink2.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ResultHandler; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class DummyAsyncSink extends AsyncSinkBase { + + private final TokenBucketRateLimitingStrategy rateLimitingStrategy; + + public DummyAsyncSink(TokenBucketRateLimitingStrategy rateLimitingStrategy) { + super((element, context) -> element, 10, 1, 100, 1024 * 1024, 1000, 1024); + this.rateLimitingStrategy = rateLimitingStrategy; + } + + @Override + public StatefulSinkWriter> createWriter( + WriterInitContext context) throws IOException { + return new AsyncSinkWriter( + getElementConverter(), + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(getMaxBatchSize()) + .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) + .setMaxInFlightRequests(getMaxInFlightRequests()) + .setMaxBufferedRequests(getMaxBufferedRequests()) + .setMaxTimeInBufferMS(getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) + .setRateLimitingStrategy(rateLimitingStrategy) + .build(), + Collections.emptyList()) { + + @Override + protected void submitRequestEntries( + List requestEntries, ResultHandler resultHandler) { + // Simulate async request - complete immediately + resultHandler.complete(); + } + + @Override + protected long getSizeInBytes(String requestEntry) { + return requestEntry.length(); + } + }; + } + + @Override + public StatefulSinkWriter> restoreWriter( + WriterInitContext context, Collection> recoveredState) + throws IOException { + return createWriter(context); + } + + @Override + public SimpleVersionedSerializer> getWriterStateSerializer() { + return new AsyncSinkWriterStateSerializer() { + @Override + protected void serializeRequestToStream(String request, DataOutputStream out) + throws IOException { + out.writeUTF(request); + } + + @Override + protected String deserializeRequestFromStream(long requestSize, DataInputStream in) + throws IOException { + return in.readUTF(); + } + + @Override + public int getVersion() { + return 1; + } + }; + } +} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java new file mode 100755 index 0000000000000..ac3e42661e301 --- /dev/null +++ b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TokenBucketProvider { + private static final Map buckets = new ConcurrentHashMap<>(); + + public static Bucket getInstance(String name, long tokensPerSecond, long tokensPerMinute) { + return buckets.computeIfAbsent( + name, + k -> { + Bandwidth limit = + Bandwidth.classic( + tokensPerMinute, + Refill.intervally(tokensPerMinute, Duration.ofMinutes(1))); + return Bucket.builder().addLimit(limit).build(); + }); + } +} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java new file mode 100755 index 0000000000000..5d09f5c122652 --- /dev/null +++ b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink; + +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo; +import org.apache.flink.connector.base.sink.writer.strategy.ResultInfo; + +import io.github.bucket4j.Bucket; + +import java.io.Serializable; + +public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy, Serializable { + private static final long serialVersionUID = 1L; + private final int maxInFlightRequests; + private final long tokensPerSecond; + private final long tokensPerMinute; + private transient int currentInFlightRequests = 0; + private transient Bucket bucket; + + public TokenBucketRateLimitingStrategy( + int maxInFlightRequests, long tokensPerSecond, long tokensPerMinute) { + this.maxInFlightRequests = maxInFlightRequests; + this.tokensPerSecond = tokensPerSecond; + this.tokensPerMinute = tokensPerMinute; + } + + private Bucket getBucket() { + if (bucket == null) { + bucket = + TokenBucketProvider.getInstance( + "TokenBucketRateLimitingStrategy", tokensPerSecond, tokensPerMinute); + } + return bucket; + } + + @Override + public boolean shouldBlock(RequestInfo requestInfo) { + // This is the problematic condition: blocks when tokens unavailable OR max in-flight + // reached + // When currentInFlightRequests == 0 AND tokens are exhausted, this returns true + // but nothing will unblock it since no requests will complete + return currentInFlightRequests >= maxInFlightRequests || areTokensNotAvailable(requestInfo); + } + + private boolean areTokensNotAvailable(RequestInfo requestInfo) { + int batchSize = requestInfo.getBatchSize(); + if (batchSize <= 0) { + return false; + } + // Check if tokens are available - if not, shouldBlock returns true + // This causes the hang when currentInFlightRequests == 0 + return !getBucket().estimateAbilityToConsume(batchSize).canBeConsumed(); + } + + @Override + public void registerInFlightRequest(RequestInfo requestInfo) { + currentInFlightRequests++; + int batchSize = requestInfo.getBatchSize(); + if (batchSize > 0) { + getBucket().tryConsume(batchSize); + } + } + + @Override + public void registerCompletedRequest(ResultInfo resultInfo) { + // Only decrements counter - doesn't refill tokens + // Tokens refill based on time, but if shouldBlock() returns true + // when currentInFlightRequests == 0, nothing triggers a recheck + currentInFlightRequests--; + } + + @Override + public int getMaxBatchSize() { + return 100; + } + + private void readObject(java.io.ObjectInputStream in) + throws java.io.IOException, ClassNotFoundException { + in.defaultReadObject(); + currentInFlightRequests = 0; + bucket = null; + } +} diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml index e9bae761259b7..f6af878a040f0 100644 --- a/flink-examples/pom.xml +++ b/flink-examples/pom.xml @@ -39,6 +39,7 @@ under the License. flink-examples-streaming flink-examples-table flink-examples-build-helper + base-connector-examples From 5cc1fc0595316505c7b1ff5f6a0051d6bb10ce40 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 10:36:50 +0530 Subject: [PATCH 02/10] Changes for async writer issues for rate limiting strategy 1 --- flink-examples/base-connector-examples/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index 946cb30164b14..9bf8b265d0ab0 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -63,6 +63,7 @@ under the License. com.github.vladimir-bukhtoyarov bucket4j-core 7.6.0 + ${flink.markBundledAsOptional} From 39173f29f30e394c007bb28b5ebc46eba4d2711e Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 12:38:34 +0530 Subject: [PATCH 03/10] Changes for async writer issues for rate limiting strategy 2 --- .../src/main/resources/META-INF/NOTICE | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100755 flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE diff --git a/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE b/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE new file mode 100755 index 0000000000000..4147237e93f01 --- /dev/null +++ b/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE @@ -0,0 +1,9 @@ +flink-examples-base-connector +Copyright 2014-2026 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.github.vladimir-bukhtoyarov:bucket4j-core:7.6.0 From c53b7274d0555a4dd92ceb24e8302f5c7d4e4a98 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 13:43:28 +0530 Subject: [PATCH 04/10] Changes for async writer issues for rate limiting strategy 3 --- .../base-connector-examples/src/main/resources/META-INF/NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE b/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE index 4147237e93f01..b5157a4ba0416 100755 --- a/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE +++ b/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE @@ -1,4 +1,4 @@ -flink-examples-base-connector +base-connector-examples Copyright 2014-2026 The Apache Software Foundation This product includes software developed at From 19bd905e066569e4b4d209a7a427cf176861b4e8 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 15:34:21 +0530 Subject: [PATCH 05/10] Changes for async writer issues for rate limiting strategy 4 --- flink-examples/base-connector-examples/pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index 9bf8b265d0ab0..a66602a75f6a3 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -85,6 +85,9 @@ under the License. com.github.vladimir-bukhtoyarov:bucket4j-core + + + From 1a64a3ba6ba635eda9b4305a3c0043bb947e38cf Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 17:50:34 +0530 Subject: [PATCH 06/10] Changes for async writer issues for rate limiting strategy 5 --- flink-examples/base-connector-examples/pom.xml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index a66602a75f6a3..dfa888aafc608 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -68,6 +68,11 @@ under the License. + + + src/main/resources + + org.apache.maven.plugins @@ -85,9 +90,6 @@ under the License. com.github.vladimir-bukhtoyarov:bucket4j-core - - - From 1252233d8c0af9ba29ea6c575b10dc4d73281e86 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 18:54:33 +0530 Subject: [PATCH 07/10] Changes for async writer issues for rate limiting strategy 6 --- .../base-connector-examples/pom.xml | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index dfa888aafc608..7029a9fd62f28 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -77,9 +77,9 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.2.4 + shade-flink package shade @@ -87,9 +87,25 @@ under the License. - com.github.vladimir-bukhtoyarov:bucket4j-core + *:* + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + From 46821414df644120758f207e7da57050324c71ec Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 20:06:56 +0530 Subject: [PATCH 08/10] Changes for async writer issues for rate limiting strategy 7 --- flink-examples/base-connector-examples/pom.xml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index 7029a9fd62f28..ef88f614a9747 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -89,23 +89,7 @@ under the License. *:* - - org.apache.flink:flink-shaded-force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - From 98db9a8121bee0665dd52f700f44e451029ac49c Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 21:10:56 +0530 Subject: [PATCH 09/10] Changes for async writer issues for rate limiting strategy 8 --- flink-examples/base-connector-examples/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml index ef88f614a9747..b1a06f78a7625 100755 --- a/flink-examples/base-connector-examples/pom.xml +++ b/flink-examples/base-connector-examples/pom.xml @@ -87,7 +87,7 @@ under the License. - *:* + com.github.vladimir-bukhtoyarov:bucket4j-core From bcb8c3b79a98683b136a400d9151e8e70adf9ad6 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Tue, 10 Mar 2026 22:18:15 +0530 Subject: [PATCH 10/10] Changes for async writer issues for rate limiting strategy 9 --- .../base-connector-examples/pom.xml | 99 ----------------- .../org/apache/flink/AsyncSinkHangDemo.java | 73 ------------ .../java/org/apache/flink/DummyAsyncSink.java | 105 ------------------ .../org/apache/flink/TokenBucketProvider.java | 42 ------- .../TokenBucketRateLimitingStrategy.java | 99 ----------------- .../src/main/resources/META-INF/NOTICE | 9 -- flink-examples/pom.xml | 1 - 7 files changed, 428 deletions(-) delete mode 100755 flink-examples/base-connector-examples/pom.xml delete mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java delete mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java delete mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java delete mode 100755 flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java delete mode 100755 flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE diff --git a/flink-examples/base-connector-examples/pom.xml b/flink-examples/base-connector-examples/pom.xml deleted file mode 100755 index b1a06f78a7625..0000000000000 --- a/flink-examples/base-connector-examples/pom.xml +++ /dev/null @@ -1,99 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-examples - 2.3-SNAPSHOT - - - flink-examples-base-connector - Flink : Examples : Base Connector - - - - org.apache.flink - flink-streaming-java - ${project.version} - provided - - - org.apache.flink - flink-clients - ${project.version} - provided - - - org.apache.flink - flink-connector-base - ${project.version} - - - org.apache.flink - flink-connector-files - ${project.version} - - - org.apache.flink - flink-connector-datagen - ${project.version} - - - com.github.vladimir-bukhtoyarov - bucket4j-core - 7.6.0 - ${flink.markBundledAsOptional} - - - - - - - src/main/resources - - - - - org.apache.maven.plugins - maven-shade-plugin - - - shade-flink - package - - shade - - - - - com.github.vladimir-bukhtoyarov:bucket4j-core - - - - - - - - - diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java deleted file mode 100755 index 2a0065d401744..0000000000000 --- a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink; - -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.connector.datagen.source.DataGeneratorSource; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -public class AsyncSinkHangDemo { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); // Checkpoint every 5 seconds - will timeout due to hang - - // Generate continuous stream of data using DataGeneratorSource - DataGeneratorSource source = - new DataGeneratorSource<>( - (Long index) -> "Record-" + index, - 1000, // records per second - Types.STRING); - - DataStream stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "DataGenerator"); - - // AsyncSink with rate limiting - demonstrates the hang when rate limiting blocks with no - // in-flight requests - // Configuration: maxInFlightRequests=1, tokensPerSecond=5, tokensPerMinute=100 - // This will cause tokens to be exhausted quickly, triggering the hang scenario: - // 1. Tokens get consumed by initial requests - // 2. shouldBlock() returns true (no tokens available) - // 3. currentInFlightRequests drops to 0 (requests complete fast) - // 4. flush() loops on mailboxExecutor.yield() indefinitely - // 5. Checkpoint cannot complete -> timeout -> job failure - TokenBucketRateLimitingStrategy rateLimiter = - new TokenBucketRateLimitingStrategy(1, 5, 100); - DummyAsyncSink sink = new DummyAsyncSink(rateLimiter); - stream.sinkTo(sink); - - // File sink - original implementation (commented out) - /* - FileSink sink = FileSink - .forRowFormat(new Path("/tmp/flink-output"), new SimpleStringEncoder("UTF-8")) - .withRollingPolicy( - DefaultRollingPolicy.builder() - .withRolloverInterval(Duration.ofMinutes(1)) - .withInactivityInterval(Duration.ofSeconds(30)) - .withMaxPartSize(MemorySize.ofMebiBytes(1)) - .build()) - .build(); - - stream.sinkTo(sink); - */ - - env.execute("AsyncSink Hang Demo"); - } -} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java deleted file mode 100755 index 1a3326d70a46b..0000000000000 --- a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink; - -import org.apache.flink.api.connector.sink2.StatefulSinkWriter; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.connector.base.sink.AsyncSinkBase; -import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; -import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; -import org.apache.flink.connector.base.sink.writer.BufferedRequestState; -import org.apache.flink.connector.base.sink.writer.ResultHandler; -import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; -import org.apache.flink.core.io.SimpleVersionedSerializer; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -public class DummyAsyncSink extends AsyncSinkBase { - - private final TokenBucketRateLimitingStrategy rateLimitingStrategy; - - public DummyAsyncSink(TokenBucketRateLimitingStrategy rateLimitingStrategy) { - super((element, context) -> element, 10, 1, 100, 1024 * 1024, 1000, 1024); - this.rateLimitingStrategy = rateLimitingStrategy; - } - - @Override - public StatefulSinkWriter> createWriter( - WriterInitContext context) throws IOException { - return new AsyncSinkWriter( - getElementConverter(), - context, - AsyncSinkWriterConfiguration.builder() - .setMaxBatchSize(getMaxBatchSize()) - .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) - .setMaxInFlightRequests(getMaxInFlightRequests()) - .setMaxBufferedRequests(getMaxBufferedRequests()) - .setMaxTimeInBufferMS(getMaxTimeInBufferMS()) - .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) - .setRateLimitingStrategy(rateLimitingStrategy) - .build(), - Collections.emptyList()) { - - @Override - protected void submitRequestEntries( - List requestEntries, ResultHandler resultHandler) { - // Simulate async request - complete immediately - resultHandler.complete(); - } - - @Override - protected long getSizeInBytes(String requestEntry) { - return requestEntry.length(); - } - }; - } - - @Override - public StatefulSinkWriter> restoreWriter( - WriterInitContext context, Collection> recoveredState) - throws IOException { - return createWriter(context); - } - - @Override - public SimpleVersionedSerializer> getWriterStateSerializer() { - return new AsyncSinkWriterStateSerializer() { - @Override - protected void serializeRequestToStream(String request, DataOutputStream out) - throws IOException { - out.writeUTF(request); - } - - @Override - protected String deserializeRequestFromStream(long requestSize, DataInputStream in) - throws IOException { - return in.readUTF(); - } - - @Override - public int getVersion() { - return 1; - } - }; - } -} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java deleted file mode 100755 index ac3e42661e301..0000000000000 --- a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink; - -import io.github.bucket4j.Bandwidth; -import io.github.bucket4j.Bucket; -import io.github.bucket4j.Refill; - -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class TokenBucketProvider { - private static final Map buckets = new ConcurrentHashMap<>(); - - public static Bucket getInstance(String name, long tokensPerSecond, long tokensPerMinute) { - return buckets.computeIfAbsent( - name, - k -> { - Bandwidth limit = - Bandwidth.classic( - tokensPerMinute, - Refill.intervally(tokensPerMinute, Duration.ofMinutes(1))); - return Bucket.builder().addLimit(limit).build(); - }); - } -} diff --git a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java b/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java deleted file mode 100755 index 5d09f5c122652..0000000000000 --- a/flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink; - -import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; -import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo; -import org.apache.flink.connector.base.sink.writer.strategy.ResultInfo; - -import io.github.bucket4j.Bucket; - -import java.io.Serializable; - -public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy, Serializable { - private static final long serialVersionUID = 1L; - private final int maxInFlightRequests; - private final long tokensPerSecond; - private final long tokensPerMinute; - private transient int currentInFlightRequests = 0; - private transient Bucket bucket; - - public TokenBucketRateLimitingStrategy( - int maxInFlightRequests, long tokensPerSecond, long tokensPerMinute) { - this.maxInFlightRequests = maxInFlightRequests; - this.tokensPerSecond = tokensPerSecond; - this.tokensPerMinute = tokensPerMinute; - } - - private Bucket getBucket() { - if (bucket == null) { - bucket = - TokenBucketProvider.getInstance( - "TokenBucketRateLimitingStrategy", tokensPerSecond, tokensPerMinute); - } - return bucket; - } - - @Override - public boolean shouldBlock(RequestInfo requestInfo) { - // This is the problematic condition: blocks when tokens unavailable OR max in-flight - // reached - // When currentInFlightRequests == 0 AND tokens are exhausted, this returns true - // but nothing will unblock it since no requests will complete - return currentInFlightRequests >= maxInFlightRequests || areTokensNotAvailable(requestInfo); - } - - private boolean areTokensNotAvailable(RequestInfo requestInfo) { - int batchSize = requestInfo.getBatchSize(); - if (batchSize <= 0) { - return false; - } - // Check if tokens are available - if not, shouldBlock returns true - // This causes the hang when currentInFlightRequests == 0 - return !getBucket().estimateAbilityToConsume(batchSize).canBeConsumed(); - } - - @Override - public void registerInFlightRequest(RequestInfo requestInfo) { - currentInFlightRequests++; - int batchSize = requestInfo.getBatchSize(); - if (batchSize > 0) { - getBucket().tryConsume(batchSize); - } - } - - @Override - public void registerCompletedRequest(ResultInfo resultInfo) { - // Only decrements counter - doesn't refill tokens - // Tokens refill based on time, but if shouldBlock() returns true - // when currentInFlightRequests == 0, nothing triggers a recheck - currentInFlightRequests--; - } - - @Override - public int getMaxBatchSize() { - return 100; - } - - private void readObject(java.io.ObjectInputStream in) - throws java.io.IOException, ClassNotFoundException { - in.defaultReadObject(); - currentInFlightRequests = 0; - bucket = null; - } -} diff --git a/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE b/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE deleted file mode 100755 index b5157a4ba0416..0000000000000 --- a/flink-examples/base-connector-examples/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,9 +0,0 @@ -base-connector-examples -Copyright 2014-2026 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) - -- com.github.vladimir-bukhtoyarov:bucket4j-core:7.6.0 diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml index f6af878a040f0..e9bae761259b7 100644 --- a/flink-examples/pom.xml +++ b/flink-examples/pom.xml @@ -39,7 +39,6 @@ under the License. flink-examples-streaming flink-examples-table flink-examples-build-helper - base-connector-examples