Skip to content

Commit 4f6c2b0

Browse files
OpenTelemetry Protcol (OTLP) identity integration (#180)
* [#158]: moved map_handlers module from identity crate to common crate * [#158]: simplified identity logic. removed duplicated code and functions * [#158]: added program handlers function in the common crate. Remove duplicated code in metrics module * [#168]: added load from program-handlers in identity user space implementation. Added a small doc in the conntracker/main.rs file * [#158]: fixed typos in the map names * [#158]: fixed bpf error: Error: the program is already loaded.Improved map handlers code * [#174]: added open telemetry (otel) logger for logs. Added otel daemonset with otel agent and collector * [#158]: improved docs for the conntracker data structures. removed useless conversion from u8 to 64 with .into() for state variable * [#174]: Added otel libraries and features in the common crate. .update identity kubernetes manifest with the otel env variables. * [#158]: imroved documentation in the user space for the identity (VethLog) data structure * [#158]: restored blocklist map initialization * [#158]: added better docs. Updated while true pattern with "loop" pattern. Code cleaning * [#174]: added prettify to logger * [#181]: added command to repair blocklist configmaps * [#182]: added GetTrackedVeth grpc endpoint definition * [#158]: added load_perf_event_array_from_mapdata function in map_handlers.rs * [#182]: added total monitored veth_events (tot_monitored_veth) * [#182]: added "cfcli monitoring veth" command frontend. added send_tracked_veth_requests function in api/requests.rs * [refactoring]: separate experimental service discovery from the helpers in the identity service * [refactoring]: created BufferType enum to centralize the event readers for PacketLog, VethLog and TcpPacketRegistry * updated common cargo.toml * [#158]: moved IpProtocols and network structures from the identity crate to the common crate. Added BufferType enum to list different buffers readers. Added buffer_type module in cortexbrain common (experimental) * [update]: cli packages update * [update]: IpProtocols update . Btter code formatting * [#158]: fixed typos from latest commit * [#182]: Added tracked events agent API implementation. Added VethEvent protobuffer message type * [proposal]: added batcher module intial bones * [158]: removed old deprecated functions to show veth logs. added packed representation for the VethLog structure. changed dev_addr type from [u32;8] to [u8;6] (see https://wiki.osdev.org/Address_Resolution_Protocol). Added unit tests to check VethLog structure bytes size. Changed "actual program" to "kernel_symbol" in the load_program function * [#158]: added a map_manager to call BpfMaps using the map name instead of relying on the declaration order during the startup * [#182]: added VethEvent protobuf message. Added log to see the veth event in the agent api * [docker] pushed new development images
1 parent 901fc3c commit 4f6c2b0

34 files changed

+2717
-979
lines changed

cli/Cargo.lock

Lines changed: 547 additions & 63 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ tonic = "0.14.2"
2222
tonic-reflection = "0.14.2"
2323
prost-types = "0.14.3"
2424
prost = "0.14.3"
25-
cortexflow_agent_api = {version = "0.1.1",features = ["client"]}
25+
cortexflow_agent_api = {path = "../core/api",features = ["client"]}
2626
kube = "2.0.1"
2727
k8s-openapi = {version = "0.26.0", features = ["v1_34"]}
2828

cli/src/install.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use crate::errors::CliError;
22
use crate::essential::{BASE_COMMAND, connect_to_client, create_config_file, create_configs};
33
use clap::{Args, Subcommand};
44
use colored::Colorize;
5-
use kube::Error;
5+
use k8s_openapi::api::core::v1::ConfigMap;
66
use kube::core::ErrorResponse;
7+
use kube::{Api, Client, Error};
8+
use std::thread::sleep;
79
use std::{process::Command, thread, time::Duration};
810

911
// docs:
@@ -38,6 +40,8 @@ pub enum InstallCommands {
3840
about = "Deploys a simple example contained in deploy-test-pod.yaml"
3941
)]
4042
TestPods,
43+
#[command(name = "blocklist", about = "Install or Repair blocklist configmap")]
44+
Blocklist,
4145
}
4246

4347
//install args
@@ -206,6 +210,84 @@ async fn install_simple_example_component() -> Result<(), CliError> {
206210
}
207211
}
208212

