Skip to content

Commit 988a032

Browse files
committed
treewide: fix multiple safety and correctness issues
- config: rewrite normalize_cron() to parse DOW values numerically, fixing false replacements like "10" -> "17" or "0-6" -> "7-6" - scheduler: fix Concurrency::Wait blocking by spawning waiting jobs asynchronously instead of awaiting in the actor handler - git: improve sync_to_job_dir atomicity with backup/rename/remove pattern to minimize window where job_dir doesn't exist - executor: add path traversal validation for env_file paths - scheduler: extend is_job_due tolerance to ±1s for clock drift - git: add LC_ALL=C to git commands for consistent English output - git: add TempDirGuard RAII for reliable temp directory cleanup
1 parent 80a2080 commit 988a032

4 files changed

Lines changed: 202 additions & 27 deletions

File tree

src/config.rs

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,22 +168,63 @@ fn normalize_cron(cron: &str) -> String {
168168
}
169169

170170
// Day-of-week is the 5th field (index 4)
171-
// Replace standalone "0" with "7" for Sunday
172-
let dow = fields[4]
173-
.replace(",0,", ",7,")
174-
.replace(",0", ",7")
175-
.replace("0,", "7,")
176-
.replace("0-", "7-");
177-
178-
// Handle standalone "0"
179-
let dow = if dow == "0" { "7".to_string() } else { dow };
171+
// Parse each element properly to avoid false replacements (e.g., "10" -> "17")
172+
let dow = normalize_dow_field(fields[4]);
180173

181174
format!(
182175
"{} {} {} {} {}",
183176
fields[0], fields[1], fields[2], fields[3], dow
184177
)
185178
}
186179

180+
/// Normalize a single day-of-week field, converting 0 (POSIX Sunday) to 7 (cron crate Sunday).
181+
fn normalize_dow_field(field: &str) -> String {
182+
// Handle special cases
183+
if field == "*" || field == "?" {
184+
return field.to_string();
185+
}
186+
187+
// Split by comma for lists (e.g., "0,3,5")
188+
field
189+
.split(',')
190+
.map(|part| normalize_dow_part(part))
191+
.collect::<Vec<_>>()
192+
.join(",")
193+
}
194+
195+
/// Normalize a single part of a day-of-week field (handles ranges like "0-6" and steps like "0/2").
196+
fn normalize_dow_part(part: &str) -> String {
197+
// Handle step values (e.g., "0/2" or "0-6/2")
198+
if let Some((range_part, step)) = part.split_once('/') {
199+
let normalized_range = normalize_dow_range(range_part);
200+
return format!("{}/{}", normalized_range, step);
201+
}
202+
203+
normalize_dow_range(part)
204+
}
205+
206+
/// Normalize a range or single value (e.g., "0", "0-6").
207+
fn normalize_dow_range(part: &str) -> String {
208+
// Handle ranges (e.g., "0-6")
209+
if let Some((start, end)) = part.split_once('-') {
210+
let start_normalized = normalize_dow_value(start);
211+
let end_normalized = normalize_dow_value(end);
212+
return format!("{}-{}", start_normalized, end_normalized);
213+
}
214+
215+
// Single value
216+
normalize_dow_value(part)
217+
}
218+
219+
/// Normalize a single day-of-week value: "0" -> "7", others unchanged.
220+
fn normalize_dow_value(val: &str) -> String {
221+
if val == "0" {
222+
"7".to_string()
223+
} else {
224+
val.to_string()
225+
}
226+
}
227+
187228
pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
188229
let config: Config = serde_yaml::from_str(content)
189230
.map_err(|e| anyhow!("Failed to parse YAML: {}", e))?;
@@ -948,4 +989,26 @@ jobs:
948989
let (_, jobs) = parse_config(yaml).unwrap();
949990
assert_eq!(jobs[0].webhook.len(), 3);
950991
}
992+
993+
#[test]
994+
fn test_normalize_dow_field() {
995+
// Standalone 0 -> 7
996+
assert_eq!(normalize_dow_field("0"), "7");
997+
// Other values unchanged
998+
assert_eq!(normalize_dow_field("1"), "1");
999+
assert_eq!(normalize_dow_field("*"), "*");
1000+
assert_eq!(normalize_dow_field("?"), "?");
1001+
// List with 0
1002+
assert_eq!(normalize_dow_field("0,3,5"), "7,3,5");
1003+
assert_eq!(normalize_dow_field("1,0,3"), "1,7,3");
1004+
// Range starting with 0
1005+
assert_eq!(normalize_dow_field("0-6"), "7-6");
1006+
// Step value
1007+
assert_eq!(normalize_dow_field("0/2"), "7/2");
1008+
assert_eq!(normalize_dow_field("0-6/2"), "7-6/2");
1009+
// Values that should NOT be affected (regression test for "10" -> "17" bug)
1010+
assert_eq!(normalize_dow_field("10"), "10"); // Invalid but should not be mangled
1011+
assert_eq!(normalize_dow_field("1-5"), "1-5");
1012+
assert_eq!(normalize_dow_field("20"), "20"); // Invalid but should not be mangled
1013+
}
9511014
}

