Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ jobs:
"quota_operations::test_bucket_quota_set_info_clear"
"option_behavior_operations::test_cp_dry_run_does_not_create_target_object"
"option_behavior_operations::test_head_bytes_returns_prefix_bytes"
"option_behavior_operations::test_find_print_outputs_full_remote_path"
"option_behavior_operations::test_find_exec_rejects_json_output"
"option_behavior_operations::test_mirror_parallel_zero_returns_usage_error"
)

for test_name in "${TESTS[@]}"; do
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ futures = "0.3"
async-trait = "0.1"
mime_guess = "2.0"
glob = "0.3"
shlex = "1.3"

# HTTP client for Admin API
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "rustls-tls-webpki-roots", "json"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jiff.workspace = true
humansize.workspace = true
mime_guess.workspace = true
glob.workspace = true
shlex.workspace = true

[features]
default = []
Expand All @@ -60,4 +61,3 @@ golden = []
[dev-dependencies]
tempfile.workspace = true
insta.workspace = true

150 changes: 145 additions & 5 deletions crates/cli/src/commands/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use clap::Args;
use rc_core::{AliasManager, ListOptions, ObjectStore as _, RemotePath};
use rc_s3::S3Client;
use serde::Serialize;
use std::io::Write as _;
use std::process::{Command, Output};

use crate::exit_code::ExitCode;
use crate::output::{Formatter, OutputConfig};
Expand Down Expand Up @@ -130,9 +132,66 @@ pub async fn execute(args: FindArgs, output_config: OutputConfig) -> ExitCode {
}
};

// Execute command for each match if requested.
// This mode is human-output only because command output cannot be embedded in JSON.
if let Some(exec_template) = args.exec.as_deref() {
if formatter.is_json() {
formatter.error("--exec cannot be used with --json output");
return ExitCode::UsageError;
}

let exec_argv_template = match parse_exec_template(exec_template) {
Ok(template) => template,
Err(e) => {
formatter.error(&e);
return ExitCode::UsageError;
}
};

for m in &matches {
let object_path = full_object_path(&alias_name, &bucket, &m.key);
let (program, exec_args, command_text) =
render_exec_command(&exec_argv_template, &object_path);
let output = match run_exec_command(&program, &exec_args) {
Ok(output) => output,
Err(e) => {
formatter.error(&format!(
"Failed to run command for {}: {} ({})",
object_path, command_text, e
));
return ExitCode::GeneralError;
}
};

if std::io::stdout().write_all(&output.stdout).is_err() {
formatter.error("Failed to write command stdout");
return ExitCode::GeneralError;
}
if std::io::stderr().write_all(&output.stderr).is_err() {
formatter.error("Failed to write command stderr");
return ExitCode::GeneralError;
}

if !output.status.success() {
formatter.error(&format!(
"Command failed for {} (status {}): {}",
object_path, output.status, command_text
));
return ExitCode::GeneralError;
}
}
}

let mut display_matches = matches;
if args.print {
for m in &mut display_matches {
m.key = full_object_path(&alias_name, &bucket, &m.key);
}
}

// Calculate totals
let total_count = matches.len();
let total_size: i64 = matches.iter().filter_map(|m| m.size_bytes).sum();
let total_count = display_matches.len();
let total_size: i64 = display_matches.iter().filter_map(|m| m.size_bytes).sum();

