11// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22// SPDX-License-Identifier: Apache-2.0
33
4- //! Implementation of the publisher part of the [OTEL process context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719)
4+ //! Implementation of the publisher part of the [OTEL process
5+ //! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719)
56//!
67//! # A note on race conditions
78//!
@@ -25,7 +26,7 @@ pub mod linux {
2526 ffi:: c_void,
2627 mem:: ManuallyDrop ,
2728 os:: fd:: AsFd as _,
28- ptr,
29+ ptr:: { self , addr_of_mut } ,
2930 sync:: {
3031 atomic:: { fence, AtomicU64 , Ordering } ,
3132 Mutex , MutexGuard ,
@@ -238,8 +239,9 @@ pub mod linux {
238239 . context ( "madvise MADVISE_DONTFORK failed" ) ?;
239240
240241 let published_at_ns = time_now_ns ( ) . ok_or_else ( || {
241- anyhow:: anyhow!( "fail to get current time for process context publication" )
242+ anyhow:: anyhow!( "failed to get current time for process context publication" )
242243 } ) ?;
244+
243245 let header = mapping. start_addr as * mut MappingHeader ;
244246
245247 unsafe {
@@ -281,12 +283,41 @@ pub mod linux {
281283 } )
282284 }
283285
284- /// Updates the context after initial publication. Currently unimplemented (always returns
285- /// `Err`).
286- fn update ( & mut self ) -> anyhow:: Result < ( ) > {
287- Err ( anyhow:: anyhow!(
288- "process context update isn't implemented yet"
289- ) )
286+ /// Updates the context after initial publication.
287+ fn update ( & mut self , payload : Vec < u8 > ) -> anyhow:: Result < ( ) > {
288+ let header = self . mapping . start_addr as * mut MappingHeader ;
289+
290+ // Note that setting `published_at_ns` to zero doesn't entirely avoid data races with
291+ // the reader in theory, which could have read a previous non-zero value just before we
292+ // flipped it but still see subsequent writes. However, since the reader is totally
293+ // unable to manifest itself to the updating process, we can't have a truly atomic
294+ // update of the whole header, and is the best we can do.
295+ let published_at_atomic =
296+ unsafe { AtomicU64 :: from_ptr ( addr_of_mut ! ( ( * header) . published_at_ns) ) } ;
297+
298+ // A process shouldn't try to concurrently update its own context, so this shouldn't
299+ // really happen.
300+ if published_at_atomic. swap ( 0 , Ordering :: Acquire ) == 0 {
301+ return Err ( anyhow:: anyhow!(
302+ "concurrent update of the process context is not supported"
303+ ) ) ;
304+ }
305+
306+ let published_at_ns = time_now_ns ( )
307+ . ok_or_else ( || anyhow:: anyhow!( "could not get the current timestamp" ) ) ?;
308+
309+ self . payload = payload;
310+
311+ unsafe {
312+ ( * header) . payload_ptr = self . payload . as_ptr ( ) ;
313+ ( * header) . payload_size = self . payload . len ( ) . try_into ( ) . map_err ( |_| {
314+ anyhow:: anyhow!( "couldn't update process protocol: new payload too large" )
315+ } ) ?;
316+ }
317+
318+ published_at_atomic. store ( published_at_ns, Ordering :: Release ) ;
319+
320+ Ok ( ( ) )
290321 }
291322 }
292323
@@ -320,7 +351,7 @@ pub mod linux {
320351 let mut guard = lock_context_handle ( ) ?;
321352
322353 match & mut * guard {
323- Some ( handler) => handler. update ( ) ,
354+ Some ( handler) => handler. update ( payload ) ,
324355 None => {
325356 * guard = Some ( ProcessContextHandle :: publish ( payload) ?) ;
326357 Ok ( ( ) )
0 commit comments