1- use log:: debug;
21use proto:: scheduler:: {
32 node_service_server:: NodeService , NodeRegisterRequest , NodeRegisterResponse , NodeStatus ,
43 NodeUnregisterRequest , NodeUnregisterResponse ,
54} ;
65use tokio:: sync:: mpsc;
7- use tonic:: { Request , Response , Status , Streaming } ;
6+ use tonic:: { Request , Response , Streaming } ;
87
98use crate :: { manager:: Manager , Event } ;
109
1110#[ derive( Debug ) ]
12- #[ allow( dead_code) ]
1311pub struct NodeListener {
1412 sender : mpsc:: Sender < Event > ,
1513}
@@ -25,33 +23,34 @@ impl NodeService for NodeListener {
2523 async fn status (
2624 & self ,
2725 request : Request < Streaming < NodeStatus > > ,
28- ) -> Result < Response < ( ) > , Status > {
26+ ) -> Result < Response < ( ) > , tonic:: Status > {
27+ log:: debug!( "received gRPC request: {:?}" , request) ;
28+
2929 let mut stream = request. into_inner ( ) ;
30- let ( tx, mut rx) = Manager :: create_mpsc_channel ( ) ;
3130
31+ // send each status to the manager
3232 loop {
33+ let ( tx, mut rx) = Manager :: create_mpsc_channel ( ) ;
3334 let message = stream. message ( ) . await ?;
35+
3436 match message {
3537 Some ( node_status) => {
36- debug ! ( "Node status: {:?}" , node_status) ;
3738 self . sender
3839 . send ( Event :: NodeStatus ( node_status, tx. clone ( ) ) )
3940 . await
4041 . unwrap ( ) ;
4142
43+ // wait for the manager to respond
4244 if let Some ( res) = rx. recv ( ) . await {
4345 match res {
44- Ok ( ( ) ) => {
45- debug ! ( "Node status updated successfully" ) ;
46- }
47- Err ( err) => {
48- debug ! ( "Error updating node status: {:?}" , err) ;
49- return Err ( err) ;
50- }
46+ Ok ( _) => { }
47+ Err ( err) => return Err ( err) ,
5148 }
5249 }
5350 }
5451 None => {
52+ log:: error!( "Node status stream closed" ) ;
53+ // todo: emit node crash event (get the node id from the first status)
5554 return Ok ( Response :: new ( ( ) ) ) ;
5655 }
5756 }
@@ -61,29 +60,33 @@ impl NodeService for NodeListener {
6160 async fn register (
6261 & self ,
6362 request : Request < NodeRegisterRequest > ,
64- ) -> Result < Response < NodeRegisterResponse > , Status > {
65- debug ! ( "{:?}" , request) ;
63+ ) -> Result < Response < NodeRegisterResponse > , tonic:: Status > {
64+ log:: debug!( "received gRPC request: {:?}" , request) ;
65+
6666 let ( tx, rx) = Manager :: create_oneshot_channel ( ) ;
67+ let remote_addr = request. remote_addr ( ) . unwrap ( ) . ip ( ) ;
68+ log:: debug!( "Registering a new node from: {:?}" , remote_addr) ;
6769
6870 match self
6971 . sender
70- . send ( Event :: NodeRegister ( request. into_inner ( ) , tx) )
72+ . send ( Event :: NodeRegister ( request. into_inner ( ) , remote_addr , tx) )
7173 . await
7274 {
7375 Ok ( _) => {
7476 return rx. await . unwrap ( ) ;
7577 }
7678 Err ( _) => {
77- return Err ( Status :: internal ( "could not send event to manager" ) ) ;
79+ return Err ( tonic :: Status :: internal ( "could not send event to manager" ) ) ;
7880 }
7981 }
8082 }
8183
8284 async fn unregister (
8385 & self ,
8486 request : Request < NodeUnregisterRequest > ,
85- ) -> Result < Response < NodeUnregisterResponse > , Status > {
86- debug ! ( "{:?}" , request) ;
87+ ) -> Result < Response < NodeUnregisterResponse > , tonic:: Status > {
88+ log:: debug!( "received gRPC request: {:?}" , request) ;
89+
8790 let ( tx, rx) = Manager :: create_oneshot_channel ( ) ;
8891
8992 match self
@@ -95,7 +98,7 @@ impl NodeService for NodeListener {
9598 return rx. await . unwrap ( ) ;
9699 }
97100 Err ( _) => {
98- return Err ( Status :: internal ( "could not send event to manager" ) ) ;
101+ return Err ( tonic :: Status :: internal ( "could not send event to manager" ) ) ;
99102 }
100103 }
101104 }
0 commit comments