diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dfed517d..8047299d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - `pw.io.milvus.write` connector, which writes a Pathway table to a Milvus collection. Row additions are sent as upserts and row deletions are sent as deletes keyed on the configured primary key column. Requires a Pathway Scale license. +- `pw.io.rabbitmq` connector for reading from and writing to RabbitMQ Streams. Supports JSON, plaintext, and raw formats with TLS/mutual TLS configuration. Includes `pw.io.rabbitmq.simple_read` for quick-start usage. ## [0.30.0] - 2026-03-24 diff --git a/Cargo.lock b/Cargo.lock index 5909bb2c0..1b29dc439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -460,7 +460,7 @@ dependencies = [ "regex", "ring", "rustls-native-certs 0.7.3", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.7", "serde", "serde_json", @@ -469,7 +469,7 @@ dependencies = [ "thiserror 1.0.69", "time", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "tokio-websockets", "tracing", @@ -886,7 +886,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tower 0.5.2", "tracing", ] @@ -1666,6 +1666,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] + [[package]] name = "chrono" version = "0.4.43" @@ -1824,6 +1835,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1859,6 +1879,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32c" version = "0.6.8" @@ -1979,7 +2008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "curve25519-dalek-derive", "digest", "fiat-crypto", @@ -3024,6 +3053,29 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "derive_more" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" +dependencies = [ + "convert_case 0.10.0", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.101", + "unicode-xid", +] + [[package]] name = "derive_utils" version = "0.15.0" @@ -3409,9 +3461,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ "percent-encoding", ] @@ -3613,11 +3665,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.2.0", "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.0", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.2" @@ -3997,7 +4063,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots 0.26.7", ] @@ -4314,9 +4380,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" dependencies = [ "idna_adapter", "smallvec", @@ -4552,6 +4618,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "levenshtein_automata" version = "0.2.1" @@ -5041,7 +5113,7 @@ dependencies = [ "bson", "chrono", "derive-where", - "derive_more", + "derive_more 0.99.18", "futures-core", "futures-executor", "futures-io", @@ -5367,18 +5439,19 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.7.3" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +checksum = "5d0bca838442ec211fa11de3a8b0e0e8f3a4522575b5c4c06ed722e005036f26" dependencies = [ "num_enum_derive", + "rustversion", ] [[package]] name = "num_enum_derive" -version = "0.7.3" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +checksum = "680998035259dcfcafe653688bf2aa6d3e2dc05e98be6ab46afb089dc84f1df8" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -5446,7 +5519,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "ring", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "serde", "serde_json", "serde_urlencoded", @@ -5844,6 +5917,7 @@ dependencies = [ "pyo3-log", "qdrant-client", "questdb-rs", + "rabbitmq-stream-client", "rand 0.9.2", "rayon", "rdkafka", @@ -5908,9 +5982,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "petgraph" @@ -6009,18 +6083,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", @@ -6443,7 +6517,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustls 0.22.4", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-pki-types", "ryu", "serde", @@ -6568,6 +6642,50 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "rabbitmq-stream-client" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e766fd5cccfba5caa077d0b00de81840f0553a15325f22b59e87322f235d10" +dependencies = [ + "async-trait", + "bytes", + "dashmap", + "futures", + "murmur3", + "pin-project", + "rabbitmq-stream-protocol", + "rand 0.10.0", + "rustls-pemfile 2.2.0", + "thiserror 2.0.18", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "rabbitmq-stream-protocol" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fedda81585bf2b5f8b91be49a592d2c91cb060c2c1927840617c52a67d1b7d" +dependencies = [ + "byteorder", + "chrono", + "derive_more 2.1.1", + "num_enum", + "ordered-float 4.6.0", + "uuid", +] + [[package]] name = "radium" version = "0.7.0" @@ -6609,6 +6727,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.0", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -6667,6 +6796,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + [[package]] name = "rand_distr" version = "0.4.3" @@ -6885,7 +7020,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -6983,7 +7118,7 @@ dependencies = [ "log", "native-tls", "rustls-native-certs 0.7.3", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.7", "thiserror 1.0.69", "tokio", @@ -7201,7 +7336,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", "security-framework 2.11.1", @@ -7230,11 +7365,10 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64 0.22.1", "rustls-pki-types", ] @@ -7555,7 +7689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -7566,7 +7700,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -7577,7 +7711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -8400,9 +8534,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.2" +version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ "rustls 0.23.31", "tokio", @@ -8449,7 +8583,7 @@ dependencies = [ "ring", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "webpki-roots 0.26.7", ] @@ -8494,10 +8628,10 @@ dependencies = [ "pin-project", "prost", "rustls-native-certs 0.8.1", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "socket2 0.5.10", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -8528,7 +8662,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "socket2 0.5.10", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.5.2", "tower-layer", @@ -8830,14 +8964,15 @@ dependencies = [ [[package]] name = "url" -version = "2.5.4" +version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" dependencies = [ "form_urlencoded", "idna", "percent-encoding", "serde", + "serde_derive", ] [[package]] @@ -8987,6 +9122,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasite" version = "1.0.2" @@ -9067,6 +9220,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.9.0", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -9080,6 +9255,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.9.1", + "hashbrown 0.15.2", + "indexmap 2.9.0", + "semver", +] + [[package]] name = "web-sys" version = "0.3.77" @@ -9635,6 +9822,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -9644,6 +9851,74 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.9.0", + "prettyplease", + "syn 2.0.101", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.101", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.9.1", + "indexmap 2.9.0", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.9.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 9d8f3d8b8..eea045d7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ pyo3 = { version = "0.25.0", features = ["abi3-py310", "multiple-pymethods"] } pyo3-async-runtimes = "0.25.0" pyo3-log = "0.12.4" qdrant-client = "1.15.0" +rabbitmq-stream-client = "0.11.0" questdb-rs = "4.0.5" rand = "0.9.1" rayon = "1.10.0" diff --git a/examples/projects/rabbitmq-ETL/Makefile b/examples/projects/rabbitmq-ETL/Makefile new file mode 100644 index 000000000..ae060e320 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/Makefile @@ -0,0 +1,25 @@ +build: + docker compose up -d + +stop: + docker compose down -v --remove-orphans + docker rmi etl-rabbitmq-pathway:latest + docker rmi etl-rabbitmq-stream-producer:latest + +connect: + docker compose exec -it pathway bash +connect-prod: + docker compose exec -it stream-producer bash +connect-rabbitmq: + docker compose exec -it rabbitmq bash +psql: + docker compose exec -it postgres psql -U pathway -d etl_db + +logs: + docker compose logs pathway +logs-prod: + docker compose logs stream-producer +logs-rabbitmq: + docker compose logs rabbitmq +logs-postgres: + docker compose logs postgres diff --git a/examples/projects/rabbitmq-ETL/README.md b/examples/projects/rabbitmq-ETL/README.md new file mode 100644 index 000000000..1f8a042a3 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/README.md @@ -0,0 +1,93 @@ +# ETL with RabbitMQ Streams in / PostgreSQL out + +A streaming ETL pipeline built with Pathway that reads from RabbitMQ Streams, +validates, transforms, aggregates, and joins data, then writes enriched results +to PostgreSQL in real time. + +## What Pathway replaces + +This example is inspired by three ETL architectures that traditionally require +many moving parts: + +| Traditional approach | What Pathway replaces | +|---|---| +| Python/Java workers consuming from queues | `pw.io.rabbitmq.read` | +| Staging databases for validation | UDF-based filtering | +| Airflow DAGs for orchestration | Reactive streaming engine | +| Celery task queues for processing | Single streaming process | +| Separate aggregation jobs | `groupby().reduce()` | +| Multi-step load pipelines | `pw.io.postgres.write` | + +**Result:** One Pathway process replaces 5+ services. + +## Architecture + +``` +Producer (rstream) Pathway (single process) PostgreSQL ++----------------+ +------------------------------+ +------------------+ +| employees_raw |--->| pw.io.rabbitmq.read | | | +| orders_raw |--->| -> validate (UDFs) |--->| enriched_employees| +| listings_raw |--->| -> transform | | enriched_listings | ++----------------+ | -> aggregate (groupby) | | (snapshot mode) | + | -> join (left joins) | +------------------+ + | pw.io.postgres.write | + +------------------------------+ +``` + +## Data sources + +- **Employees** (10 records): HR data with intentional quality issues (empty names, + invalid emails, negative salaries) to demonstrate validation +- **Orders** (15+ records): Purchase orders linked to employees by `employee_id` +- **Listings** (10+ records): Rental property listings linked to employees via + `agent_employee_id` (inspired by rental platform crawlers) + +## ETL pipeline + +1. **Read** three RabbitMQ streams via `pw.io.rabbitmq.read` +2. **Validate** using UDFs: email format, phone format, positive salary/price, non-empty fields +3. **Transform**: normalize emails/departments to lowercase, parse string amounts to float, + concatenate names, derive boolean flags +4. **Aggregate**: order stats per employee (`total_orders`, `total_spent`), + listing stats per agent (`total_listings`, `avg_listing_price`) +5. **Join**: left-join employees with order stats and listing stats; + left-join listings with agent names +6. **Write** to PostgreSQL snapshot tables (auto-updated as new data arrives) + +## Launching + +```bash +docker compose up -d +# or +make +``` + +## Checking results + +Connect to PostgreSQL: + +```bash +make psql +``` + +Then query the enriched tables: + +```sql +SELECT * FROM enriched_employees ORDER BY employee_id; +SELECT * FROM enriched_listings ORDER BY listing_id; +``` + +As the producer continues sending new orders and listings, re-running these queries +will show updated aggregates in real time. + +## Monitoring + +- **Pathway logs**: `make logs` +- **Producer logs**: `make logs-prod` +- **RabbitMQ Management UI**: http://localhost:15672 (guest/guest) + +## Stopping + +```bash +make stop +``` diff --git a/examples/projects/rabbitmq-ETL/docker-compose.yml b/examples/projects/rabbitmq-ETL/docker-compose.yml new file mode 100644 index 000000000..c67dd67df --- /dev/null +++ b/examples/projects/rabbitmq-ETL/docker-compose.yml @@ -0,0 +1,59 @@ +version: "3.7" +name: etl-rabbitmq +networks: + etl-rabbitmq-network: + driver: bridge +services: + rabbitmq: + image: rabbitmq:3.13-management + environment: + RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbitmq_stream advertised_host rabbitmq -rabbitmq_stream advertised_port 5552" + ports: + - 5552:5552 + - 5672:5672 + - 15672:15672 + command: bash -c "rabbitmq-plugins enable --offline rabbitmq_stream && rabbitmq-server" + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 10s + timeout: 5s + retries: 5 + networks: + - etl-rabbitmq-network + postgres: + image: postgres:16 + environment: + POSTGRES_USER: pathway + POSTGRES_PASSWORD: pathway + POSTGRES_DB: etl_db + ports: + - 5433:5432 + volumes: + - ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U pathway"] + interval: 5s + timeout: 5s + retries: 5 + networks: + - etl-rabbitmq-network + stream-producer: + build: + context: . + dockerfile: ./producer-src/Dockerfile + depends_on: + rabbitmq: + condition: service_healthy + networks: + - etl-rabbitmq-network + pathway: + build: + context: . + dockerfile: ./pathway-src/Dockerfile + depends_on: + rabbitmq: + condition: service_healthy + postgres: + condition: service_healthy + networks: + - etl-rabbitmq-network diff --git a/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile b/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile new file mode 100644 index 000000000..817502d52 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.10 + +RUN pip install -U pathway +COPY ./pathway-src/etl.py etl.py + +CMD ["python", "etl.py"] diff --git a/examples/projects/rabbitmq-ETL/pathway-src/etl.py b/examples/projects/rabbitmq-ETL/pathway-src/etl.py new file mode 100644 index 000000000..011bc922e --- /dev/null +++ b/examples/projects/rabbitmq-ETL/pathway-src/etl.py @@ -0,0 +1,270 @@ +# Copyright © 2026 Pathway +# +# ETL pipeline that reads from RabbitMQ Streams, validates, transforms, +# aggregates, joins, and writes enriched results to PostgreSQL. +# +# This single Pathway process replaces what traditionally required: +# - Celery/RabbitMQ workers (consuming) +# - Staging databases (validation) +# - Airflow DAGs (orchestration) +# - Separate Python/Java workers (transformation) + +import re +import time + +import pathway as pw + +# To use advanced features with Pathway Scale, get your free license key from +# https://pathway.com/features and paste it below. +# To use Pathway Community, comment out the line below. +# pw.set_license_key("demo-license-key-with-telemetry") + +RABBITMQ_URI = "rabbitmq-stream://guest:guest@0.0.0.0:5552" + +postgres_settings = { + "host": "0.0.0.0", + "port": "5433", + "dbname": "etl_db", + "user": "pathway", + "password": "pathway", +} + + +# ── Schemas ────────────────────────────────────────────────────────────────── + + +class EmployeeSchema(pw.Schema): + employee_id: int + first_name: str + last_name: str + email: str + phone: str + department: str + salary: str + hire_date: str + status: str + + +class OrderSchema(pw.Schema): + order_id: int + employee_id: int + amount: str + order_date: str + product: str + quantity: int + + +class ListingSchema(pw.Schema): + listing_id: int + title: str + price: str + location: str + source: str + posted_date: str + agent_employee_id: int + + +# ── Validation UDFs ────────────────────────────────────────────────────────── + + +@pw.udf +def is_valid_email(email: str) -> bool: + return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", email)) + + +@pw.udf +def is_valid_phone(phone: str) -> bool: + digits = re.sub(r"[\s\-\(\)\+]", "", phone) + return len(digits) >= 7 and digits.isdigit() + + +@pw.udf +def is_positive_number(value: str) -> bool: + try: + return float(value) > 0 + except (ValueError, TypeError): + return False + + +# ── Transformation UDFs ────────────────────────────────────────────────────── + + +@pw.udf +def normalize_lower(value: str) -> str: + return value.strip().lower() + + +@pw.udf +def parse_float(value: str) -> float: + return float(value) + + +# ── READ from RabbitMQ ─────────────────────────────────────────────────────── + +employees = pw.io.rabbitmq.read( + RABBITMQ_URI, + "employees_raw", + schema=EmployeeSchema, + format="json", + autocommit_duration_ms=500, +) + +orders = pw.io.rabbitmq.read( + RABBITMQ_URI, + "orders_raw", + schema=OrderSchema, + format="json", + autocommit_duration_ms=500, +) + +listings = pw.io.rabbitmq.read( + RABBITMQ_URI, + "listings_raw", + schema=ListingSchema, + format="json", + autocommit_duration_ms=500, +) + +# ── VALIDATE employees ─────────────────────────────────────────────────────── + +valid_employees = employees.filter( + (pw.this.first_name != "") + & is_valid_email(pw.this.email) + & is_valid_phone(pw.this.phone) + & is_positive_number(pw.this.salary) +) + +# ── TRANSFORM employees ───────────────────────────────────────────────────── + +transformed_employees = valid_employees.select( + employee_id=pw.this.employee_id, + full_name=pw.this.first_name + " " + pw.this.last_name, + email=normalize_lower(pw.this.email), + phone=pw.this.phone, + department=normalize_lower(pw.this.department), + salary=parse_float(pw.this.salary), + hire_date=pw.this.hire_date, + is_active=(pw.this.status == "active"), +) + +# ── VALIDATE & TRANSFORM orders ───────────────────────────────────────────── + +valid_orders = orders.filter( + is_positive_number(pw.this.amount) & (pw.this.quantity > 0) +) + +transformed_orders = valid_orders.select( + order_id=pw.this.order_id, + employee_id=pw.this.employee_id, + amount=parse_float(pw.this.amount), + order_date=pw.this.order_date, + product=pw.this.product, + quantity=pw.this.quantity, + line_total=parse_float(pw.this.amount) * pw.this.quantity, +) + +# ── VALIDATE & TRANSFORM listings ─────────────────────────────────────────── + +valid_listings = listings.filter( + (pw.this.title != "") & is_positive_number(pw.this.price) +) + +transformed_listings = valid_listings.select( + listing_id=pw.this.listing_id, + title=pw.this.title, + price=parse_float(pw.this.price), + location=pw.this.location, + source=pw.this.source, + posted_date=pw.this.posted_date, + agent_employee_id=pw.this.agent_employee_id, +) + +# ── AGGREGATE orders per employee ──────────────────────────────────────────── + +order_stats = transformed_orders.groupby(pw.this.employee_id).reduce( + employee_id=pw.this.employee_id, + total_orders=pw.reducers.count(), + total_spent=pw.reducers.sum(pw.this.line_total), +) + +# ── AGGREGATE listings per agent ───────────────────────────────────────────── + +listing_stats = transformed_listings.groupby(pw.this.agent_employee_id).reduce( + agent_employee_id=pw.this.agent_employee_id, + total_listings=pw.reducers.count(), + avg_listing_price=pw.reducers.avg(pw.this.price), +) + +# ── JOIN: enrich employees with order stats ────────────────────────────────── + +employees_with_orders = transformed_employees.join_left( + order_stats, pw.left.employee_id == pw.right.employee_id +).select( + employee_id=pw.left.employee_id, + full_name=pw.left.full_name, + email=pw.left.email, + phone=pw.left.phone, + department=pw.left.department, + salary=pw.left.salary, + hire_date=pw.left.hire_date, + is_active=pw.left.is_active, + total_orders=pw.coalesce(pw.right.total_orders, 0), + total_spent=pw.coalesce(pw.right.total_spent, 0.0), +) + +# ── JOIN: enrich employees with listing stats ──────────────────────────────── + +enriched_employees = employees_with_orders.join_left( + listing_stats, pw.left.employee_id == pw.right.agent_employee_id +).select( + employee_id=pw.left.employee_id, + full_name=pw.left.full_name, + email=pw.left.email, + phone=pw.left.phone, + department=pw.left.department, + salary=pw.left.salary, + hire_date=pw.left.hire_date, + is_active=pw.left.is_active, + total_orders=pw.left.total_orders, + total_spent=pw.left.total_spent, + total_listings=pw.coalesce(pw.right.total_listings, 0), + avg_listing_price=pw.coalesce(pw.right.avg_listing_price, 0.0), +) + +# ── JOIN: enrich listings with agent name ──────────────────────────────────── + +enriched_listings = transformed_listings.join_left( + transformed_employees, pw.left.agent_employee_id == pw.right.employee_id +).select( + listing_id=pw.left.listing_id, + title=pw.left.title, + price=pw.left.price, + location=pw.left.location, + source=pw.left.source, + posted_date=pw.left.posted_date, + agent_name=pw.coalesce(pw.right.full_name, "Unknown"), +) + +# ── WRITE to PostgreSQL ────────────────────────────────────────────────────── + +pw.io.postgres.write( + enriched_employees, + postgres_settings, + "enriched_employees", + output_table_type="snapshot", + primary_key=[enriched_employees.employee_id], +) + +pw.io.postgres.write( + enriched_listings, + postgres_settings, + "enriched_listings", + output_table_type="snapshot", + primary_key=[enriched_listings.listing_id], +) + +# Wait for RabbitMQ streams to be created by the producer +time.sleep(15) + +# Launch the streaming computation +pw.run() diff --git a/examples/projects/rabbitmq-ETL/producer-src/Dockerfile b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile new file mode 100644 index 000000000..cc7bffc4a --- /dev/null +++ b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.10 + +ENV PYTHONUNBUFFERED=1 +RUN pip install rstream +COPY ./producer-src/create-streams.py create-streams.py + +CMD ["python", "create-streams.py"] diff --git a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py new file mode 100644 index 000000000..46eec8c11 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py @@ -0,0 +1,462 @@ +# Copyright © 2026 Pathway + +import asyncio +import json +import random +import time + +RABBITMQ_HOST = "rabbitmq" +RABBITMQ_PORT = 5552 +RABBITMQ_USER = "guest" +RABBITMQ_PASSWORD = "guest" + +STREAMS = ["employees_raw", "orders_raw", "listings_raw"] + +# Sample employees - some with intentional data quality issues for validation demo +EMPLOYEES = [ + { + "employee_id": 1, + "first_name": "Alice", + "last_name": "Smith", + "email": "ALICE.SMITH@COMPANY.COM", + "phone": "+1 (555) 123-4567", + "department": "Engineering", + "salary": "95000", + "hire_date": "2022-01-15", + "status": "active", + }, + { + "employee_id": 2, + "first_name": "Bob", + "last_name": "Jones", + "email": "bob.jones@company.com", + "phone": "555-234-5678", + "department": "Sales", + "salary": "72000", + "hire_date": "2021-06-20", + "status": "active", + }, + { + # Bad record: empty first_name, invalid email, negative salary + "employee_id": 3, + "first_name": "", + "last_name": "Garcia", + "email": "invalid-email", + "phone": "555-345-6789", + "department": "Marketing", + "salary": "-5000", + "hire_date": "2023-03-01", + "status": "active", + }, + { + "employee_id": 4, + "first_name": "Diana", + "last_name": "Lee", + "email": "diana.lee@company.com", + "phone": "+44 20 7946 0958", + "department": "ENGINEERING", + "salary": "105000", + "hire_date": "2020-11-10", + "status": "active", + }, + { + "employee_id": 5, + "first_name": "Erik", + "last_name": "Martinez", + "email": "erik.martinez@company.com", + "phone": "555-456-7890", + "department": "Sales", + "salary": "68000", + "hire_date": "2023-08-05", + "status": "terminated", + }, + { + # Bad record: invalid phone + "employee_id": 6, + "first_name": "Fatima", + "last_name": "Ahmed", + "email": "fatima.ahmed@company.com", + "phone": "abc", + "department": "engineering", + "salary": "88000", + "hire_date": "2022-04-18", + "status": "active", + }, + { + "employee_id": 7, + "first_name": "George", + "last_name": "Wilson", + "email": "george.wilson@company.com", + "phone": "+1-555-567-8901", + "department": "Real Estate", + "salary": "78000", + "hire_date": "2021-09-30", + "status": "active", + }, + { + "employee_id": 8, + "first_name": "Hannah", + "last_name": "Brown", + "email": "hannah.brown@company.com", + "phone": "555-678-9012", + "department": "Real Estate", + "salary": "82000", + "hire_date": "2022-07-12", + "status": "active", + }, + { + # Bad record: empty salary + "employee_id": 9, + "first_name": "Ivan", + "last_name": "Petrov", + "email": "ivan.petrov@company.com", + "phone": "+7 495 123 4567", + "department": "Marketing", + "salary": "", + "hire_date": "2023-01-22", + "status": "active", + }, + { + "employee_id": 10, + "first_name": "Julia", + "last_name": "Chen", + "email": "julia.chen@company.com", + "phone": "+86 10 1234 5678", + "department": "Engineering", + "salary": "115000", + "hire_date": "2019-05-14", + "status": "active", + }, +] + +# Sample orders - multiple per employee +ORDERS = [ + { + "order_id": 101, + "employee_id": 1, + "amount": "1250.50", + "order_date": "2024-11-01", + "product": "Laptop Stand", + "quantity": 2, + }, + { + "order_id": 102, + "employee_id": 2, + "amount": "340.00", + "order_date": "2024-11-05", + "product": "Keyboard", + "quantity": 5, + }, + { + "order_id": 103, + "employee_id": 1, + "amount": "89.99", + "order_date": "2024-11-08", + "product": "Mouse Pad", + "quantity": 10, + }, + { + "order_id": 104, + "employee_id": 4, + "amount": "2100.00", + "order_date": "2024-11-10", + "product": "Monitor", + "quantity": 1, + }, + { + "order_id": 105, + "employee_id": 5, + "amount": "450.00", + "order_date": "2024-11-12", + "product": "Headset", + "quantity": 3, + }, + { + "order_id": 106, + "employee_id": 2, + "amount": "175.25", + "order_date": "2024-11-15", + "product": "Webcam", + "quantity": 2, + }, + { + "order_id": 107, + "employee_id": 7, + "amount": "560.00", + "order_date": "2024-11-18", + "product": "Desk Lamp", + "quantity": 4, + }, + { + "order_id": 108, + "employee_id": 10, + "amount": "3200.00", + "order_date": "2024-11-20", + "product": "Standing Desk", + "quantity": 1, + }, + { + "order_id": 109, + "employee_id": 4, + "amount": "125.50", + "order_date": "2024-11-22", + "product": "Cable Kit", + "quantity": 8, + }, + { + "order_id": 110, + "employee_id": 1, + "amount": "780.00", + "order_date": "2024-11-25", + "product": "Chair Mat", + "quantity": 2, + }, + { + "order_id": 111, + "employee_id": 8, + "amount": "95.00", + "order_date": "2024-11-28", + "product": "Notebook", + "quantity": 15, + }, + { + "order_id": 112, + "employee_id": 10, + "amount": "420.00", + "order_date": "2024-12-01", + "product": "USB Hub", + "quantity": 6, + }, + { + "order_id": 113, + "employee_id": 7, + "amount": "1800.00", + "order_date": "2024-12-03", + "product": "Printer", + "quantity": 1, + }, + { + "order_id": 114, + "employee_id": 2, + "amount": "65.99", + "order_date": "2024-12-05", + "product": "Mouse", + "quantity": 3, + }, + { + "order_id": 115, + "employee_id": 4, + "amount": "290.00", + "order_date": "2024-12-08", + "product": "Speakers", + "quantity": 2, + }, +] + +# Sample rental listings - inspired by ldtrungmark rental platform +LISTINGS = [ + { + "listing_id": 1001, + "title": "Modern 2BR Apartment Downtown", + "price": "2500.00", + "location": "District 1", + "source": "website_a", + "posted_date": "2024-11-01", + "agent_employee_id": 7, + }, + { + "listing_id": 1002, + "title": "Cozy Studio Near Park", + "price": "1200.00", + "location": "District 3", + "source": "website_b", + "posted_date": "2024-11-03", + "agent_employee_id": 8, + }, + { + "listing_id": 1003, + "title": "Spacious 3BR Family Home", + "price": "3800.00", + "location": "District 2", + "source": "website_a", + "posted_date": "2024-11-05", + "agent_employee_id": 7, + }, + # Bad record: negative price + { + "listing_id": 1004, + "title": "Invalid Listing", + "price": "-500.00", + "location": "District 5", + "source": "website_c", + "posted_date": "2024-11-07", + "agent_employee_id": 8, + }, + { + "listing_id": 1005, + "title": "Luxury Penthouse with View", + "price": "8500.00", + "location": "District 1", + "source": "website_b", + "posted_date": "2024-11-10", + "agent_employee_id": 7, + }, + { + "listing_id": 1006, + "title": "Renovated 1BR Near Metro", + "price": "1800.00", + "location": "District 4", + "source": "website_a", + "posted_date": "2024-11-12", + "agent_employee_id": 8, + }, + { + "listing_id": 1007, + "title": "Garden Villa with Pool", + "price": "12000.00", + "location": "District 7", + "source": "website_b", + "posted_date": "2024-11-15", + "agent_employee_id": 7, + }, + { + "listing_id": 1008, + "title": "Budget Room in Shared House", + "price": "450.00", + "location": "District 9", + "source": "website_a", + "posted_date": "2024-11-18", + "agent_employee_id": 8, + }, + # Bad record: empty title + { + "listing_id": 1009, + "title": "", + "price": "1500.00", + "location": "District 3", + "source": "website_c", + "posted_date": "2024-11-20", + "agent_employee_id": 7, + }, + { + "listing_id": 1010, + "title": "Furnished Office Space", + "price": "5000.00", + "location": "District 1", + "source": "website_a", + "posted_date": "2024-11-22", + "agent_employee_id": 8, + }, +] + +PRODUCTS = [ + "Laptop Stand", + "Keyboard", + "Mouse", + "Monitor", + "Headset", + "Webcam", + "USB Hub", + "Chair Mat", +] +LOCATIONS = [ + "District 1", + "District 2", + "District 3", + "District 4", + "District 7", + "District 9", +] +SOURCES = ["website_a", "website_b"] + + +async def main(): + from rstream import AMQPMessage, Producer + + print("Connecting to RabbitMQ Streams...") + producer = Producer( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + username=RABBITMQ_USER, + password=RABBITMQ_PASSWORD, + ) + await producer.start() + + # Create all streams + for stream in STREAMS: + try: + await producer.create_stream(stream) + print(f"Created stream: {stream}") + except Exception: + print(f"Stream already exists: {stream}") + + # Publish initial employees + print("Publishing employee records...") + for emp in EMPLOYEES: + await producer.send("employees_raw", AMQPMessage(body=json.dumps(emp).encode())) + await asyncio.sleep(0.3) + print(f"Published {len(EMPLOYEES)} employees") + + # Publish initial orders + print("Publishing order records...") + for order in ORDERS: + await producer.send("orders_raw", AMQPMessage(body=json.dumps(order).encode())) + await asyncio.sleep(0.2) + print(f"Published {len(ORDERS)} orders") + + # Publish initial listings + print("Publishing listing records...") + for listing in LISTINGS: + await producer.send( + "listings_raw", AMQPMessage(body=json.dumps(listing).encode()) + ) + await asyncio.sleep(0.2) + print(f"Published {len(LISTINGS)} listings") + + # Continue producing new data periodically to demonstrate streaming + print("Starting continuous data production...") + order_id = 200 + listing_id = 2000 + valid_employee_ids = [1, 2, 4, 5, 7, 8, 10] + agent_ids = [7, 8] + + while True: + await asyncio.sleep(5) + + # New order + order_id += 1 + new_order = { + "order_id": order_id, + "employee_id": random.choice(valid_employee_ids), + "amount": f"{random.uniform(50, 2000):.2f}", + "order_date": "2024-12-10", + "product": random.choice(PRODUCTS), + "quantity": random.randint(1, 10), + } + await producer.send( + "orders_raw", AMQPMessage(body=json.dumps(new_order).encode()) + ) + print(f"Published new order: {order_id}") + + await asyncio.sleep(3) + + # New listing + listing_id += 1 + new_listing = { + "listing_id": listing_id, + "title": f"New Listing #{listing_id}", + "price": f"{random.uniform(500, 10000):.2f}", + "location": random.choice(LOCATIONS), + "source": random.choice(SOURCES), + "posted_date": "2024-12-10", + "agent_employee_id": random.choice(agent_ids), + } + await producer.send( + "listings_raw", AMQPMessage(body=json.dumps(new_listing).encode()) + ) + print(f"Published new listing: {listing_id}") + + +if __name__ == "__main__": + time.sleep(5) # Extra wait for RabbitMQ streams plugin to initialize + asyncio.run(main()) diff --git a/examples/projects/rabbitmq-ETL/sql/init-db.sql b/examples/projects/rabbitmq-ETL/sql/init-db.sql new file mode 100644 index 000000000..b76cd5284 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/sql/init-db.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS enriched_employees ( + employee_id INTEGER PRIMARY KEY, + full_name TEXT NOT NULL, + email TEXT NOT NULL, + phone TEXT NOT NULL, + department TEXT NOT NULL, + salary DOUBLE PRECISION NOT NULL, + hire_date TEXT NOT NULL, + is_active BOOLEAN NOT NULL, + total_orders BIGINT NOT NULL, + total_spent DOUBLE PRECISION NOT NULL, + total_listings BIGINT NOT NULL, + avg_listing_price DOUBLE PRECISION NOT NULL +); + +CREATE TABLE IF NOT EXISTS enriched_listings ( + listing_id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + location TEXT NOT NULL, + source TEXT NOT NULL, + posted_date TEXT NOT NULL, + agent_name TEXT NOT NULL +); diff --git a/integration_tests/kafka/test_rabbitmq.py b/integration_tests/kafka/test_rabbitmq.py new file mode 100644 index 000000000..dd001977a --- /dev/null +++ b/integration_tests/kafka/test_rabbitmq.py @@ -0,0 +1,427 @@ +import asyncio +import json +import pathlib +import threading +import time +from types import TracebackType +from uuid import uuid4 + +import pytest + +import pathway as pw +from pathway.internals.parse_graph import G +from pathway.tests.utils import CsvLinesNumberChecker, wait_result_with_checker + +RABBITMQ_STREAM_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552/" +RABBITMQ_HOST = "rabbitmq" +RABBITMQ_PORT = 5552 +RABBITMQ_USER = "guest" +RABBITMQ_PASSWORD = "guest" +WAIT_TIMEOUT_SECS = 30 + + +class RabbitmqStreamManager: + """Context manager that creates and cleans up a RabbitMQ stream for testing.""" + + def __init__(self, uri: str, stream_name: str): + self.uri = uri + self.stream_name = stream_name + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread(target=self._loop.run_forever, daemon=True) + self._thread.start() + + def __enter__(self): + fut = asyncio.run_coroutine_threadsafe(self._create_stream(), self._loop) + fut.result() + return self + + def __exit__( + self, + type_: type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ): + fut = asyncio.run_coroutine_threadsafe(self._cleanup(), self._loop) + fut.result() + self._loop.call_soon_threadsafe(self._loop.stop) + self._thread.join() + + async def _create_stream(self): + from rstream import Producer + + self._producer = await Producer.create( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + username=RABBITMQ_USER, + password=RABBITMQ_PASSWORD, + ) + try: + await self._producer.create_stream(self.stream_name) + except Exception: + pass # stream may already exist + + async def _cleanup(self): + try: + await self._producer.delete_stream(self.stream_name) + except Exception: + pass + await self._producer.close() + + def send(self, message: str) -> None: + fut = asyncio.run_coroutine_threadsafe( + self._send_async(message), + self._loop, + ) + fut.result() + + async def _send_async(self, message: str) -> None: + await self._producer.send( + self.stream_name, + message.encode(), + ) + + def send_with_properties(self, message: str, properties: dict[str, str]) -> None: + """Send a message with AMQP 1.0 application properties.""" + fut = asyncio.run_coroutine_threadsafe( + self._send_with_properties_async(message, properties), + self._loop, + ) + fut.result() + + async def _send_with_properties_async( + self, message: str, properties: dict[str, str] + ) -> None: + from rstream import AMQPMessage + + amqp_message = AMQPMessage( + body=message.encode(), + application_properties=properties, + ) + await self._producer.send( + self.stream_name, + amqp_message, + ) + + +def run_identity_program( + input_file: pathlib.Path, + output_file: pathlib.Path, + stream_name: str, + new_entries: list[str], +) -> None: + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + table = pw.io.plaintext.read(input_file, mode="static") + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + ) + + class InputSchema(pw.Schema): + data: str + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, len(new_entries)), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_simple(tmp_path: pathlib.Path): + stream_name = f"rmq-{uuid4()}" + input_file = tmp_path / "input.txt" + output_file = tmp_path / "output.txt" + + with open(input_file, "w") as f: + f.write("one\ntwo\nthree\nfour\n") + run_identity_program( + input_file, + output_file, + stream_name, + ["one", "two", "three", "four"], + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_json(tmp_path: pathlib.Path): + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + class InputSchema(pw.Schema): + name: str + age: int + + # Send test messages + mgr.send('{"name": "Alice", "age": 30}') + mgr.send('{"name": "Bob", "age": 25}') + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_raw_format(tmp_path: pathlib.Path): + """Test reading messages in raw binary format.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("hello raw") + mgr.send("world raw") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="raw", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_plaintext_format(tmp_path: pathlib.Path): + """Test reading messages in plaintext format.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("hello plaintext") + mgr.send("world plaintext") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_metadata(tmp_path: pathlib.Path): + """Test with_metadata=True produces _metadata column with offset and stream_name.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("message1") + mgr.send("message2") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + with_metadata=True, + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + # Verify metadata column contents + import pandas as pd + + result = pd.read_csv(output_file) + assert "_metadata" in result.columns, "Expected _metadata column" + for _, row in result.iterrows(): + metadata = json.loads(row["_metadata"]) + assert "offset" in metadata + assert "stream_name" in metadata + assert metadata["stream_name"] == stream_name + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_key_via_application_properties(tmp_path: pathlib.Path): + """Test key extraction from AMQP 1.0 application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + # Send messages with "key" in application_properties + mgr.send_with_properties("payload1", {"key": "key_a"}) + mgr.send_with_properties("payload2", {"key": "key_b"}) + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + # Verify key column contains the app property values + import pandas as pd + + result = pd.read_csv(output_file) + assert "key" in result.columns, "Expected key column" + keys = sorted(result["key"].tolist()) + assert keys == ["key_a", "key_b"] + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_static_mode(tmp_path: pathlib.Path): + """Test mode='static' reads existing messages and stops.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + # Pre-populate the stream before starting the reader + for i in range(3): + mgr.send(f'{{"value": "msg{i}"}}') + + # Give time for messages to be committed + time.sleep(1) + + G.clear() + + class InputSchema(pw.Schema): + value: str + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + mode="static", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 3), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_with_key(tmp_path: pathlib.Path): + """Test writing messages with key parameter stores key in application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + + table = pw.debug.table_from_markdown( + """ + name | age + Alice | 30 + Bob | 25 + """ + ) + + # Write with key=name column + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + key=table.name, + ) + + # Read back with key support + class InputSchema(pw.Schema): + name: str + age: int + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_with_headers(tmp_path: pathlib.Path): + """Test writing messages with headers parameter stores in application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + + table = pw.debug.table_from_markdown( + """ + name | age + Alice | 30 + Bob | 25 + """ + ) + + # Write with headers + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + headers=[table.name], + ) + + # Read back to verify messages were written + class InputSchema(pw.Schema): + name: str + age: int + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index ff4dc13a1..9ef15abe6 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -930,6 +930,10 @@ class DataStorage: ssl_cert_path: str | None = None, psql_replication: PsqlReplicationSettings | None = None, schema_name: str | None = None, + rabbitmq_tls_root_certificates: str | None = None, + rabbitmq_tls_client_cert: str | None = None, + rabbitmq_tls_client_key: str | None = None, + rabbitmq_tls_trust_certificates: bool = False, ) -> None: ... def delta_s3_storage_options(self, *args, **kwargs): ... diff --git a/python/pathway/io/__init__.py b/python/pathway/io/__init__.py index 625380b31..6f8cdaec3 100644 --- a/python/pathway/io/__init__.py +++ b/python/pathway/io/__init__.py @@ -29,6 +29,7 @@ pyfilesystem, python, questdb, + rabbitmq, redpanda, s3, slack, @@ -82,6 +83,7 @@ "register_input_synchronization_group", "mqtt", "questdb", + "rabbitmq", "dynamodb", "kinesis", "mysql", diff --git a/python/pathway/io/rabbitmq/__init__.py b/python/pathway/io/rabbitmq/__init__.py new file mode 100644 index 000000000..b50373e3d --- /dev/null +++ b/python/pathway/io/rabbitmq/__init__.py @@ -0,0 +1,407 @@ +# Copyright © 2026 Pathway + +from __future__ import annotations + +from typing import Iterable, Literal + +from pathway.internals import api, datasink, datasource +from pathway.internals.expression import ColumnReference +from pathway.internals.runtime_type_check import check_arg_types +from pathway.internals.schema import ( + KEY_SOURCE_COMPONENT, + PAYLOAD_SOURCE_COMPONENT, + Schema, +) +from pathway.internals.table import Table +from pathway.internals.table_io import table_from_datasource +from pathway.internals.trace import trace_user_frame +from pathway.io._utils import ( + MessageQueueOutputFormat, + _get_unique_name, + check_raw_and_plaintext_only_kwargs_for_message_queues, + construct_schema_and_data_format, + internal_connector_mode, +) + + +@check_arg_types +@trace_user_frame +def read( + uri: str, + stream_name: str, + *, + schema: type[Schema] | None = None, + format: Literal["plaintext", "raw", "json"] = "raw", + mode: Literal["streaming", "static"] = "streaming", + autocommit_duration_ms: int | None = 1500, + json_field_paths: dict[str, str] | None = None, + autogenerate_key: bool = False, + with_metadata: bool = False, + start_from_timestamp_ms: int | None = None, + parallel_readers: int | None = None, + name: str | None = None, + max_backlog_size: int | None = None, + tls_root_certificates: str | None = None, + tls_client_cert: str | None = None, + tls_client_key: str | None = None, + tls_trust_certificates: bool = False, + debug_data=None, + _stacklevel: int = 5, + **kwargs, +) -> Table: + """Reads data from a specified RabbitMQ stream. + + It supports three formats: ``"plaintext"``, ``"raw"``, and ``"json"``. + + For the ``"raw"`` format, the payload is read as raw bytes and added directly to the + table. In the ``"plaintext"`` format, the payload is decoded from UTF-8 and stored as + plain text. In both cases, the table will have a ``"key"`` column and a ``"data"`` + column representing the payload. + + If you select the ``"json"`` format, the connector parses the message payload as JSON + and creates table columns based on the schema provided in the ``schema`` parameter. + + Args: + uri: The URI of the RabbitMQ server with Streams enabled, e.g. + ``"rabbitmq-stream://guest:guest@localhost:5552"``. + stream_name: The name of the RabbitMQ stream to read data from. The stream + must already exist on the server. + schema: The table schema, used only when the format is set to ``"json"``. + format: The input data format, which can be ``"raw"``, ``"plaintext"``, or + ``"json"``. + mode: The reading mode, which can be ``"streaming"`` or ``"static"``. In + ``"streaming"`` mode, the connector reads messages continuously. In + ``"static"`` mode, it reads all existing messages and then stops. + autocommit_duration_ms: The time interval (in milliseconds) between commits. + After this time, the updates received by the connector are committed and + added to Pathway's computation graph. + json_field_paths: For the ``"json"`` format, this allows mapping field names to + paths within the JSON structure. Use the format ``: `` + where the path follows the + `JSON Pointer (RFC 6901) `_. + autogenerate_key: If True, always auto-generate the primary key instead of + using the message key from ``application_properties``. + with_metadata: If True, adds a ``_metadata`` column containing a dict with + ``offset``, ``stream_name``, and ``timestamp_millis`` fields. Note: + ``timestamp_millis`` is currently always null due to protocol library + limitations. + start_from_timestamp_ms: If specified, start reading from messages at or after + this timestamp (in milliseconds since epoch). + parallel_readers: The number of reader instances running in parallel. + name: A unique name for the connector. If provided, this name will be used in + logs and monitoring dashboards. Additionally, if persistence is enabled, it + will be used as the name for the snapshot that stores the connector's progress. + max_backlog_size: Limit on the number of entries read from the input source and + kept in processing at any moment. + tls_root_certificates: Path to the root CA certificate file for TLS connections. + tls_client_cert: Path to the client certificate file for mutual TLS. + tls_client_key: Path to the client private key file for mutual TLS. + tls_trust_certificates: If True, trust server certificates without verification. + debug_data: Static data replacing original one when debug mode is active. + + Returns: + Table: The table read. + + Example: + + If your RabbitMQ server with Streams enabled is running on ``localhost`` using the + default Streams port ``5552``, you can stream the ``"events"`` stream to a Pathway + table like this: + + >>> import pathway as pw + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... ) + + You can also parse messages as UTF-8 during reading: + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="plaintext", + ... ) + + Read and parse a JSON table with schema: + + >>> class InputSchema(pw.Schema): + ... user_id: int = pw.column_definition(primary_key=True) + ... username: str + ... phone: str + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="json", + ... schema=InputSchema, + ... ) + + Read with metadata column: + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... with_metadata=True, + ... ) + + Read in static mode (bounded snapshot): + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... mode="static", + ... ) + """ + tls_kwargs: dict = {} + if tls_root_certificates is not None: + tls_kwargs["rabbitmq_tls_root_certificates"] = tls_root_certificates + if tls_client_cert is not None: + tls_kwargs["rabbitmq_tls_client_cert"] = tls_client_cert + if tls_client_key is not None: + tls_kwargs["rabbitmq_tls_client_key"] = tls_client_key + if tls_trust_certificates: + tls_kwargs["rabbitmq_tls_trust_certificates"] = tls_trust_certificates + data_storage = api.DataStorage( + storage_type="rabbitmq", + path=uri, + topic=stream_name, + parallel_readers=parallel_readers, + mode=internal_connector_mode(mode), + start_from_timestamp_ms=start_from_timestamp_ms, + **tls_kwargs, + ) + schema, data_format = construct_schema_and_data_format( + "binary" if format == "raw" else format, + schema=schema, + json_field_paths=json_field_paths, + with_native_record_key=True, + autogenerate_key=autogenerate_key, + with_metadata=with_metadata, + _stacklevel=_stacklevel, + ) + data_source_options = datasource.DataSourceOptions( + commit_duration_ms=autocommit_duration_ms, + unique_name=_get_unique_name(name, kwargs), + max_backlog_size=max_backlog_size, + ) + return table_from_datasource( + datasource.GenericDataSource( + datastorage=data_storage, + dataformat=data_format, + data_source_options=data_source_options, + schema=schema, + datasource_name="rabbitmq", + ), + debug_datasource=datasource.debug_datasource(debug_data), + supported_components=( + KEY_SOURCE_COMPONENT, + PAYLOAD_SOURCE_COMPONENT, + ), + ) + + +@check_arg_types +@trace_user_frame +def simple_read( + host: str, + stream_name: str, + *, + port: int = 5552, + schema: type[Schema] | None = None, + format: Literal["plaintext", "raw", "json"] = "raw", + autocommit_duration_ms: int | None = 1500, + json_field_paths: dict[str, str] | None = None, + parallel_readers: int | None = None, + name: str | None = None, + debug_data=None, + **kwargs, +) -> Table: + """Simplified method to read data from a RabbitMQ stream. Only requires the + server host and the stream name. Uses default credentials (``guest:guest``). + If you need TLS, authentication, or fine-tuning, use :py:func:`read` instead. + + Read starts from the beginning of the stream, unless the ``read_only_new`` + parameter is set to True. + + There are three formats currently supported: ``"plaintext"``, ``"raw"``, and + ``"json"``. If ``"raw"`` is chosen, the key and the payload are read as raw bytes. + If ``"plaintext"`` is chosen, they are parsed from UTF-8. In both cases the table + has a ``"data"`` column. If ``"json"`` is chosen, the payload is parsed according + to the ``schema`` parameter. + + Args: + host: Hostname of the RabbitMQ server with Streams enabled. + stream_name: Name of the RabbitMQ stream to read from. + port: Port of the RabbitMQ Streams endpoint (default ``5552``). + schema: Schema of the resulting table (used with ``"json"`` format). + format: Input data format: ``"raw"``, ``"plaintext"``, or ``"json"``. + autocommit_duration_ms: The maximum time between two commits in milliseconds. + json_field_paths: For ``"json"`` format, mapping of field names to JSON + Pointer paths. + parallel_readers: Number of reader instances running in parallel. + name: A unique name for the connector. + debug_data: Static data replacing original one when debug mode is active. + + Returns: + Table: The table read. + + Example: + + >>> import pathway as pw + >>> t = pw.io.rabbitmq.simple_read("localhost", "events") + """ + uri = f"rabbitmq-stream://guest:guest@{host}:{port}" + return read( + uri=uri, + stream_name=stream_name, + schema=schema, + format=format, + mode="streaming", + autocommit_duration_ms=autocommit_duration_ms, + json_field_paths=json_field_paths, + parallel_readers=parallel_readers, + name=name, + debug_data=debug_data, + _stacklevel=7, + **kwargs, + ) + + +@check_raw_and_plaintext_only_kwargs_for_message_queues +@check_arg_types +@trace_user_frame +def write( + table: Table, + uri: str, + stream_name: str | ColumnReference, + *, + format: Literal["json", "dsv", "plaintext", "raw"] = "json", + delimiter: str = ",", + key: ColumnReference | None = None, + value: ColumnReference | None = None, + headers: Iterable[ColumnReference] | None = None, + name: str | None = None, + sort_by: Iterable[ColumnReference] | None = None, + tls_root_certificates: str | None = None, + tls_client_cert: str | None = None, + tls_client_key: str | None = None, + tls_trust_certificates: bool = False, +) -> None: + """Writes data into the specified RabbitMQ stream. + + The produced messages consist of the payload, corresponding to the values of the + table that are serialized according to the chosen format. Two AMQP 1.0 application + properties are always added: ``pathway_time`` (processing time) and ``pathway_diff`` + (either ``1`` or ``-1``). If ``headers`` parameter is used, additional properties + can be added to the message. + + There are several serialization formats supported: ``"json"``, ``"dsv"``, + ``"plaintext"`` and ``"raw"``. + + If the selected format is either ``"plaintext"`` or ``"raw"``, you also need to + specify which column of the table corresponds to the payload of the produced message. + It can be done by providing the ``value`` parameter. + + Args: + table: The table for output. + uri: The URI of the RabbitMQ server with Streams enabled, e.g. + ``"rabbitmq-stream://guest:guest@localhost:5552"``. + stream_name: The RabbitMQ stream where data will be written. The stream must + already exist on the server. + format: Format in which the data is put into RabbitMQ. Currently ``"json"``, + ``"plaintext"``, ``"raw"`` and ``"dsv"`` are supported. + delimiter: Field delimiter to be used in case of delimiter-separated values + format. + key: Reference to a column that should be used as the message key. The key is + stored as an AMQP 1.0 application property named ``"key"``. On the read + side, this property is extracted when ``autogenerate_key`` is False. + value: Reference to the column that should be used as a payload in the produced + message in ``"plaintext"`` or ``"raw"`` format. + headers: References to the table fields that must be provided as message + application properties. These properties are named in the same way as fields + that are forwarded and correspond to the string representations of the + respective values encoded in UTF-8. + name: A unique name for the connector. If provided, this name will be used in + logs and monitoring dashboards. + sort_by: If specified, the output will be sorted in ascending order based on the + values of the given columns within each minibatch. + tls_root_certificates: Path to the root CA certificate file for TLS connections. + tls_client_cert: Path to the client certificate file for mutual TLS. + tls_client_key: Path to the client private key file for mutual TLS. + tls_trust_certificates: If True, trust server certificates without verification. + + Example: + + Assume you have the RabbitMQ server with Streams enabled running locally on the + default Streams port, ``5552``. + + First, you'll need to create a Pathway table: + + >>> import pathway as pw + ... + >>> table = pw.debug.table_from_markdown(''' + ... age | owner | pet + ... 10 | Alice | dog + ... 9 | Bob | cat + ... 8 | Alice | cat + ... ''') + + To output the table's contents in JSON format: + + >>> pw.io.rabbitmq.write( + ... table, + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="json", + ... ) + + You can also use a single column from the table as the payload with headers: + + >>> pw.io.rabbitmq.write( + ... table, + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="plaintext", + ... value=table.owner, + ... headers=[table.age, table.pet], + ... ) + """ + output_format = MessageQueueOutputFormat.construct( + table, + format=format, + delimiter=delimiter, + key=key, + value=value, + headers=headers, + topic_name=stream_name if isinstance(stream_name, ColumnReference) else None, + ) + table = output_format.table + + tls_kwargs: dict = {} + if tls_root_certificates is not None: + tls_kwargs["rabbitmq_tls_root_certificates"] = tls_root_certificates + if tls_client_cert is not None: + tls_kwargs["rabbitmq_tls_client_cert"] = tls_client_cert + if tls_client_key is not None: + tls_kwargs["rabbitmq_tls_client_key"] = tls_client_key + if tls_trust_certificates: + tls_kwargs["rabbitmq_tls_trust_certificates"] = tls_trust_certificates + data_storage = api.DataStorage( + storage_type="rabbitmq", + path=uri, + topic=stream_name if isinstance(stream_name, str) else None, + topic_name_index=output_format.topic_name_index, + key_field_index=output_format.key_field_index, + header_fields=list(output_format.header_fields.items()), + **tls_kwargs, + ) + + table.to( + datasink.GenericDataSink( + data_storage, + output_format.data_format, + datasink_name="rabbitmq", + unique_name=name, + sort_by=sort_by, + ) + ) diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 60dff9efe..32a6c2e27 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -423,6 +423,26 @@ impl FormatterContext { } nats_headers } + + pub fn construct_rabbitmq_properties( + &self, + header_fields: &[(String, usize)], + ) -> Vec<(String, String)> { + let raw_headers = self.construct_message_headers(header_fields, true); + raw_headers + .into_iter() + .map(|h| { + let value = h.value.map_or_else( + || Value::None.to_string(), + |v| { + String::from_utf8(v) + .expect("all prepared headers must be UTF-8 serializable") + }, + ); + (h.key, value) + }) + .collect() + } } #[derive(Debug, thiserror::Error)] diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index ddd564f97..0f9d671ff 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -104,6 +104,8 @@ pub use super::nats::NatsWriter; pub use super::postgres::{ PsqlReader, PsqlWriter, ReplicationError as PostgresReplicationError, SslError, }; +pub use super::rabbitmq::RabbitmqReader; +pub use super::rabbitmq::RabbitmqWriter; pub use super::sqlite::SqliteReader; #[derive(Clone, Debug, Eq, PartialEq, Copy)] @@ -377,6 +379,9 @@ pub enum ReadError { #[error("failed to acknowledge read nats message: {0}")] NatsMessageAck(async_nats::Error), + + #[error("RabbitMQ consumer error: {0}")] + Rabbitmq(String), } // Allow `?` on `mongodb::error::Error` in functions returning `Result<_, ReadError>`. @@ -435,6 +440,7 @@ pub enum StorageType { Kinesis, Postgres, MongoDb, + Rabbitmq, } impl StorageType { @@ -459,6 +465,7 @@ impl StorageType { StorageType::Kinesis => KinesisReader::merge_two_frontiers(lhs, rhs), StorageType::Postgres => PsqlReader::merge_two_frontiers(lhs, rhs), StorageType::MongoDb => MongoReader::merge_two_frontiers(lhs, rhs), + StorageType::Rabbitmq => RabbitmqReader::merge_two_frontiers(lhs, rhs), } } } @@ -481,6 +488,7 @@ pub trait Reader { Ok(()) } + #[allow(clippy::too_many_lines, clippy::match_same_arms)] fn merge_two_frontiers(lhs: &OffsetAntichain, rhs: &OffsetAntichain) -> OffsetAntichain where Self: Sized, @@ -497,6 +505,14 @@ pub trait Reader { result.advance_offset(offset_key.clone(), other_value.clone()); } } + ( + OffsetValue::RabbitmqOffset(offset_position), + OffsetValue::RabbitmqOffset(other_position), + ) => { + if other_position > offset_position { + result.advance_offset(offset_key.clone(), other_value.clone()); + } + } ( OffsetValue::PythonCursor { total_entries_read: offset_position, @@ -672,6 +688,9 @@ pub enum WriteError { #[error(transparent)] NatsFlush(#[from] NatsFlushError), + #[error("RabbitMQ publish error: {0}")] + RabbitmqPublish(String), + #[error(transparent)] JetStream(#[from] NatsError), diff --git a/src/connectors/metadata/mod.rs b/src/connectors/metadata/mod.rs index 5d1d53827..fec875552 100644 --- a/src/connectors/metadata/mod.rs +++ b/src/connectors/metadata/mod.rs @@ -4,6 +4,7 @@ pub mod kafka; pub mod mongodb; pub mod parquet; pub mod postgres; +pub mod rabbitmq; pub mod sqlite; #[allow(clippy::module_name_repetitions)] @@ -24,6 +25,9 @@ pub use parquet::ParquetMetadata; #[allow(clippy::module_name_repetitions)] pub use postgres::PostgresMetadata; +#[allow(clippy::module_name_repetitions)] +pub use rabbitmq::RabbitmqMetadata; + #[allow(clippy::module_name_repetitions)] pub use sqlite::SQLiteMetadata; @@ -37,6 +41,7 @@ pub enum SourceMetadata { Iceberg(IcebergMetadata), Parquet(ParquetMetadata), Postgres(PostgresMetadata), + Rabbitmq(RabbitmqMetadata), } impl From for SourceMetadata { @@ -75,6 +80,12 @@ impl From for SourceMetadata { } } +impl From for SourceMetadata { + fn from(impl_: RabbitmqMetadata) -> Self { + Self::Rabbitmq(impl_) + } +} + impl From for SourceMetadata { fn from(impl_: SQLiteMetadata) -> Self { Self::SQLite(impl_) @@ -91,6 +102,7 @@ impl SourceMetadata { Self::Iceberg(meta) => serde_json::to_value(meta), Self::Parquet(meta) => serde_json::to_value(meta), Self::Postgres(meta) => serde_json::to_value(meta), + Self::Rabbitmq(meta) => serde_json::to_value(meta), } .expect("Internal JSON serialization error") } @@ -103,7 +115,7 @@ impl SourceMetadata { | Self::Iceberg(_) | Self::Parquet(_) | Self::Postgres(_) => false, - Self::Kafka(_) => true, + Self::Kafka(_) | Self::Rabbitmq(_) => true, } } } diff --git a/src/connectors/metadata/rabbitmq.rs b/src/connectors/metadata/rabbitmq.rs new file mode 100644 index 000000000..c808cd7b6 --- /dev/null +++ b/src/connectors/metadata/rabbitmq.rs @@ -0,0 +1,20 @@ +// Copyright © 2026 Pathway + +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct RabbitmqMetadata { + pub offset: u64, + pub stream_name: String, + pub timestamp_millis: Option, +} + +impl RabbitmqMetadata { + pub fn new(offset: u64, stream_name: String, timestamp_millis: Option) -> Self { + Self { + offset, + stream_name, + timestamp_millis, + } + } +} diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index ba2ccbcb4..dd79d295f 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -32,6 +32,7 @@ pub mod nats; pub mod offset; pub mod posix_like; pub mod postgres; +pub mod rabbitmq; pub mod scanner; pub mod sqlite; pub mod synchronization; diff --git a/src/connectors/offset.rs b/src/connectors/offset.rs index a435bae4c..f6af9c73b 100644 --- a/src/connectors/offset.rs +++ b/src/connectors/offset.rs @@ -17,6 +17,7 @@ use crate::persistence::cached_object_storage::CachedObjectVersion; pub enum OffsetKey { Kafka(ArcStr, i32), Nats(usize), + Rabbitmq(usize), Empty, Kinesis(ArcStr), MongoDb, @@ -29,7 +30,9 @@ impl HashInto for OffsetKey { hasher.update(topic_name.as_bytes()); partition.hash_into(hasher); } - OffsetKey::Nats(worker_index) => worker_index.hash_into(hasher), + OffsetKey::Nats(worker_index) | OffsetKey::Rabbitmq(worker_index) => { + worker_index.hash_into(hasher); + } OffsetKey::Kinesis(shard) => hasher.update(shard.as_bytes()), OffsetKey::Empty | OffsetKey::MongoDb => {} } @@ -68,6 +71,7 @@ pub enum OffsetValue { snapshot_id: IcebergSnapshotId, }, NatsReadEntriesCount(usize), + RabbitmqOffset(u64), MqttReadEntriesCount(usize), PostgresReadEntriesCount(usize), KinesisOffset(String), @@ -149,6 +153,9 @@ impl HashInto for OffsetValue { | OffsetValue::PostgresReadEntriesCount(count) => { count.hash_into(hasher); } + OffsetValue::RabbitmqOffset(offset) => { + offset.hash_into(hasher); + } OffsetValue::IcebergSnapshot { snapshot_id } => { snapshot_id.hash_into(hasher); } diff --git a/src/connectors/rabbitmq.rs b/src/connectors/rabbitmq.rs new file mode 100644 index 000000000..97fe65065 --- /dev/null +++ b/src/connectors/rabbitmq.rs @@ -0,0 +1,297 @@ +// Copyright © 2026 Pathway + +use log::error; +use std::borrow::Cow; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use futures::StreamExt; +use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue}; +use rabbitmq_stream_client::{Consumer as RmqConsumer, Environment, Producer as RmqProducer}; +use tokio::runtime::Runtime as TokioRuntime; + +use crate::connectors::data_format::FormatterContext; +use crate::connectors::data_storage::MessageQueueTopic; +use crate::connectors::metadata::RabbitmqMetadata; +use crate::connectors::{ + OffsetKey, OffsetValue, ReadError, ReadResult, Reader, ReaderContext, StorageType, WriteError, + Writer, +}; +use crate::engine::Value; +use crate::persistence::frontier::OffsetAntichain; + +/// AMQP 1.0 application property name used to carry the message key. +/// Writer stores the key here; reader extracts it from this property. +const RABBITMQ_KEY_PROPERTY: &str = "key"; + +const FLUSH_TIMEOUT: Duration = Duration::from_secs(30); +const FLUSH_POLL_INTERVAL: Duration = Duration::from_millis(10); + +// --- Writer --- + +#[allow(clippy::module_name_repetitions)] +pub struct RabbitmqWriter { + runtime: TokioRuntime, + producer: RmqProducer, + topic: MessageQueueTopic, + header_fields: Vec<(String, usize)>, + key_field_index: Option, + pending_confirms: Arc, + send_errors: Arc>>, +} + +impl Writer for RabbitmqWriter { + fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> { + // Check for errors accumulated from previous async sends + { + let errs = self.send_errors.lock().unwrap(); + if !errs.is_empty() { + return Err(WriteError::RabbitmqPublish(errs.join("; "))); + } + } + + self.runtime.block_on(async { + let properties = data.construct_rabbitmq_properties(&self.header_fields); + + // Extract key from data values if key_field_index is set + let key_value: Option = + self.key_field_index.map(|idx| match &data.values[idx] { + Value::Bytes(b) => String::from_utf8_lossy(b).into_owned(), + Value::String(s) => s.to_string(), + other => other.to_string(), + }); + + let has_app_props = !properties.is_empty() || key_value.is_some(); + + for payload in data.payloads { + let payload_bytes = payload.into_raw_bytes()?; + let message = if has_app_props { + let mut app_props = Message::builder().application_properties(); + if let Some(ref key) = key_value { + app_props = app_props.insert(RABBITMQ_KEY_PROPERTY, key.as_str()); + } + for (name, value) in &properties { + app_props = app_props.insert(name.as_str(), value.as_str()); + } + app_props.message_builder().body(payload_bytes).build() + } else { + Message::builder().body(payload_bytes).build() + }; + + self.pending_confirms.fetch_add(1, Ordering::Release); + let counter = self.pending_confirms.clone(); + let errs = self.send_errors.clone(); + self.producer + .send(message, move |result| { + let c = counter; + let e = errs; + async move { + c.fetch_sub(1, Ordering::Release); + match result { + Ok(confirm) if !confirm.confirmed() => { + e.lock().unwrap().push(format!( + "message not confirmed (publishing_id={})", + confirm.publishing_id() + )); + } + Err(err) => { + e.lock().unwrap().push(format!("{err}")); + } + _ => {} + } + } + }) + .await + .map_err(|e| WriteError::RabbitmqPublish(e.to_string()))?; + } + Ok(()) + }) + } + + fn flush(&mut self, _forced: bool) -> Result<(), WriteError> { + let start = Instant::now(); + while self.pending_confirms.load(Ordering::Acquire) > 0 { + if start.elapsed() > FLUSH_TIMEOUT { + return Err(WriteError::RabbitmqPublish(format!( + "flush timed out after {}s with {} messages still pending", + FLUSH_TIMEOUT.as_secs(), + self.pending_confirms.load(Ordering::Acquire) + ))); + } + std::thread::sleep(FLUSH_POLL_INTERVAL); + } + let errs = self.send_errors.lock().unwrap(); + if !errs.is_empty() { + return Err(WriteError::RabbitmqPublish(errs.join("; "))); + } + Ok(()) + } + + fn retriable(&self) -> bool { + true + } + + fn single_threaded(&self) -> bool { + false + } + + fn name(&self) -> String { + format!("RabbitMQ({})", self.topic) + } +} + +impl Drop for RabbitmqWriter { + fn drop(&mut self) { + if let Err(e) = self.flush(true) { + error!("RabbitMQ flush failed on drop: {e}"); + } + } +} + +impl RabbitmqWriter { + pub fn new( + runtime: TokioRuntime, + producer: RmqProducer, + topic: MessageQueueTopic, + header_fields: Vec<(String, usize)>, + key_field_index: Option, + ) -> Self { + RabbitmqWriter { + runtime, + producer, + topic, + header_fields, + key_field_index, + pending_confirms: Arc::new(AtomicUsize::new(0)), + send_errors: Arc::new(Mutex::new(Vec::new())), + } + } +} + +// --- Reader --- + +#[allow(clippy::module_name_repetitions)] +pub struct RabbitmqReader { + runtime: TokioRuntime, + consumer: RmqConsumer, + environment: Environment, + worker_index: usize, + stream_name: String, + end_offset: Option, + deferred_read_result: Option, +} + +impl Reader for RabbitmqReader { + fn read(&mut self) -> Result { + // Return deferred result from previous call (metadata pattern) + if let Some(deferred) = self.deferred_read_result.take() { + return Ok(deferred); + } + + let delivery = self.runtime.block_on(async { self.consumer.next().await }); + match delivery { + Some(Ok(delivery)) => { + let stream_offset = delivery.offset(); + + // Static mode: check if we've reached the end + if let Some(end) = self.end_offset { + if stream_offset > end { + return Ok(ReadResult::Finished); + } + } + + let message = delivery.message(); + + // Extract key from application_properties if present + let key: Option> = message + .application_properties() + .and_then(|props| props.get(RABBITMQ_KEY_PROPERTY)) + .map(|v| match v { + SimpleValue::String(s) => s.as_bytes().to_vec(), + SimpleValue::Binary(b) => b.clone(), + other => format!("{other:?}").into_bytes(), + }); + let body = message.data().map(<[u8]>::to_vec); + + // Use key-value context to support native record keys + let payload = ReaderContext::from_key_value(key, body); + + let offset = ( + OffsetKey::Rabbitmq(self.worker_index), + OffsetValue::RabbitmqOffset(stream_offset), + ); + + // Create metadata and use deferred pattern (like Kafka). + // Timestamp extraction is not possible due to private field in + // rabbitmq_stream_protocol::Timestamp. + let metadata = RabbitmqMetadata::new(stream_offset, self.stream_name.clone(), None); + self.deferred_read_result = Some(ReadResult::Data(payload, offset)); + Ok(ReadResult::NewSource(metadata.into())) + } + Some(Err(e)) => Err(ReadError::Rabbitmq(e.to_string())), + None => Ok(ReadResult::Finished), + } + } + + fn seek(&mut self, frontier: &OffsetAntichain) -> Result<(), ReadError> { + let offset_value = frontier.get_offset(&OffsetKey::Rabbitmq(self.worker_index)); + if let Some(offset) = offset_value { + if let OffsetValue::RabbitmqOffset(last_offset) = offset { + // Rebuild consumer starting from the next offset after the persisted one + let next_offset = last_offset.checked_add(1).ok_or_else(|| { + ReadError::Rabbitmq("Offset overflow: cannot seek past u64::MAX".to_string()) + })?; + let new_consumer = self + .runtime + .block_on(async { + self.environment + .consumer() + .offset(OffsetSpecification::Offset(next_offset)) + .build(&self.stream_name) + .await + }) + .map_err(|e| { + ReadError::Rabbitmq(format!("Failed to rebuild consumer for seek: {e}")) + })?; + self.consumer = new_consumer; + } else { + error!("Unexpected offset type for RabbitMQ reader: {offset:?}"); + } + } + Ok(()) + } + + fn storage_type(&self) -> StorageType { + StorageType::Rabbitmq + } + + fn max_allowed_consecutive_errors(&self) -> usize { + 32 + } + + fn short_description(&self) -> Cow<'static, str> { + format!("RabbitMQ({})", self.stream_name).into() + } +} + +impl RabbitmqReader { + pub fn new( + runtime: TokioRuntime, + consumer: RmqConsumer, + environment: Environment, + worker_index: usize, + stream_name: String, + end_offset: Option, + ) -> RabbitmqReader { + RabbitmqReader { + runtime, + consumer, + environment, + worker_index, + stream_name, + end_offset, + deferred_read_result: None, + } + } +} diff --git a/src/python_api.rs b/src/python_api.rs index 4c71d8068..422de3dde 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -114,8 +114,8 @@ use crate::connectors::data_storage::{ KafkaWriter, LakeWriter, MessageQueueTopic, MongoReader, MongoWriter, MqttReader, MqttWriter, MysqlWriter, NatsReader, NatsWriter, NullWriter, ObjectDownloader, PsqlReader, PsqlWriter, PythonConnectorEventType, PythonReaderBuilder, QuestDBAtColumnPolicy, QuestDBWriter, - RdkafkaWatermark, ReadError, ReadMethod, ReaderBuilder, SqliteReader, TableWriterInitMode, - WriteError, Writer, MQTT_CLIENT_MAX_CHANNEL_SIZE, + RabbitmqReader, RabbitmqWriter, RdkafkaWatermark, ReadError, ReadMethod, ReaderBuilder, + SqliteReader, TableWriterInitMode, WriteError, Writer, MQTT_CLIENT_MAX_CHANNEL_SIZE, }; use crate::connectors::data_tokenize::{BufReaderTokenizer, CsvTokenizer, Tokenize}; use crate::connectors::nats; @@ -160,6 +160,80 @@ mod external_index_wrappers; mod logging; pub mod threads; +/// Parse a `RabbitMQ` Streams URI and build an Environment. +/// URI format: `rabbitmq-stream://user:pass@host:port/vhost` +async fn build_rabbitmq_environment( + uri: &str, + tls_root_certs: Option<&str>, + tls_client_cert: Option<&str>, + tls_client_key: Option<&str>, + tls_trust: bool, +) -> PyResult { + // Parse URI: rabbitmq-stream://user:pass@host:port/vhost + let uri_str = uri.strip_prefix("rabbitmq-stream://").unwrap_or(uri); + let mut builder = rabbitmq_stream_client::Environment::builder(); + + // Split userinfo from host + if let Some((userinfo, rest)) = uri_str.split_once('@') { + if let Some((user, pass)) = userinfo.split_once(':') { + builder = builder.username(user).password(pass); + } + // Parse host:port/vhost + let (hostport, vhost) = rest.split_once('/').unwrap_or((rest, "")); + if let Some((host, port_str)) = hostport.split_once(':') { + builder = builder.host(host); + if let Ok(port) = port_str.parse::() { + builder = builder.port(port); + } + } else { + builder = builder.host(hostport); + } + if !vhost.is_empty() { + builder = builder.virtual_host(vhost); + } + } else { + // No userinfo, just host:port + let (hostport, vhost) = uri_str.split_once('/').unwrap_or((uri_str, "")); + if let Some((host, port_str)) = hostport.split_once(':') { + builder = builder.host(host); + if let Ok(port) = port_str.parse::() { + builder = builder.port(port); + } + } else { + builder = builder.host(uri_str); + } + if !vhost.is_empty() { + builder = builder.virtual_host(vhost); + } + } + + // Configure TLS if any TLS parameters are provided + if tls_root_certs.is_some() || tls_trust || tls_client_cert.is_some() { + let mut tls_builder = rabbitmq_stream_client::TlsConfiguration::builder(); + if let Some(root) = tls_root_certs { + // add_root_certificates enables TLS and sets the CA cert path + tls_builder = tls_builder.add_root_certificates(root.to_string()); + } + if let (Some(cert), Some(key)) = (tls_client_cert, tls_client_key) { + tls_builder = + tls_builder.add_client_certificates_keys(cert.to_string(), key.to_string()); + } + if tls_trust && tls_root_certs.is_none() { + // enable(true) without root certs produces TlsConfiguration::Untrusted + tls_builder = tls_builder.enable(true); + } + let tls_config = tls_builder.build().map_err(|e| { + PyValueError::new_err(format!("Failed to build RabbitMQ TLS configuration: {e}")) + })?; + builder = builder.tls(tls_config); + } + + builder + .build() + .await + .map_err(|e| PyIOError::new_err(format!("Failed to connect to RabbitMQ: {e}"))) +} + static CONVERT: GILOnceCell> = GILOnceCell::new(); fn get_convert_python_module(py: Python<'_>) -> &Bound<'_, PyModule> { @@ -4599,6 +4673,10 @@ pub struct DataStorage { ssl_cert_path: Option, psql_replication: Option, schema_name: Option, + rabbitmq_tls_root_certificates: Option, + rabbitmq_tls_client_cert: Option, + rabbitmq_tls_client_key: Option, + rabbitmq_tls_trust_certificates: bool, } #[allow(clippy::doc_markdown)] @@ -5161,6 +5239,10 @@ impl DataStorage { ssl_cert_path = None, psql_replication = None, schema_name = None, + rabbitmq_tls_root_certificates = None, + rabbitmq_tls_client_cert = None, + rabbitmq_tls_client_key = None, + rabbitmq_tls_trust_certificates = false, ))] #[allow(clippy::too_many_arguments)] #[allow(clippy::fn_params_excessive_bools)] @@ -5207,6 +5289,10 @@ impl DataStorage { ssl_cert_path: Option, psql_replication: Option, schema_name: Option, + rabbitmq_tls_root_certificates: Option, + rabbitmq_tls_client_cert: Option, + rabbitmq_tls_client_key: Option, + rabbitmq_tls_trust_certificates: bool, ) -> Self { DataStorage { storage_type, @@ -5251,6 +5337,10 @@ impl DataStorage { ssl_cert_path, psql_replication, schema_name, + rabbitmq_tls_root_certificates, + rabbitmq_tls_client_cert, + rabbitmq_tls_client_key, + rabbitmq_tls_trust_certificates, } } @@ -6131,6 +6221,80 @@ impl DataStorage { Ok((Box::new(reader), properties.max_parallel_readers(scope))) } + fn construct_rabbitmq_reader( + &self, + scope: &Scope, + properties: &ConnectorProperties, + ) -> PyResult<(Box, usize)> { + let uri = self.path()?; + let stream_name = self.message_queue_fixed_topic()?; + let runtime = create_async_tokio_runtime()?; + + let offset_spec = match self.start_from_timestamp_ms { + Some(ts) => rabbitmq_stream_client::types::OffsetSpecification::Timestamp(ts), + None => rabbitmq_stream_client::types::OffsetSpecification::First, + }; + + let (environment, consumer, end_offset) = runtime.block_on(async { + let environment = build_rabbitmq_environment( + uri, + self.rabbitmq_tls_root_certificates.as_deref(), + self.rabbitmq_tls_client_cert.as_deref(), + self.rabbitmq_tls_client_key.as_deref(), + self.rabbitmq_tls_trust_certificates, + ) + .await?; + let consumer = environment + .consumer() + .offset(offset_spec) + .build(&stream_name) + .await + .map_err(|e| { + PyIOError::new_err(format!("Failed to create RabbitMQ consumer: {e}")) + })?; + + // For static mode, try to determine the last offset to know when to stop. + // We create a temporary consumer at the Last offset to discover the boundary. + let end_offset = if self.mode == ConnectorMode::Static { + let mut probe = environment + .consumer() + .offset(rabbitmq_stream_client::types::OffsetSpecification::Last) + .build(&stream_name) + .await + .ok(); + if let Some(ref mut p) = probe { + use futures::StreamExt; + // Read one message to get the last offset + const STATIC_MODE_PROBE_TIMEOUT_SECS: u64 = 5; + match tokio::time::timeout( + std::time::Duration::from_secs(STATIC_MODE_PROBE_TIMEOUT_SECS), + p.next(), + ) + .await + { + Ok(Some(Ok(delivery))) => Some(delivery.offset()), + _ => None, // empty stream or timeout + } + } else { + None + } + } else { + None + }; + Ok::<_, PyErr>((environment, consumer, end_offset)) + })?; + + let reader = RabbitmqReader::new( + runtime, + consumer, + environment, + scope.worker_index(), + stream_name.clone(), + end_offset, + ); + Ok((Box::new(reader), properties.max_parallel_readers(scope))) + } + fn construct_iceberg_reader( &self, py: pyo3::Python, @@ -6331,6 +6495,7 @@ impl DataStorage { "sqlite" => self.construct_sqlite_reader(py, data_format), "deltalake" => self.construct_deltalake_reader(py, data_format, scope), "nats" => self.construct_nats_reader(py, scope, properties), + "rabbitmq" => self.construct_rabbitmq_reader(scope, properties), "iceberg" => self.construct_iceberg_reader(py, data_format, scope), "mqtt" => self.construct_mqtt_reader(), "kinesis" => self.construct_kinesis_reader(scope, properties), @@ -6621,6 +6786,39 @@ impl DataStorage { Ok(Box::new(writer)) } + fn construct_rabbitmq_writer(&self) -> PyResult> { + let uri = self.path()?; + let topic = self.message_queue_topic()?; + let stream_name = self.message_queue_fixed_topic()?; + let runtime = create_async_tokio_runtime()?; + let producer = runtime.block_on(async { + let environment = build_rabbitmq_environment( + uri, + self.rabbitmq_tls_root_certificates.as_deref(), + self.rabbitmq_tls_client_cert.as_deref(), + self.rabbitmq_tls_client_key.as_deref(), + self.rabbitmq_tls_trust_certificates, + ) + .await?; + let producer = environment + .producer() + .build(&stream_name) + .await + .map_err(|e| { + PyIOError::new_err(format!("Failed to create RabbitMQ producer: {e}")) + })?; + Ok::<_, PyErr>(producer) + })?; + let writer = RabbitmqWriter::new( + runtime, + producer, + topic, + self.header_fields.clone(), + self.key_field_index, + ); + Ok(Box::new(writer)) + } + fn construct_mongodb_writer(&self) -> PyResult> { let uri = self.connection_string()?; let client = MongoClient::with_uri_str(uri) @@ -6806,6 +7004,7 @@ impl DataStorage { "mongodb" => self.construct_mongodb_writer(), "null" => Ok(Box::new(NullWriter::new())), "nats" => self.construct_nats_writer(), + "rabbitmq" => self.construct_rabbitmq_writer(), "iceberg" => self.construct_iceberg_writer(py, data_format, license), "mqtt" => self.construct_mqtt_writer(), "questdb" => self.construct_questdb_writer(py, data_format, license),