213+
// docs:
214+
pub async fn install_blocklist_configmap() -> Result<(), CliError> {
215+
match connect_to_client().await {
216+
Ok(client) => {
217+
println!(
218+
"{} {}",
219+
"=====>".blue().bold(),
220+
"Checking if the Blocklist configmap exists"
221+
);
222+
sleep(Duration::from_secs(1));
223+
let blocklist_exists = check_if_blocklist_exists(client).await?;
224+
if !blocklist_exists {
225+
println!(
226+
"{} {}",
227+
"=====>".blue().bold(),
228+
"Blocklist configmap does not exist".red().bold()
229+
);
230+
sleep(Duration::from_secs(1));
231+
println!("{} {}", "=====>".bold().blue(), "Creating configmap");
232+
let metdata_configs = create_configs();
233+
sleep(Duration::from_secs(1));
234+
match create_config_file(metdata_configs).await {
235+
Ok(_) => {
236+
println!(
237+
"{} {}",
238+
"=====>".bold().blue(),
239+
"Configmap created/repaired successfully".bold().green()
240+
)
241+
}
242+
Err(e) => {
243+
return Err(CliError::InstallerError {
244+
reason: e.to_string(),
245+
});
246+
}
247+
}
248+
return Ok(());
249+
} else {
250+
println!()
251+
}
252+
253+
Ok(())
254+
}
255+
Err(e) => {
256+
return Err(CliError::ClientError(Error::Api(ErrorResponse {
257+
status: "failed".to_string(),
258+
message: "Failed to connect to kubernetes client".to_string(),
259+
reason: e.to_string(),
260+
code: 404,
261+
})));
262+
}
263+
}
264+
}
265+
266+
// docs:
267+
async fn check_if_blocklist_exists(client: Client) -> Result<bool, CliError> {
268+
let namespace = "cortexflow";
269+
let name = "cortexbrain-client-config";
270+
let api: Api<ConfigMap> = Api::namespaced(client, namespace);
271+
match api.get(name).await {
272+
Ok(_) => {
273+
println!(
274+
"{} {}",
275+
"=====>".bold().blue(),
276+
"Blocklist configmap exists".green().bold()
277+
);
278+
Ok(true)
279+
}
280+
Err(_) => {
281+
println!(
282+
"{} {}",
283+
"=====>".bold().blue(),
284+
"Blocklist configmap doesn not exists".red().bold(),
285+
);
286+
Ok(false)
287+
}
288+
}
289+
}
290+
209291
//docs:
210292
//
211293
// This is an auxiliary function to help manage the cortexflow components during the installation

