feat(connectors): Delta Lake Sink Connector#2783
feat(connectors): Delta Lake Sink Connector#2783kriti-sc wants to merge 17 commits intoapache:masterfrom
Conversation
The plugin exports a symbol called "open", which conflicts with the POSIX "open()" call to read a file. So when delta sink calls POSIX open() to read a file, the linker resolves it to the plugin's open instead of libc's open. We rename the symbols plugin exports to iggy_sink_open. Now when object_store calls POSIX open(), there's no local symbol called open to collide with, so it correctly resolves to libc's open().
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2783 +/- ##
============================================
+ Coverage 68.30% 68.59% +0.28%
Complexity 656 656
============================================
Files 741 746 +5
Lines 62210 62925 +715
Branches 58623 59338 +715
============================================
+ Hits 42495 43164 +669
- Misses 17601 17643 +42
- Partials 2114 2118 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| */ | ||
|
|
||
| #[global_allocator] | ||
| static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; |
There was a problem hiding this comment.
this crate is cdylib loaded via dlopen2 by the connector runtime, which already declares its own #[global_allocator]. two global allocators in one process causes heap corruption (memory allocated by one, freed by the other). No other existing sink connector defines #[global_allocator], removethis
| async fn close(&mut self) -> Result<(), Error> { | ||
| info!("Delta Lake sink connector with ID: {} is closed.", self.id); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
close() is a no-op. The JsonWriter has internal Parquet buffers,if the runtime shuts down or a prior flush_and_commit failed, buffered data is silently dropped, the state (Mutex<Option<SinkState>>) is never set back to None. close() should attempt a final flush_and_commit and take() the state.
| Err(_) if !self.config.schema.is_empty() => { | ||
| info!("Table does not exist, creating from configured schema..."); | ||
| create_table(&self.config.table_uri, storage_options, &self.config.schema) | ||
| .await? | ||
| } |
There was a problem hiding this comment.
Err(_) if !self.config.schema.is_empty() catches network failures, auth errors, corrupted metadata anything.... and proceeds to create a new table. with wrong S3 credentials + schema configured, it will attempt to create a new empty table, diverging from the real one. Must match on the specific "not found" error variant
| let json_bytes = simd_json::to_vec(simd_value).map_err(|e| { | ||
| error!("Failed to serialize JSON payload: {e}"); | ||
| Error::InvalidJsonPayload | ||
| })?; | ||
| let value: serde_json::Value = | ||
| serde_json::from_slice(&json_bytes).map_err(|e| { | ||
| error!("Failed to parse JSON payload: {e}"); | ||
| Error::InvalidJsonPayload | ||
| })?; | ||
| json_values.push(value); |
There was a problem hiding this comment.
double allocation. the elasticsearch_sink already has a direct owned_value_to_serde_json converter for this reason.
| // Write JSON values to internal Parquet buffers | ||
| state.writer.write(json_values).await.map_err(|e| { | ||
| error!("Failed to write to Delta writer: {e}"); | ||
| Error::Storage(format!("Failed to write to Delta writer: {e}")) | ||
| })?; | ||
|
|
||
| // Flush buffers to object store and commit to Delta log | ||
| let version = state | ||
| .writer | ||
| .flush_and_commit(&mut state.table) | ||
| .await | ||
| .map_err(|e| { | ||
| error!("Failed to flush and commit to Delta table: {e}"); | ||
| Error::Storage(format!("Failed to flush and commit: {e}")) | ||
| })?; |
There was a problem hiding this comment.
when flush_and_commit fails, data is already in the writer's buffer. The runtime exits the consumer task on error (core/connectors/runtime/src/sink.rs:322-329), but the consumer group has already committed the offset. Those messages are permanently lost. Additionally, the writer buffer is left in an undefined state -- no reset() call to clear it.
| .map(|entry| { | ||
| let parts: Vec<&str> = entry.split_whitespace().collect(); | ||
| if parts.len() != 2 { | ||
| return Err(Error::InvalidConfig); |
There was a problem hiding this comment.
add print what went wrong, similar to line 53
| let version = state | ||
| .writer | ||
| .flush_and_commit(&mut state.table) |
There was a problem hiding this comment.
with poll_interval = "5ms" and batch_length = 100, this produces thousands of tiny Parquet files per second. Delta Lake is not designed for this write frequency. Should support configurable flush thresholds (size/time/count).
Which issue does this PR close?
Closes #1852
Rationale
Delta Lake is a data analytics engine, and very popular in modern streaming analytics architectures.
What changed?
Introduces a Delta Lake Sink Connector that enables writing data from Iggy to Delta Lake.
The Delta Lake writing logic is heavily inspired by the kafka-delta-ingest project, to have a proven starting ground for writing to Delta Lake.
Local Execution
user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime<Utc>, message: Stringusing sample data producer.AI Usage
If AI tools were used, please answer: