@@ -9,6 +9,7 @@ use cidr::Ipv4Inet;
99use log:: { debug, info, trace} ;
1010use network:: node:: request:: SetupNodeRequest ;
1111use tokio:: time;
12+ use tokio_stream:: wrappers:: ReceiverStream ;
1213use tonic:: { transport:: Server , Request , Response , Status } ;
1314use uuid:: Uuid ;
1415
@@ -17,13 +18,81 @@ mod config;
1718use config:: { GrpcServerConfig , NodeAgentConfig } ;
1819use network:: node:: setup_node;
1920use node_manager:: NodeSystem ;
21+ use workload_manager:: workload_manager:: WorkloadManager ;
2022
23+ use proto:: agent:: {
24+ instance_service_server:: InstanceService , instance_service_server:: InstanceServiceServer ,
25+ Instance , InstanceStatus , SignalInstruction ,
26+ } ;
2127use proto:: scheduler:: {
2228 node_service_client:: NodeServiceClient , NodeRegisterRequest , NodeRegisterResponse , NodeStatus ,
2329 Resource , ResourceSummary , Status as SchedulerStatus ,
2430} ;
2531
2632const NUMBER_OF_CONNECTION_ATTEMPTS : u16 = 10 ;
33+
34+ ///
35+ /// This Struct implement the Instance service from Node Agent proto file
36+ ///
37+ #[ derive( Debug ) ]
38+ pub struct InstanceServiceController {
39+ workload_manager : WorkloadManager ,
40+ }
41+
42+ impl InstanceServiceController {
43+ pub fn new ( node_id : String ) -> Self {
44+ Self {
45+ workload_manager : WorkloadManager :: new ( node_id) ,
46+ }
47+ }
48+ }
49+
50+ #[ tonic:: async_trait]
51+ impl InstanceService for InstanceServiceController {
52+ type createStream = ReceiverStream < Result < InstanceStatus , Status > > ;
53+
54+ async fn create (
55+ & self ,
56+ request : Request < Instance > ,
57+ ) -> Result < Response < Self :: createStream > , Status > {
58+ let instance = request. into_inner ( ) ;
59+ // let receiver = self.workload_manager.create(instance).await?;
60+ let receiver = self . workload_manager . create ( instance) . await ;
61+
62+ Ok ( Response :: new ( ReceiverStream :: new ( receiver) ) )
63+ }
64+
65+ async fn signal ( & self , request : Request < SignalInstruction > ) -> Result < Response < ( ) > , Status > {
66+ let signal_instruction = request. into_inner ( ) ;
67+
68+ Ok ( Response :: new (
69+ self . workload_manager . signal ( signal_instruction) . await ?,
70+ ) )
71+ }
72+ }
73+
74+ ///
75+ /// This function starts the grpc server of the Node Agent.
76+ /// The server listens and responds to requests from the Scheduler.
77+ /// The default port is 50053.
78+ ///
79+ fn create_grpc_server ( config : GrpcServerConfig , node_id : String ) -> tokio:: task:: JoinHandle < ( ) > {
80+ let addr = format ! ( "http://{}:{}" , config. host, config. port)
81+ . parse ( )
82+ . unwrap ( ) ;
83+ let instance_service_controller = InstanceServiceController :: new ( node_id) ;
84+
85+ info ! ( "Node Agent server listening on {}" , addr) ;
86+
87+ tokio:: spawn ( async move {
88+ Server :: builder ( )
89+ . add_service ( InstanceServiceServer :: new ( instance_service_controller) )
90+ . serve ( addr)
91+ . await
92+ . unwrap ( )
93+ } )
94+ }
95+
2796///
2897/// This function allows you to connect to the scheduler's grpc server.
2998///
@@ -183,6 +252,7 @@ fn create_grpc_client(config: GrpcServerConfig, node_id: String) -> tokio::task:
183252 }
184253 } )
185254}
255+
186256#[ tokio:: main]
187257async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
188258 env_logger:: init ( ) ;
@@ -205,8 +275,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
205275
206276 // start grpc server and client
207277 let client_handler = create_grpc_client ( config. client , node_id. clone ( ) ) ;
278+ let server_handler = create_grpc_server ( config. server , node_id. clone ( ) ) ;
208279
209280 client_handler. await ?;
281+ server_handler. await ?;
210282
211283 info ! ( "Shutting down node agent" ) ;
212284
0 commit comments