[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677
[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677gavin9402 wants to merge 22 commits into
Conversation
|
This is amazing. What compute engine do you use? |
@FMX We plan to integrate it into the Daft engine. |
|
This is really good @gavin9402 , thanks for contributing. I will drop some review comments soon if I have any. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
added some small comments, overall though looks good to me.
I am fine with service --> client dependency leaks into master/worker, but others may think the Daemon doesn't need to live in service, and would propose that you consider putting it in a new module that depends on both service and client without these dependency leaks.
| shutdownLatch.await() | ||
| } | ||
|
|
||
| private[lifecyclemanager] def applyArgsToConf( |
There was a problem hiding this comment.
The --host argument is basically ignored. LifecycleManagerDaemonArguments parses --host into host: Option[String], but applyArgsToConf only writes MASTER_ENDPOINTS and CLIENT_SHUFFLE_MANAGER_PORT. The host is never propagated. LifecycleManager binds to lifecycleHost = Utils.localHostName(conf) (LifecycleManager.scala:81), and Utils.localHostName only looks at the CELEBORN_LOCAL_HOSTNAME env or the auto resolved hostname. There's no conf key path for it.
So someone running start-lifecycle-manager.sh --host 10.0.0.5 gets the default hostname with no warning. Either drop the option, set CELEBORN_LOCAL_HOSTNAME from the script, or call Utils.setCustomHostname(host) before constructing LifecycleManager.
| /// already be torn down by `IOThreadPoolExecutor::join()`. | ||
| pub fn shutdown(mut self) -> Result<()> { | ||
| if let Some(pinned) = self.inner.as_mut() { | ||
| ffi::shutdown(pinned)?; |
There was a problem hiding this comment.
If ffi::shutdown returns Err, the ? propagates and self is dropped normally. Drop::drop then calls ffi::shutdown(pinned) a second time.
The comment on shutdown says the handle is intentionally leaked to avoid a folly EventBase use-after-free during destruction, so calling shutdown twice could re-trigger that teardown path.
I propose either ensuring the handle is leaked even when ffi::shutdown returns an error (so Drop cannot call ffi::shutdown a second time), or adding a “shutdown attempted” flag so Drop skips the shutdown call and only leaks the handle.
| rust::Vec<uint8_t> out; | ||
| out.reserve(64 * 1024); | ||
| std::vector<uint8_t> buf(64 * 1024); | ||
|
|
||
| while (true) { | ||
| int n = stream->read(buf.data(), 0, buf.size()); | ||
| if (n == -1) { | ||
| break; | ||
| } | ||
| if (n <= 0) { | ||
| throw std::runtime_error( | ||
| "celeborn-ffi: CelebornInputStream::read returned unexpected non-positive " + | ||
| std::to_string(n)); | ||
| } | ||
| for (int i = 0; i < n; ++i) { | ||
| out.push_back(buf[i]); |
There was a problem hiding this comment.
Per-byte push_back into rust::Vec<uint8_t> can introduce noticeable overhead for large partition reads, and the initial reserve(64 * 1024) only avoids reallocations for the first buffer chunk.
Consider accumulating into a std::vector<uint8_t> (or appending in larger chunks) and copying once at the end, or resizing the destination per chunk and using memcpy instead of pushing one byte at a time.
This doesn’t need to be addressed in the current PR, but it may be worth leaving a TODO here to revisit for performance improvements.
| return | ||
| } | ||
|
|
||
| if (daemonArgs.port < 1024) { |
There was a problem hiding this comment.
LifecycleManagerDaemonArguments already sys.exit(1) on this condition. Dead code.
|
@afterincomparableyum Thank you for your thorough review. I have made the requested changes by your suggestions above. |
391b486 to
c08f576
Compare
|
@gavin9402 can you please fix failing CI. particularly the style check, license check, cpp integration test (which is also a style failure), and the celeborn integration test. I took a look, and all you need to do is fix the format/style. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
fix format/style failures in CI pls.
- Add Apache license headers to all rust source files - Fix clang-format violations in cpp source files - Fix license header format in sbin/start-lifecycle-manager.sh - Add cpp/build/** and rust/Cargo.lock to .rat-excludes
|
@afterincomparableyum Done. Fixed all format/style failures Please re-trigger CI. Thanks! |
|
ping @SteNicholas or @FMX, could one of your please re-trigger CI. |
…red library
Replace the cxx-based bridge with a hand-written C ABI exported from
libceleborn_client.{so,dylib}. The shared library is now the sole external
link dependency — folly / protobuf / glog / abseil stay hidden inside it,
and Rust no longer needs cxx, rust::Vec set_len access tricks, or C++
header includes during the build.
C++ side:
- cpp/celeborn/ffi/CelebornFfi.{h,cc}: extern "C" surface
(celeborn_ffi_create_client / push_data / mapper_end / read_partition_full
/ open_partition_reader / read_partition_chunk / close_partition_reader / shutdown).
- cpp/exports.{map,txt} + cpp/dummy.cc: limit exported symbols to
celeborn::* and celeborn_ffi_* on both ld.bfd/lld (version script) and
macOS ld64 (-exported_symbols_list).
- cpp/CMakeLists.txt aggregates the static archives into one shared lib.
Rust side:
- celeborn-client-sys is now pure `extern "C"` bindings; wrapper.{h,cc}
are deleted.
- build.rs locates libceleborn_client.* via CELEBORN_CPP_PREFIX, then
rust/resource/lib/<target-triple>/, then falls back to a from-source
cmake build into OUT_DIR.
- celeborn-client re-implements the streaming open_partition /
PartitionReader (std::io::Read) on top of the new C ABI, so the
rust/examples/data_sum_reader.rs example keeps working.
Misc:
- rust/resource/lib/README.md documents the layout and lookup precedence.
- rust/.gitignore excludes prebuilt *.so / *.dylib / *.dll artifacts so
the resource/lib/ directory only holds README.md in source control.
Verified with `cargo check --workspace --examples`.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR extends Celeborn to better support non-JVM clients by (1) introducing a standalone JVM LifecycleManager daemon that can be launched independently of Spark/Flink drivers, and (2) adding a Rust SDK built on top of a C ABI shim and an aggregated C++ shared library.
Changes:
- Added
lifecycle-managerMaven module with aLifecycleManagerDaemonand CLI argument parsing + tests. - Added Rust workspace (
rust/) withceleborn-client-sys(raw FFI) andceleborn-client(safe wrapper) plus example programs. - Updated C++ build to produce an aggregated
libceleborn_client.{so,dylib}and updated distribution packaging/scripts to include the new daemon.
Reviewed changes
Copilot reviewed 29 out of 30 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| sbin/stop-lifecycle-manager.sh | New stop script for standalone LifecycleManager daemon |
| sbin/start-lifecycle-manager.sh | New start script with classpath assembly and optional port selection |
| rust/resource/lib/README.md | Documents lookup/layout for prebuilt native client artifacts |
| rust/examples/data_sum_writer.rs | Rust writer example mirroring existing C++ integration program |
| rust/examples/data_sum_reader.rs | Rust reader example mirroring existing C++ integration program |
| rust/celeborn-client/src/lib.rs | Safe Rust wrapper API over the raw FFI |
| rust/celeborn-client/Cargo.toml | Rust wrapper crate manifest + example wiring |
| rust/celeborn-client/build.rs | Emits rpath for downstream artifacts based on sys crate metadata |
| rust/celeborn-client-sys/src/lib.rs | Raw extern "C" declarations and helper to take/free error strings |
| rust/celeborn-client-sys/Cargo.toml | Sys crate manifest (links = "celeborn_client") |
| rust/celeborn-client-sys/build.rs | Builds/locates libceleborn_client and sets link search + metadata |
| rust/Cargo.toml | Declares Rust workspace members |
| rust/.gitignore | Ignores Cargo build output and prebuilt native blobs |
| pom.xml | Adds lifecycle-manager module to the Maven reactor |
| lifecycle-manager/src/test/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArgumentsSuite.scala | Unit tests for daemon CLI parsing |
| lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArguments.scala | CLI args parser + usage text |
| lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemon.scala | Standalone daemon main + shutdown hook + auth constraint |
| lifecycle-manager/pom.xml | New Maven module definition and dependencies |
| cpp/exports.txt | macOS exported symbols list for the aggregated shared library |
| cpp/exports.map | ELF version-script controlling exports for the shared library |
| cpp/dummy.cc | Placeholder TU for creating the aggregate shared library target |
| cpp/CMakeLists.txt | Builds aggregated celeborn_client shared lib and adjusts link/export behavior |
| cpp/celeborn/ffi/CMakeLists.txt | Adds static celeborn_ffi target (C ABI shim) |
| cpp/celeborn/ffi/CelebornFfi.h | C ABI header for language bindings |
| cpp/celeborn/ffi/CelebornFfi.cc | C ABI implementation wrapping the C++ ShuffleClient |
| cpp/celeborn/CMakeLists.txt | Wires the new ffi/ subdirectory into the C++ build |
| cpp/celeborn/client/writer/PushMergedDataCallback.cpp | Minor formatting adjustment |
| cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp | Minor formatting adjustments in tests |
| build/make-distribution.sh | Packages lifecycle-manager jars into the distribution tarball |
| .rat-excludes | Excludes generated build dirs and Rust lockfile from RAT checks |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1d92a40 to
cf8d472
Compare
|
This is awesome. I was looking for something similar at work and came across this. Just had a question, my understanding could be wrong here, ShuffleClient c++ implementation is thread safe for calling |
@chandnagautam You're absolutely right. We should make both the read and write APIs parallelizable. The current &mut self signatures force downstream consumers (e.g. Daft) to wrap the client in an Arc<Mutex<_>>, which serializes all concurrent push_data and read_partition calls and defeats multi-partition parallelism. |
SteNicholas
left a comment
There was a problem hiding this comment.
Review
Thanks for this — the FFI boundary hardening, Drop/shutdown handling, and the documented Send/Sync rationale are all carefully done. A few findings below, grouped by severity. (Reviewed the full diff and verified the daemon wiring against the codebase: CLIENT_SHUFFLE_MANAGER_PORT → LifecycleManager binds via conf.shuffleManagerPort at LifecycleManager.scala:186, and getHost/getPort, authEnabledOnClient, Utils.setCustomHostname, loadDefaultCelebornProperties all exist as used.)
🔴 High — blocking
1. The sbt build is broken: lifecycle-manager has no sbt project.
build/make-distribution.sh (sbt path) now calls ...;celeborn-lifecycle-manager/copyJars and copies from lifecycle-manager/target/scala-$SCALA_VERSION/..., but project/CelebornBuild.scala is not modified and defines no celeborn-lifecycle-manager Project. As a result:
sbt celeborn-lifecycle-manager/copyJarsfails with an unknown-project error, breakingsbt_build_service.- The new module (and
LifecycleManagerDaemonArgumentsSuite) is never compiled / style-checked / tested under sbt — only Maven via the rootpom.xml<module>.
Fix: add a Project("celeborn-lifecycle-manager", file("lifecycle-manager")) with .dependsOn(service, client, common ...), copyDepsSettings/copyJars, and aggregate it like cli/master/worker.
🟠 Medium
2. C++ Abseil/Homebrew path discovery is non-portable (cpp/CMakeLists.txt). The abseil glob hardcodes APPLE → /opt/homebrew/lib, elseif(EXISTS /usr/local/lib) → /usr/local/lib, else → /usr/lib/x86_64-linux-gnu.
- aarch64 Linux ships absl in
/usr/lib/aarch64-linux-gnu, and/usr/local/liboften exists-but-empty so theelseifwins → absl not found, only amessage(WARNING)→ link failure. The README advertisesaarch64-unknown-linux-gnu. - Intel macOS uses
/usr/local, not/opt/homebrew— thetarget_link_directories/INSTALL_RPATH/openssl paths are Apple-Silicon-only.
Prefer find_package(absl) / pkg_check_modules or derive the multiarch dir from the toolchain; at minimum make the missing-absl case a FATAL_ERROR.
3. Concurrency claim needs a real stress test. unsafe impl Sync + &self on push_data/read_partition asserts the C++ ShuffleClientImpl is fully thread-safe for concurrent pushData. The examples don't exercise it: rand_simple in data_sum_writer.rs uses a thread_local seed fixed at 12345, so every mapper thread generates the identical byte stream, masking races. A test with distinct per-thread data under contention would validate the invariant the PR depends on.
4. Unauthenticated control plane. The standalone LM rejects auth and exposes shuffle registration / slot allocation to anything that can reach its RPC port. Documented as a limitation, but it warrants explicit deployment guidance (bind to a trusted network only).
5. Permanent client leak by design. celeborn_ffi_create_client's ClientImpl is never freed (shutdown stops the client but doesn't delete it; connect() leaks it entirely on setup_lifecycle_manager failure). Fine for one-shot processes, but any process creating many clients leaks conf/endpoint/client + thread pool each time. Worth documenting the per-process-client assumption prominently, or revisiting the folly teardown ordering that motivates the leak.
🟡 Low / nits
- PR description is stale — still mentions
service/pom.xml+CelebornBuild.scalaedits andcxx; the diff uses a separatelifecycle-managermodule and a plain C ABI. - Daemon shutdown (
LifecycleManagerDaemon): the watchdog'scatch InterruptedExceptionis dead (nothing interrupts it), andexitFn(0)aftershutdownLatch.await()runs during signal-driven shutdown, soSystem.exit(0)is redundant. runUntilStoppedappears unused —maininlines the same logic.-h⇒--hostcollides with the conventional-h=help (--helpis help); consider dropping the-halias.- Testability: the parser calls
sys.exitdirectly, so help/unknown/missing-arg paths aren't unit-testable; Rustconnect()mixes pure validation (codec/app_id/port) with FFI — extracting validation would allow tests without a cluster. exports.txt:*celeborn_ffi*is redundant (subset of*celeborn*).- Start script: the fixed
sleep 2reports success once the PID is alive, but the RPC port may not be bound yet; consider polling.netstat -anLuses BSD/macOS flags (ok as last-resort fallback). - No user-facing docs for a new component + SDK.
Test coverage
- ✅
LifecycleManagerDaemonArgumentshappy paths. - ❌ No coverage for daemon lifecycle,
applyArgsToConf, or the auth-rejection branch. - ❌ No Rust unit tests; examples require a live cluster and don't stress the concurrency invariant.
- ❌ New module untested under sbt (see High #1).
Recommendation: the Maven side and runtime wiring look correct, but the missing sbt project (High #1) is a build/CI break that should be fixed, and the cross-platform CMake discovery (Medium #2) will bite the non-x86_64/Intel targets the README claims to support.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
What changes were proposed in this pull request?
This PR introduces two major features to support non-JVM (C++/Rust) clients using Apache Celeborn for shuffle:
1. Standalone LifecycleManager Daemon (Scala/JVM)
LifecycleManagerDaemon— a standalone JVM process that hosts aLifecycleManagerindependently from any compute engine (Spark/Flink) Driver. It installs a shutdown hook (with a watchdog that force-halts if graceful stop exceeds the timeout) and blocks until SIGINT/SIGTERM.LifecycleManagerDaemonArgumentsfor CLI argument parsing (--app-id,--master-endpoints,--port/-p,--host,--properties-file,-h/--help). Parsing is a pure function that throwsArgumentParseException(carrying an exit code) so every branch is unit-testable;parseOrExitwraps it for the process entry point.sbin/start-lifecycle-manager.shlaunch script with classpath assembly, environment loading, required-argument validation, automatic free-port selection, and RPC-port polling to confirm the daemon is actually bound before reporting success.lifecycle-managerMaven/sbt module that depends onceleborn-service,celeborn-clientandceleborn-common. Registered the module in both the rootpom.xmlandproject/CelebornBuild.scala(projectDefinitions), and wired it intobuild/make-distribution.shfor both the Maven and sbt build paths.2. Rust SDK via C++ FFI (
rust/directory)celeborn-client-sys: Low-level FFI crate bridging Rust ↔ C++ via a plain C ABI (nocxx). The C++ side exposesceleborn_ffi_*functions (create_client,setup_lifecycle_manager,shutdown,push_data,mapper_end, partition reader open/read/close, etc.) returning status codes plus heap-allocated error strings.build.rs: links the single aggregated shared librarylibceleborn_client.{so,dylib}(which whole-archives all internal static libs and hides non-celebornsymbols), so downstream Rust never sees folly / protobuf / glog / abseil.celeborn-client: Safe, ergonomic Rust wrapper providingShuffleClientwith:validate_connect_args(app_id non-empty, port > 0, codec ∈ {NONE, LZ4, ZSTD}) — testable without a live cluster.Send + Syncrationale (the C++ShuffleClientImplsynchronizes internally), enablingArc<ShuffleClient>sharing for concurrent&selfpush/read.Drop-safe shutdown that nulls the handle to avoid a doubleceleborn_ffi_shutdown. The native handle is intentionally leaked after shutdown to dodge a follyEventBaseteardown race; this implies a per-process-client usage model, which is documented prominently on the type.data_sum_writer.rs,data_sum_reader.rs) mirroring the existing C++DataSumWithWriterClient/DataSumWithReaderClienttest programs. The writer seeds its RNG per mapper thread so each thread emits a distinct byte stream, genuinely exercising the concurrent push path.3. C++ build portability (
cpp/CMakeLists.txt)brew --prefixinstead of hard-coding/opt/homebrew(so Intel macOS under/usr/localworks too).find_package(absl CONFIG), falling back to a toolchain-derived GNU multiarch dir (coversaarch64-linux-gnu, which the README advertises); a missing Abseil is now aFATAL_ERRORrather than a silent warning.-msse4.2flag by architecture so aarch64 builds compile.Why are the changes needed?
Currently,
LifecycleManagercan only run embedded inside a JVM-based compute engine Driver (e.g., Spark Driver). This makes it impossible for non-JVM applications (Daft engine, etc.) to use Celeborn as their shuffle service, because:LifecycleManagerto coordinate shuffle metadata (register shuffles, allocate slots, manage partition locations) with Celeborn Masters and Workers.LifecycleManager, non-JVM applications have no way to bootstrap this coordination layer.By decoupling the
LifecycleManagerinto a standalone daemon process, any client — regardless of language runtime — can connect to it via RPC. The Rust SDK then leverages this architecture to provide first-class Rust support by bridging to the existing, battle-tested C++ client implementation via FFI.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
Yes.
LifecycleManagerdaemon viasbin/start-lifecycle-manager.sh --app-id <id> --master-endpoints <eps> [--port <port>] [--host <host>].celeborn-clientcrate to perform shuffle read/write operations against a Celeborn cluster.LifecycleManagerdoes not support auth (celeborn.auth.enabledmust befalse), as the C++/Rust clients lack SASL support. Deploy it on a trusted network only.How was this patch tested?
LifecycleManagerDaemonArgumentsSuitecovers the happy paths plus help / unknown-arg / missing-arg / invalid-port branches andapplyArgsToConf(13 tests). Run via Maven (mvn -pl lifecycle-manager test) and compiled/style-checked under sbt as a first-class module.validate_connect_argsis covered bycargo test -p celeborn-client(codec/app_id/port validation) without requiring a cluster.data_sum_writer/data_sum_readerexamples (Rust ports ofDataSumWithWriterClient.cpp/DataSumWithReaderClient.cpp), which write random numeric data across partitions and verify correctness by comparing partition sums between writer and reader. TheLifecycleManagerDaemonwas tested by starting it against a local Celeborn cluster (Master + Workers) and verifying the Rust examples connect, push, and read through the daemon.