cli/src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::install::{InstallArgs, InstallCommands, install_cortexflow, install_s
1818
use crate::logs::{LogsArgs, logs_command};
1919
use crate::monitoring::{
2020
MonitorArgs, MonitorCommands, list_features, monitor_dropped_packets, monitor_identity_events,
21-
monitor_latency_metrics,
21+
monitor_latency_metrics, monitor_tracked_veth,
2222
};
2323
use crate::policies::{
2424
PoliciesArgs, PoliciesCommands, check_blocklist, create_blocklist, remove_ip,
@@ -68,7 +68,7 @@ enum Commands {
6868
struct SetArgs {
6969
val: String,
7070
}
71-
71+
//TODO: add command for monitoring veth interfaces
7272
async fn args_parser() -> Result<(), CliError> {
7373
let args = Cli::parse();
7474
debug!("Arguments {:?}", args.cmd);
@@ -80,6 +80,10 @@ async fn args_parser() -> Result<(), CliError> {
8080
InstallCommands::TestPods => {
8181
install_simple_example().await?;
8282
}
83+
InstallCommands::Blocklist => {
84+
//install or repair blocklist configmap
85+
let _ = install::install_blocklist_configmap().await?;
86+
}
8387
},
8488
Some(Commands::Uninstall) => {
8589
uninstall().await?;
@@ -120,6 +124,9 @@ async fn args_parser() -> Result<(), CliError> {
120124
MonitorCommands::Droppedpackets => {
121125
let _ = monitor_dropped_packets().await?;
122126
}
127+
MonitorCommands::Veth => {
128+
let _ = monitor_tracked_veth().await?;
129+
}
123130
},
124131
Some(Commands::Policies(policies_args)) => {
125132
match policies_args.policy_cmd {

cli/src/monitoring.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use std::result::Result::Ok;
88
use tonic_reflection::pb::v1::server_reflection_response::MessageResponse;
99

1010
use agent_api::client::{connect_to_client, connect_to_server_reflection};
11-
use agent_api::requests::{get_all_features, send_active_connection_request};
11+
use agent_api::requests::{
12+
get_all_features, send_active_connection_request, send_dropped_packets_request,
13+
send_latency_metrics_request, send_tracked_veth_request,
14+
};
1215

1316
use crate::errors::CliError;
1417
use clap::{Args, Subcommand};
@@ -33,15 +36,18 @@ pub enum MonitorCommands {
3336
about = "Monitor the dropped packets metrics detected by the metrics service"
3437
)]
3538
Droppedpackets,
39+
#[command(
40+
name = "veth",
41+
about = "Monitor tracked veth interfaces from the identity service"
42+
)]
43+
Veth,
3644
}
3745

3846
// cfcli monitor <args>
3947
#[derive(Args, Debug, Clone)]
4048
pub struct MonitorArgs {
4149
#[command(subcommand)]
4250
pub monitor_cmd: MonitorCommands,
43-
//#[arg(long, short)]
44-
//pub flags: Option<String>,
4551
}
4652

4753
pub async fn list_features() -> Result<(), CliError> {
@@ -168,7 +174,7 @@ pub async fn monitor_latency_metrics() -> Result<(), CliError> {
168174
"Connected to CortexFlow Client".green()
169175
);
170176
//send request to get latency metrics
171-
match agent_api::requests::send_latency_metrics_request(client).await {
177+
match send_latency_metrics_request(client).await {
172178
Ok(response) => {
173179
let resp = response.into_inner();
174180
if resp.metrics.is_empty() {
@@ -237,7 +243,7 @@ pub async fn monitor_dropped_packets() -> Result<(), CliError> {
237243
"Connected to CortexFlow Client".green()
238244
);
239245
//send request to get dropped packets metrics
240-
match agent_api::requests::send_dropped_packets_request(client).await {
246+
match send_dropped_packets_request(client).await {
241247
Ok(response) => {
242248
let resp = response.into_inner();
243249
if resp.metrics.is_empty() {
@@ -291,6 +297,50 @@ pub async fn monitor_dropped_packets() -> Result<(), CliError> {
291297
Ok(())
292298
}
293299

300+
pub async fn monitor_tracked_veth() -> Result<(), CliError> {
301+
println!(
302+
"{} {}",
303+
"=====>".blue().bold(),
304+
"Connecting to cortexflow Client".white()
305+
);
306+
match connect_to_client().await {
307+
Ok(client) => match send_tracked_veth_request(client).await {
308+
Ok(response) => {
309+
let veth_response = response.into_inner();
310+
if veth_response.tot_monitored_veth == 0 {
311+
println!("{} {} ", "=====>".blue().bold(), "No tracked veth found");
312+
Ok(())
313+
} else {
314+
println!(
315+
"{} {} {} {} ",
316+
"=====>".blue().bold(),
317+
"Found:",
318+
&veth_response.tot_monitored_veth,
319+
"tracked veth"
320+
);
321+
for veth in veth_response.veth_names.iter() {
322+
println!("{} {}", "=====>".blue().bold(), &veth);
323+
}
324+
Ok(())
325+
}
326+
}
327+
Err(e) => {
328+
return Err(CliError::AgentError(
329+
tonic_reflection::server::Error::InvalidFileDescriptorSet(e.to_string()),
330+
));
331+
}
332+
},
333+
Err(e) => {
334+
return Err(CliError::ClientError(kube::Error::Api(ErrorResponse {
335+
status: "failed".to_string(),
336+
message: "Failed to connect to kubernetes client".to_string(),
337+
reason: e.to_string(),
338+
code: 404,
339+
})));
340+
}
341+
}
342+
}
343+
294344
fn convert_timestamp_to_date(timestamp: u64) -> String {
295345
DateTime::from_timestamp_micros(timestamp as i64)
296346
.map(|dt| dt.to_string())

0 commit comments

Comments
 (0)