Skip to content

Commit 1d11d08

Browse files
committed
wip
1 parent ec53758 commit 1d11d08

1 file changed

Lines changed: 41 additions & 25 deletions

File tree

toad/src/step/block.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,8 @@ impl<P, Pieces> Conversation<P, Pieces>
8686
Pieces: Default
8787
{
8888
/// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`)
89-
pub fn expect_response(expires_at: Instant<P::Clock>, msg: Message<P>) -> Self {
90-
Self { original: Some(msg),
91-
biggest_number_seen: None,
92-
pcs: Default::default(),
93-
expires_at }
94-
}
95-
96-
/// Create a new [`Conversation`] tracking a received Blocked request
97-
pub fn request(expires_at: Instant<P::Clock>) -> Self {
98-
Self { original: None,
89+
pub fn new(expires_at: Instant<P::Clock>, original: Option<Message<P>>) -> Self {
90+
Self { original,
9991
biggest_number_seen: None,
10092
pcs: Default::default(),
10193
expires_at }
@@ -280,21 +272,13 @@ impl<P, S, Endpoints, Conversations, Pieces> Block<P, S, Endpoints, Conversation
280272
})
281273
}
282274

283-
fn expect_response(&self, snap: &Snapshot<P>, addr: SocketAddr, req: &Message<P>) {
284-
let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis());
285-
self.get_or_create_endpoint(addr, |convs| {
286-
convs.insert((req.token, Role::Response, Direction::Inbound),
287-
Conversation::expect_response(exp, req.clone()))
288-
.unwrap();
289-
});
290-
}
291-
292-
fn insert_request(&self,
275+
fn insert(&self,
293276
snap: &Snapshot<P>,
277+
original: Option<&Message<P>>,
294278
(addr, token, role, dir): (SocketAddr, Token, Role, Direction)) {
295279
let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis());
296280
self.get_or_create_endpoint(addr, |convs| {
297-
convs.insert((token, role, dir), Conversation::request(exp))
281+
convs.insert((token, role, dir), Conversation::new(exp, original.cloned()))
298282
.unwrap();
299283
});
300284
}
@@ -391,7 +375,7 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
391375
},
392376
| Some(block) => {
393377
if !has_prev_pieces && block.num() == 0 && block.more() {
394-
self.insert_request(snap, k);
378+
self.insert(snap, None, k);
395379
self.map_mut(k, |conv| {
396380
conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap())
397381
});
@@ -467,7 +451,7 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
467451
},
468452
| Some(block) => {
469453
if !has_prev_pieces {
470-
// TODO: warn
454+
log!(Block::poll_resp, effects, log::Level::Warn, "Response received for token {:?} but we've never seen a request using that token. Ignoring this response despite it having {:?}", rep.data().msg().token, block);
471455
Some(Ok(rep))
472456
} else {
473457
self.map_mut(k, |conv| {
@@ -496,16 +480,48 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
496480
}
497481
}
498482

483+
fn before_message_sent(&self,
484+
snap: &platform::Snapshot<P>,
485+
effs: &mut P::Effects,
486+
msg: &mut Addrd<Message<P>>)
487+
-> Result<(), Self::Error> {
488+
self.prune(effs, snap.time);
489+
self.inner.before_message_sent(snap, effs, msg)?;
490+
491+
let block_size: usize = 1024;
492+
493+
let original_payload = msg.data().payload().0;
494+
495+
// TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE
496+
if msg.data().block1().is_none() && original_payload.len() > block_size {
497+
let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound);
498+
self.insert(snap, Some(msg.data()), k);
499+
self.map_mut(k, |conv| {
500+
let len = original_payload.len() as f32;
501+
let block_count = (len / block_size as f32).ceil() as u32;
502+
for n in 0..block_count {
503+
let mut msg_block = msg.clone();
504+
msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok();
505+
let mut p = P::MessagePayload::default();
506+
p.append_copy(original_payload[n * 1024..((n + 1) * 1024)]);
507+
msg_block.as_mut().payload = Payload(p);
508+
conv.have(snap.time, n, msg_block.unwrap());
509+
}
510+
}).unwrap();
511+
}
512+
Ok(())
513+
}
514+
499515
fn on_message_sent(&self,
500516
snap: &platform::Snapshot<P>,
501517
effs: &mut P::Effects,
502518
msg: &Addrd<Message<P>>)
503519
-> Result<(), Self::Error> {
504-
self.prune(effs, snap.time);
505520
self.inner.on_message_sent(snap, effs, msg)?;
506521
if msg.data().code.kind() == CodeKind::Request {
507-
self.expect_response(snap, msg.addr(), msg.data());
522+
self.insert(snap, Some(msg.data()), (msg.addr(), msg.data().token, Role::Response, Direction::Inbound));
508523
} else if msg.data().code.kind() == CodeKind::Response {
524+
// TODO: block outbound responses
509525
}
510526

511527
Ok(())

0 commit comments

Comments
 (0)