src/git.rs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,30 @@ use anyhow::{Context, Result};
22
use std::path::{Path, PathBuf};
33
use std::process::Command;
44

5+
/// RAII guard that removes a directory on drop unless disarmed.
6+
struct TempDirGuard<'a> {
7+
path: &'a Path,
8+
keep: bool,
9+
}
10+
11+
impl<'a> TempDirGuard<'a> {
12+
fn new(path: &'a Path) -> Self {
13+
Self { path, keep: false }
14+
}
15+
16+
fn disarm(&mut self) {
17+
self.keep = true;
18+
}
19+
}
20+
21+
impl Drop for TempDirGuard<'_> {
22+
fn drop(&mut self) {
23+
if !self.keep && self.path.exists() {
24+
let _ = std::fs::remove_dir_all(self.path);
25+
}
26+
}
27+
}
28+
529
/// Ensures repo is cloned/synced to cache. Returns (cache path, commit_range if updated).
630
pub fn ensure_repo(source: &str) -> Result<(PathBuf, Option<String>)> {
731
let cache_dir = get_cache_dir(source)?;
@@ -41,6 +65,7 @@ fn sync_repo(dest: &Path) -> Result<Option<String>> {
4165
let has_upstream = Command::new("git")
4266
.args(["rev-parse", "--abbrev-ref", "@{upstream}"])
4367
.current_dir(dest)
68+
.env("LC_ALL", "C") // Ensure consistent English output
4469
.output()
4570
.map(|o| o.status.success())
4671
.unwrap_or(false);
@@ -49,6 +74,7 @@ fn sync_repo(dest: &Path) -> Result<Option<String>> {
4974
let output = Command::new("git")
5075
.args(["pull", "--ff-only"])
5176
.current_dir(dest)
77+
.env("LC_ALL", "C") // Ensure consistent English output
5278
.output()?;
5379

5480
if !output.status.success() {
@@ -129,14 +155,17 @@ pub fn sync_to_job_dir(sot_path: &Path, job_dir: &Path) -> Result<()> {
129155
}
130156
std::fs::create_dir_all(&temp_dir)?;
131157

158+
// RAII guard ensures temp_dir is cleaned up on any error path
159+
let mut temp_guard = TempDirGuard::new(&temp_dir);
160+
132161
// Use git archive for all repos (both local and remote use git clone now)
133162
let archive = Command::new("git")
134163
.args(["archive", "HEAD"])
135164
.current_dir(sot_path)
136165
.output()?;
137166

138167
if !archive.status.success() {
139-
std::fs::remove_dir_all(&temp_dir)?;
168+
// temp_guard will clean up on drop
140169
let stderr = String::from_utf8_lossy(&archive.stderr);
141170
anyhow::bail!("git archive failed: {}", stderr);
142171
}
@@ -159,18 +188,39 @@ pub fn sync_to_job_dir(sot_path: &Path, job_dir: &Path) -> Result<()> {
159188

160189
let status = extract.wait()?;
161190
if !status.success() {
162-
std::fs::remove_dir_all(&temp_dir)?;
191+
// temp_guard will clean up on drop
163192
anyhow::bail!("tar extraction failed with exit code: {:?}", status.code());
164193
}
165194

166-
// Atomic swap: remove old, rename temp to target
195+
// Disarm the guard before rename - we'll handle cleanup manually from here
196+
temp_guard.disarm();
197+
198+
// Safe swap: rename old to backup, rename temp to target, then remove backup.
199+
// This minimizes the window where job_dir doesn't exist.
200+
let backup_dir = job_dir.with_extension("old");
201+
202+
// Clean up any leftover backup directory
203+
if backup_dir.exists() {
204+
let _ = std::fs::remove_dir_all(&backup_dir);
205+
}
206+
167207
if job_dir.exists() {
168-
std::fs::remove_dir_all(job_dir)?;
208+
// Move existing to backup first
209+
std::fs::rename(job_dir, &backup_dir).with_context(|| {
210+
format!("Failed to rename {} to backup", job_dir_str)
211+
})?;
169212
}
213+
214+
// Move new directory into place
170215
std::fs::rename(&temp_dir, job_dir).with_context(|| {
171216
format!("Failed to rename {} to {}", temp_dir_str, job_dir_str)
172217
})?;
173218

219+
// Remove backup (ignore errors as it's cleanup)
220+
if backup_dir.exists() {
221+
let _ = std::fs::remove_dir_all(&backup_dir);
222+
}
223+
174224
Ok(())
175225
}
176226

src/scheduler/executor.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,43 @@ async fn run_command(job: &Job, work_dir: &PathBuf, sot_path: &PathBuf, runner:
165165
}
166166
}
167167

168+
/// Validate that a path stays within the base directory (prevents path traversal).
169+
fn validate_env_file_path(
170+
env_file_path: &str,
171+
base_dir: &PathBuf,
172+
context: &str,
173+
) -> anyhow::Result<PathBuf> {
174+
let expanded = env::expand_string(env_file_path);
175+
let full_path = base_dir.join(&expanded);
176+
177+
// Canonicalize to resolve .. and symlinks
178+
match (full_path.canonicalize(), base_dir.canonicalize()) {
179+
(Ok(resolved), Ok(base)) if resolved.starts_with(&base) => Ok(resolved),
180+
(Ok(_), Ok(_)) => {
181+
anyhow::bail!(
182+
"{} env_file '{}': path traversal detected (must be within base directory)",
183+
context,
184+
env_file_path
185+
)
186+
}
187+
(Err(e), _) => {
188+
anyhow::bail!(
189+
"{} env_file '{}': file not found or inaccessible: {}",
190+
context,
191+
env_file_path,
192+
e
193+
)
194+
}
195+
(_, Err(e)) => {
196+
anyhow::bail!(
197+
"{} base directory not accessible: {}",
198+
context,
199+
e
200+
)
201+
}
202+
}
203+
}
204+
168205
fn merge_env_vars(
169206
job: &Job,
170207
work_dir: &PathBuf,
@@ -175,9 +212,8 @@ fn merge_env_vars(
175212

176213
// 1. Start with runner.env_file (loaded from sot_path)
177214
if let Some(env_file_path) = &runner.env_file {
178-
let expanded = env::expand_string(env_file_path);
179-
let full_path = sot_path.join(&expanded);
180-
let vars = env::load_env_from_path(&full_path)?;
215+
let validated_path = validate_env_file_path(env_file_path, sot_path, "runner")?;
216+
let vars = env::load_env_from_path(&validated_path)?;
181217
env_vars.extend(vars);
182218
}
183219

@@ -190,9 +226,8 @@ fn merge_env_vars(
190226

191227
// 3. Merge job.env_file (loaded from work_dir)
192228
if let Some(env_file_path) = &job.env_file {
193-
let expanded = env::expand_string(env_file_path);
194-
let full_path = work_dir.join(&expanded);
195-
let vars = env::load_env_from_path(&full_path)?;
229+
let validated_path = validate_env_file_path(env_file_path, work_dir, &format!("job:{}", job.id))?;
230+
let vars = env::load_env_from_path(&validated_path)?;
196231
env_vars.extend(vars);
197232
}
198233

src/scheduler/mod.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,11 @@ impl Scheduler {
192192
"{} Waiting for {} previous run(s) to complete",
193193
tag, running_count
194194
);
195-
if let Some(handles) = self.job_handles.remove(&job.id) {
196-
for handle in handles {
197-
let _ = handle.await;
198-
}
199-
}
200-
self.cleanup_finished_handles(&job.id);
195+
// Spawn waiting job asynchronously to avoid blocking the actor
196+
self.spawn_waiting_job(job, addr);
197+
} else {
198+
self.spawn_job(job, addr);
201199
}
202-
self.spawn_job(job, addr);
203200
}
204201
Concurrency::Skip => {
205202
if running_count > 0 {
@@ -238,6 +235,34 @@ impl Scheduler {
238235

239236
self.job_handles.entry(job_id).or_default().push(handle);
240237
}
238+
239+
/// Spawn a job that waits for previous runs to complete before executing.
240+
/// This avoids blocking the actor by spawning a separate task that handles the waiting.
241+
fn spawn_waiting_job(&mut self, job: Job, addr: Address<Scheduler, Weak>) {
242+
let job_id = job.id.clone();
243+
let job_id_for_log = job.id.clone();
244+
let work_dir = resolve_work_dir(&self.sot_path, &job.id, &job.working_dir);
245+
let sot_path = self.sot_path.clone();
246+
let runner = self.runner.clone();
247+
248+
// Take ownership of existing handles to wait on them
249+
let previous_handles = self.job_handles.remove(&job.id).unwrap_or_default();
250+
251+
let handle = tokio::spawn(async move {
252+
// Wait for all previous runs to complete
253+
for prev_handle in previous_handles {
254+
let _ = prev_handle.await;
255+
}
256+
257+
// Now execute the new job
258+
execute_job(&job, &work_dir, &sot_path, &runner).await;
259+
if let Err(e) = addr.send(JobCompleted).await {
260+
eprintln!("[job:{}] Failed to notify completion: {}", job_id_for_log, e);
261+
}
262+
});
263+
264+
self.job_handles.entry(job_id).or_default().push(handle);
265+
}
241266
}
242267

243268
// === Helper Functions ===
@@ -252,7 +277,9 @@ where
252277
let now = Utc::now().with_timezone(&tz);
253278
if let Some(next) = schedule.upcoming(tz).next() {
254279
let until_next = (next - now).num_milliseconds();
255-
until_next <= SCHEDULE_TOLERANCE_MS && until_next >= 0
280+
// Allow jobs to be triggered slightly after their scheduled time
281+
// (e.g., due to minor clock drift or processing delay)
282+
until_next <= SCHEDULE_TOLERANCE_MS && until_next >= -SCHEDULE_TOLERANCE_MS
256283
} else {
257284
false
258285
}

0 commit comments

Comments
 (0)