[FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates before 1970-01-01.#4214
[FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates before 1970-01-01.#4214lvyanquan wants to merge 1 commit intoapache:masterfrom
Conversation
… dates before 1970-01-01.
There was a problem hiding this comment.
Pull request overview
This pull request fixes a timestamp conversion issue for PostgreSQL CDC that caused inconsistent data when handling dates before 1970-01-01. The fix addresses timezone offset inconsistencies between the full snapshot phase and incremental CDC phase.
Changes:
- Introduced CustomPostgresValueConverter that uses Timestamp.toLocalDateTime() to properly handle pre-1970 dates
- Updated PostgresConnection and PostgresObjectUtils to use the custom converter
- Added integration test with pre-1970 date (1900-01-01) to verify the fix
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| CustomPostgresValueConverter.java | New converter class that overrides timestamp-to-LocalDateTime conversion to fix timezone handling for pre-1970 dates |
| PostgresConnection.java | Updated to instantiate CustomPostgresValueConverter instead of PostgresValueConverter |
| PostgresObjectUtils.java | Updated to use CustomPostgresValueConverter in the value converter builder |
| column_type_test.sql | Added second test row with timestamp '1900-01-01 00:00:00.123' to test pre-1970 date handling |
| PostgreSQLConnectorITCase.java | Updated test to handle two rows and changed assertion to support unordered results for parallel processing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "+I(2,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); | ||
| List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); | ||
| Assertions.assertThat(actual).isEqualTo(expected); | ||
| Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); |
There was a problem hiding this comment.
The change from isEqualTo to containsExactlyInAnyOrderElementsOf weakens the test by no longer verifying event ordering. While this is appropriate for parallel processing scenarios (as indicated by the parallelismSnapshot parameter), consider documenting why order verification was removed or adding a separate test that verifies correct ordering when parallelismSnapshot is false.
| protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) { | ||
| if (data == null) { | ||
| return null; | ||
| } | ||
| if (!(data instanceof Timestamp)) { | ||
| return data; | ||
| } | ||
| final Timestamp timestamp = (Timestamp) data; | ||
|
|
||
| if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) { | ||
| return POSITIVE_INFINITY_LOCAL_DATE_TIME; | ||
| } else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) { | ||
| return NEGATIVE_INFINITY_LOCAL_DATE_TIME; | ||
| } | ||
|
|
||
| return timestamp.toLocalDateTime(); |
There was a problem hiding this comment.
The overridden method doesn't use the column and fieldDefn parameters. If the parent class implementation uses these parameters for specific logic (such as handling different column types or field definitions), removing this logic could cause unexpected behavior. Consider adding a comment explaining why these parameters can be safely ignored in this implementation.
| connectorConfig.getUnavailableValuePlaceholder(), | ||
| connectorConfig.moneyFractionDigits()); | ||
| } | ||
|
|
There was a problem hiding this comment.
The overridden method convertTimestampToLocalDateTime should include a JavaDoc comment explaining why this override is necessary and how it differs from the parent implementation. Specifically, it should document that this method uses Timestamp.toLocalDateTime() to properly handle dates before 1970-01-01 by avoiding timezone offset inconsistencies.
| /** | |
| * Overrides {@link PostgresValueConverter#convertTimestampToLocalDateTime(Column, Field, Object)} | |
| * to use {@link Timestamp#toLocalDateTime()} directly. | |
| * | |
| * <p>The default implementation derives the {@code LocalDateTime} value using epoch-based | |
| * calculations, which can apply timezone offsets inconsistently for timestamps before | |
| * {@code 1970-01-01}. By delegating to {@code Timestamp.toLocalDateTime()}, this override | |
| * avoids those pre-epoch timezone offset inconsistencies while preserving the behavior for | |
| * timestamps on or after the epoch.</p> | |
| */ |
| @Override | ||
| protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) { | ||
| if (data == null) { | ||
| return null; | ||
| } | ||
| if (!(data instanceof Timestamp)) { | ||
| return data; | ||
| } | ||
| final Timestamp timestamp = (Timestamp) data; | ||
|
|
||
| if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) { | ||
| return POSITIVE_INFINITY_LOCAL_DATE_TIME; | ||
| } else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) { | ||
| return NEGATIVE_INFINITY_LOCAL_DATE_TIME; | ||
| } | ||
|
|
||
| return timestamp.toLocalDateTime(); | ||
| } |
There was a problem hiding this comment.
The CustomPostgresValueConverter class lacks dedicated unit tests. While integration tests are added, unit tests should verify the convertTimestampToLocalDateTime method's behavior with various edge cases including null values, non-Timestamp objects, infinity timestamps, and crucially, timestamps both before and after 1970-01-01 to ensure the timezone handling fix works correctly.
Time offsets before and after 1970-01-01 may be inconsistent, which could result in different data being returned during the full phase and the incremental phase.
Here, the conversion to LocalDateTime is performed using Timestamp.toLocalDateTime to bypass timezone issues.