Skip to content

Commit 8acde72

Browse files
frailltTimonPost
authored andcommitted
Codebase improvements (#258)
1 parent b0121dc commit 8acde72

18 files changed

Lines changed: 1684 additions & 1393 deletions

src/either.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#[derive(Debug)]
12
pub(crate) enum Either<L, R> {
23
Left(L),
34
Right(R),

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ mod throughput;
3737
#[cfg(feature = "tester")]
3838
pub use self::throughput::ThroughputMonitoring;
3939

40+
#[cfg(test)]
41+
pub mod test_utils;
42+
4043
pub use self::config::Config;
4144
pub use self::error::{ErrorKind, Result};
4245
pub use self::net::{LinkConditioner, Socket, SocketEvent};

src/net.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
//! You can think of the socket, connection management, congestion control.
33
44
mod connection;
5+
mod connection_impl;
6+
mod connection_manager;
57
mod events;
68
mod link_conditioner;
79
mod quality;
@@ -10,6 +12,8 @@ mod virtual_connection;
1012

1113
pub mod constants;
1214

15+
pub use self::connection::{Connection, ConnectionEventAddress, ConnectionMessenger};
16+
pub use self::connection_manager::{ConnectionManager, DatagramSocket};
1317
pub use self::events::SocketEvent;
1418
pub use self::link_conditioner::LinkConditioner;
1519
pub use self::quality::{NetworkQuality, RttMeasurer};

src/net/connection.rs

Lines changed: 73 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -1,187 +1,73 @@
1-
pub use crate::net::{NetworkQuality, RttMeasurer, VirtualConnection};
2-
3-
use crate::config::Config;
4-
use crate::either::Either::{self, Left, Right};
5-
use std::{
6-
collections::HashMap,
7-
net::SocketAddr,
8-
time::{Duration, Instant},
9-
};
10-
11-
/// Maintains a registry of active "connections". Essentially, when we receive a packet on the
12-
/// socket from a particular `SocketAddr`, we will track information about it here.
13-
#[derive(Debug)]
14-
pub struct ActiveConnections {
15-
connections: HashMap<SocketAddr, VirtualConnection>,
16-
}
17-
18-
impl ActiveConnections {
19-
pub fn new() -> Self {
20-
Self {
21-
connections: HashMap::new(),
22-
}
23-
}
24-
25-
/// Try to get a `VirtualConnection` by address. If the connection does not exist, it will be
26-
/// inserted and returned.
27-
pub fn get_or_insert_connection(
28-
&mut self,
29-
address: SocketAddr,
30-
config: &Config,
31-
time: Instant,
32-
) -> &mut VirtualConnection {
33-
self.connections
34-
.entry(address)
35-
.or_insert_with(|| VirtualConnection::new(address, config, time))
36-
}
37-
38-
/// Try to get or create a [VirtualConnection] by address. If the connection does not exist, it will be
39-
/// created and returned, but not inserted into the table of active connections.
40-
pub(crate) fn get_or_create_connection(
41-
&mut self,
42-
address: SocketAddr,
43-
config: &Config,
44-
time: Instant,
45-
) -> Either<&mut VirtualConnection, VirtualConnection> {
46-
if let Some(connection) = self.connections.get_mut(&address) {
47-
Left(connection)
48-
} else {
49-
Right(VirtualConnection::new(address, config, time))
50-
}
51-
}
52-
53-
/// Removes the connection from `ActiveConnections` by socket address.
54-
pub fn remove_connection(
55-
&mut self,
56-
address: &SocketAddr,
57-
) -> Option<(SocketAddr, VirtualConnection)> {
58-
self.connections.remove_entry(address)
59-
}
60-
61-
/// Check for and return `VirtualConnection`s which have been idling longer than `max_idle_time`.
62-
pub fn idle_connections(&mut self, max_idle_time: Duration, time: Instant) -> Vec<SocketAddr> {
63-
self.connections
64-
.iter()
65-
.filter(|(_, connection)| connection.last_heard(time) >= max_idle_time)
66-
.map(|(address, _)| *address)
67-
.collect()
68-
}
69-
70-
/// Get a list of addresses of dead connections
71-
pub fn dead_connections(&mut self) -> Vec<SocketAddr> {
72-
self.connections
73-
.iter()
74-
.filter(|(_, connection)| connection.should_be_dropped())
75-
.map(|(address, _)| *address)
76-
.collect()
77-
}
78-
79-
/// Check for and return `VirtualConnection`s which have not sent anything for a duration of at least `heartbeat_interval`.
80-
pub fn heartbeat_required_connections(
81-
&mut self,
82-
heartbeat_interval: Duration,
83-
time: Instant,
84-
) -> impl Iterator<Item = &mut VirtualConnection> {
85-
self.connections
86-
.iter_mut()
87-
.filter(move |(_, connection)| connection.last_sent(time) >= heartbeat_interval)
88-
.map(|(_, connection)| connection)
89-
}
90-
91-
/// Returns true if the given connection exists.
92-
pub fn exists(&self, address: &SocketAddr) -> bool {
93-
self.connections.contains_key(&address)
94-
}
95-
96-
/// Returns the number of connected clients.
97-
#[cfg(test)]
98-
pub(crate) fn count(&self) -> usize {
99-
self.connections.len()
100-
}
101-
}
102-
103-
#[cfg(test)]
104-
mod tests {
105-
use super::{ActiveConnections, Config};
106-
use std::{
107-
sync::Arc,
108-
time::{Duration, Instant},
109-
};
110-
111-
const ADDRESS: &str = "127.0.0.1:12345";
112-
113-
#[test]
114-
fn connection_timed_out() {
115-
let mut connections = ActiveConnections::new();
116-
let config = Config::default();
117-
118-
let now = Instant::now();
119-
120-
// add 10 clients
121-
for i in 0..10 {
122-
connections.get_or_insert_connection(
123-
format!("127.0.0.1:122{}", i).parse().unwrap(),
124-
&config,
125-
now,
126-
);
127-
}
128-
129-
assert_eq!(connections.count(), 10);
130-
131-
let wait = Duration::from_millis(200);
132-
133-
#[cfg(not(windows))]
134-
let epsilon = Duration::from_nanos(1);
135-
#[cfg(windows)]
136-
let epsilon = Duration::from_millis(1);
137-
138-
let timed_out_connections = connections.idle_connections(wait, now + wait - epsilon);
139-
assert_eq!(timed_out_connections.len(), 0);
140-
141-
let timed_out_connections = connections.idle_connections(wait, now + wait + epsilon);
142-
assert_eq!(timed_out_connections.len(), 10);
143-
}
144-
145-
#[test]
146-
fn insert_connection() {
147-
let mut connections = ActiveConnections::new();
148-
let config = Config::default();
149-
150-
let address = ADDRESS.parse().unwrap();
151-
connections.get_or_insert_connection(address, &config, Instant::now());
152-
assert!(connections.connections.contains_key(&address));
153-
}
154-
155-
#[test]
156-
fn insert_existing_connection() {
157-
let mut connections = ActiveConnections::new();
158-
let config = Config::default();
159-
160-
let address = ADDRESS.parse().unwrap();
161-
connections.get_or_insert_connection(address, &config, Instant::now());
162-
assert!(connections.connections.contains_key(&address));
163-
connections.get_or_insert_connection(address, &config, Instant::now());
164-
assert!(connections.connections.contains_key(&address));
165-
}
166-
167-
#[test]
168-
fn remove_connection() {
169-
let mut connections = ActiveConnections::new();
170-
let config = Arc::new(Config::default());
171-
172-
let address = ADDRESS.parse().unwrap();
173-
connections.get_or_insert_connection(address, &config, Instant::now());
174-
assert!(connections.connections.contains_key(&address));
175-
connections.remove_connection(&address);
176-
assert!(!connections.connections.contains_key(&address));
177-
}
178-
179-
#[test]
180-
fn remove_non_existent_connection() {
181-
let mut connections = ActiveConnections::new();
182-
183-
let address = &ADDRESS.parse().unwrap();
184-
connections.remove_connection(address);
185-
assert!(!connections.connections.contains_key(address));
186-
}
187-
}
1+
use crate::config::Config;
2+
3+
use std::{self, fmt::Debug, net::SocketAddr, time::Instant};
4+
5+
/// Allows connection to send packet, send event and get global configuration.
6+
pub trait ConnectionMessenger<ReceiveEvent: Debug> {
7+
/// Returns global configuration.
8+
fn config(&self) -> &Config;
9+
10+
/// Sends a connection event.
11+
fn send_event(&mut self, address: &SocketAddr, event: ReceiveEvent);
12+
/// Sends a packet.
13+
fn send_packet(&mut self, address: &SocketAddr, payload: &[u8]);
14+
}
15+
16+
/// Returns an address of an event.
17+
/// This is used by a `ConnectionManager`, because it doesn't know anything about connection events.
18+
pub trait ConnectionEventAddress {
19+
/// Returns event address
20+
fn address(&self) -> SocketAddr;
21+
}
22+
23+
/// Allows to implement actual connection.
24+
/// Defines a type of `Send` and `Receive` events, that will be used by a connection.
25+
pub trait Connection: Debug {
26+
/// Defines a user event type.
27+
type SendEvent: Debug + ConnectionEventAddress;
28+
/// Defines a connection event type.
29+
type ReceiveEvent: Debug + ConnectionEventAddress;
30+
31+
/// Creates new connection and initialize it by sending an connection event to the user.
32+
/// * messenger - allows to send packets and events, also provides a config.
33+
/// * address - defines a address that connection is associated with.
34+
/// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet.
35+
/// * initial_data - if initiated by remote host, this will hold that a packet data.
36+
fn create_connection(
37+
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
38+
address: SocketAddr,
39+
time: Instant,
40+
initial_data: Option<&[u8]>,
41+
) -> Self;
42+
43+
/// Determines if the connection should be dropped due to its state.
44+
fn should_drop(
45+
&mut self,
46+
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
47+
time: Instant,
48+
) -> bool;
49+
50+
/// Processes a received packet: parse it and emit an event.
51+
fn process_packet(
52+
&mut self,
53+
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
54+
payload: &[u8],
55+
time: Instant,
56+
);
57+
58+
/// Processes a received event and send a packet.
59+
fn process_event(
60+
&mut self,
61+
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
62+
event: Self::SendEvent,
63+
time: Instant,
64+
);
65+
66+
/// Processes various connection-related tasks: resend dropped packets, send heartbeat packet, etc...
67+
/// This function gets called frequently.
68+
fn update(
69+
&mut self,
70+
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
71+
time: Instant,
72+
);
73+
}

0 commit comments

Comments
 (0)