Skip to content

[FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates before 1970-01-01.#4214

Open
lvyanquan wants to merge 1 commit intoapache:masterfrom
lvyanquan:FLINK-38835
Open

[FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates before 1970-01-01.#4214
lvyanquan wants to merge 1 commit intoapache:masterfrom
lvyanquan:FLINK-38835

Conversation

@lvyanquan
Copy link
Contributor

Time offsets before and after 1970-01-01 may be inconsistent, which could result in different data being returned during the full phase and the incremental phase.
Here, the conversion to LocalDateTime is performed using Timestamp.toLocalDateTime to bypass timezone issues.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes a timestamp conversion issue for PostgreSQL CDC that caused inconsistent data when handling dates before 1970-01-01. The fix addresses timezone offset inconsistencies between the full snapshot phase and incremental CDC phase.

Changes:

  • Introduced CustomPostgresValueConverter that uses Timestamp.toLocalDateTime() to properly handle pre-1970 dates
  • Updated PostgresConnection and PostgresObjectUtils to use the custom converter
  • Added integration test with pre-1970 date (1900-01-01) to verify the fix

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
CustomPostgresValueConverter.java New converter class that overrides timestamp-to-LocalDateTime conversion to fix timezone handling for pre-1970 dates
PostgresConnection.java Updated to instantiate CustomPostgresValueConverter instead of PostgresValueConverter
PostgresObjectUtils.java Updated to use CustomPostgresValueConverter in the value converter builder
column_type_test.sql Added second test row with timestamp '1900-01-01 00:00:00.123' to test pre-1970 date handling
PostgreSQLConnectorITCase.java Updated test to handle two rows and changed assertion to support unordered results for parallel processing

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"+I(2,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Assertions.assertThat(actual).isEqualTo(expected);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from isEqualTo to containsExactlyInAnyOrderElementsOf weakens the test by no longer verifying event ordering. While this is appropriate for parallel processing scenarios (as indicated by the parallelismSnapshot parameter), consider documenting why order verification was removed or adding a separate test that verifies correct ordering when parallelismSnapshot is false.

Copilot uses AI. Check for mistakes.
Comment on lines +82 to +97
protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) {
if (data == null) {
return null;
}
if (!(data instanceof Timestamp)) {
return data;
}
final Timestamp timestamp = (Timestamp) data;

if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return POSITIVE_INFINITY_LOCAL_DATE_TIME;
} else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return NEGATIVE_INFINITY_LOCAL_DATE_TIME;
}

return timestamp.toLocalDateTime();
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overridden method doesn't use the column and fieldDefn parameters. If the parent class implementation uses these parameters for specific logic (such as handling different column types or field definitions), removing this logic could cause unexpected behavior. Consider adding a comment explaining why these parameters can be safely ignored in this implementation.

Copilot uses AI. Check for mistakes.
connectorConfig.getUnavailableValuePlaceholder(),
connectorConfig.moneyFractionDigits());
}

Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overridden method convertTimestampToLocalDateTime should include a JavaDoc comment explaining why this override is necessary and how it differs from the parent implementation. Specifically, it should document that this method uses Timestamp.toLocalDateTime() to properly handle dates before 1970-01-01 by avoiding timezone offset inconsistencies.

Suggested change
/**
* Overrides {@link PostgresValueConverter#convertTimestampToLocalDateTime(Column, Field, Object)}
* to use {@link Timestamp#toLocalDateTime()} directly.
*
* <p>The default implementation derives the {@code LocalDateTime} value using epoch-based
* calculations, which can apply timezone offsets inconsistently for timestamps before
* {@code 1970-01-01}. By delegating to {@code Timestamp.toLocalDateTime()}, this override
* avoids those pre-epoch timezone offset inconsistencies while preserving the behavior for
* timestamps on or after the epoch.</p>
*/

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +98
@Override
protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) {
if (data == null) {
return null;
}
if (!(data instanceof Timestamp)) {
return data;
}
final Timestamp timestamp = (Timestamp) data;

if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return POSITIVE_INFINITY_LOCAL_DATE_TIME;
} else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return NEGATIVE_INFINITY_LOCAL_DATE_TIME;
}

return timestamp.toLocalDateTime();
}
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CustomPostgresValueConverter class lacks dedicated unit tests. While integration tests are added, unit tests should verify the convertTimestampToLocalDateTime method's behavior with various edge cases including null values, non-Timestamp objects, infinity timestamps, and crucially, timestamps both before and after 1970-01-01 to ensure the timezone handling fix works correctly.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants