From 2e1f1d286afd720b4be8486b05b26c7683c5d2b7 Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Fri, 27 Mar 2026 14:23:51 +0800 Subject: [PATCH 1/7] Add RabbitMQ connector and remove MSSQL connector Replace MSSQL connector with RabbitMQ stream connector supporting both input and output operations. Includes Rust connector implementation, Python bindings, integration tests, and an ETL example project. --- Cargo.lock | 355 ++++++++++++-- Cargo.toml | 1 + examples/projects/rabbitmq-ETL/Makefile | 25 + examples/projects/rabbitmq-ETL/README.md | 93 ++++ .../projects/rabbitmq-ETL/docker-compose.yml | 59 +++ .../rabbitmq-ETL/pathway-src/Dockerfile | 6 + .../projects/rabbitmq-ETL/pathway-src/etl.py | 270 +++++++++++ .../rabbitmq-ETL/producer-src/Dockerfile | 6 + .../producer-src/create-streams.py | 254 ++++++++++ .../projects/rabbitmq-ETL/sql/init-db.sql | 24 + integration_tests/kafka/test_rabbitmq.py | 432 ++++++++++++++++++ python/pathway/io/__init__.py | 2 + python/pathway/io/rabbitmq/__init__.py | 299 ++++++++++++ src/connectors/data_format.rs | 20 + src/connectors/data_storage.rs | 18 + src/connectors/metadata/mod.rs | 14 +- src/connectors/metadata/rabbitmq.rs | 20 + src/connectors/mod.rs | 1 + src/connectors/offset.rs | 9 +- src/connectors/rabbitmq.rs | 252 ++++++++++ src/python_api.rs | 149 +++++- 21 files changed, 2265 insertions(+), 44 deletions(-) create mode 100644 examples/projects/rabbitmq-ETL/Makefile create mode 100644 examples/projects/rabbitmq-ETL/README.md create mode 100644 examples/projects/rabbitmq-ETL/docker-compose.yml create mode 100644 examples/projects/rabbitmq-ETL/pathway-src/Dockerfile create mode 100644 examples/projects/rabbitmq-ETL/pathway-src/etl.py create mode 100644 examples/projects/rabbitmq-ETL/producer-src/Dockerfile create mode 100644 examples/projects/rabbitmq-ETL/producer-src/create-streams.py create mode 100644 examples/projects/rabbitmq-ETL/sql/init-db.sql create mode 100644 integration_tests/kafka/test_rabbitmq.py create mode 100644 python/pathway/io/rabbitmq/__init__.py create mode 100644 src/connectors/metadata/rabbitmq.rs create mode 100644 src/connectors/rabbitmq.rs diff --git a/Cargo.lock b/Cargo.lock index 5909bb2c0..1b29dc439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -460,7 +460,7 @@ dependencies = [ "regex", "ring", "rustls-native-certs 0.7.3", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.7", "serde", "serde_json", @@ -469,7 +469,7 @@ dependencies = [ "thiserror 1.0.69", "time", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "tokio-websockets", "tracing", @@ -886,7 +886,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tower 0.5.2", "tracing", ] @@ -1666,6 +1666,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] + [[package]] name = "chrono" version = "0.4.43" @@ -1824,6 +1835,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1859,6 +1879,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32c" version = "0.6.8" @@ -1979,7 +2008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "curve25519-dalek-derive", "digest", "fiat-crypto", @@ -3024,6 +3053,29 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "derive_more" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" +dependencies = [ + "convert_case 0.10.0", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.101", + "unicode-xid", +] + [[package]] name = "derive_utils" version = "0.15.0" @@ -3409,9 +3461,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ "percent-encoding", ] @@ -3613,11 +3665,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.2.0", "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.0", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.2" @@ -3997,7 +4063,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots 0.26.7", ] @@ -4314,9 +4380,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" dependencies = [ "idna_adapter", "smallvec", @@ -4552,6 +4618,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "levenshtein_automata" version = "0.2.1" @@ -5041,7 +5113,7 @@ dependencies = [ "bson", "chrono", "derive-where", - "derive_more", + "derive_more 0.99.18", "futures-core", "futures-executor", "futures-io", @@ -5367,18 +5439,19 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.7.3" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +checksum = "5d0bca838442ec211fa11de3a8b0e0e8f3a4522575b5c4c06ed722e005036f26" dependencies = [ "num_enum_derive", + "rustversion", ] [[package]] name = "num_enum_derive" -version = "0.7.3" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +checksum = "680998035259dcfcafe653688bf2aa6d3e2dc05e98be6ab46afb089dc84f1df8" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -5446,7 +5519,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "ring", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "serde", "serde_json", "serde_urlencoded", @@ -5844,6 +5917,7 @@ dependencies = [ "pyo3-log", "qdrant-client", "questdb-rs", + "rabbitmq-stream-client", "rand 0.9.2", "rayon", "rdkafka", @@ -5908,9 +5982,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "petgraph" @@ -6009,18 +6083,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", @@ -6443,7 +6517,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustls 0.22.4", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-pki-types", "ryu", "serde", @@ -6568,6 +6642,50 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "rabbitmq-stream-client" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e766fd5cccfba5caa077d0b00de81840f0553a15325f22b59e87322f235d10" +dependencies = [ + "async-trait", + "bytes", + "dashmap", + "futures", + "murmur3", + "pin-project", + "rabbitmq-stream-protocol", + "rand 0.10.0", + "rustls-pemfile 2.2.0", + "thiserror 2.0.18", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "rabbitmq-stream-protocol" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fedda81585bf2b5f8b91be49a592d2c91cb060c2c1927840617c52a67d1b7d" +dependencies = [ + "byteorder", + "chrono", + "derive_more 2.1.1", + "num_enum", + "ordered-float 4.6.0", + "uuid", +] + [[package]] name = "radium" version = "0.7.0" @@ -6609,6 +6727,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.0", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -6667,6 +6796,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + [[package]] name = "rand_distr" version = "0.4.3" @@ -6885,7 +7020,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -6983,7 +7118,7 @@ dependencies = [ "log", "native-tls", "rustls-native-certs 0.7.3", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.7", "thiserror 1.0.69", "tokio", @@ -7201,7 +7336,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", "security-framework 2.11.1", @@ -7230,11 +7365,10 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64 0.22.1", "rustls-pki-types", ] @@ -7555,7 +7689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -7566,7 +7700,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -7577,7 +7711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.16", "digest", ] @@ -8400,9 +8534,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.2" +version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ "rustls 0.23.31", "tokio", @@ -8449,7 +8583,7 @@ dependencies = [ "ring", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-util", "webpki-roots 0.26.7", ] @@ -8494,10 +8628,10 @@ dependencies = [ "pin-project", "prost", "rustls-native-certs 0.8.1", - "rustls-pemfile 2.1.3", + "rustls-pemfile 2.2.0", "socket2 0.5.10", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -8528,7 +8662,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "socket2 0.5.10", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.5.2", "tower-layer", @@ -8830,14 +8964,15 @@ dependencies = [ [[package]] name = "url" -version = "2.5.4" +version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" dependencies = [ "form_urlencoded", "idna", "percent-encoding", "serde", + "serde_derive", ] [[package]] @@ -8987,6 +9122,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasite" version = "1.0.2" @@ -9067,6 +9220,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.9.0", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -9080,6 +9255,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.9.1", + "hashbrown 0.15.2", + "indexmap 2.9.0", + "semver", +] + [[package]] name = "web-sys" version = "0.3.77" @@ -9635,6 +9822,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -9644,6 +9851,74 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.9.0", + "prettyplease", + "syn 2.0.101", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.101", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.9.1", + "indexmap 2.9.0", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.9.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 9d8f3d8b8..eea045d7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ pyo3 = { version = "0.25.0", features = ["abi3-py310", "multiple-pymethods"] } pyo3-async-runtimes = "0.25.0" pyo3-log = "0.12.4" qdrant-client = "1.15.0" +rabbitmq-stream-client = "0.11.0" questdb-rs = "4.0.5" rand = "0.9.1" rayon = "1.10.0" diff --git a/examples/projects/rabbitmq-ETL/Makefile b/examples/projects/rabbitmq-ETL/Makefile new file mode 100644 index 000000000..ae060e320 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/Makefile @@ -0,0 +1,25 @@ +build: + docker compose up -d + +stop: + docker compose down -v --remove-orphans + docker rmi etl-rabbitmq-pathway:latest + docker rmi etl-rabbitmq-stream-producer:latest + +connect: + docker compose exec -it pathway bash +connect-prod: + docker compose exec -it stream-producer bash +connect-rabbitmq: + docker compose exec -it rabbitmq bash +psql: + docker compose exec -it postgres psql -U pathway -d etl_db + +logs: + docker compose logs pathway +logs-prod: + docker compose logs stream-producer +logs-rabbitmq: + docker compose logs rabbitmq +logs-postgres: + docker compose logs postgres diff --git a/examples/projects/rabbitmq-ETL/README.md b/examples/projects/rabbitmq-ETL/README.md new file mode 100644 index 000000000..1f8a042a3 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/README.md @@ -0,0 +1,93 @@ +# ETL with RabbitMQ Streams in / PostgreSQL out + +A streaming ETL pipeline built with Pathway that reads from RabbitMQ Streams, +validates, transforms, aggregates, and joins data, then writes enriched results +to PostgreSQL in real time. + +## What Pathway replaces + +This example is inspired by three ETL architectures that traditionally require +many moving parts: + +| Traditional approach | What Pathway replaces | +|---|---| +| Python/Java workers consuming from queues | `pw.io.rabbitmq.read` | +| Staging databases for validation | UDF-based filtering | +| Airflow DAGs for orchestration | Reactive streaming engine | +| Celery task queues for processing | Single streaming process | +| Separate aggregation jobs | `groupby().reduce()` | +| Multi-step load pipelines | `pw.io.postgres.write` | + +**Result:** One Pathway process replaces 5+ services. + +## Architecture + +``` +Producer (rstream) Pathway (single process) PostgreSQL ++----------------+ +------------------------------+ +------------------+ +| employees_raw |--->| pw.io.rabbitmq.read | | | +| orders_raw |--->| -> validate (UDFs) |--->| enriched_employees| +| listings_raw |--->| -> transform | | enriched_listings | ++----------------+ | -> aggregate (groupby) | | (snapshot mode) | + | -> join (left joins) | +------------------+ + | pw.io.postgres.write | + +------------------------------+ +``` + +## Data sources + +- **Employees** (10 records): HR data with intentional quality issues (empty names, + invalid emails, negative salaries) to demonstrate validation +- **Orders** (15+ records): Purchase orders linked to employees by `employee_id` +- **Listings** (10+ records): Rental property listings linked to employees via + `agent_employee_id` (inspired by rental platform crawlers) + +## ETL pipeline + +1. **Read** three RabbitMQ streams via `pw.io.rabbitmq.read` +2. **Validate** using UDFs: email format, phone format, positive salary/price, non-empty fields +3. **Transform**: normalize emails/departments to lowercase, parse string amounts to float, + concatenate names, derive boolean flags +4. **Aggregate**: order stats per employee (`total_orders`, `total_spent`), + listing stats per agent (`total_listings`, `avg_listing_price`) +5. **Join**: left-join employees with order stats and listing stats; + left-join listings with agent names +6. **Write** to PostgreSQL snapshot tables (auto-updated as new data arrives) + +## Launching + +```bash +docker compose up -d +# or +make +``` + +## Checking results + +Connect to PostgreSQL: + +```bash +make psql +``` + +Then query the enriched tables: + +```sql +SELECT * FROM enriched_employees ORDER BY employee_id; +SELECT * FROM enriched_listings ORDER BY listing_id; +``` + +As the producer continues sending new orders and listings, re-running these queries +will show updated aggregates in real time. + +## Monitoring + +- **Pathway logs**: `make logs` +- **Producer logs**: `make logs-prod` +- **RabbitMQ Management UI**: http://localhost:15672 (guest/guest) + +## Stopping + +```bash +make stop +``` diff --git a/examples/projects/rabbitmq-ETL/docker-compose.yml b/examples/projects/rabbitmq-ETL/docker-compose.yml new file mode 100644 index 000000000..2eb5b34a5 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/docker-compose.yml @@ -0,0 +1,59 @@ +version: "3.7" +name: etl-rabbitmq +networks: + etl-rabbitmq-network: + driver: bridge +services: + rabbitmq: + image: rabbitmq:3.13-management + environment: + RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbitmq_stream advertised_host rabbitmq -rabbitmq_stream advertised_port 5552" + ports: + - 5552:5552 + - 5672:5672 + - 15672:15672 + command: bash -c "rabbitmq-plugins enable --offline rabbitmq_stream && rabbitmq-server" + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 10s + timeout: 5s + retries: 5 + networks: + - etl-rabbitmq-network + postgres: + image: postgres:16 + environment: + POSTGRES_USER: pathway + POSTGRES_PASSWORD: pathway + POSTGRES_DB: etl_db + ports: + - 5432:5432 + volumes: + - ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U pathway"] + interval: 5s + timeout: 5s + retries: 5 + networks: + - etl-rabbitmq-network + stream-producer: + build: + context: . + dockerfile: ./producer-src/Dockerfile + depends_on: + rabbitmq: + condition: service_healthy + networks: + - etl-rabbitmq-network + pathway: + build: + context: . + dockerfile: ./pathway-src/Dockerfile + depends_on: + rabbitmq: + condition: service_healthy + postgres: + condition: service_healthy + networks: + - etl-rabbitmq-network diff --git a/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile b/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile new file mode 100644 index 000000000..817502d52 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/pathway-src/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.10 + +RUN pip install -U pathway +COPY ./pathway-src/etl.py etl.py + +CMD ["python", "etl.py"] diff --git a/examples/projects/rabbitmq-ETL/pathway-src/etl.py b/examples/projects/rabbitmq-ETL/pathway-src/etl.py new file mode 100644 index 000000000..a9de0997d --- /dev/null +++ b/examples/projects/rabbitmq-ETL/pathway-src/etl.py @@ -0,0 +1,270 @@ +# Copyright © 2026 Pathway +# +# ETL pipeline that reads from RabbitMQ Streams, validates, transforms, +# aggregates, joins, and writes enriched results to PostgreSQL. +# +# This single Pathway process replaces what traditionally required: +# - Celery/RabbitMQ workers (consuming) +# - Staging databases (validation) +# - Airflow DAGs (orchestration) +# - Separate Python/Java workers (transformation) + +import re +import time + +import pathway as pw + +# To use advanced features with Pathway Scale, get your free license key from +# https://pathway.com/features and paste it below. +# To use Pathway Community, comment out the line below. +pw.set_license_key("demo-license-key-with-telemetry") + +RABBITMQ_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552" + +postgres_settings = { + "host": "postgres", + "port": "5432", + "dbname": "etl_db", + "user": "pathway", + "password": "pathway", +} + + +# ── Schemas ────────────────────────────────────────────────────────────────── + + +class EmployeeSchema(pw.Schema): + employee_id: int + first_name: str + last_name: str + email: str + phone: str + department: str + salary: str + hire_date: str + status: str + + +class OrderSchema(pw.Schema): + order_id: int + employee_id: int + amount: str + order_date: str + product: str + quantity: int + + +class ListingSchema(pw.Schema): + listing_id: int + title: str + price: str + location: str + source: str + posted_date: str + agent_employee_id: int + + +# ── Validation UDFs ────────────────────────────────────────────────────────── + + +@pw.udf +def is_valid_email(email: str) -> bool: + return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", email)) + + +@pw.udf +def is_valid_phone(phone: str) -> bool: + digits = re.sub(r"[\s\-\(\)\+]", "", phone) + return len(digits) >= 7 and digits.isdigit() + + +@pw.udf +def is_positive_number(value: str) -> bool: + try: + return float(value) > 0 + except (ValueError, TypeError): + return False + + +# ── Transformation UDFs ────────────────────────────────────────────────────── + + +@pw.udf +def normalize_lower(value: str) -> str: + return value.strip().lower() + + +@pw.udf +def parse_float(value: str) -> float: + return float(value) + + +# ── READ from RabbitMQ ─────────────────────────────────────────────────────── + +employees = pw.io.rabbitmq.read( + RABBITMQ_URI, + "employees_raw", + schema=EmployeeSchema, + format="json", + autocommit_duration_ms=500, +) + +orders = pw.io.rabbitmq.read( + RABBITMQ_URI, + "orders_raw", + schema=OrderSchema, + format="json", + autocommit_duration_ms=500, +) + +listings = pw.io.rabbitmq.read( + RABBITMQ_URI, + "listings_raw", + schema=ListingSchema, + format="json", + autocommit_duration_ms=500, +) + +# ── VALIDATE employees ─────────────────────────────────────────────────────── + +valid_employees = employees.filter( + (pw.this.first_name != "") + & is_valid_email(pw.this.email) + & is_valid_phone(pw.this.phone) + & is_positive_number(pw.this.salary) +) + +# ── TRANSFORM employees ───────────────────────────────────────────────────── + +transformed_employees = valid_employees.select( + employee_id=pw.this.employee_id, + full_name=pw.this.first_name + " " + pw.this.last_name, + email=normalize_lower(pw.this.email), + phone=pw.this.phone, + department=normalize_lower(pw.this.department), + salary=parse_float(pw.this.salary), + hire_date=pw.this.hire_date, + is_active=(pw.this.status == "active"), +) + +# ── VALIDATE & TRANSFORM orders ───────────────────────────────────────────── + +valid_orders = orders.filter( + is_positive_number(pw.this.amount) & (pw.this.quantity > 0) +) + +transformed_orders = valid_orders.select( + order_id=pw.this.order_id, + employee_id=pw.this.employee_id, + amount=parse_float(pw.this.amount), + order_date=pw.this.order_date, + product=pw.this.product, + quantity=pw.this.quantity, + line_total=parse_float(pw.this.amount) * pw.this.quantity, +) + +# ── VALIDATE & TRANSFORM listings ─────────────────────────────────────────── + +valid_listings = listings.filter( + (pw.this.title != "") & is_positive_number(pw.this.price) +) + +transformed_listings = valid_listings.select( + listing_id=pw.this.listing_id, + title=pw.this.title, + price=parse_float(pw.this.price), + location=pw.this.location, + source=pw.this.source, + posted_date=pw.this.posted_date, + agent_employee_id=pw.this.agent_employee_id, +) + +# ── AGGREGATE orders per employee ──────────────────────────────────────────── + +order_stats = transformed_orders.groupby(pw.this.employee_id).reduce( + employee_id=pw.this.employee_id, + total_orders=pw.reducers.count(), + total_spent=pw.reducers.sum(pw.this.line_total), +) + +# ── AGGREGATE listings per agent ───────────────────────────────────────────── + +listing_stats = transformed_listings.groupby(pw.this.agent_employee_id).reduce( + agent_employee_id=pw.this.agent_employee_id, + total_listings=pw.reducers.count(), + avg_listing_price=pw.reducers.avg(pw.this.price), +) + +# ── JOIN: enrich employees with order stats ────────────────────────────────── + +employees_with_orders = transformed_employees.join_left( + order_stats, pw.left.employee_id == pw.right.employee_id +).select( + employee_id=pw.left.employee_id, + full_name=pw.left.full_name, + email=pw.left.email, + phone=pw.left.phone, + department=pw.left.department, + salary=pw.left.salary, + hire_date=pw.left.hire_date, + is_active=pw.left.is_active, + total_orders=pw.coalesce(pw.right.total_orders, 0), + total_spent=pw.coalesce(pw.right.total_spent, 0.0), +) + +# ── JOIN: enrich employees with listing stats ──────────────────────────────── + +enriched_employees = employees_with_orders.join_left( + listing_stats, pw.left.employee_id == pw.right.agent_employee_id +).select( + employee_id=pw.left.employee_id, + full_name=pw.left.full_name, + email=pw.left.email, + phone=pw.left.phone, + department=pw.left.department, + salary=pw.left.salary, + hire_date=pw.left.hire_date, + is_active=pw.left.is_active, + total_orders=pw.left.total_orders, + total_spent=pw.left.total_spent, + total_listings=pw.coalesce(pw.right.total_listings, 0), + avg_listing_price=pw.coalesce(pw.right.avg_listing_price, 0.0), +) + +# ── JOIN: enrich listings with agent name ──────────────────────────────────── + +enriched_listings = transformed_listings.join_left( + transformed_employees, pw.left.agent_employee_id == pw.right.employee_id +).select( + listing_id=pw.left.listing_id, + title=pw.left.title, + price=pw.left.price, + location=pw.left.location, + source=pw.left.source, + posted_date=pw.left.posted_date, + agent_name=pw.coalesce(pw.right.full_name, "Unknown"), +) + +# ── WRITE to PostgreSQL ────────────────────────────────────────────────────── + +pw.io.postgres.write( + enriched_employees, + postgres_settings, + "enriched_employees", + output_table_type="snapshot", + primary_key=[enriched_employees.employee_id], +) + +pw.io.postgres.write( + enriched_listings, + postgres_settings, + "enriched_listings", + output_table_type="snapshot", + primary_key=[enriched_listings.listing_id], +) + +# Wait for RabbitMQ streams to be created by the producer +time.sleep(15) + +# Launch the streaming computation +pw.run() diff --git a/examples/projects/rabbitmq-ETL/producer-src/Dockerfile b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile new file mode 100644 index 000000000..c0a72c8eb --- /dev/null +++ b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.10 + +RUN pip install rstream +COPY ./producer-src/create-streams.py create-streams.py + +CMD ["python", "create-streams.py"] diff --git a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py new file mode 100644 index 000000000..a2d12ebb5 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py @@ -0,0 +1,254 @@ +# Copyright © 2026 Pathway + +import asyncio +import json +import random +import time + +RABBITMQ_HOST = "rabbitmq" +RABBITMQ_PORT = 5552 +RABBITMQ_USER = "guest" +RABBITMQ_PASSWORD = "guest" + +STREAMS = ["employees_raw", "orders_raw", "listings_raw"] + +# Sample employees - some with intentional data quality issues for validation demo +EMPLOYEES = [ + { + "employee_id": 1, + "first_name": "Alice", + "last_name": "Smith", + "email": "ALICE.SMITH@COMPANY.COM", + "phone": "+1 (555) 123-4567", + "department": "Engineering", + "salary": "95000", + "hire_date": "2022-01-15", + "status": "active", + }, + { + "employee_id": 2, + "first_name": "Bob", + "last_name": "Jones", + "email": "bob.jones@company.com", + "phone": "555-234-5678", + "department": "Sales", + "salary": "72000", + "hire_date": "2021-06-20", + "status": "active", + }, + { + # Bad record: empty first_name, invalid email, negative salary + "employee_id": 3, + "first_name": "", + "last_name": "Garcia", + "email": "invalid-email", + "phone": "555-345-6789", + "department": "Marketing", + "salary": "-5000", + "hire_date": "2023-03-01", + "status": "active", + }, + { + "employee_id": 4, + "first_name": "Diana", + "last_name": "Lee", + "email": "diana.lee@company.com", + "phone": "+44 20 7946 0958", + "department": "ENGINEERING", + "salary": "105000", + "hire_date": "2020-11-10", + "status": "active", + }, + { + "employee_id": 5, + "first_name": "Erik", + "last_name": "Martinez", + "email": "erik.martinez@company.com", + "phone": "555-456-7890", + "department": "Sales", + "salary": "68000", + "hire_date": "2023-08-05", + "status": "terminated", + }, + { + # Bad record: invalid phone + "employee_id": 6, + "first_name": "Fatima", + "last_name": "Ahmed", + "email": "fatima.ahmed@company.com", + "phone": "abc", + "department": "engineering", + "salary": "88000", + "hire_date": "2022-04-18", + "status": "active", + }, + { + "employee_id": 7, + "first_name": "George", + "last_name": "Wilson", + "email": "george.wilson@company.com", + "phone": "+1-555-567-8901", + "department": "Real Estate", + "salary": "78000", + "hire_date": "2021-09-30", + "status": "active", + }, + { + "employee_id": 8, + "first_name": "Hannah", + "last_name": "Brown", + "email": "hannah.brown@company.com", + "phone": "555-678-9012", + "department": "Real Estate", + "salary": "82000", + "hire_date": "2022-07-12", + "status": "active", + }, + { + # Bad record: empty salary + "employee_id": 9, + "first_name": "Ivan", + "last_name": "Petrov", + "email": "ivan.petrov@company.com", + "phone": "+7 495 123 4567", + "department": "Marketing", + "salary": "", + "hire_date": "2023-01-22", + "status": "active", + }, + { + "employee_id": 10, + "first_name": "Julia", + "last_name": "Chen", + "email": "julia.chen@company.com", + "phone": "+86 10 1234 5678", + "department": "Engineering", + "salary": "115000", + "hire_date": "2019-05-14", + "status": "active", + }, +] + +# Sample orders - multiple per employee +ORDERS = [ + {"order_id": 101, "employee_id": 1, "amount": "1250.50", "order_date": "2024-11-01", "product": "Laptop Stand", "quantity": 2}, + {"order_id": 102, "employee_id": 2, "amount": "340.00", "order_date": "2024-11-05", "product": "Keyboard", "quantity": 5}, + {"order_id": 103, "employee_id": 1, "amount": "89.99", "order_date": "2024-11-08", "product": "Mouse Pad", "quantity": 10}, + {"order_id": 104, "employee_id": 4, "amount": "2100.00", "order_date": "2024-11-10", "product": "Monitor", "quantity": 1}, + {"order_id": 105, "employee_id": 5, "amount": "450.00", "order_date": "2024-11-12", "product": "Headset", "quantity": 3}, + {"order_id": 106, "employee_id": 2, "amount": "175.25", "order_date": "2024-11-15", "product": "Webcam", "quantity": 2}, + {"order_id": 107, "employee_id": 7, "amount": "560.00", "order_date": "2024-11-18", "product": "Desk Lamp", "quantity": 4}, + {"order_id": 108, "employee_id": 10, "amount": "3200.00", "order_date": "2024-11-20", "product": "Standing Desk", "quantity": 1}, + {"order_id": 109, "employee_id": 4, "amount": "125.50", "order_date": "2024-11-22", "product": "Cable Kit", "quantity": 8}, + {"order_id": 110, "employee_id": 1, "amount": "780.00", "order_date": "2024-11-25", "product": "Chair Mat", "quantity": 2}, + {"order_id": 111, "employee_id": 8, "amount": "95.00", "order_date": "2024-11-28", "product": "Notebook", "quantity": 15}, + {"order_id": 112, "employee_id": 10, "amount": "420.00", "order_date": "2024-12-01", "product": "USB Hub", "quantity": 6}, + {"order_id": 113, "employee_id": 7, "amount": "1800.00", "order_date": "2024-12-03", "product": "Printer", "quantity": 1}, + {"order_id": 114, "employee_id": 2, "amount": "65.99", "order_date": "2024-12-05", "product": "Mouse", "quantity": 3}, + {"order_id": 115, "employee_id": 4, "amount": "290.00", "order_date": "2024-12-08", "product": "Speakers", "quantity": 2}, +] + +# Sample rental listings - inspired by ldtrungmark rental platform +LISTINGS = [ + {"listing_id": 1001, "title": "Modern 2BR Apartment Downtown", "price": "2500.00", "location": "District 1", "source": "website_a", "posted_date": "2024-11-01", "agent_employee_id": 7}, + {"listing_id": 1002, "title": "Cozy Studio Near Park", "price": "1200.00", "location": "District 3", "source": "website_b", "posted_date": "2024-11-03", "agent_employee_id": 8}, + {"listing_id": 1003, "title": "Spacious 3BR Family Home", "price": "3800.00", "location": "District 2", "source": "website_a", "posted_date": "2024-11-05", "agent_employee_id": 7}, + # Bad record: negative price + {"listing_id": 1004, "title": "Invalid Listing", "price": "-500.00", "location": "District 5", "source": "website_c", "posted_date": "2024-11-07", "agent_employee_id": 8}, + {"listing_id": 1005, "title": "Luxury Penthouse with View", "price": "8500.00", "location": "District 1", "source": "website_b", "posted_date": "2024-11-10", "agent_employee_id": 7}, + {"listing_id": 1006, "title": "Renovated 1BR Near Metro", "price": "1800.00", "location": "District 4", "source": "website_a", "posted_date": "2024-11-12", "agent_employee_id": 8}, + {"listing_id": 1007, "title": "Garden Villa with Pool", "price": "12000.00", "location": "District 7", "source": "website_b", "posted_date": "2024-11-15", "agent_employee_id": 7}, + {"listing_id": 1008, "title": "Budget Room in Shared House", "price": "450.00", "location": "District 9", "source": "website_a", "posted_date": "2024-11-18", "agent_employee_id": 8}, + # Bad record: empty title + {"listing_id": 1009, "title": "", "price": "1500.00", "location": "District 3", "source": "website_c", "posted_date": "2024-11-20", "agent_employee_id": 7}, + {"listing_id": 1010, "title": "Furnished Office Space", "price": "5000.00", "location": "District 1", "source": "website_a", "posted_date": "2024-11-22", "agent_employee_id": 8}, +] + +PRODUCTS = ["Laptop Stand", "Keyboard", "Mouse", "Monitor", "Headset", "Webcam", "USB Hub", "Chair Mat"] +LOCATIONS = ["District 1", "District 2", "District 3", "District 4", "District 7", "District 9"] +SOURCES = ["website_a", "website_b"] + + +async def main(): + from rstream import Producer + + print("Connecting to RabbitMQ Streams...") + producer = await Producer.create( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + username=RABBITMQ_USER, + password=RABBITMQ_PASSWORD, + ) + + # Create all streams + for stream in STREAMS: + try: + await producer.create_stream(stream) + print(f"Created stream: {stream}") + except Exception: + print(f"Stream already exists: {stream}") + + # Publish initial employees + print("Publishing employee records...") + for emp in EMPLOYEES: + await producer.send("employees_raw", json.dumps(emp).encode()) + await asyncio.sleep(0.3) + print(f"Published {len(EMPLOYEES)} employees") + + # Publish initial orders + print("Publishing order records...") + for order in ORDERS: + await producer.send("orders_raw", json.dumps(order).encode()) + await asyncio.sleep(0.2) + print(f"Published {len(ORDERS)} orders") + + # Publish initial listings + print("Publishing listing records...") + for listing in LISTINGS: + await producer.send("listings_raw", json.dumps(listing).encode()) + await asyncio.sleep(0.2) + print(f"Published {len(LISTINGS)} listings") + + # Continue producing new data periodically to demonstrate streaming + print("Starting continuous data production...") + order_id = 200 + listing_id = 2000 + valid_employee_ids = [1, 2, 4, 5, 7, 8, 10] + agent_ids = [7, 8] + + while True: + await asyncio.sleep(5) + + # New order + order_id += 1 + new_order = { + "order_id": order_id, + "employee_id": random.choice(valid_employee_ids), + "amount": f"{random.uniform(50, 2000):.2f}", + "order_date": "2024-12-10", + "product": random.choice(PRODUCTS), + "quantity": random.randint(1, 10), + } + await producer.send("orders_raw", json.dumps(new_order).encode()) + print(f"Published new order: {order_id}") + + await asyncio.sleep(3) + + # New listing + listing_id += 1 + new_listing = { + "listing_id": listing_id, + "title": f"New Listing #{listing_id}", + "price": f"{random.uniform(500, 10000):.2f}", + "location": random.choice(LOCATIONS), + "source": random.choice(SOURCES), + "posted_date": "2024-12-10", + "agent_employee_id": random.choice(agent_ids), + } + await producer.send("listings_raw", json.dumps(new_listing).encode()) + print(f"Published new listing: {listing_id}") + + +if __name__ == "__main__": + time.sleep(5) # Extra wait for RabbitMQ streams plugin to initialize + asyncio.run(main()) diff --git a/examples/projects/rabbitmq-ETL/sql/init-db.sql b/examples/projects/rabbitmq-ETL/sql/init-db.sql new file mode 100644 index 000000000..b76cd5284 --- /dev/null +++ b/examples/projects/rabbitmq-ETL/sql/init-db.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS enriched_employees ( + employee_id INTEGER PRIMARY KEY, + full_name TEXT NOT NULL, + email TEXT NOT NULL, + phone TEXT NOT NULL, + department TEXT NOT NULL, + salary DOUBLE PRECISION NOT NULL, + hire_date TEXT NOT NULL, + is_active BOOLEAN NOT NULL, + total_orders BIGINT NOT NULL, + total_spent DOUBLE PRECISION NOT NULL, + total_listings BIGINT NOT NULL, + avg_listing_price DOUBLE PRECISION NOT NULL +); + +CREATE TABLE IF NOT EXISTS enriched_listings ( + listing_id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + location TEXT NOT NULL, + source TEXT NOT NULL, + posted_date TEXT NOT NULL, + agent_name TEXT NOT NULL +); diff --git a/integration_tests/kafka/test_rabbitmq.py b/integration_tests/kafka/test_rabbitmq.py new file mode 100644 index 000000000..a0faad2cf --- /dev/null +++ b/integration_tests/kafka/test_rabbitmq.py @@ -0,0 +1,432 @@ +import asyncio +import json +import pathlib +import threading +import time +from types import TracebackType +from uuid import uuid4 + +import pytest + +import pathway as pw +from pathway.internals.parse_graph import G +from pathway.tests.utils import ( + CsvLinesNumberChecker, + wait_result_with_checker, +) + +RABBITMQ_STREAM_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552/" +RABBITMQ_HOST = "rabbitmq" +RABBITMQ_PORT = 5552 +RABBITMQ_USER = "guest" +RABBITMQ_PASSWORD = "guest" +WAIT_TIMEOUT_SECS = 30 + + +class RabbitmqStreamManager: + """Context manager that creates and cleans up a RabbitMQ stream for testing.""" + + def __init__(self, uri: str, stream_name: str): + self.uri = uri + self.stream_name = stream_name + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread(target=self._loop.run_forever, daemon=True) + self._thread.start() + + def __enter__(self): + fut = asyncio.run_coroutine_threadsafe(self._create_stream(), self._loop) + fut.result() + return self + + def __exit__( + self, + type_: type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ): + fut = asyncio.run_coroutine_threadsafe(self._cleanup(), self._loop) + fut.result() + self._loop.call_soon_threadsafe(self._loop.stop) + self._thread.join() + + async def _create_stream(self): + from rstream import Producer + + self._producer = await Producer.create( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + username=RABBITMQ_USER, + password=RABBITMQ_PASSWORD, + ) + try: + await self._producer.create_stream(self.stream_name) + except Exception: + pass # stream may already exist + + async def _cleanup(self): + try: + await self._producer.delete_stream(self.stream_name) + except Exception: + pass + await self._producer.close() + + def send(self, message: str) -> None: + fut = asyncio.run_coroutine_threadsafe( + self._send_async(message), + self._loop, + ) + fut.result() + + async def _send_async(self, message: str) -> None: + await self._producer.send( + self.stream_name, + message.encode(), + ) + + def send_with_properties( + self, message: str, properties: dict[str, str] + ) -> None: + """Send a message with AMQP 1.0 application properties.""" + fut = asyncio.run_coroutine_threadsafe( + self._send_with_properties_async(message, properties), + self._loop, + ) + fut.result() + + async def _send_with_properties_async( + self, message: str, properties: dict[str, str] + ) -> None: + from rstream import AMQPMessage + + amqp_message = AMQPMessage( + body=message.encode(), + application_properties=properties, + ) + await self._producer.send( + self.stream_name, + amqp_message, + ) + + +def run_identity_program( + input_file: pathlib.Path, + output_file: pathlib.Path, + stream_name: str, + new_entries: list[str], +) -> None: + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + table = pw.io.plaintext.read(input_file, mode="static") + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + ) + + class InputSchema(pw.Schema): + data: str + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, len(new_entries)), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_simple(tmp_path: pathlib.Path): + stream_name = f"rmq-{uuid4()}" + input_file = tmp_path / "input.txt" + output_file = tmp_path / "output.txt" + + with open(input_file, "w") as f: + f.write("one\ntwo\nthree\nfour\n") + run_identity_program( + input_file, + output_file, + stream_name, + ["one", "two", "three", "four"], + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_json(tmp_path: pathlib.Path): + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + class InputSchema(pw.Schema): + name: str + age: int + + # Send test messages + mgr.send('{"name": "Alice", "age": 30}') + mgr.send('{"name": "Bob", "age": 25}') + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_raw_format(tmp_path: pathlib.Path): + """Test reading messages in raw binary format.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("hello raw") + mgr.send("world raw") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="raw", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_plaintext_format(tmp_path: pathlib.Path): + """Test reading messages in plaintext format.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("hello plaintext") + mgr.send("world plaintext") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_metadata(tmp_path: pathlib.Path): + """Test with_metadata=True produces _metadata column with offset and stream_name.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + mgr.send("message1") + mgr.send("message2") + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + with_metadata=True, + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + # Verify metadata column contents + import pandas as pd + + result = pd.read_csv(output_file) + assert "_metadata" in result.columns, "Expected _metadata column" + for _, row in result.iterrows(): + metadata = json.loads(row["_metadata"]) + assert "offset" in metadata + assert "stream_name" in metadata + assert metadata["stream_name"] == stream_name + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_key_via_application_properties(tmp_path: pathlib.Path): + """Test key extraction from AMQP 1.0 application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + G.clear() + + # Send messages with "key" in application_properties + mgr.send_with_properties("payload1", {"key": "key_a"}) + mgr.send_with_properties("payload2", {"key": "key_b"}) + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="plaintext", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + # Verify key column contains the app property values + import pandas as pd + + result = pd.read_csv(output_file) + assert "key" in result.columns, "Expected key column" + keys = sorted(result["key"].tolist()) + assert keys == ["key_a", "key_b"] + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_static_mode(tmp_path: pathlib.Path): + """Test mode='static' reads existing messages and stops.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name) as mgr: + # Pre-populate the stream before starting the reader + for i in range(3): + mgr.send(f'{{"value": "msg{i}"}}') + + # Give time for messages to be committed + time.sleep(1) + + G.clear() + + class InputSchema(pw.Schema): + value: str + + table = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + mode="static", + ) + pw.io.csv.write(table, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 3), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_with_key(tmp_path: pathlib.Path): + """Test writing messages with key parameter stores key in application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + + table = pw.debug.table_from_markdown( + """ + name | age + Alice | 30 + Bob | 25 + """ + ) + + # Write with key=name column + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + key=table.name, + ) + + # Read back with key support + class InputSchema(pw.Schema): + name: str + age: int + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) + + +@pytest.mark.flaky(reruns=5) +def test_rabbitmq_write_with_headers(tmp_path: pathlib.Path): + """Test writing messages with headers parameter stores in application_properties.""" + stream_name = f"rmq-{uuid4()}" + output_file = tmp_path / "output.txt" + + with RabbitmqStreamManager(RABBITMQ_STREAM_URI, stream_name): + G.clear() + + table = pw.debug.table_from_markdown( + """ + name | age + Alice | 30 + Bob | 25 + """ + ) + + # Write with headers + pw.io.rabbitmq.write( + table, + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + format="json", + headers=[table.name], + ) + + # Read back to verify messages were written + class InputSchema(pw.Schema): + name: str + age: int + + table_reread = pw.io.rabbitmq.read( + uri=RABBITMQ_STREAM_URI, + stream_name=stream_name, + schema=InputSchema, + format="json", + ) + pw.io.csv.write(table_reread, output_file) + + wait_result_with_checker( + CsvLinesNumberChecker(output_file, 2), + WAIT_TIMEOUT_SECS, + ) diff --git a/python/pathway/io/__init__.py b/python/pathway/io/__init__.py index 625380b31..6f8cdaec3 100644 --- a/python/pathway/io/__init__.py +++ b/python/pathway/io/__init__.py @@ -29,6 +29,7 @@ pyfilesystem, python, questdb, + rabbitmq, redpanda, s3, slack, @@ -82,6 +83,7 @@ "register_input_synchronization_group", "mqtt", "questdb", + "rabbitmq", "dynamodb", "kinesis", "mysql", diff --git a/python/pathway/io/rabbitmq/__init__.py b/python/pathway/io/rabbitmq/__init__.py new file mode 100644 index 000000000..9535d7d29 --- /dev/null +++ b/python/pathway/io/rabbitmq/__init__.py @@ -0,0 +1,299 @@ +# Copyright © 2026 Pathway + +from __future__ import annotations + +from typing import Iterable, Literal + +from pathway.internals import api, datasink, datasource +from pathway.internals.expression import ColumnReference +from pathway.internals.runtime_type_check import check_arg_types +from pathway.internals.schema import KEY_SOURCE_COMPONENT, PAYLOAD_SOURCE_COMPONENT, Schema +from pathway.internals.table import Table +from pathway.internals.table_io import table_from_datasource +from pathway.internals.trace import trace_user_frame +from pathway.io._utils import ( + MessageQueueOutputFormat, + _get_unique_name, + check_raw_and_plaintext_only_kwargs_for_message_queues, + construct_schema_and_data_format, + internal_connector_mode, +) + + +@check_arg_types +@trace_user_frame +def read( + uri: str, + stream_name: str, + *, + schema: type[Schema] | None = None, + format: Literal["plaintext", "raw", "json"] = "raw", + mode: Literal["streaming", "static"] = "streaming", + autocommit_duration_ms: int | None = 1500, + json_field_paths: dict[str, str] | None = None, + autogenerate_key: bool = False, + with_metadata: bool = False, + start_from_timestamp_ms: int | None = None, + parallel_readers: int | None = None, + name: str | None = None, + max_backlog_size: int | None = None, + debug_data=None, + **kwargs, +) -> Table: + """Reads data from a specified RabbitMQ stream. + + It supports three formats: ``"plaintext"``, ``"raw"``, and ``"json"``. + + For the ``"raw"`` format, the payload is read as raw bytes and added directly to the + table. In the ``"plaintext"`` format, the payload is decoded from UTF-8 and stored as + plain text. In both cases, the table will have a ``"key"`` column and a ``"data"`` + column representing the payload. + + If you select the ``"json"`` format, the connector parses the message payload as JSON + and creates table columns based on the schema provided in the ``schema`` parameter. + + Args: + uri: The URI of the RabbitMQ server with Streams enabled, e.g. + ``"rabbitmq-stream://guest:guest@localhost:5552"``. + stream_name: The name of the RabbitMQ stream to read data from. The stream + must already exist on the server. + schema: The table schema, used only when the format is set to ``"json"``. + format: The input data format, which can be ``"raw"``, ``"plaintext"``, or + ``"json"``. + mode: The reading mode, which can be ``"streaming"`` or ``"static"``. In + ``"streaming"`` mode, the connector reads messages continuously. In + ``"static"`` mode, it reads all existing messages and then stops. + autocommit_duration_ms: The time interval (in milliseconds) between commits. + After this time, the updates received by the connector are committed and + added to Pathway's computation graph. + json_field_paths: For the ``"json"`` format, this allows mapping field names to + paths within the JSON structure. Use the format ``: `` + where the path follows the + `JSON Pointer (RFC 6901) `_. + autogenerate_key: If True, always auto-generate the primary key instead of + using the message key from ``application_properties``. + with_metadata: If True, adds a ``_metadata`` column containing a dict with + ``offset``, ``stream_name``, and ``timestamp_millis`` fields. Note: + ``timestamp_millis`` is currently always null due to protocol library + limitations. + start_from_timestamp_ms: If specified, start reading from messages at or after + this timestamp (in milliseconds since epoch). + parallel_readers: The number of reader instances running in parallel. + name: A unique name for the connector. If provided, this name will be used in + logs and monitoring dashboards. Additionally, if persistence is enabled, it + will be used as the name for the snapshot that stores the connector's progress. + max_backlog_size: Limit on the number of entries read from the input source and + kept in processing at any moment. + debug_data: Static data replacing original one when debug mode is active. + + Returns: + Table: The table read. + + Example: + + If your RabbitMQ server with Streams enabled is running on ``localhost`` using the + default Streams port ``5552``, you can stream the ``"events"`` stream to a Pathway + table like this: + + >>> import pathway as pw + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... ) + + You can also parse messages as UTF-8 during reading: + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="plaintext", + ... ) + + Read and parse a JSON table with schema: + + >>> class InputSchema(pw.Schema): + ... user_id: int = pw.column_definition(primary_key=True) + ... username: str + ... phone: str + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="json", + ... schema=InputSchema, + ... ) + + Read with metadata column: + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... with_metadata=True, + ... ) + + Read in static mode (bounded snapshot): + + >>> table = pw.io.rabbitmq.read( + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... mode="static", + ... ) + """ + data_storage = api.DataStorage( + storage_type="rabbitmq", + path=uri, + topic=stream_name, + parallel_readers=parallel_readers, + mode=internal_connector_mode(mode), + start_from_timestamp_ms=start_from_timestamp_ms, + ) + schema, data_format = construct_schema_and_data_format( + "binary" if format == "raw" else format, + schema=schema, + json_field_paths=json_field_paths, + with_native_record_key=True, + autogenerate_key=autogenerate_key, + with_metadata=with_metadata, + _stacklevel=5, + ) + data_source_options = datasource.DataSourceOptions( + commit_duration_ms=autocommit_duration_ms, + unique_name=_get_unique_name(name, kwargs), + max_backlog_size=max_backlog_size, + ) + return table_from_datasource( + datasource.GenericDataSource( + datastorage=data_storage, + dataformat=data_format, + data_source_options=data_source_options, + schema=schema, + datasource_name="rabbitmq", + ), + debug_datasource=datasource.debug_datasource(debug_data), + supported_components=( + KEY_SOURCE_COMPONENT, + PAYLOAD_SOURCE_COMPONENT, + ), + ) + + +@check_raw_and_plaintext_only_kwargs_for_message_queues +@check_arg_types +@trace_user_frame +def write( + table: Table, + uri: str, + stream_name: str | ColumnReference, + *, + format: Literal["json", "dsv", "plaintext", "raw"] = "json", + delimiter: str = ",", + key: ColumnReference | None = None, + value: ColumnReference | None = None, + headers: Iterable[ColumnReference] | None = None, + name: str | None = None, + sort_by: Iterable[ColumnReference] | None = None, +) -> None: + """Writes data into the specified RabbitMQ stream. + + The produced messages consist of the payload, corresponding to the values of the + table that are serialized according to the chosen format. Two AMQP 1.0 application + properties are always added: ``pathway_time`` (processing time) and ``pathway_diff`` + (either ``1`` or ``-1``). If ``headers`` parameter is used, additional properties + can be added to the message. + + There are several serialization formats supported: ``"json"``, ``"dsv"``, + ``"plaintext"`` and ``"raw"``. + + If the selected format is either ``"plaintext"`` or ``"raw"``, you also need to + specify which column of the table corresponds to the payload of the produced message. + It can be done by providing the ``value`` parameter. + + Args: + table: The table for output. + uri: The URI of the RabbitMQ server with Streams enabled, e.g. + ``"rabbitmq-stream://guest:guest@localhost:5552"``. + stream_name: The RabbitMQ stream where data will be written. The stream must + already exist on the server. + format: Format in which the data is put into RabbitMQ. Currently ``"json"``, + ``"plaintext"``, ``"raw"`` and ``"dsv"`` are supported. + delimiter: Field delimiter to be used in case of delimiter-separated values + format. + key: Reference to a column that should be used as the message key. The key is + stored as an AMQP 1.0 application property named ``"key"``. On the read + side, this property is extracted when ``autogenerate_key`` is False. + value: Reference to the column that should be used as a payload in the produced + message in ``"plaintext"`` or ``"raw"`` format. + headers: References to the table fields that must be provided as message + application properties. These properties are named in the same way as fields + that are forwarded and correspond to the string representations of the + respective values encoded in UTF-8. + name: A unique name for the connector. If provided, this name will be used in + logs and monitoring dashboards. + sort_by: If specified, the output will be sorted in ascending order based on the + values of the given columns within each minibatch. + + Example: + + Assume you have the RabbitMQ server with Streams enabled running locally on the + default Streams port, ``5552``. + + First, you'll need to create a Pathway table: + + >>> import pathway as pw + ... + >>> table = pw.debug.table_from_markdown(''' + ... age | owner | pet + ... 10 | Alice | dog + ... 9 | Bob | cat + ... 8 | Alice | cat + ... ''') + + To output the table's contents in JSON format: + + >>> pw.io.rabbitmq.write( + ... table, + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="json", + ... ) + + You can also use a single column from the table as the payload with headers: + + >>> pw.io.rabbitmq.write( + ... table, + ... "rabbitmq-stream://guest:guest@localhost:5552", + ... "events", + ... format="plaintext", + ... value=table.owner, + ... headers=[table.age, table.pet], + ... ) + """ + output_format = MessageQueueOutputFormat.construct( + table, + format=format, + delimiter=delimiter, + key=key, + value=value, + headers=headers, + topic_name=stream_name if isinstance(stream_name, ColumnReference) else None, + ) + table = output_format.table + + data_storage = api.DataStorage( + storage_type="rabbitmq", + path=uri, + topic=stream_name if isinstance(stream_name, str) else None, + topic_name_index=output_format.topic_name_index, + key_field_index=output_format.key_field_index, + header_fields=list(output_format.header_fields.items()), + ) + + table.to( + datasink.GenericDataSink( + data_storage, + output_format.data_format, + datasink_name="rabbitmq", + unique_name=name, + sort_by=sort_by, + ) + ) diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 60dff9efe..59763478e 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -423,6 +423,26 @@ impl FormatterContext { } nats_headers } + + pub fn construct_rabbitmq_properties( + &self, + header_fields: &[(String, usize)], + ) -> Vec<(String, String)> { + let raw_headers = self.construct_message_headers(header_fields, true); + raw_headers + .into_iter() + .map(|h| { + let value = h + .value + .map(|v| { + String::from_utf8(v) + .expect("all prepared headers must be UTF-8 serializable") + }) + .unwrap_or_else(|| Value::None.to_string()); + (h.key, value) + }) + .collect() + } } #[derive(Debug, thiserror::Error)] diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index ddd564f97..fa5724dd1 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -101,6 +101,8 @@ pub use super::elasticsearch::ElasticSearchWriter; pub use super::mongodb::{MongoReader, MongoWriter}; pub use super::nats::NatsReader; pub use super::nats::NatsWriter; +pub use super::rabbitmq::RabbitmqReader; +pub use super::rabbitmq::RabbitmqWriter; pub use super::postgres::{ PsqlReader, PsqlWriter, ReplicationError as PostgresReplicationError, SslError, }; @@ -377,6 +379,9 @@ pub enum ReadError { #[error("failed to acknowledge read nats message: {0}")] NatsMessageAck(async_nats::Error), + + #[error("RabbitMQ consumer error: {0}")] + Rabbitmq(String), } // Allow `?` on `mongodb::error::Error` in functions returning `Result<_, ReadError>`. @@ -435,6 +440,7 @@ pub enum StorageType { Kinesis, Postgres, MongoDb, + Rabbitmq, } impl StorageType { @@ -459,6 +465,7 @@ impl StorageType { StorageType::Kinesis => KinesisReader::merge_two_frontiers(lhs, rhs), StorageType::Postgres => PsqlReader::merge_two_frontiers(lhs, rhs), StorageType::MongoDb => MongoReader::merge_two_frontiers(lhs, rhs), + StorageType::Rabbitmq => RabbitmqReader::merge_two_frontiers(lhs, rhs), } } } @@ -578,6 +585,14 @@ pub trait Reader { { result.advance_offset(offset_key.clone(), other_value.clone()); } + ( + OffsetValue::RabbitmqOffset(offset_position), + OffsetValue::RabbitmqOffset(other_position), + ) => { + if other_position > offset_position { + result.advance_offset(offset_key.clone(), other_value.clone()); + } + } (_, _) => { error!("Incomparable offsets in the frontier: {offset_value:?} and {other_value:?}"); } @@ -672,6 +687,9 @@ pub enum WriteError { #[error(transparent)] NatsFlush(#[from] NatsFlushError), + #[error("RabbitMQ publish error: {0}")] + RabbitmqPublish(String), + #[error(transparent)] JetStream(#[from] NatsError), diff --git a/src/connectors/metadata/mod.rs b/src/connectors/metadata/mod.rs index 5d1d53827..fec875552 100644 --- a/src/connectors/metadata/mod.rs +++ b/src/connectors/metadata/mod.rs @@ -4,6 +4,7 @@ pub mod kafka; pub mod mongodb; pub mod parquet; pub mod postgres; +pub mod rabbitmq; pub mod sqlite; #[allow(clippy::module_name_repetitions)] @@ -24,6 +25,9 @@ pub use parquet::ParquetMetadata; #[allow(clippy::module_name_repetitions)] pub use postgres::PostgresMetadata; +#[allow(clippy::module_name_repetitions)] +pub use rabbitmq::RabbitmqMetadata; + #[allow(clippy::module_name_repetitions)] pub use sqlite::SQLiteMetadata; @@ -37,6 +41,7 @@ pub enum SourceMetadata { Iceberg(IcebergMetadata), Parquet(ParquetMetadata), Postgres(PostgresMetadata), + Rabbitmq(RabbitmqMetadata), } impl From for SourceMetadata { @@ -75,6 +80,12 @@ impl From for SourceMetadata { } } +impl From for SourceMetadata { + fn from(impl_: RabbitmqMetadata) -> Self { + Self::Rabbitmq(impl_) + } +} + impl From for SourceMetadata { fn from(impl_: SQLiteMetadata) -> Self { Self::SQLite(impl_) @@ -91,6 +102,7 @@ impl SourceMetadata { Self::Iceberg(meta) => serde_json::to_value(meta), Self::Parquet(meta) => serde_json::to_value(meta), Self::Postgres(meta) => serde_json::to_value(meta), + Self::Rabbitmq(meta) => serde_json::to_value(meta), } .expect("Internal JSON serialization error") } @@ -103,7 +115,7 @@ impl SourceMetadata { | Self::Iceberg(_) | Self::Parquet(_) | Self::Postgres(_) => false, - Self::Kafka(_) => true, + Self::Kafka(_) | Self::Rabbitmq(_) => true, } } } diff --git a/src/connectors/metadata/rabbitmq.rs b/src/connectors/metadata/rabbitmq.rs new file mode 100644 index 000000000..c808cd7b6 --- /dev/null +++ b/src/connectors/metadata/rabbitmq.rs @@ -0,0 +1,20 @@ +// Copyright © 2026 Pathway + +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct RabbitmqMetadata { + pub offset: u64, + pub stream_name: String, + pub timestamp_millis: Option, +} + +impl RabbitmqMetadata { + pub fn new(offset: u64, stream_name: String, timestamp_millis: Option) -> Self { + Self { + offset, + stream_name, + timestamp_millis, + } + } +} diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index ba2ccbcb4..d41beb245 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -30,6 +30,7 @@ pub mod mongodb; pub mod monitoring; pub mod nats; pub mod offset; +pub mod rabbitmq; pub mod posix_like; pub mod postgres; pub mod scanner; diff --git a/src/connectors/offset.rs b/src/connectors/offset.rs index a435bae4c..f6af9c73b 100644 --- a/src/connectors/offset.rs +++ b/src/connectors/offset.rs @@ -17,6 +17,7 @@ use crate::persistence::cached_object_storage::CachedObjectVersion; pub enum OffsetKey { Kafka(ArcStr, i32), Nats(usize), + Rabbitmq(usize), Empty, Kinesis(ArcStr), MongoDb, @@ -29,7 +30,9 @@ impl HashInto for OffsetKey { hasher.update(topic_name.as_bytes()); partition.hash_into(hasher); } - OffsetKey::Nats(worker_index) => worker_index.hash_into(hasher), + OffsetKey::Nats(worker_index) | OffsetKey::Rabbitmq(worker_index) => { + worker_index.hash_into(hasher); + } OffsetKey::Kinesis(shard) => hasher.update(shard.as_bytes()), OffsetKey::Empty | OffsetKey::MongoDb => {} } @@ -68,6 +71,7 @@ pub enum OffsetValue { snapshot_id: IcebergSnapshotId, }, NatsReadEntriesCount(usize), + RabbitmqOffset(u64), MqttReadEntriesCount(usize), PostgresReadEntriesCount(usize), KinesisOffset(String), @@ -149,6 +153,9 @@ impl HashInto for OffsetValue { | OffsetValue::PostgresReadEntriesCount(count) => { count.hash_into(hasher); } + OffsetValue::RabbitmqOffset(offset) => { + offset.hash_into(hasher); + } OffsetValue::IcebergSnapshot { snapshot_id } => { snapshot_id.hash_into(hasher); } diff --git a/src/connectors/rabbitmq.rs b/src/connectors/rabbitmq.rs new file mode 100644 index 000000000..f430d901f --- /dev/null +++ b/src/connectors/rabbitmq.rs @@ -0,0 +1,252 @@ +// Copyright © 2026 Pathway + +use log::error; +use std::borrow::Cow; + +use futures::StreamExt; +use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue}; +use rabbitmq_stream_client::{Consumer as RmqConsumer, Environment, Producer as RmqProducer}; +use tokio::runtime::Runtime as TokioRuntime; + +use crate::connectors::data_format::FormatterContext; +use crate::connectors::data_storage::MessageQueueTopic; +use crate::connectors::metadata::RabbitmqMetadata; +use crate::connectors::{ + OffsetKey, OffsetValue, ReadError, ReadResult, Reader, ReaderContext, StorageType, WriteError, + Writer, +}; +use crate::engine::Value; +use crate::persistence::frontier::OffsetAntichain; + +/// AMQP 1.0 application property name used to carry the message key. +/// Writer stores the key here; reader extracts it from this property. +const RABBITMQ_KEY_PROPERTY: &str = "key"; + +// --- Writer --- + +#[allow(clippy::module_name_repetitions)] +pub struct RabbitmqWriter { + runtime: TokioRuntime, + producer: RmqProducer, + topic: MessageQueueTopic, + header_fields: Vec<(String, usize)>, + key_field_index: Option, +} + +impl Writer for RabbitmqWriter { + fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> { + self.runtime.block_on(async { + let properties = data.construct_rabbitmq_properties(&self.header_fields); + + // Extract key from data values if key_field_index is set + let key_value: Option = self.key_field_index.map(|idx| { + match &data.values[idx] { + Value::Bytes(b) => String::from_utf8_lossy(b).into_owned(), + Value::String(s) => s.to_string(), + other => other.to_string(), + } + }); + + let has_app_props = !properties.is_empty() || key_value.is_some(); + + for payload in data.payloads.into_iter() { + let payload_bytes = payload.into_raw_bytes()?; + let message = if has_app_props { + let mut app_props = Message::builder().application_properties(); + if let Some(ref key) = key_value { + app_props = app_props.insert(RABBITMQ_KEY_PROPERTY, key.as_str()); + } + for (name, value) in &properties { + app_props = app_props.insert(name.as_str(), value.as_str()); + } + app_props.message_builder().body(payload_bytes).build() + } else { + Message::builder().body(payload_bytes).build() + }; + self.producer + .send_with_confirm(message) + .await + .map_err(|e| WriteError::RabbitmqPublish(e.to_string()))?; + } + Ok(()) + }) + } + + fn flush(&mut self, _forced: bool) -> Result<(), WriteError> { + // send_with_confirm already waits for broker acknowledgment + Ok(()) + } + + fn retriable(&self) -> bool { + true + } + + fn single_threaded(&self) -> bool { + false + } + + fn name(&self) -> String { + format!("RabbitMQ({})", self.topic) + } +} + +impl Drop for RabbitmqWriter { + fn drop(&mut self) { + self.flush(true).expect("failed to send the final messages"); + } +} + +impl RabbitmqWriter { + pub fn new( + runtime: TokioRuntime, + producer: RmqProducer, + topic: MessageQueueTopic, + header_fields: Vec<(String, usize)>, + key_field_index: Option, + ) -> Self { + RabbitmqWriter { + runtime, + producer, + topic, + header_fields, + key_field_index, + } + } +} + +// --- Reader --- + +#[allow(clippy::module_name_repetitions)] +pub struct RabbitmqReader { + runtime: TokioRuntime, + consumer: RmqConsumer, + environment: Environment, + worker_index: usize, + stream_name: String, + end_offset: Option, + deferred_read_result: Option, +} + +impl Reader for RabbitmqReader { + fn read(&mut self) -> Result { + // Return deferred result from previous call (metadata pattern) + if let Some(deferred) = self.deferred_read_result.take() { + return Ok(deferred); + } + + let delivery = self + .runtime + .block_on(async { self.consumer.next().await }); + match delivery { + Some(Ok(delivery)) => { + let stream_offset = delivery.offset(); + + // Static mode: check if we've reached the end + if let Some(end) = self.end_offset { + if stream_offset > end { + return Ok(ReadResult::Finished); + } + } + + let message = delivery.message(); + + // Extract key from application_properties if present + let key: Option> = message + .application_properties() + .and_then(|props| props.get(RABBITMQ_KEY_PROPERTY)) + .map(|v| match v { + SimpleValue::String(s) => s.as_bytes().to_vec(), + SimpleValue::Binary(b) => b.clone(), + other => format!("{other:?}").into_bytes(), + }); + let body = message.data().map(|d| d.to_vec()); + + // Use key-value context to support native record keys + let payload = ReaderContext::from_key_value(key, body); + + let offset = ( + OffsetKey::Rabbitmq(self.worker_index), + OffsetValue::RabbitmqOffset(stream_offset), + ); + + // Create metadata and use deferred pattern (like Kafka). + // Timestamp extraction is not possible due to private field in + // rabbitmq_stream_protocol::Timestamp. + let metadata = RabbitmqMetadata::new( + stream_offset, + self.stream_name.clone(), + None, + ); + self.deferred_read_result = Some(ReadResult::Data(payload, offset)); + Ok(ReadResult::NewSource(metadata.into())) + } + Some(Err(e)) => Err(ReadError::Rabbitmq(e.to_string())), + None => Ok(ReadResult::Finished), + } + } + + fn seek(&mut self, frontier: &OffsetAntichain) -> Result<(), ReadError> { + let offset_value = frontier.get_offset(&OffsetKey::Rabbitmq(self.worker_index)); + if let Some(offset) = offset_value { + if let OffsetValue::RabbitmqOffset(last_offset) = offset { + // Rebuild consumer starting from the next offset after the persisted one + let next_offset = last_offset.checked_add(1).ok_or_else(|| { + ReadError::Rabbitmq( + "Offset overflow: cannot seek past u64::MAX".to_string(), + ) + })?; + let new_consumer = self + .runtime + .block_on(async { + self.environment + .consumer() + .offset(OffsetSpecification::Offset(next_offset)) + .build(&self.stream_name) + .await + }) + .map_err(|e| { + ReadError::Rabbitmq(format!( + "Failed to rebuild consumer for seek: {e}" + )) + })?; + self.consumer = new_consumer; + } else { + error!("Unexpected offset type for RabbitMQ reader: {offset:?}"); + } + } + Ok(()) + } + + fn storage_type(&self) -> StorageType { + StorageType::Rabbitmq + } + + fn max_allowed_consecutive_errors(&self) -> usize { + 32 + } + + fn short_description(&self) -> Cow<'static, str> { + format!("RabbitMQ({})", self.stream_name).into() + } +} + +impl RabbitmqReader { + pub fn new( + runtime: TokioRuntime, + consumer: RmqConsumer, + environment: Environment, + worker_index: usize, + stream_name: String, + end_offset: Option, + ) -> RabbitmqReader { + RabbitmqReader { + runtime, + consumer, + environment, + worker_index, + stream_name, + end_offset, + deferred_read_result: None, + } + } +} diff --git a/src/python_api.rs b/src/python_api.rs index 4c71d8068..1abeb4413 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -114,8 +114,8 @@ use crate::connectors::data_storage::{ KafkaWriter, LakeWriter, MessageQueueTopic, MongoReader, MongoWriter, MqttReader, MqttWriter, MysqlWriter, NatsReader, NatsWriter, NullWriter, ObjectDownloader, PsqlReader, PsqlWriter, PythonConnectorEventType, PythonReaderBuilder, QuestDBAtColumnPolicy, QuestDBWriter, - RdkafkaWatermark, ReadError, ReadMethod, ReaderBuilder, SqliteReader, TableWriterInitMode, - WriteError, Writer, MQTT_CLIENT_MAX_CHANNEL_SIZE, + RabbitmqReader, RabbitmqWriter, RdkafkaWatermark, ReadError, ReadMethod, ReaderBuilder, + SqliteReader, TableWriterInitMode, WriteError, Writer, MQTT_CLIENT_MAX_CHANNEL_SIZE, }; use crate::connectors::data_tokenize::{BufReaderTokenizer, CsvTokenizer, Tokenize}; use crate::connectors::nats; @@ -160,6 +160,56 @@ mod external_index_wrappers; mod logging; pub mod threads; +/// Parse a RabbitMQ Streams URI and build an Environment. +/// URI format: `rabbitmq-stream://user:pass@host:port/vhost` +async fn build_rabbitmq_environment( + uri: &str, +) -> PyResult { + // Parse URI: rabbitmq-stream://user:pass@host:port/vhost + let uri_str = uri + .strip_prefix("rabbitmq-stream://") + .unwrap_or(uri); + let mut builder = rabbitmq_stream_client::Environment::builder(); + + // Split userinfo from host + if let Some((userinfo, rest)) = uri_str.split_once('@') { + if let Some((user, pass)) = userinfo.split_once(':') { + builder = builder.username(user).password(pass); + } + // Parse host:port/vhost + let (hostport, vhost) = rest.split_once('/').unwrap_or((rest, "")); + if let Some((host, port_str)) = hostport.split_once(':') { + builder = builder.host(host); + if let Ok(port) = port_str.parse::() { + builder = builder.port(port); + } + } else { + builder = builder.host(hostport); + } + if !vhost.is_empty() { + builder = builder.virtual_host(vhost); + } + } else { + // No userinfo, just host:port + let (hostport, vhost) = uri_str.split_once('/').unwrap_or((uri_str, "")); + if let Some((host, port_str)) = hostport.split_once(':') { + builder = builder.host(host); + if let Ok(port) = port_str.parse::() { + builder = builder.port(port); + } + } else { + builder = builder.host(uri_str); + } + if !vhost.is_empty() { + builder = builder.virtual_host(vhost); + } + } + + builder.build().await.map_err(|e| { + PyIOError::new_err(format!("Failed to connect to RabbitMQ: {e}")) + }) +} + static CONVERT: GILOnceCell> = GILOnceCell::new(); fn get_convert_python_module(py: Python<'_>) -> &Bound<'_, PyModule> { @@ -6131,6 +6181,73 @@ impl DataStorage { Ok((Box::new(reader), properties.max_parallel_readers(scope))) } + fn construct_rabbitmq_reader( + &self, + scope: &Scope, + properties: &ConnectorProperties, + ) -> PyResult<(Box, usize)> { + let uri = self.path()?; + let stream_name = self.message_queue_fixed_topic()?; + let runtime = create_async_tokio_runtime()?; + + let offset_spec = match self.start_from_timestamp_ms { + Some(ts) => rabbitmq_stream_client::types::OffsetSpecification::Timestamp(ts), + None => rabbitmq_stream_client::types::OffsetSpecification::First, + }; + + let (environment, consumer, end_offset) = runtime.block_on(async { + let environment = build_rabbitmq_environment(uri).await?; + let consumer = environment + .consumer() + .offset(offset_spec) + .build(&stream_name) + .await + .map_err(|e| { + PyIOError::new_err(format!("Failed to create RabbitMQ consumer: {e}")) + })?; + + // For static mode, try to determine the last offset to know when to stop. + // We create a temporary consumer at the Last offset to discover the boundary. + let end_offset = if self.mode == ConnectorMode::Static { + let mut probe = environment + .consumer() + .offset(rabbitmq_stream_client::types::OffsetSpecification::Last) + .build(&stream_name) + .await + .ok(); + if let Some(ref mut p) = probe { + use futures::StreamExt; + // Read one message to get the last offset + const STATIC_MODE_PROBE_TIMEOUT_SECS: u64 = 5; + match tokio::time::timeout( + std::time::Duration::from_secs(STATIC_MODE_PROBE_TIMEOUT_SECS), + p.next(), + ) + .await + { + Ok(Some(Ok(delivery))) => Some(delivery.offset()), + _ => None, // empty stream or timeout + } + } else { + None + } + } else { + None + }; + Ok::<_, PyErr>((environment, consumer, end_offset)) + })?; + + let reader = RabbitmqReader::new( + runtime, + consumer, + environment, + scope.worker_index(), + stream_name.clone(), + end_offset, + ); + Ok((Box::new(reader), properties.max_parallel_readers(scope))) + } + fn construct_iceberg_reader( &self, py: pyo3::Python, @@ -6331,6 +6448,7 @@ impl DataStorage { "sqlite" => self.construct_sqlite_reader(py, data_format), "deltalake" => self.construct_deltalake_reader(py, data_format, scope), "nats" => self.construct_nats_reader(py, scope, properties), + "rabbitmq" => self.construct_rabbitmq_reader(scope, properties), "iceberg" => self.construct_iceberg_reader(py, data_format, scope), "mqtt" => self.construct_mqtt_reader(), "kinesis" => self.construct_kinesis_reader(scope, properties), @@ -6621,6 +6739,32 @@ impl DataStorage { Ok(Box::new(writer)) } + fn construct_rabbitmq_writer(&self) -> PyResult> { + let uri = self.path()?; + let topic = self.message_queue_topic()?; + let stream_name = self.message_queue_fixed_topic()?; + let runtime = create_async_tokio_runtime()?; + let producer = runtime.block_on(async { + let environment = build_rabbitmq_environment(uri).await?; + let producer = environment + .producer() + .build(&stream_name) + .await + .map_err(|e| { + PyIOError::new_err(format!("Failed to create RabbitMQ producer: {e}")) + })?; + Ok::<_, PyErr>(producer) + })?; + let writer = RabbitmqWriter::new( + runtime, + producer, + topic, + self.header_fields.clone(), + self.key_field_index, + ); + Ok(Box::new(writer)) + } + fn construct_mongodb_writer(&self) -> PyResult> { let uri = self.connection_string()?; let client = MongoClient::with_uri_str(uri) @@ -6806,6 +6950,7 @@ impl DataStorage { "mongodb" => self.construct_mongodb_writer(), "null" => Ok(Box::new(NullWriter::new())), "nats" => self.construct_nats_writer(), + "rabbitmq" => self.construct_rabbitmq_writer(), "iceberg" => self.construct_iceberg_writer(py, data_format, license), "mqtt" => self.construct_mqtt_writer(), "questdb" => self.construct_questdb_writer(py, data_format, license), From 7563868921a65a70ab86b38984bb9aba275f8dde Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 14:02:56 +0800 Subject: [PATCH 2/7] Fix RabbitMQ connector: TLS support, rstream API update, AMQP message framing - Add TLS parameters to DataStorage for RabbitMQ connections - Update rstream Producer API (Producer() + start() instead of Producer.create()) - Send AMQP 1.0 framed messages from producer for compatibility with Rust consumer - Add PYTHONUNBUFFERED to producer Dockerfile - Remap postgres port to avoid conflicts --- CHANGELOG.md | 1 + .../projects/rabbitmq-ETL/docker-compose.yml | 2 +- .../projects/rabbitmq-ETL/pathway-src/etl.py | 8 +- .../rabbitmq-ETL/producer-src/Dockerfile | 1 + .../producer-src/create-streams.py | 15 +-- integration_tests/db_connectors/conftest.py | 1 + integration_tests/db_connectors/utils.py | 2 + python/pathway/engine.pyi | 4 + python/pathway/io/rabbitmq/__init__.py | 106 +++++++++++++++++- src/connectors/rabbitmq.rs | 62 +++++++++- src/python_api.rs | 59 +++++++++- 11 files changed, 243 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dfed517d..8047299d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - `pw.io.milvus.write` connector, which writes a Pathway table to a Milvus collection. Row additions are sent as upserts and row deletions are sent as deletes keyed on the configured primary key column. Requires a Pathway Scale license. +- `pw.io.rabbitmq` connector for reading from and writing to RabbitMQ Streams. Supports JSON, plaintext, and raw formats with TLS/mutual TLS configuration. Includes `pw.io.rabbitmq.simple_read` for quick-start usage. ## [0.30.0] - 2026-03-24 diff --git a/examples/projects/rabbitmq-ETL/docker-compose.yml b/examples/projects/rabbitmq-ETL/docker-compose.yml index 2eb5b34a5..c67dd67df 100644 --- a/examples/projects/rabbitmq-ETL/docker-compose.yml +++ b/examples/projects/rabbitmq-ETL/docker-compose.yml @@ -27,7 +27,7 @@ services: POSTGRES_PASSWORD: pathway POSTGRES_DB: etl_db ports: - - 5432:5432 + - 5433:5432 volumes: - ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql healthcheck: diff --git a/examples/projects/rabbitmq-ETL/pathway-src/etl.py b/examples/projects/rabbitmq-ETL/pathway-src/etl.py index a9de0997d..011bc922e 100644 --- a/examples/projects/rabbitmq-ETL/pathway-src/etl.py +++ b/examples/projects/rabbitmq-ETL/pathway-src/etl.py @@ -17,13 +17,13 @@ # To use advanced features with Pathway Scale, get your free license key from # https://pathway.com/features and paste it below. # To use Pathway Community, comment out the line below. -pw.set_license_key("demo-license-key-with-telemetry") +# pw.set_license_key("demo-license-key-with-telemetry") -RABBITMQ_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552" +RABBITMQ_URI = "rabbitmq-stream://guest:guest@0.0.0.0:5552" postgres_settings = { - "host": "postgres", - "port": "5432", + "host": "0.0.0.0", + "port": "5433", "dbname": "etl_db", "user": "pathway", "password": "pathway", diff --git a/examples/projects/rabbitmq-ETL/producer-src/Dockerfile b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile index c0a72c8eb..cc7bffc4a 100644 --- a/examples/projects/rabbitmq-ETL/producer-src/Dockerfile +++ b/examples/projects/rabbitmq-ETL/producer-src/Dockerfile @@ -1,5 +1,6 @@ FROM python:3.10 +ENV PYTHONUNBUFFERED=1 RUN pip install rstream COPY ./producer-src/create-streams.py create-streams.py diff --git a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py index a2d12ebb5..182762983 100644 --- a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py +++ b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py @@ -170,15 +170,16 @@ async def main(): - from rstream import Producer + from rstream import AMQPMessage, Producer print("Connecting to RabbitMQ Streams...") - producer = await Producer.create( + producer = Producer( host=RABBITMQ_HOST, port=RABBITMQ_PORT, username=RABBITMQ_USER, password=RABBITMQ_PASSWORD, ) + await producer.start() # Create all streams for stream in STREAMS: @@ -191,21 +192,21 @@ async def main(): # Publish initial employees print("Publishing employee records...") for emp in EMPLOYEES: - await producer.send("employees_raw", json.dumps(emp).encode()) + await producer.send("employees_raw", AMQPMessage(body=json.dumps(emp).encode())) await asyncio.sleep(0.3) print(f"Published {len(EMPLOYEES)} employees") # Publish initial orders print("Publishing order records...") for order in ORDERS: - await producer.send("orders_raw", json.dumps(order).encode()) + await producer.send("orders_raw", AMQPMessage(body=json.dumps(order).encode())) await asyncio.sleep(0.2) print(f"Published {len(ORDERS)} orders") # Publish initial listings print("Publishing listing records...") for listing in LISTINGS: - await producer.send("listings_raw", json.dumps(listing).encode()) + await producer.send("listings_raw", AMQPMessage(body=json.dumps(listing).encode())) await asyncio.sleep(0.2) print(f"Published {len(LISTINGS)} listings") @@ -229,7 +230,7 @@ async def main(): "product": random.choice(PRODUCTS), "quantity": random.randint(1, 10), } - await producer.send("orders_raw", json.dumps(new_order).encode()) + await producer.send("orders_raw", AMQPMessage(body=json.dumps(new_order).encode())) print(f"Published new order: {order_id}") await asyncio.sleep(3) @@ -245,7 +246,7 @@ async def main(): "posted_date": "2024-12-10", "agent_employee_id": random.choice(agent_ids), } - await producer.send("listings_raw", json.dumps(new_listing).encode()) + await producer.send("listings_raw", AMQPMessage(body=json.dumps(new_listing).encode())) print(f"Published new listing: {listing_id}") diff --git a/integration_tests/db_connectors/conftest.py b/integration_tests/db_connectors/conftest.py index 591bd6a7f..c1b44b7d9 100644 --- a/integration_tests/db_connectors/conftest.py +++ b/integration_tests/db_connectors/conftest.py @@ -54,6 +54,7 @@ def mysql(): return MySQLContext() + @pytest.fixture def milvus(tmp_path): ctx = MilvusContext(str(tmp_path / "milvus.db")) diff --git a/integration_tests/db_connectors/utils.py b/integration_tests/db_connectors/utils.py index 72dad3f34..366d513c6 100644 --- a/integration_tests/db_connectors/utils.py +++ b/integration_tests/db_connectors/utils.py @@ -100,6 +100,8 @@ def is_mysql_reachable(): return True + + @dataclass(frozen=True) class ColumnProperties: type_name: str diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index ff4dc13a1..9ef15abe6 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -930,6 +930,10 @@ class DataStorage: ssl_cert_path: str | None = None, psql_replication: PsqlReplicationSettings | None = None, schema_name: str | None = None, + rabbitmq_tls_root_certificates: str | None = None, + rabbitmq_tls_client_cert: str | None = None, + rabbitmq_tls_client_key: str | None = None, + rabbitmq_tls_trust_certificates: bool = False, ) -> None: ... def delta_s3_storage_options(self, *args, **kwargs): ... diff --git a/python/pathway/io/rabbitmq/__init__.py b/python/pathway/io/rabbitmq/__init__.py index 9535d7d29..db18b279f 100644 --- a/python/pathway/io/rabbitmq/__init__.py +++ b/python/pathway/io/rabbitmq/__init__.py @@ -37,7 +37,12 @@ def read( parallel_readers: int | None = None, name: str | None = None, max_backlog_size: int | None = None, + tls_root_certificates: str | None = None, + tls_client_cert: str | None = None, + tls_client_key: str | None = None, + tls_trust_certificates: bool = False, debug_data=None, + _stacklevel: int = 5, **kwargs, ) -> Table: """Reads data from a specified RabbitMQ stream. @@ -84,6 +89,10 @@ def read( will be used as the name for the snapshot that stores the connector's progress. max_backlog_size: Limit on the number of entries read from the input source and kept in processing at any moment. + tls_root_certificates: Path to the root CA certificate file for TLS connections. + tls_client_cert: Path to the client certificate file for mutual TLS. + tls_client_key: Path to the client private key file for mutual TLS. + tls_trust_certificates: If True, trust server certificates without verification. debug_data: Static data replacing original one when debug mode is active. Returns: @@ -139,6 +148,15 @@ def read( ... mode="static", ... ) """ + tls_kwargs: dict = {} + if tls_root_certificates is not None: + tls_kwargs["rabbitmq_tls_root_certificates"] = tls_root_certificates + if tls_client_cert is not None: + tls_kwargs["rabbitmq_tls_client_cert"] = tls_client_cert + if tls_client_key is not None: + tls_kwargs["rabbitmq_tls_client_key"] = tls_client_key + if tls_trust_certificates: + tls_kwargs["rabbitmq_tls_trust_certificates"] = tls_trust_certificates data_storage = api.DataStorage( storage_type="rabbitmq", path=uri, @@ -146,6 +164,7 @@ def read( parallel_readers=parallel_readers, mode=internal_connector_mode(mode), start_from_timestamp_ms=start_from_timestamp_ms, + **tls_kwargs, ) schema, data_format = construct_schema_and_data_format( "binary" if format == "raw" else format, @@ -154,7 +173,7 @@ def read( with_native_record_key=True, autogenerate_key=autogenerate_key, with_metadata=with_metadata, - _stacklevel=5, + _stacklevel=_stacklevel, ) data_source_options = datasource.DataSourceOptions( commit_duration_ms=autocommit_duration_ms, @@ -177,6 +196,73 @@ def read( ) +@check_arg_types +@trace_user_frame +def simple_read( + host: str, + stream_name: str, + *, + port: int = 5552, + schema: type[Schema] | None = None, + format: Literal["plaintext", "raw", "json"] = "raw", + autocommit_duration_ms: int | None = 1500, + json_field_paths: dict[str, str] | None = None, + parallel_readers: int | None = None, + name: str | None = None, + debug_data=None, + **kwargs, +) -> Table: + """Simplified method to read data from a RabbitMQ stream. Only requires the + server host and the stream name. Uses default credentials (``guest:guest``). + If you need TLS, authentication, or fine-tuning, use :py:func:`read` instead. + + Read starts from the beginning of the stream, unless the ``read_only_new`` + parameter is set to True. + + There are three formats currently supported: ``"plaintext"``, ``"raw"``, and + ``"json"``. If ``"raw"`` is chosen, the key and the payload are read as raw bytes. + If ``"plaintext"`` is chosen, they are parsed from UTF-8. In both cases the table + has a ``"data"`` column. If ``"json"`` is chosen, the payload is parsed according + to the ``schema`` parameter. + + Args: + host: Hostname of the RabbitMQ server with Streams enabled. + stream_name: Name of the RabbitMQ stream to read from. + port: Port of the RabbitMQ Streams endpoint (default ``5552``). + schema: Schema of the resulting table (used with ``"json"`` format). + format: Input data format: ``"raw"``, ``"plaintext"``, or ``"json"``. + autocommit_duration_ms: The maximum time between two commits in milliseconds. + json_field_paths: For ``"json"`` format, mapping of field names to JSON + Pointer paths. + parallel_readers: Number of reader instances running in parallel. + name: A unique name for the connector. + debug_data: Static data replacing original one when debug mode is active. + + Returns: + Table: The table read. + + Example: + + >>> import pathway as pw + >>> t = pw.io.rabbitmq.simple_read("localhost", "events") + """ + uri = f"rabbitmq-stream://guest:guest@{host}:{port}" + return read( + uri=uri, + stream_name=stream_name, + schema=schema, + format=format, + mode="streaming", + autocommit_duration_ms=autocommit_duration_ms, + json_field_paths=json_field_paths, + parallel_readers=parallel_readers, + name=name, + debug_data=debug_data, + _stacklevel=7, + **kwargs, + ) + + @check_raw_and_plaintext_only_kwargs_for_message_queues @check_arg_types @trace_user_frame @@ -192,6 +278,10 @@ def write( headers: Iterable[ColumnReference] | None = None, name: str | None = None, sort_by: Iterable[ColumnReference] | None = None, + tls_root_certificates: str | None = None, + tls_client_cert: str | None = None, + tls_client_key: str | None = None, + tls_trust_certificates: bool = False, ) -> None: """Writes data into the specified RabbitMQ stream. @@ -231,6 +321,10 @@ def write( logs and monitoring dashboards. sort_by: If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. + tls_root_certificates: Path to the root CA certificate file for TLS connections. + tls_client_cert: Path to the client certificate file for mutual TLS. + tls_client_key: Path to the client private key file for mutual TLS. + tls_trust_certificates: If True, trust server certificates without verification. Example: @@ -279,6 +373,15 @@ def write( ) table = output_format.table + tls_kwargs: dict = {} + if tls_root_certificates is not None: + tls_kwargs["rabbitmq_tls_root_certificates"] = tls_root_certificates + if tls_client_cert is not None: + tls_kwargs["rabbitmq_tls_client_cert"] = tls_client_cert + if tls_client_key is not None: + tls_kwargs["rabbitmq_tls_client_key"] = tls_client_key + if tls_trust_certificates: + tls_kwargs["rabbitmq_tls_trust_certificates"] = tls_trust_certificates data_storage = api.DataStorage( storage_type="rabbitmq", path=uri, @@ -286,6 +389,7 @@ def write( topic_name_index=output_format.topic_name_index, key_field_index=output_format.key_field_index, header_fields=list(output_format.header_fields.items()), + **tls_kwargs, ) table.to( diff --git a/src/connectors/rabbitmq.rs b/src/connectors/rabbitmq.rs index f430d901f..4c0cd3d4c 100644 --- a/src/connectors/rabbitmq.rs +++ b/src/connectors/rabbitmq.rs @@ -2,6 +2,9 @@ use log::error; use std::borrow::Cow; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use futures::StreamExt; use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue}; @@ -22,6 +25,9 @@ use crate::persistence::frontier::OffsetAntichain; /// Writer stores the key here; reader extracts it from this property. const RABBITMQ_KEY_PROPERTY: &str = "key"; +const FLUSH_TIMEOUT: Duration = Duration::from_secs(30); +const FLUSH_POLL_INTERVAL: Duration = Duration::from_millis(10); + // --- Writer --- #[allow(clippy::module_name_repetitions)] @@ -31,10 +37,20 @@ pub struct RabbitmqWriter { topic: MessageQueueTopic, header_fields: Vec<(String, usize)>, key_field_index: Option, + pending_confirms: Arc, + send_errors: Arc>>, } impl Writer for RabbitmqWriter { fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> { + // Check for errors accumulated from previous async sends + { + let errs = self.send_errors.lock().unwrap(); + if !errs.is_empty() { + return Err(WriteError::RabbitmqPublish(errs.join("; "))); + } + } + self.runtime.block_on(async { let properties = data.construct_rabbitmq_properties(&self.header_fields); @@ -63,8 +79,30 @@ impl Writer for RabbitmqWriter { } else { Message::builder().body(payload_bytes).build() }; + + self.pending_confirms.fetch_add(1, Ordering::Release); + let counter = self.pending_confirms.clone(); + let errs = self.send_errors.clone(); self.producer - .send_with_confirm(message) + .send(message, move |result| { + let c = counter; + let e = errs; + async move { + c.fetch_sub(1, Ordering::Release); + match result { + Ok(confirm) if !confirm.confirmed() => { + e.lock().unwrap().push(format!( + "message not confirmed (publishing_id={})", + confirm.publishing_id() + )); + } + Err(err) => { + e.lock().unwrap().push(format!("{err}")); + } + _ => {} + } + } + }) .await .map_err(|e| WriteError::RabbitmqPublish(e.to_string()))?; } @@ -73,7 +111,21 @@ impl Writer for RabbitmqWriter { } fn flush(&mut self, _forced: bool) -> Result<(), WriteError> { - // send_with_confirm already waits for broker acknowledgment + let start = Instant::now(); + while self.pending_confirms.load(Ordering::Acquire) > 0 { + if start.elapsed() > FLUSH_TIMEOUT { + return Err(WriteError::RabbitmqPublish(format!( + "flush timed out after {}s with {} messages still pending", + FLUSH_TIMEOUT.as_secs(), + self.pending_confirms.load(Ordering::Acquire) + ))); + } + std::thread::sleep(FLUSH_POLL_INTERVAL); + } + let errs = self.send_errors.lock().unwrap(); + if !errs.is_empty() { + return Err(WriteError::RabbitmqPublish(errs.join("; "))); + } Ok(()) } @@ -92,7 +144,9 @@ impl Writer for RabbitmqWriter { impl Drop for RabbitmqWriter { fn drop(&mut self) { - self.flush(true).expect("failed to send the final messages"); + if let Err(e) = self.flush(true) { + error!("RabbitMQ flush failed on drop: {e}"); + } } } @@ -110,6 +164,8 @@ impl RabbitmqWriter { topic, header_fields, key_field_index, + pending_confirms: Arc::new(AtomicUsize::new(0)), + send_errors: Arc::new(Mutex::new(Vec::new())), } } } diff --git a/src/python_api.rs b/src/python_api.rs index 1abeb4413..5672b4757 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -164,6 +164,10 @@ pub mod threads; /// URI format: `rabbitmq-stream://user:pass@host:port/vhost` async fn build_rabbitmq_environment( uri: &str, + tls_root_certs: Option<&str>, + tls_client_cert: Option<&str>, + tls_client_key: Option<&str>, + tls_trust: bool, ) -> PyResult { // Parse URI: rabbitmq-stream://user:pass@host:port/vhost let uri_str = uri @@ -205,6 +209,27 @@ async fn build_rabbitmq_environment( } } + // Configure TLS if any TLS parameters are provided + if tls_root_certs.is_some() || tls_trust || tls_client_cert.is_some() { + let mut tls_builder = rabbitmq_stream_client::TlsConfiguration::builder(); + if let Some(root) = tls_root_certs { + // add_root_certificates enables TLS and sets the CA cert path + tls_builder = tls_builder.add_root_certificates(root.to_string()); + } + if let (Some(cert), Some(key)) = (tls_client_cert, tls_client_key) { + tls_builder = + tls_builder.add_client_certificates_keys(cert.to_string(), key.to_string()); + } + if tls_trust && tls_root_certs.is_none() { + // enable(true) without root certs produces TlsConfiguration::Untrusted + tls_builder = tls_builder.enable(true); + } + let tls_config = tls_builder.build().map_err(|e| { + PyValueError::new_err(format!("Failed to build RabbitMQ TLS configuration: {e}")) + })?; + builder = builder.tls(tls_config); + } + builder.build().await.map_err(|e| { PyIOError::new_err(format!("Failed to connect to RabbitMQ: {e}")) }) @@ -4649,6 +4674,10 @@ pub struct DataStorage { ssl_cert_path: Option, psql_replication: Option, schema_name: Option, + rabbitmq_tls_root_certificates: Option, + rabbitmq_tls_client_cert: Option, + rabbitmq_tls_client_key: Option, + rabbitmq_tls_trust_certificates: bool, } #[allow(clippy::doc_markdown)] @@ -5211,6 +5240,10 @@ impl DataStorage { ssl_cert_path = None, psql_replication = None, schema_name = None, + rabbitmq_tls_root_certificates = None, + rabbitmq_tls_client_cert = None, + rabbitmq_tls_client_key = None, + rabbitmq_tls_trust_certificates = false, ))] #[allow(clippy::too_many_arguments)] #[allow(clippy::fn_params_excessive_bools)] @@ -5257,6 +5290,10 @@ impl DataStorage { ssl_cert_path: Option, psql_replication: Option, schema_name: Option, + rabbitmq_tls_root_certificates: Option, + rabbitmq_tls_client_cert: Option, + rabbitmq_tls_client_key: Option, + rabbitmq_tls_trust_certificates: bool, ) -> Self { DataStorage { storage_type, @@ -5301,6 +5338,10 @@ impl DataStorage { ssl_cert_path, psql_replication, schema_name, + rabbitmq_tls_root_certificates, + rabbitmq_tls_client_cert, + rabbitmq_tls_client_key, + rabbitmq_tls_trust_certificates, } } @@ -6196,7 +6237,14 @@ impl DataStorage { }; let (environment, consumer, end_offset) = runtime.block_on(async { - let environment = build_rabbitmq_environment(uri).await?; + let environment = build_rabbitmq_environment( + uri, + self.rabbitmq_tls_root_certificates.as_deref(), + self.rabbitmq_tls_client_cert.as_deref(), + self.rabbitmq_tls_client_key.as_deref(), + self.rabbitmq_tls_trust_certificates, + ) + .await?; let consumer = environment .consumer() .offset(offset_spec) @@ -6745,7 +6793,14 @@ impl DataStorage { let stream_name = self.message_queue_fixed_topic()?; let runtime = create_async_tokio_runtime()?; let producer = runtime.block_on(async { - let environment = build_rabbitmq_environment(uri).await?; + let environment = build_rabbitmq_environment( + uri, + self.rabbitmq_tls_root_certificates.as_deref(), + self.rabbitmq_tls_client_cert.as_deref(), + self.rabbitmq_tls_client_key.as_deref(), + self.rabbitmq_tls_trust_certificates, + ) + .await?; let producer = environment .producer() .build(&stream_name) From 3912f708c1118ddab289667814b62ce1f2d40043 Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 14:18:29 +0800 Subject: [PATCH 3/7] Fix linting: isort, black, and cargo fmt formatting --- integration_tests/kafka/test_rabbitmq.py | 4 +--- python/pathway/io/rabbitmq/__init__.py | 6 +++++- src/connectors/data_storage.rs | 4 ++-- src/connectors/mod.rs | 2 +- src/connectors/rabbitmq.rs | 25 +++++++----------------- 5 files changed, 16 insertions(+), 25 deletions(-) diff --git a/integration_tests/kafka/test_rabbitmq.py b/integration_tests/kafka/test_rabbitmq.py index a0faad2cf..d71ff4b66 100644 --- a/integration_tests/kafka/test_rabbitmq.py +++ b/integration_tests/kafka/test_rabbitmq.py @@ -83,9 +83,7 @@ async def _send_async(self, message: str) -> None: message.encode(), ) - def send_with_properties( - self, message: str, properties: dict[str, str] - ) -> None: + def send_with_properties(self, message: str, properties: dict[str, str]) -> None: """Send a message with AMQP 1.0 application properties.""" fut = asyncio.run_coroutine_threadsafe( self._send_with_properties_async(message, properties), diff --git a/python/pathway/io/rabbitmq/__init__.py b/python/pathway/io/rabbitmq/__init__.py index db18b279f..b50373e3d 100644 --- a/python/pathway/io/rabbitmq/__init__.py +++ b/python/pathway/io/rabbitmq/__init__.py @@ -7,7 +7,11 @@ from pathway.internals import api, datasink, datasource from pathway.internals.expression import ColumnReference from pathway.internals.runtime_type_check import check_arg_types -from pathway.internals.schema import KEY_SOURCE_COMPONENT, PAYLOAD_SOURCE_COMPONENT, Schema +from pathway.internals.schema import ( + KEY_SOURCE_COMPONENT, + PAYLOAD_SOURCE_COMPONENT, + Schema, +) from pathway.internals.table import Table from pathway.internals.table_io import table_from_datasource from pathway.internals.trace import trace_user_frame diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index fa5724dd1..fde7312cb 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -101,11 +101,11 @@ pub use super::elasticsearch::ElasticSearchWriter; pub use super::mongodb::{MongoReader, MongoWriter}; pub use super::nats::NatsReader; pub use super::nats::NatsWriter; -pub use super::rabbitmq::RabbitmqReader; -pub use super::rabbitmq::RabbitmqWriter; pub use super::postgres::{ PsqlReader, PsqlWriter, ReplicationError as PostgresReplicationError, SslError, }; +pub use super::rabbitmq::RabbitmqReader; +pub use super::rabbitmq::RabbitmqWriter; pub use super::sqlite::SqliteReader; #[derive(Clone, Debug, Eq, PartialEq, Copy)] diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index d41beb245..dd79d295f 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -30,9 +30,9 @@ pub mod mongodb; pub mod monitoring; pub mod nats; pub mod offset; -pub mod rabbitmq; pub mod posix_like; pub mod postgres; +pub mod rabbitmq; pub mod scanner; pub mod sqlite; pub mod synchronization; diff --git a/src/connectors/rabbitmq.rs b/src/connectors/rabbitmq.rs index 4c0cd3d4c..413144eae 100644 --- a/src/connectors/rabbitmq.rs +++ b/src/connectors/rabbitmq.rs @@ -55,13 +55,12 @@ impl Writer for RabbitmqWriter { let properties = data.construct_rabbitmq_properties(&self.header_fields); // Extract key from data values if key_field_index is set - let key_value: Option = self.key_field_index.map(|idx| { - match &data.values[idx] { + let key_value: Option = + self.key_field_index.map(|idx| match &data.values[idx] { Value::Bytes(b) => String::from_utf8_lossy(b).into_owned(), Value::String(s) => s.to_string(), other => other.to_string(), - } - }); + }); let has_app_props = !properties.is_empty() || key_value.is_some(); @@ -190,9 +189,7 @@ impl Reader for RabbitmqReader { return Ok(deferred); } - let delivery = self - .runtime - .block_on(async { self.consumer.next().await }); + let delivery = self.runtime.block_on(async { self.consumer.next().await }); match delivery { Some(Ok(delivery)) => { let stream_offset = delivery.offset(); @@ -228,11 +225,7 @@ impl Reader for RabbitmqReader { // Create metadata and use deferred pattern (like Kafka). // Timestamp extraction is not possible due to private field in // rabbitmq_stream_protocol::Timestamp. - let metadata = RabbitmqMetadata::new( - stream_offset, - self.stream_name.clone(), - None, - ); + let metadata = RabbitmqMetadata::new(stream_offset, self.stream_name.clone(), None); self.deferred_read_result = Some(ReadResult::Data(payload, offset)); Ok(ReadResult::NewSource(metadata.into())) } @@ -247,9 +240,7 @@ impl Reader for RabbitmqReader { if let OffsetValue::RabbitmqOffset(last_offset) = offset { // Rebuild consumer starting from the next offset after the persisted one let next_offset = last_offset.checked_add(1).ok_or_else(|| { - ReadError::Rabbitmq( - "Offset overflow: cannot seek past u64::MAX".to_string(), - ) + ReadError::Rabbitmq("Offset overflow: cannot seek past u64::MAX".to_string()) })?; let new_consumer = self .runtime @@ -261,9 +252,7 @@ impl Reader for RabbitmqReader { .await }) .map_err(|e| { - ReadError::Rabbitmq(format!( - "Failed to rebuild consumer for seek: {e}" - )) + ReadError::Rabbitmq(format!("Failed to rebuild consumer for seek: {e}")) })?; self.consumer = new_consumer; } else { From 8693adb92e4ce6d2583c882a02edfec3313c353f Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 15:51:44 +0800 Subject: [PATCH 4/7] Fix linting: isort, black, flake8, cargo fmt, cargo clippy --- .../producer-src/create-streams.py | 267 ++++++++++++++++-- integration_tests/db_connectors/conftest.py | 1 - integration_tests/db_connectors/utils.py | 2 - src/connectors/rabbitmq.rs | 4 +- src/python_api.rs | 13 +- 5 files changed, 245 insertions(+), 42 deletions(-) diff --git a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py index 182762983..46eec8c11 100644 --- a/examples/projects/rabbitmq-ETL/producer-src/create-streams.py +++ b/examples/projects/rabbitmq-ETL/producer-src/create-streams.py @@ -131,41 +131,242 @@ # Sample orders - multiple per employee ORDERS = [ - {"order_id": 101, "employee_id": 1, "amount": "1250.50", "order_date": "2024-11-01", "product": "Laptop Stand", "quantity": 2}, - {"order_id": 102, "employee_id": 2, "amount": "340.00", "order_date": "2024-11-05", "product": "Keyboard", "quantity": 5}, - {"order_id": 103, "employee_id": 1, "amount": "89.99", "order_date": "2024-11-08", "product": "Mouse Pad", "quantity": 10}, - {"order_id": 104, "employee_id": 4, "amount": "2100.00", "order_date": "2024-11-10", "product": "Monitor", "quantity": 1}, - {"order_id": 105, "employee_id": 5, "amount": "450.00", "order_date": "2024-11-12", "product": "Headset", "quantity": 3}, - {"order_id": 106, "employee_id": 2, "amount": "175.25", "order_date": "2024-11-15", "product": "Webcam", "quantity": 2}, - {"order_id": 107, "employee_id": 7, "amount": "560.00", "order_date": "2024-11-18", "product": "Desk Lamp", "quantity": 4}, - {"order_id": 108, "employee_id": 10, "amount": "3200.00", "order_date": "2024-11-20", "product": "Standing Desk", "quantity": 1}, - {"order_id": 109, "employee_id": 4, "amount": "125.50", "order_date": "2024-11-22", "product": "Cable Kit", "quantity": 8}, - {"order_id": 110, "employee_id": 1, "amount": "780.00", "order_date": "2024-11-25", "product": "Chair Mat", "quantity": 2}, - {"order_id": 111, "employee_id": 8, "amount": "95.00", "order_date": "2024-11-28", "product": "Notebook", "quantity": 15}, - {"order_id": 112, "employee_id": 10, "amount": "420.00", "order_date": "2024-12-01", "product": "USB Hub", "quantity": 6}, - {"order_id": 113, "employee_id": 7, "amount": "1800.00", "order_date": "2024-12-03", "product": "Printer", "quantity": 1}, - {"order_id": 114, "employee_id": 2, "amount": "65.99", "order_date": "2024-12-05", "product": "Mouse", "quantity": 3}, - {"order_id": 115, "employee_id": 4, "amount": "290.00", "order_date": "2024-12-08", "product": "Speakers", "quantity": 2}, + { + "order_id": 101, + "employee_id": 1, + "amount": "1250.50", + "order_date": "2024-11-01", + "product": "Laptop Stand", + "quantity": 2, + }, + { + "order_id": 102, + "employee_id": 2, + "amount": "340.00", + "order_date": "2024-11-05", + "product": "Keyboard", + "quantity": 5, + }, + { + "order_id": 103, + "employee_id": 1, + "amount": "89.99", + "order_date": "2024-11-08", + "product": "Mouse Pad", + "quantity": 10, + }, + { + "order_id": 104, + "employee_id": 4, + "amount": "2100.00", + "order_date": "2024-11-10", + "product": "Monitor", + "quantity": 1, + }, + { + "order_id": 105, + "employee_id": 5, + "amount": "450.00", + "order_date": "2024-11-12", + "product": "Headset", + "quantity": 3, + }, + { + "order_id": 106, + "employee_id": 2, + "amount": "175.25", + "order_date": "2024-11-15", + "product": "Webcam", + "quantity": 2, + }, + { + "order_id": 107, + "employee_id": 7, + "amount": "560.00", + "order_date": "2024-11-18", + "product": "Desk Lamp", + "quantity": 4, + }, + { + "order_id": 108, + "employee_id": 10, + "amount": "3200.00", + "order_date": "2024-11-20", + "product": "Standing Desk", + "quantity": 1, + }, + { + "order_id": 109, + "employee_id": 4, + "amount": "125.50", + "order_date": "2024-11-22", + "product": "Cable Kit", + "quantity": 8, + }, + { + "order_id": 110, + "employee_id": 1, + "amount": "780.00", + "order_date": "2024-11-25", + "product": "Chair Mat", + "quantity": 2, + }, + { + "order_id": 111, + "employee_id": 8, + "amount": "95.00", + "order_date": "2024-11-28", + "product": "Notebook", + "quantity": 15, + }, + { + "order_id": 112, + "employee_id": 10, + "amount": "420.00", + "order_date": "2024-12-01", + "product": "USB Hub", + "quantity": 6, + }, + { + "order_id": 113, + "employee_id": 7, + "amount": "1800.00", + "order_date": "2024-12-03", + "product": "Printer", + "quantity": 1, + }, + { + "order_id": 114, + "employee_id": 2, + "amount": "65.99", + "order_date": "2024-12-05", + "product": "Mouse", + "quantity": 3, + }, + { + "order_id": 115, + "employee_id": 4, + "amount": "290.00", + "order_date": "2024-12-08", + "product": "Speakers", + "quantity": 2, + }, ] # Sample rental listings - inspired by ldtrungmark rental platform LISTINGS = [ - {"listing_id": 1001, "title": "Modern 2BR Apartment Downtown", "price": "2500.00", "location": "District 1", "source": "website_a", "posted_date": "2024-11-01", "agent_employee_id": 7}, - {"listing_id": 1002, "title": "Cozy Studio Near Park", "price": "1200.00", "location": "District 3", "source": "website_b", "posted_date": "2024-11-03", "agent_employee_id": 8}, - {"listing_id": 1003, "title": "Spacious 3BR Family Home", "price": "3800.00", "location": "District 2", "source": "website_a", "posted_date": "2024-11-05", "agent_employee_id": 7}, + { + "listing_id": 1001, + "title": "Modern 2BR Apartment Downtown", + "price": "2500.00", + "location": "District 1", + "source": "website_a", + "posted_date": "2024-11-01", + "agent_employee_id": 7, + }, + { + "listing_id": 1002, + "title": "Cozy Studio Near Park", + "price": "1200.00", + "location": "District 3", + "source": "website_b", + "posted_date": "2024-11-03", + "agent_employee_id": 8, + }, + { + "listing_id": 1003, + "title": "Spacious 3BR Family Home", + "price": "3800.00", + "location": "District 2", + "source": "website_a", + "posted_date": "2024-11-05", + "agent_employee_id": 7, + }, # Bad record: negative price - {"listing_id": 1004, "title": "Invalid Listing", "price": "-500.00", "location": "District 5", "source": "website_c", "posted_date": "2024-11-07", "agent_employee_id": 8}, - {"listing_id": 1005, "title": "Luxury Penthouse with View", "price": "8500.00", "location": "District 1", "source": "website_b", "posted_date": "2024-11-10", "agent_employee_id": 7}, - {"listing_id": 1006, "title": "Renovated 1BR Near Metro", "price": "1800.00", "location": "District 4", "source": "website_a", "posted_date": "2024-11-12", "agent_employee_id": 8}, - {"listing_id": 1007, "title": "Garden Villa with Pool", "price": "12000.00", "location": "District 7", "source": "website_b", "posted_date": "2024-11-15", "agent_employee_id": 7}, - {"listing_id": 1008, "title": "Budget Room in Shared House", "price": "450.00", "location": "District 9", "source": "website_a", "posted_date": "2024-11-18", "agent_employee_id": 8}, + { + "listing_id": 1004, + "title": "Invalid Listing", + "price": "-500.00", + "location": "District 5", + "source": "website_c", + "posted_date": "2024-11-07", + "agent_employee_id": 8, + }, + { + "listing_id": 1005, + "title": "Luxury Penthouse with View", + "price": "8500.00", + "location": "District 1", + "source": "website_b", + "posted_date": "2024-11-10", + "agent_employee_id": 7, + }, + { + "listing_id": 1006, + "title": "Renovated 1BR Near Metro", + "price": "1800.00", + "location": "District 4", + "source": "website_a", + "posted_date": "2024-11-12", + "agent_employee_id": 8, + }, + { + "listing_id": 1007, + "title": "Garden Villa with Pool", + "price": "12000.00", + "location": "District 7", + "source": "website_b", + "posted_date": "2024-11-15", + "agent_employee_id": 7, + }, + { + "listing_id": 1008, + "title": "Budget Room in Shared House", + "price": "450.00", + "location": "District 9", + "source": "website_a", + "posted_date": "2024-11-18", + "agent_employee_id": 8, + }, # Bad record: empty title - {"listing_id": 1009, "title": "", "price": "1500.00", "location": "District 3", "source": "website_c", "posted_date": "2024-11-20", "agent_employee_id": 7}, - {"listing_id": 1010, "title": "Furnished Office Space", "price": "5000.00", "location": "District 1", "source": "website_a", "posted_date": "2024-11-22", "agent_employee_id": 8}, + { + "listing_id": 1009, + "title": "", + "price": "1500.00", + "location": "District 3", + "source": "website_c", + "posted_date": "2024-11-20", + "agent_employee_id": 7, + }, + { + "listing_id": 1010, + "title": "Furnished Office Space", + "price": "5000.00", + "location": "District 1", + "source": "website_a", + "posted_date": "2024-11-22", + "agent_employee_id": 8, + }, ] -PRODUCTS = ["Laptop Stand", "Keyboard", "Mouse", "Monitor", "Headset", "Webcam", "USB Hub", "Chair Mat"] -LOCATIONS = ["District 1", "District 2", "District 3", "District 4", "District 7", "District 9"] +PRODUCTS = [ + "Laptop Stand", + "Keyboard", + "Mouse", + "Monitor", + "Headset", + "Webcam", + "USB Hub", + "Chair Mat", +] +LOCATIONS = [ + "District 1", + "District 2", + "District 3", + "District 4", + "District 7", + "District 9", +] SOURCES = ["website_a", "website_b"] @@ -206,7 +407,9 @@ async def main(): # Publish initial listings print("Publishing listing records...") for listing in LISTINGS: - await producer.send("listings_raw", AMQPMessage(body=json.dumps(listing).encode())) + await producer.send( + "listings_raw", AMQPMessage(body=json.dumps(listing).encode()) + ) await asyncio.sleep(0.2) print(f"Published {len(LISTINGS)} listings") @@ -230,7 +433,9 @@ async def main(): "product": random.choice(PRODUCTS), "quantity": random.randint(1, 10), } - await producer.send("orders_raw", AMQPMessage(body=json.dumps(new_order).encode())) + await producer.send( + "orders_raw", AMQPMessage(body=json.dumps(new_order).encode()) + ) print(f"Published new order: {order_id}") await asyncio.sleep(3) @@ -246,7 +451,9 @@ async def main(): "posted_date": "2024-12-10", "agent_employee_id": random.choice(agent_ids), } - await producer.send("listings_raw", AMQPMessage(body=json.dumps(new_listing).encode())) + await producer.send( + "listings_raw", AMQPMessage(body=json.dumps(new_listing).encode()) + ) print(f"Published new listing: {listing_id}") diff --git a/integration_tests/db_connectors/conftest.py b/integration_tests/db_connectors/conftest.py index c1b44b7d9..591bd6a7f 100644 --- a/integration_tests/db_connectors/conftest.py +++ b/integration_tests/db_connectors/conftest.py @@ -54,7 +54,6 @@ def mysql(): return MySQLContext() - @pytest.fixture def milvus(tmp_path): ctx = MilvusContext(str(tmp_path / "milvus.db")) diff --git a/integration_tests/db_connectors/utils.py b/integration_tests/db_connectors/utils.py index 366d513c6..72dad3f34 100644 --- a/integration_tests/db_connectors/utils.py +++ b/integration_tests/db_connectors/utils.py @@ -100,8 +100,6 @@ def is_mysql_reachable(): return True - - @dataclass(frozen=True) class ColumnProperties: type_name: str diff --git a/src/connectors/rabbitmq.rs b/src/connectors/rabbitmq.rs index 413144eae..97fe65065 100644 --- a/src/connectors/rabbitmq.rs +++ b/src/connectors/rabbitmq.rs @@ -64,7 +64,7 @@ impl Writer for RabbitmqWriter { let has_app_props = !properties.is_empty() || key_value.is_some(); - for payload in data.payloads.into_iter() { + for payload in data.payloads { let payload_bytes = payload.into_raw_bytes()?; let message = if has_app_props { let mut app_props = Message::builder().application_properties(); @@ -212,7 +212,7 @@ impl Reader for RabbitmqReader { SimpleValue::Binary(b) => b.clone(), other => format!("{other:?}").into_bytes(), }); - let body = message.data().map(|d| d.to_vec()); + let body = message.data().map(<[u8]>::to_vec); // Use key-value context to support native record keys let payload = ReaderContext::from_key_value(key, body); diff --git a/src/python_api.rs b/src/python_api.rs index 5672b4757..422de3dde 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -160,7 +160,7 @@ mod external_index_wrappers; mod logging; pub mod threads; -/// Parse a RabbitMQ Streams URI and build an Environment. +/// Parse a `RabbitMQ` Streams URI and build an Environment. /// URI format: `rabbitmq-stream://user:pass@host:port/vhost` async fn build_rabbitmq_environment( uri: &str, @@ -170,9 +170,7 @@ async fn build_rabbitmq_environment( tls_trust: bool, ) -> PyResult { // Parse URI: rabbitmq-stream://user:pass@host:port/vhost - let uri_str = uri - .strip_prefix("rabbitmq-stream://") - .unwrap_or(uri); + let uri_str = uri.strip_prefix("rabbitmq-stream://").unwrap_or(uri); let mut builder = rabbitmq_stream_client::Environment::builder(); // Split userinfo from host @@ -230,9 +228,10 @@ async fn build_rabbitmq_environment( builder = builder.tls(tls_config); } - builder.build().await.map_err(|e| { - PyIOError::new_err(format!("Failed to connect to RabbitMQ: {e}")) - }) + builder + .build() + .await + .map_err(|e| PyIOError::new_err(format!("Failed to connect to RabbitMQ: {e}"))) } static CONVERT: GILOnceCell> = GILOnceCell::new(); From 26d790171d3041c39e5fba17b81b61985fbcec2e Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 16:54:35 +0800 Subject: [PATCH 5/7] Fix isort: collapse single-line import in test_rabbitmq.py --- integration_tests/kafka/test_rabbitmq.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration_tests/kafka/test_rabbitmq.py b/integration_tests/kafka/test_rabbitmq.py index d71ff4b66..dd001977a 100644 --- a/integration_tests/kafka/test_rabbitmq.py +++ b/integration_tests/kafka/test_rabbitmq.py @@ -10,10 +10,7 @@ import pathway as pw from pathway.internals.parse_graph import G -from pathway.tests.utils import ( - CsvLinesNumberChecker, - wait_result_with_checker, -) +from pathway.tests.utils import CsvLinesNumberChecker, wait_result_with_checker RABBITMQ_STREAM_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552/" RABBITMQ_HOST = "rabbitmq" From eddb731602d7652650f8d8913eab67ca72a13674 Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 17:39:19 +0800 Subject: [PATCH 6/7] Fix cargo clippy: map_unwrap_or, too_many_lines, identical match arms --- src/connectors/data_format.rs | 10 +++++----- src/connectors/data_storage.rs | 13 +++++-------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 59763478e..32a6c2e27 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -432,13 +432,13 @@ impl FormatterContext { raw_headers .into_iter() .map(|h| { - let value = h - .value - .map(|v| { + let value = h.value.map_or_else( + || Value::None.to_string(), + |v| { String::from_utf8(v) .expect("all prepared headers must be UTF-8 serializable") - }) - .unwrap_or_else(|| Value::None.to_string()); + }, + ); (h.key, value) }) .collect() diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index fde7312cb..82cf80ea9 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -488,6 +488,7 @@ pub trait Reader { Ok(()) } + #[allow(clippy::too_many_lines)] fn merge_two_frontiers(lhs: &OffsetAntichain, rhs: &OffsetAntichain) -> OffsetAntichain where Self: Sized, @@ -499,6 +500,10 @@ pub trait Reader { ( OffsetValue::KafkaOffset(offset_position), OffsetValue::KafkaOffset(other_position), + ) + | ( + OffsetValue::RabbitmqOffset(offset_position), + OffsetValue::RabbitmqOffset(other_position), ) => { if other_position > offset_position { result.advance_offset(offset_key.clone(), other_value.clone()); @@ -585,14 +590,6 @@ pub trait Reader { { result.advance_offset(offset_key.clone(), other_value.clone()); } - ( - OffsetValue::RabbitmqOffset(offset_position), - OffsetValue::RabbitmqOffset(other_position), - ) => { - if other_position > offset_position { - result.advance_offset(offset_key.clone(), other_value.clone()); - } - } (_, _) => { error!("Incomparable offsets in the frontier: {offset_value:?} and {other_value:?}"); } From c256a049eaad0602f71790d9bc8c27a3ed550754 Mon Sep 17 00:00:00 2001 From: Daniel Du Date: Sat, 28 Mar 2026 18:14:39 +0800 Subject: [PATCH 7/7] Fix clippy: separate RabbitmqOffset arm (u64 vs i64 type mismatch) --- src/connectors/data_storage.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 82cf80ea9..0f9d671ff 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -488,7 +488,7 @@ pub trait Reader { Ok(()) } - #[allow(clippy::too_many_lines)] + #[allow(clippy::too_many_lines, clippy::match_same_arms)] fn merge_two_frontiers(lhs: &OffsetAntichain, rhs: &OffsetAntichain) -> OffsetAntichain where Self: Sized, @@ -500,8 +500,12 @@ pub trait Reader { ( OffsetValue::KafkaOffset(offset_position), OffsetValue::KafkaOffset(other_position), - ) - | ( + ) => { + if other_position > offset_position { + result.advance_offset(offset_key.clone(), other_value.clone()); + } + } + ( OffsetValue::RabbitmqOffset(offset_position), OffsetValue::RabbitmqOffset(other_position), ) => {