Skip to content

Commit 6598cc3

Browse files
committed
feat: add process context publication
1 parent bfceeaf commit 6598cc3

3 files changed

Lines changed: 366 additions & 1 deletion

File tree

libdd-library-config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ anyhow = "1.0"
2222
rand = "0.8.3"
2323
rmp = "0.8.14"
2424
rmp-serde = "1.3.0"
25-
rustix = { version = "1.1.3", features = ["param", "mm"] }
25+
rustix = { version = "1.1.3", features = ["param", "mm", "process"] }
2626

2727
[dev-dependencies]
2828
tempfile = { version = "3.3" }

libdd-library-config/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
3+
pub mod process_context;
34
pub mod tracer_metadata;
45

56
use std::borrow::Cow;
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Implementation of the publisher part of the [process sharing protocol](https://github.com/open-telemetry/opentelemetry-specification/pull/4719)
5+
//!
6+
//! # A note on race conditions
7+
//!
8+
//! Process context sharing implies concurrently writing to a memory area that another process
9+
//! might be actively reading. However, reading isn't done as direct memory accesses but go through
10+
//! the OS, so the Rust definition of race conditions doesn't really apply.
11+
//!
12+
//! Still, we typically want to avoid the compiler and the hardware to re-order the write to the
13+
//! signature (which should be last according to the specification) with the writes to other fields
14+
//! of the header.
15+
//!
16+
//! To do so, we implement synchronization during publication _as if the reader were another thread
17+
//! of this program_, using atomics.
18+
19+
/// Current version of the process context format
20+
pub const PROCESS_CTX_VERSION: u32 = 2;
21+
/// Signature bytes for identifying process context mappings
22+
pub const SIGNATURE: &[u8; 8] = b"OTEL_CTX";
23+
/// The discoverable name of the memory mapping.
24+
pub const MAPPING_NAME: &str = "OTEL_CTX";
25+
26+
#[cfg(target_os = "linux")]
27+
#[cfg(target_has_atomic = "64")]
28+
pub mod linux {
29+
use super::{MAPPING_NAME, PROCESS_CTX_VERSION, SIGNATURE};
30+
31+
use std::{
32+
ffi::c_void,
33+
mem::ManuallyDrop,
34+
os::fd::{AsFd as _, OwnedFd},
35+
ptr,
36+
sync::{
37+
atomic::{AtomicU64, Ordering},
38+
Mutex, MutexGuard,
39+
},
40+
time::{SystemTime, UNIX_EPOCH},
41+
};
42+
43+
use anyhow::Context;
44+
45+
use rustix::{
46+
fs::{ftruncate, memfd_create, MemfdFlags},
47+
mm::{madvise, mmap, mmap_anonymous, munmap, Advice, MapFlags, ProtFlags},
48+
param::page_size,
49+
process::{getpid, set_virtual_memory_region_name, Pid},
50+
};
51+
52+
/// The header structure written at the start of the mapping. This must match the C
53+
/// layout of the specification.
54+
///
55+
/// # Atomic accesses
56+
///
57+
/// The publishing protocol requires some form of synchronization. Using fences or any non-OS
58+
/// based synchronization requires the use of atomics to have any effect (see [Mandatory
59+
/// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic))
60+
///
61+
/// We use `signature` as a release notification for publication, and `published_at_ns` for
62+
/// updates. Ideally, those should be two `AtomicU64`, but this isn't compatible with
63+
/// `#[repr(C, packed)]`, since `AtomicU64` can't be used in a packed structure for alignment
64+
/// reason (what's more, their alignment might be bigger than the one of `u64` on some
65+
/// platforms).
66+
///
67+
/// In practice, given the page size and the layout of `MappingHeader`, the alignment should
68+
/// match (we statically test for it anyway). We can then use [`AtomicU64::from_ptr`] to create
69+
/// an atomic view of those fields when synchronization is needed.
70+
#[repr(C, packed)]
71+
struct MappingHeader {
72+
signature: [u8; 8],
73+
version: u32,
74+
payload_size: u32,
75+
published_at_ns: u64,
76+
payload_ptr: *const u8,
77+
}
78+
79+
/// The shared memory mapped area to publish the context to. The memory region is owned by a
80+
/// [MemMapping] instance and is automatically unmapped upon drop.
81+
///
82+
/// # Safety
83+
///
84+
/// The following invariants MUST always hold for safety and are guaranteed by [MemMapping]:
85+
/// - `start` is non-null, is coming from a previous call to `mmap` with a size value of
86+
/// [mapping_size] and hasn't been unmmaped since.
87+
/// - once `self` has been dropped, no memory access must be performed on the memory previously
88+
/// pointed to by `start`.
89+
struct MemMapping {
90+
start_addr: *mut c_void,
91+
/// The file descriptor, if the mapping was successfully created from `memfd`.
92+
fd: Option<OwnedFd>,
93+
}
94+
95+
// Safety: MemMapping represents ownership over the mapped region. It never leaks or
96+
// share the internal pointer. It's also safe to drop (`munmap`) from a different thread.
97+
unsafe impl Send for MemMapping {}
98+
99+
/// The global instance of the context for the current process.
100+
///
101+
/// We need a mutex to have a static handle and avoid bothering the users of this API with
102+
/// keeping a handle around and ensure it's not dropped, but we don't expect this mutex to
103+
/// actually be contended in practice. Ideally a single thread should handle context updates,
104+
/// even if it's not strictly required.
105+
static PROCESS_CONTEXT_HANDLER: Mutex<Option<ProcessContextHandle>> = Mutex::new(None);
106+
107+
impl MemMapping {
108+
/// Creates a suitable memory mapping for the context protocol to be published.
109+
///
110+
/// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping on
111+
/// old kernels that don't support `memfd` (or if `memfd` failed).
112+
fn new() -> anyhow::Result<Self> {
113+
let size = mapping_size();
114+
115+
memfd_create(
116+
MAPPING_NAME,
117+
MemfdFlags::CLOEXEC | MemfdFlags::NOEXEC_SEAL | MemfdFlags::ALLOW_SEALING,
118+
)
119+
.or_else(|_| memfd_create(MAPPING_NAME, MemfdFlags::CLOEXEC | MemfdFlags::ALLOW_SEALING))
120+
.and_then(|fd| {
121+
ftruncate(fd.as_fd(), mapping_size() as u64)?;
122+
// Safety: we pass a null pointer to mmap which is unconditionally ok
123+
let start_addr = unsafe {
124+
mmap(
125+
ptr::null_mut(),
126+
size,
127+
ProtFlags::WRITE | ProtFlags::READ,
128+
MapFlags::SHARED,
129+
fd.as_fd(),
130+
0,
131+
)?
132+
};
133+
134+
Ok(MemMapping {
135+
start_addr,
136+
fd: Some(fd),
137+
})
138+
})
139+
// If any previous step failed, we fallback to an anonymous mapping
140+
.or_else(|_| {
141+
// Safety: we pass a null pointer to mmap, no precondition to uphold
142+
let start_addr = unsafe {
143+
mmap_anonymous(
144+
ptr::null_mut(),
145+
size,
146+
ProtFlags::WRITE | ProtFlags::READ,
147+
MapFlags::PRIVATE,
148+
)
149+
.context(
150+
"Couldn't create a memfd or anonymous mmapped region for process context publication",
151+
)?
152+
};
153+
154+
Ok(MemMapping { start_addr, fd: None })
155+
})
156+
}
157+
158+
/// Makes this mapping discoverable by giving it a name. This is not required for a
159+
/// memfd-backed mapping.
160+
fn set_name(&mut self) -> anyhow::Result<()> {
161+
// Safety: the invariants of `MemMapping` ensures that `start` is non null and comes
162+
// from a previous call to `mmap` of size `mapping_size()`
163+
set_virtual_memory_region_name(
164+
unsafe { std::slice::from_raw_parts(self.start_addr as *const u8, mapping_size()) },
165+
Some(
166+
std::ffi::CString::new(MAPPING_NAME)
167+
.context("unexpected null byte in process context mapping name")?
168+
.as_c_str(),
169+
),
170+
)?;
171+
Ok(())
172+
}
173+
174+
/// Unmaps the underlying memory region and close the memfd file descriptor, if set. This
175+
/// has same effect as dropping `self`, but propagates potential errors.
176+
fn free(mut self) -> anyhow::Result<()> {
177+
// Safety: We put `self` in a `ManuallyDrop`, which prevents drop and future calls to
178+
// `free()`.
179+
unsafe {
180+
self.unmap()?;
181+
}
182+
183+
// Ensure `fd` is dropped and thus closed
184+
self.fd = None;
185+
// Prevent `Self::drop` from being called
186+
let _ = ManuallyDrop::new(self);
187+
188+
Ok(())
189+
}
190+
191+
/// Unmaps the underlying memory region. For internal use only; prefer `free()` or `drop()`.
192+
///
193+
/// # Safety
194+
///
195+
/// This method must only be called once. After calling `unmap()`, no other method of
196+
/// `MemMapping` must be ever called on `self` again, including `unmap()` and `drop()`.
197+
///
198+
/// Practically, `self` must be put in a `ManuallyDrop` wrapper and forgotten.
199+
unsafe fn unmap(&mut self) -> anyhow::Result<()> {
200+
unsafe {
201+
munmap(self.start_addr, mapping_size()).map_err(|errno| {
202+
anyhow::anyhow!(
203+
"munmap failed when freeing the process context with error {errno}"
204+
)
205+
})
206+
}
207+
}
208+
}
209+
210+
impl Drop for MemMapping {
211+
fn drop(&mut self) {
212+
// Safety: `self` is being dropped
213+
let _ = unsafe { self.unmap() };
214+
}
215+
}
216+
217+
/// Handle for future updates of a published process context.
218+
#[cfg(target_os = "linux")]
219+
struct ProcessContextHandle {
220+
mapping: MemMapping,
221+
/// Once published, and until the next update is complete, the backing allocation of
222+
/// `payload` might be read by external processes and thus most not move (e.g. by resizing
223+
/// or drop).
224+
#[allow(unused)]
225+
payload: Vec<u8>,
226+
#[allow(unused)]
227+
publisher_pid: Pid,
228+
}
229+
230+
impl ProcessContextHandle {
231+
/// Initial publication of the process context. Creates an appropriate memory mapping.
232+
fn publish(payload: Vec<u8>) -> anyhow::Result<Self> {
233+
let mut mapping = MemMapping::new()?;
234+
let size = mapping_size();
235+
236+
// Checks that the layout allow us to access `signature` and `published_at_ns` as
237+
// atomics u64. Page size is at minimum 4KB and will be always 8 bytes aligned even on
238+
// exotic platforms. The respective offsets of `signature` and `published_at_ns` are
239+
// 0 and 8 bytes, so it suffices for `AtomicU64` to require an alignment of at most 8
240+
// (which is the expected alignment anyway).
241+
//
242+
// Note that `align_of` is a `const fn`, so this is in fact a compile-time check and
243+
// will be optimized away, hence the `allow(unreachable_code)`.
244+
#[allow(unreachable_code)]
245+
if std::mem::align_of::<AtomicU64>() > 8 {
246+
return Err(anyhow::anyhow!("alignment constraints forbid the use of atomics for publishing the protocol context"));
247+
}
248+
249+
// Safety: the invariants of MemMapping ensures `start_addr` is not null and comes
250+
// from a previous call to `mmap`
251+
unsafe { madvise(mapping.start_addr, size, Advice::LinuxDontFork) }
252+
.context("madvise MADVISE_DONTFORK failed")?;
253+
254+
let published_at_ns = time_now_ns();
255+
256+
if published_at_ns == 0 {
257+
return Err(anyhow::anyhow!(
258+
"failed to get current time for process context publication"
259+
));
260+
}
261+
262+
let header = mapping.start_addr as *mut MappingHeader;
263+
264+
unsafe {
265+
// Safety: MappingHeader is packed, thus have no alignment requirement. It points
266+
// to a freshly mmaped region which is valid for writing at least PAGE_SIZE bytes,
267+
// which is greater than the size of MappingHeader.
268+
ptr::write(
269+
header,
270+
MappingHeader {
271+
// signature will be set atomically at last
272+
signature: [0; 8],
273+
version: PROCESS_CTX_VERSION,
274+
payload_size: payload
275+
.len()
276+
.try_into()
277+
.context("payload size overflowed")?,
278+
published_at_ns,
279+
payload_ptr: payload.as_ptr(),
280+
},
281+
);
282+
// Signature is set last, which means that all the previous stores happens-before it
283+
// (program order on a given single thread). Any fence or atomic load from the
284+
// reader side which loads the completed signature with at least
285+
// `Acquire` ordering will create a happens-before relationship with
286+
// `signature`, ensuring the header is seen as fully initialized on
287+
// their side.
288+
AtomicU64::from_ptr((*header).signature.as_mut_ptr().cast::<u64>())
289+
.store(u64::from_be_bytes(SIGNATURE.clone()), Ordering::Release);
290+
}
291+
292+
// For anonymous mappings, try to name it (optional, may fail on older kernels).
293+
// `memfd` mappings don't need this - the name shows in /proc/pid/maps automatically
294+
if mapping.fd.is_none() {
295+
let _ = mapping.set_name();
296+
}
297+
298+
Ok(ProcessContextHandle {
299+
mapping,
300+
payload,
301+
publisher_pid: getpid(),
302+
})
303+
}
304+
305+
/// Updates the context after initial publication. Currently unimplemented (always returns
306+
/// `Err`).
307+
fn update(&mut self) -> anyhow::Result<()> {
308+
Err(anyhow::anyhow!(
309+
"process context update isn't implemented yet"
310+
))
311+
}
312+
}
313+
314+
fn time_now_ns() -> u64 {
315+
SystemTime::now()
316+
.duration_since(UNIX_EPOCH)
317+
.ok()
318+
.and_then(|d| u64::try_from(d.as_nanos()).ok())
319+
.unwrap_or(0)
320+
}
321+
322+
fn mapping_size() -> usize {
323+
page_size() * 2
324+
}
325+
326+
/// Locks the context handle. Returns a uniform error if the lock has been poisoned.
327+
fn lock_context_handle<'a>() -> anyhow::Result<MutexGuard<'a, Option<ProcessContextHandle>>> {
328+
PROCESS_CONTEXT_HANDLER.lock().map_err(|_| {
329+
anyhow::anyhow!("a thread panicked while operating on the process context handler")
330+
})
331+
}
332+
333+
/// Publishes or updates the process context for it to be visible by external readers.
334+
///
335+
/// If this is the first publication, or if [unpublish] has been called last, this will follow
336+
/// the Publish protocol of the process context specification.
337+
///
338+
/// Otherwise, the context is updated following the Update protocol.
339+
pub fn publish(payload: Vec<u8>) -> anyhow::Result<()> {
340+
let mut guard = lock_context_handle()?;
341+
342+
match &mut *guard {
343+
Some(handler) => handler.update(),
344+
None => {
345+
*guard = Some(ProcessContextHandle::publish(payload)?);
346+
Ok(())
347+
}
348+
}
349+
}
350+
351+
/// Unmaps the region used to share the process context and close the associated file
352+
/// descriptor, if any. If no context has ever been published, this is no-op.
353+
///
354+
/// A call to [publish] following an [unpublish] will create a new mapping.
355+
pub fn unpublish() -> anyhow::Result<()> {
356+
let mut guard = lock_context_handle()?;
357+
358+
if let Some(ProcessContextHandle { mapping, .. }) = guard.take() {
359+
mapping.free()?;
360+
}
361+
362+
Ok(())
363+
}
364+
}

0 commit comments

Comments
 (0)