From a355be230869f2c271920d9c8e0a37e9ab41831f Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Fri, 5 Jun 2026 13:42:47 -0700 Subject: [PATCH 1/2] [FLINK-39873][streaming] Add null check for connectedSocket in SocketStreamIterator.notifyOfError() --- .../streaming/experimental/SocketStreamIterator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java index 69a72c3186028..9a10af7f7f64d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java @@ -208,9 +208,11 @@ public void notifyOfError(Throwable error) { this.error = error; // this should wake up any blocking calls - try { - connectedSocket.close(); - } catch (Throwable ignored) { + if (connectedSocket != null) { + try { + connectedSocket.close(); + } catch (Throwable ignored) { + } } try { socket.close(); From e2802287b00200bfbaa694f1d7375c16fd0a385b Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Fri, 5 Jun 2026 14:18:25 -0700 Subject: [PATCH 2/2] [FLINK-39873] Add two test cases to verify the null check fix in SocketStreamIterator --- .../SocketStreamIteratorTest.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java index 63a0321851775..521f85e173aff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java @@ -104,4 +104,45 @@ public void run() { .isInstanceOf(RuntimeException.class) .hasMessageContaining("test"); } + + @Test + void testNotifyOfErrorBeforeConnection() throws Exception { + // Test that notifyOfError() can be called before connectedSocket is initialized + // (i.e., before any data has been read) + final SocketStreamIterator iterator = + new SocketStreamIterator<>(LongSerializer.INSTANCE); + + // Call notifyOfError before any connection is established + // This should not throw NullPointerException + iterator.notifyOfError(new Exception("early error")); + + // Verify that the iterator properly propagates the error + assertThatThrownBy(iterator::hasNext) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("early error"); + + // Clean up + iterator.close(); + } + + @Test + void testNotifyOfErrorMultipleTimes() throws Exception { + // Test that notifyOfError() handles multiple calls gracefully + final SocketStreamIterator iterator = + new SocketStreamIterator<>(LongSerializer.INSTANCE); + + // First error should be recorded + iterator.notifyOfError(new Exception("first error")); + + // Second error should be ignored (first error takes precedence) + iterator.notifyOfError(new Exception("second error")); + + // Verify that the first error is propagated + assertThatThrownBy(iterator::hasNext) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("first error"); + + // Clean up + iterator.close(); + } }