Skip to content

Commit 0dfae20

Browse files
committed
chore: Move flow expiration from PQ to timers
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
1 parent 1198f21 commit 0dfae20

11 files changed

Lines changed: 281 additions & 747 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dataplane/src/packet_processor/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::packet_processor::ipforward::IpForwarder;
1212

1313
use concurrency::sync::Arc;
1414

15-
use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable};
15+
use flow_entry::flow_table::{FlowLookup, FlowTable};
1616
use flow_filter::{FlowFilter, FlowFilterTableWriter};
1717

1818
use nat::portfw::{PortForwarder, PortFwTableWriter};
@@ -90,15 +90,14 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
9090
let stats_stage = Stats::new("stats", writer.clone());
9191
let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle());
9292
let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone());
93-
let flow_expirations_nf = ExpirationsNF::new(flow_table.clone());
9493
let portfw = PortForwarder::new(
9594
"port-forwarder",
9695
portfw_factory.handle(),
9796
flow_table.clone(),
9897
);
9998

10099
// Build the pipeline for a router. The composition of the pipeline (in stages) is currently
101-
// hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last
100+
// hard-coded. Flow expiration is handled by per-flow tokio timers; no ExpirationsNF needed.
102101
DynPipeline::new()
103102
.add_stage(stage_ingress)
104103
.add_stage(iprouter1)
@@ -109,7 +108,6 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
109108
.add_stage(stateful_nat)
110109
.add_stage(iprouter2)
111110
.add_stage(stage_egress)
112-
.add_stage(flow_expirations_nf)
113111
.add_stage(pktdump)
114112
.add_stage(stats_stage)
115113
};

flow-entry/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ etherparse = { workspace = true }
2020
linkme = { workspace = true }
2121
net = { workspace = true }
2222
pipeline = { workspace = true }
23-
priority-queue = { workspace = true }
2423
thiserror = { workspace = true }
25-
thread_local = { workspace = true }
24+
tokio = { workspace = true, features = ["rt", "time"] }
2625
tracectl = { workspace = true }
2726
tracing = { workspace = true }
2827

2928
[dev-dependencies]
3029
bolero = { workspace = true, default-features = false }
3130
net = { workspace = true, features = ["bolero"] }
31+
tokio = { workspace = true, features = ["macros", "rt", "time"] }
3232
tracing-test = { workspace = true, features = [] }
3333
shuttle = { workspace = true }
Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,31 @@
11
# Flow Table
22

3-
The current implementation of flow table uses `dash_map` and per-thread priority queue's (for timeouts) along with `Arc` and `Weak` to get a reasonable flow table with timeouts.
4-
However, it leaves a lot of room for optimizations.
3+
The flow table uses `DashMap` for concurrent key→value storage and per-flow
4+
tokio timers for expiration.
55

66
## Flow Table Implementation
77

8-
The main `DashMap` holds `Weak` references to all the flow entries so that the memory gets automatically deallocated when the entry times out.
9-
10-
The priority queue's hold `Arc` references to the flow entries to keep them alive when they are not in any packet meta data.
11-
When the entry times-out and is removed from the priority queue and the last packet referencing that flow is dropped, the memory for the entry is freed.
12-
13-
Note that in the current implementation, a flow is not removed from the flow table until the last Arc to the flow_info is dropped or the flow entry is replaced. This can be changed if needed, or even have it be an option on the flow as to whether timeout removes the flow or not.
14-
15-
## Optimizations
16-
17-
In the current implementation, there has to be periodic or on-timeout reaping the Weak reference in the hash table.
18-
This is better done by having a version of `DashMap` that can reap the dead `Weak` reference as it walks the table on lookups, instead of waiting for key collisions.
19-
The hope, for now, is that the entries in the hash table array will contain a small pointer and not take up too much extra memory.
20-
Those dead `Weak` pointers will prevent shrinking of the hash table though, if the implementation supports that.
21-
22-
Second, the `priority_queue` crate uses a `HashMap` inside the queue in order to allow fast removal and re-insertion.
23-
However, this wastes space and requires extra hashes.
24-
The better way to do this is to have a custom priority queue integrated with the custom weak-reaping hash map so that the same hash table can be used for both operations.
25-
This improves cache locality, reduces memory utlization, and avoids multiple hash table lookups in many cases.
26-
27-
However, in the interest of time to completion for the code, this module currently uses existing data structures instead of full custom implementations of everything.
28-
However, the interface should be able to hide a change from the current to the optimized implementation.
8+
The `DashMap` stores `Arc<FlowInfo>` values directly, so the table is the
9+
primary strong-reference keeper for each flow entry. Callers that need to
10+
retain a flow (e.g. pipeline stages that tag packets) clone the `Arc` out of
11+
`lookup()`.
12+
13+
When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is
14+
present) that sleeps until the flow's deadline and then calls
15+
`update_status(FlowStatus::Expired)`. The DashMap entry is not removed by the
16+
timer; instead, `lookup()` performs lazy cleanup: if a looked-up entry is
17+
`Expired` or `Cancelled` it is removed from the map and `None` is returned.
18+
The same lazy path covers the case where a deadline passes without a timer
19+
firing (e.g. in non-tokio test contexts).
20+
21+
`FlowInfo::related` holds a `Weak<FlowInfo>` pointing to the reverse-direction
22+
flow of a bidirectional pair. This avoids reference cycles: the two entries
23+
are independent `Arc`s in the table, and the `Weak` merely lets one side
24+
observe the other without keeping it alive.
25+
26+
## Non-tokio contexts (shuttle / sync tests)
27+
28+
When no tokio runtime is present `Handle::try_current()` returns `Err` and no
29+
timer is spawned. Tests simulate expiration by calling
30+
`flow_info.update_status(FlowStatus::Expired)` directly, after which the next
31+
`lookup()` call performs the lazy removal.

flow-entry/src/flow_table/display.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ impl Display for FlowTable {
2323
Heading(format!("Flow Table ({} entries)", table.len())).fmt(f)?;
2424
for entry in table.iter() {
2525
let key = entry.key();
26-
match entry.value().upgrade() {
27-
Some(value) => writeln!(f, "key = {key}\ndata = {value}")?,
28-
None => writeln!(f, "key = {key} NONE")?,
29-
}
26+
let value = entry.value();
27+
writeln!(f, "key = {key}\ndata = {value}")?;
3028
}
3129
} else {
3230
write!(f, "Failed to lock flow table")?;

flow-entry/src/flow_table/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@
22
// Copyright Open Network Fabric Authors
33

44
mod display;
5-
pub mod nf_expirations;
65
pub mod nf_lookup;
76
pub mod table;
8-
mod thread_local_pq;
97

8+
pub use nf_lookup::FlowLookup;
109
pub use table::FlowTable;
1110

1211
pub use net::flows::atomic_instant::AtomicInstant;
1312
pub use net::flows::flow_info::*;
14-
pub use nf_expirations::ExpirationsNF;
15-
pub use nf_lookup::FlowLookup;
1613

1714
use tracectl::trace_target;
1815
trace_target!("flow-table", LevelFilter::INFO, &["pipeline"]);

flow-entry/src/flow_table/nf_expirations.rs

Lines changed: 0 additions & 142 deletions
This file was deleted.

0 commit comments

Comments
 (0)