Severity: medium-high
timely/src/dataflow/operators/vec/broadcast.rs:31— The broadcast operator is implemented asflat_map(|x| (0..peers).map(|i| (i, x.clone()))).exchange(|ix| ix.0).map(|(_i,x)| x). Each record is clonedpeerstimes, wrapped in tuples, exchanged, then unwrapped. This is O(peers * records) in allocations. The comment acknowledges: "Simplified implementation... Optimize once they have settled down."
Severity: medium
The inter-thread communication path does three operations per message push with no batching:
communication/src/allocator/counters.rs:47-49—events.push(self.index)appends to a shared Vec on every push, growing O(messages) between drains.communication/src/allocator/counters.rs:102—self.buzzer.buzz()calls unpark/condvar on every push, even when the target thread is already awake.communication/src/allocator/process.rs:189-194—receive()drains all pending mpsc messages into the events Vec in one shot, no bound or backpressure.
Commented-out code in counters.rs:34-44 shows a batching strategy was considered but not completed.
Severity: medium
Multiple buffers grow to their high-water mark and never shrink:
communication/src/allocator/zero_copy/bytes_slab.rs:106—in_progressVec grows as buffers are retired, never shrinks. Slow consumers cause monotonic growth.communication/src/allocator/zero_copy/bytes_exchange.rs:31—MergeQueueVecDeque grows without backpressure under producer-consumer imbalance.communication/src/allocator/zero_copy/allocator.rs:277-289andallocator_process.rs:204-216— Per-channelVecDeque<Bytes>grows without limit if consumers are slow.communication/src/allocator/zero_copy/tcp.rs:53-56—stagedsinner Vecs retain peak capacity.communication/src/allocator/zero_copy/allocator.rs:128andallocator_process.rs:118—stagedVec retains high-water-mark capacity.
Severity: medium
timely/src/dataflow/operators/capability.rs:154-161—try_downgradecreates a new intermediateCapability(incrementing ChangeBatch), then drops the old one (decrementing). Twoborrow_mut+updatecalls when one in-place update would suffice.timely/src/dataflow/operators/generic/notificator.rs:323—make_availableclones capabilities frompendinginstead of moving them. A TODO comment acknowledges this.timely/src/dataflow/operators/capability.rs:167-171—Capability::dropclones the time to callupdate(time.clone(), -1)becauseupdatetakes ownership.
Severity: medium
timely/src/worker.rs:391,401—self.logging()is called multiple times perstep_or_park(). Each call goes throughself.log_register()→borrow()→HashMap::get("timely"), performing a string lookup on every worker step. Should be cached in theWorkerstruct.
Severity: medium
timely/src/dataflow/operators/core/capture/event.rs:75— Every pushed event creates a newRc<EventLink>. For high-throughput capture, this is one heap allocation per event. A pre-allocated ring buffer or arena would be more efficient.
Severity: medium
timely/src/dataflow/operators/core/reclock.rs:55-79— The stash is aVecscanned linearly per notification, thenretainshifts elements. With many distinct timestamps this becomes O(n^2). ABTreeMap<T, Vec<C>>would give O(log n) lookups and efficient range removal.
Severity: medium (when logging enabled)
timely/src/progress/broadcast.rs:66-67,120-121— Everysend()/recv()allocates twoVecs for logging that are transferred to the logger by ownership.timely/src/progress/reachability.rs:852,867—log_source_updates/log_target_updatescollect into new Vecs, cloning every timestamp.timely/src/logging.rs:51—BatchLogger::publish_batchallocates a 2-element Vec per progress frontier advance.
Severity: medium
communication/src/initialize.rs:157— Default refill closure createsBox::new(vec![0_u8; size]): the Vec already heap-allocates its buffer, then Box adds another heap allocation and pointer indirection.vec![0u8; size].into_boxed_slice()would eliminate the Vec metadata overhead.
Severity: low-medium
communication/src/allocator/zero_copy/tcp.rs:99-101—bytes.clone()for every target in the range. For unicast messages (the common case wheretarget_upper - target_lower == 1), the originalbytescould be moved instead of cloned, saving one atomic refcount increment/decrement pair per message.
Severity: low-medium
timely/src/scheduling/activate.rs:280—SyncActivator::activate()clonesself.path(aVec<usize>) on every call.timely/src/scheduling/activate.rs:87—activate_afterallocatespath.to_vec()per delayed activation. UsingRc<[usize]>would avoid per-call allocation.
Severity: low-medium
timely/src/dataflow/channels/pushers/exchange.rs:57,67—time.clone()inside the per-container extraction loop. For complex product timestamps, this adds up.
Severity: low-medium
timely/src/synchronization/sequence.rs:185— Sink re-sorts the entirerecvdvector each invocation, including already-sorted elements. Should sort only new elements and merge.timely/src/synchronization/sequence.rs:153— Clones each elementpeers - 1times; the last iteration could move.
Severity: low
communication/src/allocator/thread.rs:61— The shared tuple contains two VecDeques but the recycling code using the second one (lines 97-102) is commented out. The second VecDeque is allocated but never used.
Severity: low-medium
timely/src/dataflow/operators/core/partition.rs:61-67— Creates aBTreeMap<u64, Vec<_>>to buffer data per partition before pushing to outputs. Data could be pushed directly to per-output container builders without the intermediate collection.
container/src/lib.rs:150—CapacityContainerBuilder::pendingVecDeque grows but never shrinks.relax()is a no-op.container/src/lib.rs:216-219—ensure_capacitycomputesreserve(preferred - capacity())but should usereserve(preferred - len()).- Various
BinaryHeapandVecinstances across the codebase that drain but never shrink (standard amortized pattern, acceptable in most cases). timely/src/synchronization/barrier.rs:23—Worker::clone()deep-clonesConfig(which contains aHashMap), but Config could beArc-wrapped.