@@ -13,7 +13,7 @@ extern crate uuid;
1313extern crate socket2;
1414
1515use std:: io:: Read ;
16- use std:: { thread} ;
16+ use std:: { array , thread} ;
1717use std:: thread:: sleep;
1818use std:: sync:: mpsc;
1919use std:: sync:: mpsc:: { Sender , SyncSender , Receiver , RecvTimeoutError } ;
@@ -2256,12 +2256,13 @@ fn test_receiver_source_limit_2_termination_check() {
22562256
22572257 let mut snd_threads = Vec :: new ( ) ;
22582258
2259- let ( snd_tx , snd_rx ) : ( SyncSender < ( ) > , Receiver < ( ) > ) = mpsc:: sync_channel ( 0 ) ; // Used for handshaking, allows syncing the sender states.
2259+ let sender_channels : [ ( SyncSender < ( ) > , Receiver < ( ) > ) ; SND_THREADS ] = array :: from_fn ( |_| mpsc:: sync_channel ( 0 ) ) ;
22602260
22612261 const BASE_UNIVERSE : u16 = 2 ;
22622262
22632263 for i in 0 .. SND_THREADS {
2264- let tx = snd_tx. clone ( ) ;
2264+ // use a unique tx,rx pair per send thread to prevent race condition when using the same tx,rx between threads
2265+ let tx = sender_channels[ i] . 0 . clone ( ) ;
22652266
22662267 let data = [ 1 , 2 , 3 ] ;
22672268
@@ -2295,14 +2296,16 @@ fn test_receiver_source_limit_2_termination_check() {
22952296 dmx_recv. listen_universes ( & [ i] ) . unwrap ( ) ;
22962297 }
22972298
2298- snd_rx. recv ( ) . unwrap ( ) ;
2299- snd_rx. recv ( ) . unwrap ( ) ;
2299+ for ( _, snd_rx) in & sender_channels {
2300+ snd_rx. recv ( ) . unwrap ( ) ;
2301+ }
23002302
23012303 // Asserts that the recv attempts are successful.
2302- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2303- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2304- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2305- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2304+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #1 failed." ) ;
2305+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #2 failed." ) ;
2306+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #3 failed." ) ;
2307+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #4 failed." ) ;
2308+
23062309
23072310 // The first source is held back from terminating but the second source should terminate.
23082311 let second_thread = snd_threads. remove ( 1 ) ;
@@ -2318,15 +2321,16 @@ fn test_receiver_source_limit_2_termination_check() {
23182321
23192322 // New source now sends twice which the receiver should receive.
23202323 src. send ( & [ BASE_UNIVERSE ] , & data, None , None , None ) . unwrap ( ) ;
2324+
23212325 src. send ( & [ BASE_UNIVERSE ] , & data, None , None , None ) . unwrap ( ) ;
23222326 } ) ;
23232327
23242328 // Asserts that the recv attempts are successful (no source exceeded).
2325- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2326- dmx_recv. recv ( RECV_TIMEOUT ) . unwrap ( ) ;
2329+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #5 failed." ) ;
2330+ dmx_recv. recv ( RECV_TIMEOUT ) . expect ( "dmx_recv.recv() #6 failed." ) ;
23272331
23282332 // Allow the first source to progress and finish.
2329- snd_rx . recv ( ) . unwrap ( ) ;
2333+ sender_channels [ 0 ] . 1 . recv ( ) . unwrap ( ) ;
23302334 let first_thread = snd_threads. remove ( 0 ) ;
23312335 first_thread. join ( ) . unwrap ( ) ;
23322336
0 commit comments