本文档详细介绍 execute 库的高级功能,包括自定义执行器、健康检查、重试机制、超时控制等。
CommandExecutor trait 允许你实现自己的命令执行逻辑,比如集成异步运行时(Tokio)或添加自定义行为。
pub trait CommandExecutor: Send + Sync {
fn execute(&self, config: &CommandConfig) -> Result<Output, ExecuteError>;
}这是默认的执行器,使用 std::process::Command:
use execute::{CommandPool, CommandConfig, StdCommandExecutor};
use std::sync::Arc;
let pool = CommandPool::new();
let executor = Arc::new(StdCommandExecutor);
// 提交任务
pool.push_task(CommandConfig::new("echo", vec!["hello".to_string()]));
// 启动执行器(使用自定义执行器)
pool.start_executor_with_executor(std::time::Duration::from_millis(100), executor);在同步接口中使用异步功能:
use execute::{CommandConfig, ExecuteError, CommandExecutor};
use std::process::Output;
use tokio::process::Command;
pub struct TokioExecutor;
impl CommandExecutor for TokioExecutor {
fn execute(&self, config: &CommandConfig) -> Result<Output, ExecuteError> {
// 创建运行时来执行异步代码
let rt = tokio::runtime::Runtime::new()
.map_err(|e| ExecuteError::Io(std::io::Error::new(
std::io::ErrorKind::Other, e
)))?;
rt.block_on(async {
let mut cmd = Command::new(&config.program);
cmd.args(&config.args);
if let Some(dir) = &config.working_dir {
cmd.current_dir(dir);
}
// 设置环境变量
for (key, value) in &config.env.vars {
if let Some(val) = value {
cmd.env(key, val);
} else {
cmd.env_remove(key);
}
}
cmd.output().await.map_err(ExecuteError::Io)
})
}
}使用方式:
let pool = CommandPool::new();
let executor = Arc::new(TokioExecutor);
pool.push_task(CommandConfig::new("sleep", vec!["1".to_string()]));
pool.start_executor_with_executor(std::time::Duration::from_millis(100), executor);use execute::{CommandConfig, ExecuteError, CommandExecutor};
use std::process::Output;
use std::time::Duration;
use tokio::process::Command;
use tokio::time::timeout;
pub struct TimeoutExecutor;
impl CommandExecutor for TimeoutExecutor {
fn execute(&self, config: &CommandConfig) -> Result<Output, ExecuteError> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| ExecuteError::Io(std::io::Error::new(
std::io::ErrorKind::Other, e
)))?;
rt.block_on(async {
let mut cmd = Command::new(&config.program);
cmd.args(&config.args);
// 应用超时
match config.timeout {
Some(dur) => {
timeout(dur, cmd.output()).await
.map_err(|_| ExecuteError::Timeout(dur))?
.map_err(|e| ExecuteError::Io(e))
}
None => {
cmd.output().await.map_err(ExecuteError::Io)
}
}
})
}
}-
避免重复创建运行时:使用线程本地存储或全局运行时
use once_cell::sync::Lazy; static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| { tokio::runtime::Runtime::new().unwrap() }); impl CommandExecutor for YourExecutor { fn execute(&self, config: &CommandConfig) -> Result<Output, ExecuteError> { RUNTIME.block_on(async { /* ... */ }) } }
-
并发限制:使用信号量防止资源耗尽
pool.start_executor_with_executor_and_limit( Duration::from_millis(100), 8, // 8 个工作线程 4, // 最多同时执行 4 个外部进程 executor, );
-
选择合适的队列:
- 高并发场景:使用无锁队列
CommandPoolSeg - 一般场景:使用标准队列
CommandPool
use execute::CommandPoolSeg; let pool = CommandPoolSeg::new(); let executor = Arc::new(YourExecutor); pool.start_executor_with_executor(Duration::from_millis(100), executor);
- 高并发场景:使用无锁队列
use execute::{CommandPool, CommandConfig, CommandExecutor, ExecuteError};
use std::process::{Command, Output};
use std::sync::Arc;
use std::time::Duration;
struct CustomExecutor;
impl CommandExecutor for CustomExecutor {
fn execute(&self, config: &CommandConfig) -> Result<Output, ExecuteError> {
println!("执行:{} {:?}", config.program(), config.args());
let mut cmd = Command::new(config.program());
cmd.args(config.args());
if let Some(dir) = config.working_dir() {
cmd.current_dir(dir);
}
cmd.output().map_err(ExecuteError::Io)
}
}
fn main() -> Result<(), ExecuteError> {
let pool = CommandPool::new();
let executor = Arc::new(CustomExecutor);
// 添加任务
pool.push_task(CommandConfig::new("echo", vec!["hello".to_string()]));
pool.push_task(CommandConfig::new("date", vec![]));
// 启动执行器:4 个工作线程,最多 2 个并发
pool.start_executor_with_executor_and_limit(
Duration::from_millis(100),
4,
2,
executor,
);
// 等待任务完成
std::thread::sleep(Duration::from_secs(1));
// 优雅关闭
pool.shutdown().unwrap();
Ok(())
}运行此示例:
cargo run --example custom_executor_demo健康检查功能用于监控命令池的运行状态,适合集成到监控系统中。
pub enum HealthStatus {
Healthy, // 系统健康
Degraded { issues: Vec<String> }, // 系统降级(部分问题)
Unhealthy { issues: Vec<String> }, // 系统异常(严重问题)
}use execute::{CommandPool, PoolConfigBuilder, HealthStatus};
use std::time::Duration;
// 创建带健康检查的命令池
let pool = PoolConfigBuilder::new()
.thread_count(4)
.enable_health_check(true)
.build()
.unwrap();
// 执行一些任务
for i in 0..10 {
pool.push_task(CommandConfig::new("echo", vec![format!("task {}", i)]));
}
pool.start_executor();
// 检查健康状态
let health = pool.health_check();
match health.status {
HealthStatus::Healthy => {
println!("✅ 系统健康运行");
}
HealthStatus::Degraded { issues } => {
println!("⚠️ 系统性能下降:");
for issue in issues {
println!(" - {}", issue);
}
}
HealthStatus::Unhealthy { issues } => {
println!("❌ 系统异常:");
for issue in issues {
println!(" - {}", issue);
}
}
}
// 查看详细指标
println!("\n详细指标:");
println!(" 活跃工作线程:{}/{}",
health.details.workers_alive,
health.details.workers_total
);
println!(" 队列使用率:{:.1}%",
health.details.queue_usage * 100.0
);
println!(" 长时任务数:{}",
health.details.long_running_tasks
);系统会自动检测以下问题:
降级状态 (Degraded):
- 工作线程死亡超过 25%
- 队列使用率超过 80%
- 存在运行超过 5 分钟的任务
异常状态 (Unhealthy):
- 工作线程死亡超过 50%
- 队列已满,无法提交新任务
- 多个关键组件失败
use std::thread;
use std::time::Duration;
// 定期健康检查
thread::spawn(move || {
loop {
let health = pool.health_check();
match health.status {
HealthStatus::Unhealthy { issues } => {
// 发送紧急告警
send_alert(&format!("命令池异常:{:?}", issues));
}
HealthStatus::Degraded { issues } => {
// 发送警告
send_warning(&format!("命令池降级:{:?}", issues));
}
_ => {}
}
thread::sleep(Duration::from_secs(30));
}
});完整示例:examples/health_check_demo.rs
自动重试失败的任务,支持固定间隔和指数退避策略。
pub enum RetryStrategy {
FixedInterval(Duration),
ExponentialBackoff {
initial: Duration,
max: Duration,
multiplier: f64,
},
}use execute::{CommandConfig, RetryPolicy, RetryStrategy, execute_with_retry};
use std::time::Duration;
// 配置重试策略 - 指数退避
let policy = RetryPolicy::new(
3, // 最多重试 3 次
RetryStrategy::ExponentialBackoff {
initial: Duration::from_millis(100),
max: Duration::from_secs(10),
multiplier: 2.0,
}
);
let config = CommandConfig::new("curl", vec!["https://example.com".to_string()])
.with_retry(policy);
match execute_with_retry(&config, 1) {
Ok(output) => println!("成功: {}", String::from_utf8_lossy(&output.stdout)),
Err(e) => eprintln!("重试后仍失败: {}", e),
}let policy = RetryPolicy::new(
3,
RetryStrategy::FixedInterval(Duration::from_secs(1))
);精细控制命令启动超时和执行超时,便于错误诊断。
pub struct TimeoutConfig {
pub spawn_timeout: Option<Duration>, // 进程创建超时
pub execution_timeout: Option<Duration>, // 执行超时
}use execute::{CommandConfig, TimeoutConfig, execute_with_timeouts};
use std::time::Duration;
let timeout_config = TimeoutConfig::new()
.with_spawn_timeout(Duration::from_secs(5)) // 5秒启动超时
.with_execution_timeout(Duration::from_secs(30)); // 30秒执行超时
let config = CommandConfig::new("my-command", vec![])
.with_timeouts(timeout_config);
match execute_with_timeouts(&config, 1) {
Ok(output) => println!("成功: {:?}", output),
Err(CommandError::Timeout { context, configured_timeout, actual_duration }) => {
eprintln!("超时: 配置={:?}, 实际={:?}", configured_timeout, actual_duration);
}
Err(e) => eprintln!("错误: {}", e),
}let timeout_config = TimeoutConfig::new()
.with_execution_timeout(Duration::from_secs(10));
let retry_policy = RetryPolicy::new(
3,
RetryStrategy::FixedInterval(Duration::from_secs(1))
);
let config = CommandConfig::new("flaky-command", vec![])
.with_timeouts(timeout_config)
.with_retry(retry_policy);
let result = execute_with_retry(&config, 1);为命令配置自定义环境变量,支持继承、覆盖和清除。
pub struct EnvConfig {
pub inherit_parent: bool, // 是否继承父进程环境变量
pub vars: HashMap<String, Option<String>>, // 环境变量映射
}use execute::{CommandConfig, EnvConfig, execute_command_with_context};
// 设置环境变量
let env = EnvConfig::new()
.set("MY_VAR", "my_value")
.set("ANOTHER_VAR", "42");
let config = CommandConfig::new("printenv", vec!["MY_VAR".to_string()])
.with_env(env);
let result = execute_command_with_context(&config, 1)?;
// 清除特定环境变量
let env = EnvConfig::new()
.remove("TEMP_VAR");
// 不继承父进程环境变量
let env = EnvConfig::new()
.no_inherit()
.set("ONLY_VAR", "only_value");在任务执行前后插入自定义逻辑,用于性能分析、监控等。
pub trait ExecutionHook: Send + Sync {
fn before_execute(&self, ctx: &ExecutionContext);
fn after_execute(&self, ctx: &ExecutionContext, result: &HookTaskResult);
}use execute::{CommandConfig, execute_task_with_hooks};
use execute::{ExecutionHook, ExecutionContext, HookTaskResult};
use std::sync::Arc;
struct TimingHook;
impl ExecutionHook for TimingHook {
fn before_execute(&self, ctx: &ExecutionContext) {
println!("开始执行任务 {}", ctx.task_id);
}
fn after_execute(&self, ctx: &ExecutionContext, result: &HookTaskResult) {
println!("任务 {} 完成,耗时 {:?}", ctx.task_id, result.duration);
}
}
let config = CommandConfig::new("echo", vec!["hello".to_string()]);
let hooks = vec![Arc::new(TimingHook) as Arc<dyn ExecutionHook>];
let result = execute_task_with_hooks(&config, 1, 0, &hooks);钩子 panic 不会影响任务执行,系统会捕获 panic 并记录警告日志。
自动清理已终止的子进程,防止僵尸进程累积。
use execute::{CommandPool, ExecutionConfig};
use std::time::Duration;
// 创建带僵尸进程清理的池(每5秒检查一次)
let config = ExecutionConfig::new()
.with_workers(4)
.with_zombie_reaper_interval(Duration::from_secs(5));
let pool = CommandPool::with_config(config);
pool.start_executor();
// 僵尸进程会自动清理- Unix: 使用
waitpid(-1, WNOHANG)回收僵尸进程 - 非 Unix: 无操作实现
实时收集任务执行指标,用于监控和性能分析。
use execute::CommandPool;
let pool = CommandPool::new();
// ... 执行任务 ...
let metrics = pool.metrics();
println!("总任务数: {}", metrics.total_tasks);
println!("成功任务: {}", metrics.successful_tasks);
println!("失败任务: {}", metrics.failed_tasks);
println!("成功率: {:.2}%", metrics.success_rate * 100.0);
println!("平均执行时间: {:?}", metrics.avg_execution_time);- 任务计数(总数/成功/失败)
- 成功率
- 执行时间统计(最小/最大/平均/百分位数)
- 队列等待时间
支持取消队列中或执行中的任务。
use execute::{CommandPool, CommandConfig};
use std::time::Duration;
let pool = CommandPool::new();
// 提交任务并获取句柄
let config = CommandConfig::new("long_running_command", vec![]);
let handle = pool.submit_task(config)?;
// 取消任务
match handle.cancel() {
Ok(()) => println!("任务已取消"),
Err(e) => println!("取消失败: {}", e),
}限制命令的输出大小和内存使用。
use execute::{CommandConfig, ResourceLimits};
let limits = ResourceLimits::new()
.with_max_output_size(1024 * 1024) // 1MB 输出限制
.with_max_memory(100 * 1024 * 1024); // 100MB 内存限制
let config = CommandConfig::new("command", vec![])
.with_resource_limits(limits);execute 库提供了丰富的高级功能:
| 功能 | 用途 |
|---|---|
| 自定义执行器 | 集成不同运行时(Tokio 等) |
| 健康检查 | 监控命令池状态 |
| 重试机制 | 自动重试失败任务 |
| 分离超时 | 精细控制启动和执行超时 |
| 环境变量 | 配置命令执行环境 |
| 钩子系统 | 性能分析和监控 |
| 僵尸进程清理 | 防止资源泄漏 |
| 指标收集 | 实时监控和统计 |
| 任务取消 | 灵活的任务管理 |
| 资源限制 | 防止资源耗尽 |
更多示例请参考 examples/ 目录。