diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java index 69a72c3186028..9a10af7f7f64d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java @@ -208,9 +208,11 @@ public void notifyOfError(Throwable error) { this.error = error; // this should wake up any blocking calls - try { - connectedSocket.close(); - } catch (Throwable ignored) { + if (connectedSocket != null) { + try { + connectedSocket.close(); + } catch (Throwable ignored) { + } } try { socket.close(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java index 63a0321851775..521f85e173aff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java @@ -104,4 +104,45 @@ public void run() { .isInstanceOf(RuntimeException.class) .hasMessageContaining("test"); } + + @Test + void testNotifyOfErrorBeforeConnection() throws Exception { + // Test that notifyOfError() can be called before connectedSocket is initialized + // (i.e., before any data has been read) + final SocketStreamIterator iterator = + new SocketStreamIterator<>(LongSerializer.INSTANCE); + + // Call notifyOfError before any connection is established + // This should not throw NullPointerException + iterator.notifyOfError(new Exception("early error")); + + // Verify that the iterator properly propagates the error + assertThatThrownBy(iterator::hasNext) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("early error"); + + // Clean up + iterator.close(); + } + + @Test + void testNotifyOfErrorMultipleTimes() throws Exception { + // Test that notifyOfError() handles multiple calls gracefully + final SocketStreamIterator iterator = + new SocketStreamIterator<>(LongSerializer.INSTANCE); + + // First error should be recorded + iterator.notifyOfError(new Exception("first error")); + + // Second error should be ignored (first error takes precedence) + iterator.notifyOfError(new Exception("second error")); + + // Verify that the first error is propagated + assertThatThrownBy(iterator::hasNext) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("first error"); + + // Clean up + iterator.close(); + } }