if args.count {
// Only print count
Expand All @@ -153,16 +212,16 @@ pub async fn execute(args: FindArgs, output_config: OutputConfig) -> ExitCode {
}
} else if formatter.is_json() {
let output = FindOutput {
matches,
matches: display_matches,
total_count,
total_size_bytes: total_size,
total_size_human: humansize::format_size(total_size as u64, humansize::BINARY),
};
formatter.json(&output);
} else if matches.is_empty() {
} else if display_matches.is_empty() {
formatter.println("No matches found.");
} else {
for m in &matches {
for m in &display_matches {
let size = m.size_human.as_deref().unwrap_or("0B");
let styled_size = formatter.style_size(&format!("{:>10}", size));
let styled_key = formatter.style_file(&m.key);
Expand All @@ -179,6 +238,38 @@ pub async fn execute(args: FindArgs, output_config: OutputConfig) -> ExitCode {
ExitCode::Success
}

fn full_object_path(alias: &str, bucket: &str, key: &str) -> String {
RemotePath::new(alias, bucket, key).to_full_path()
}

fn parse_exec_template(exec_template: &str) -> Result<Vec<String>, String> {
let args = shlex::split(exec_template)
.ok_or_else(|| "Invalid --exec template: unbalanced quotes".to_string())?;
if args.is_empty() {
return Err("Invalid --exec template: command cannot be empty".to_string());
}

Ok(args)
}

fn render_exec_command(
argv_template: &[String],
object_path: &str,
) -> (String, Vec<String>, String) {
let rendered: Vec<String> = argv_template
.iter()
.map(|arg| arg.replace("{}", object_path))
.collect();
let program = rendered[0].clone();
let args = rendered[1..].to_vec();
let command_text = rendered.join(" ");
(program, args, command_text)
}

fn run_exec_command(program: &str, args: &[String]) -> std::io::Result<Output> {
Command::new(program).args(args).output()
}

/// Filters for find command
struct FindFilters {
name_pattern: Option<glob::Pattern>,
Expand Down Expand Up @@ -431,4 +522,53 @@ mod tests {
assert!(parse_find_path("").is_err());
assert!(parse_find_path("myalias").is_err());
}

#[test]
fn test_full_object_path() {
assert_eq!(
full_object_path("test", "bucket", "a/b.txt"),
"test/bucket/a/b.txt"
);
assert_eq!(full_object_path("test", "bucket", ""), "test/bucket");
}

#[test]
fn test_parse_exec_template() {
assert_eq!(
parse_exec_template("echo EXEC:{}").unwrap(),
vec!["echo".to_string(), "EXEC:{}".to_string()]
);
assert_eq!(
parse_exec_template(r#"printf '%s\n' "{}""#).unwrap(),
vec!["printf".to_string(), "%s\\n".to_string(), "{}".to_string()]
);
}

#[test]
fn test_parse_exec_template_errors() {
assert!(parse_exec_template("").is_err());
assert!(parse_exec_template("'unterminated").is_err());
}

#[test]
fn test_render_exec_command() {
let template = vec![
"echo".to_string(),
"prefix:{}".to_string(),
"{}".to_string(),
];
let (program, args, text) = render_exec_command(&template, "test/bucket/a.txt");
assert_eq!(program, "echo");
assert_eq!(
args,
vec![
"prefix:test/bucket/a.txt".to_string(),
"test/bucket/a.txt".to_string()
]
);
assert_eq!(
text,
"echo prefix:test/bucket/a.txt test/bucket/a.txt".to_string()
);
}
}
103 changes: 81 additions & 22 deletions crates/cli/src/commands/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use rc_s3::S3Client;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;

use crate::commands::diff::{DiffEntry, DiffStatus};
use crate::exit_code::ExitCode;
Expand Down Expand Up @@ -66,6 +68,11 @@ struct FileInfo {
pub async fn execute(args: MirrorArgs, output_config: OutputConfig) -> ExitCode {
let formatter = Formatter::new(output_config);

if args.parallel == 0 {
formatter.error("--parallel must be greater than 0");
return ExitCode::UsageError;
}

// Parse both paths
let source_parsed = parse_path(&args.source);
let target_parsed = parse_path(&args.target);
Expand Down Expand Up @@ -253,7 +260,11 @@ pub async fn execute(args: MirrorArgs, output_config: OutputConfig) -> ExitCode
let mut copied = 0;
let mut errors = 0;

for (key, _) in &to_copy {
let parallel_limit = args.parallel.max(1);
let copy_semaphore = Arc::new(Semaphore::new(parallel_limit));
let mut copy_tasks: JoinSet<(String, Result<(), String>)> = JoinSet::new();

for (key, _) in to_copy {
let source_sep = if source_path.key.is_empty() || source_path.key.ends_with('/') {
""
} else {
Expand All @@ -276,26 +287,46 @@ pub async fn execute(args: MirrorArgs, output_config: OutputConfig) -> ExitCode
format!("{}{target_sep}{key}", target_path.key),
);

// Get object content and upload to target
match source_client.get_object(&source_full).await {
Ok(data) => match target_client.put_object(&target_full, data, None).await {
Ok(_) => {
copied += 1;
if !args.quiet && !formatter.is_json() {
formatter.println(&format!("+ {key}"));
}
let key = key.to_string();
let source_client = Arc::clone(&source_client);
let target_client = Arc::clone(&target_client);
let permit = copy_semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore should not be closed");
copy_tasks.spawn(async move {
let _permit = permit;
let result = match source_client.get_object(&source_full).await {
Ok(data) => target_client
.put_object(&target_full, data, None)
.await
.map(|_| ())
.map_err(|e| format!("Failed to upload {key}: {e}")),
Err(e) => Err(format!("Failed to download {key}: {e}")),
};
(key, result)
});
}

while let Some(task_result) = copy_tasks.join_next().await {
match task_result {
Ok((key, Ok(()))) => {
copied += 1;
if !args.quiet && !formatter.is_json() {
formatter.println(&format!("+ {key}"));
}
Err(e) => {
errors += 1;
if !formatter.is_json() {
formatter.error(&format!("Failed to upload {key}: {e}"));
}
}
Ok((_, Err(message))) => {
errors += 1;
if !formatter.is_json() {
formatter.error(&message);
}
},
Err(e) => {
}
Err(join_error) => {
errors += 1;
if !formatter.is_json() {
formatter.error(&format!("Failed to download {key}: {e}"));
formatter.error(&format!("Mirror copy worker failed: {join_error}"));
}
}
}
Expand All @@ -309,7 +340,10 @@ pub async fn execute(args: MirrorArgs, output_config: OutputConfig) -> ExitCode
let mut removed = 0;

if args.remove {
for key in &to_remove {
let remove_semaphore = Arc::new(Semaphore::new(parallel_limit));
let mut remove_tasks: JoinSet<(String, Result<(), String>)> = JoinSet::new();

for key in to_remove {
let sep = if target_path.key.is_empty() || target_path.key.ends_with('/') {
""
} else {
Expand All @@ -321,17 +355,42 @@ pub async fn execute(args: MirrorArgs, output_config: OutputConfig) -> ExitCode
format!("{}{sep}{key}", target_path.key),
);

match target_client.delete_object(&target_full).await {
Ok(_) => {
let key = key.to_string();
let target_client = Arc::clone(&target_client);
let permit = remove_semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore should not be closed");
remove_tasks.spawn(async move {
let _permit = permit;
let result = target_client
.delete_object(&target_full)
.await
.map(|_| ())
.map_err(|e| format!("Failed to remove {key}: {e}"));
(key, result)
});
}

while let Some(task_result) = remove_tasks.join_next().await {
match task_result {
Ok((key, Ok(()))) => {
removed += 1;
if !args.quiet && !formatter.is_json() {
formatter.println(&format!("- {key}"));
}
}
Err(e) => {
Ok((_, Err(message))) => {
errors += 1;
if !formatter.is_json() {
formatter.error(&message);
}
}
Err(join_error) => {
errors += 1;
if !formatter.is_json() {
formatter.error(&format!("Failed to remove {key}: {e}"));
formatter.error(&format!("Mirror remove worker failed: {join_error}"));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/tests/help_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ fn assert_help_case(case: &HelpCase) {
);

let stdout = String::from_utf8_lossy(&output.stdout);
let normalized_stdout = stdout.replace("Usage: rc.exe ", "Usage: rc ");
assert!(
stdout.contains(case.usage),
normalized_stdout.contains(case.usage),
"usage marker `{}` missing for {command_label}\nstdout:\n{}",
case.usage,
stdout
Expand Down
Loading