Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions crates/cloudreve-sync/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct AppConfig {
pub log_level: LogLevel,
/// Maximum number of log files to keep
pub log_max_files: usize,
/// Delay before starting upload after file stops changing (seconds). 0 disables the delay.
pub sync_delay_seconds: u64,
/// Language/locale setting (e.g., "en-US", "zh-CN"). None means use system default.
pub language: Option<String>,
}
Expand All @@ -74,6 +76,7 @@ impl Default for AppConfig {
log_to_file: true,
log_level: LogLevel::Debug,
log_max_files: 5,
sync_delay_seconds: 0,
language: None,
}
}
Expand Down Expand Up @@ -288,6 +291,21 @@ impl ConfigManager {
})
}

/// Get the sync delay in seconds
pub fn sync_delay_seconds(&self) -> u64 {
self.config
.read()
.map(|c| c.sync_delay_seconds)
.unwrap_or(0)
}

/// Set the sync delay in seconds
pub fn set_sync_delay_seconds(&self, seconds: u64) -> Result<()> {
self.update(|config| {
config.sync_delay_seconds = seconds;
})
}

/// Get the language setting
pub fn language(&self) -> Option<String> {
self.config.read().ok().and_then(|c| c.language.clone())
Expand Down
167 changes: 166 additions & 1 deletion crates/cloudreve-sync/src/tasks/queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::ConfigManager;
use crate::inventory::{InventoryDb, NewTaskRecord, TaskRecord, TaskStatus, TaskUpdate};
use crate::tasks::download::DownloadTask;
use crate::tasks::types::{TaskKind, TaskPayload, TaskProgress};
Expand All @@ -6,14 +7,17 @@ use anyhow::{Context, Result, anyhow};
use cloudreve_api::Client;
use dashmap::DashMap;
use serde_json::Value;
use std::path::PathBuf;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, SystemTime};
use tokio::sync::{
Mutex, Notify, Semaphore,
mpsc::{self, UnboundedReceiver, UnboundedSender},
};
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -48,6 +52,12 @@ pub struct TaskQueue {
task_paths: DashMap<String, String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct FileChangeSignature {
size: u64,
modified: SystemTime,
}

impl TaskQueue {
pub async fn new(
drive_id: impl Into<String>,
Expand Down Expand Up @@ -501,6 +511,11 @@ impl TaskQueue {

match &task.payload.kind {
TaskKind::Upload => {
match self.wait_for_upload_stability(task).await? {
TaskRunState::Cancelled => return Ok(TaskRunState::Cancelled),
TaskRunState::Completed => {}
}

let mut task_executor = UploadTask::new(
self.inventory.clone(),
self.cr_client.clone(),
Expand Down Expand Up @@ -567,6 +582,156 @@ impl TaskQueue {
Ok(TaskRunState::Completed)
}

fn is_task_marked_inactive(&self, task_id: &str) -> bool {
match self.inventory.get_task_status(task_id) {
Ok(Some(TaskStatus::Cancelled)) => true,
Ok(Some(status)) => !status.is_active(),
Err(err) => {
warn!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task_id,
error = %err,
"Failed to check task status while waiting for delayed upload"
);
false
}
_ => false,
}
}

fn capture_file_signature(path: &Path) -> Result<Option<FileChangeSignature>> {
let metadata = match std::fs::metadata(path) {
Ok(metadata) => metadata,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(err)
.with_context(|| format!("failed to read metadata for {}", path.display()));
}
};
if !metadata.is_file() {
return Ok(None);
}

let modified = metadata
.modified()
.with_context(|| format!("failed to read modified time for {}", path.display()))?;

Ok(Some(FileChangeSignature {
size: metadata.len(),
modified,
}))
}

async fn wait_for_upload_stability(&self, task: &QueuedTask) -> Result<TaskRunState> {
let delay_seconds = ConfigManager::try_get()
.map(|config| config.sync_delay_seconds())
.unwrap_or(0);

if delay_seconds == 0 {
return Ok(TaskRunState::Completed);
}

let local_path = &task.payload.local_path;
let mut signature = match Self::capture_file_signature(local_path.as_path()) {
Ok(Some(signature)) => signature,
Ok(None) => return Ok(TaskRunState::Completed),
Err(err) => {
warn!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
error = %err,
"Failed to capture initial file signature, skipping delayed upload wait"
);
return Ok(TaskRunState::Completed);
}
};

info!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
delay_seconds = delay_seconds,
"Waiting for file to stop changing before upload"
);

let required_stable = Duration::from_secs(delay_seconds);
let poll_interval = if delay_seconds <= 30 {
Duration::from_secs(1)
} else if delay_seconds <= 120 {
Duration::from_secs(2)
} else {
Duration::from_secs(3)
};
let status_check_interval = Duration::from_secs(5);
let mut unchanged_since = Instant::now();
let mut next_status_check = Instant::now();

while unchanged_since.elapsed() < required_stable {
if self.cancel_requested.load(Ordering::SeqCst) {
debug!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
"Task cancelled while waiting for upload delay"
);
return Ok(TaskRunState::Cancelled);
}

if Instant::now() >= next_status_check {
if self.is_task_marked_inactive(&task.task_id) {
debug!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
"Task became inactive while waiting for upload delay"
);
return Ok(TaskRunState::Cancelled);
}
next_status_check = Instant::now() + status_check_interval;
}

sleep(poll_interval).await;

let current_signature = match Self::capture_file_signature(local_path.as_path()) {
Ok(Some(signature)) => signature,
Ok(None) => return Ok(TaskRunState::Completed),
Err(err) => {
warn!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
error = %err,
"Failed to capture file signature while waiting, continuing upload"
);
return Ok(TaskRunState::Completed);
}
};

if current_signature != signature {
signature = current_signature;
unchanged_since = Instant::now();
}
}

debug!(
target: "tasks::queue",
drive = %self.drive_id,
task_id = %task.task_id,
path = %local_path.display(),
delay_seconds = delay_seconds,
"File remained unchanged for configured delay, proceeding with upload"
);

Ok(TaskRunState::Completed)
}

#[allow(dead_code)]
async fn wait_for_idle(&self) {
while self.inflight.load(Ordering::SeqCst) > 0 {
Expand Down
10 changes: 10 additions & 0 deletions src-tauri/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ pub async fn get_general_settings() -> CommandResult<GeneralSettings> {
log_to_file: config.log_to_file,
log_level: config.log_level.as_str().to_string(),
log_max_files: config.log_max_files,
sync_delay_seconds: config.sync_delay_seconds,
log_dir: ConfigManager::get_log_dir().display().to_string(),
language: config.language,
})
Expand All @@ -569,6 +570,7 @@ pub struct GeneralSettings {
pub log_to_file: bool,
pub log_level: String,
pub log_max_files: usize,
pub sync_delay_seconds: u64,
pub log_dir: String,
pub language: Option<String>,
}
Expand Down Expand Up @@ -600,6 +602,14 @@ pub async fn set_log_max_files(max_files: usize) -> CommandResult<()> {
.map_err(|e| e.to_string())
}

/// Set sync delay in seconds
#[tauri::command]
pub async fn set_sync_delay_seconds(seconds: u64) -> CommandResult<()> {
ConfigManager::get()
.set_sync_delay_seconds(seconds)
.map_err(|e| e.to_string())
}

/// Set language setting and update rust_i18n locale
#[tauri::command]
pub async fn set_language(app: AppHandle, language: Option<String>) -> CommandResult<()> {
Expand Down
1 change: 1 addition & 0 deletions src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ pub fn run() {
commands::set_log_to_file,
commands::set_log_level,
commands::set_log_max_files,
commands::set_sync_delay_seconds,
commands::set_language,
commands::open_log_folder,
])
Expand Down
3 changes: 3 additions & 0 deletions ui/public/locales/en-US/common.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
"logLevelDescription": "Set the verbosity of log output (restart required)",
"logMaxFiles": "Max log files",
"logMaxFilesDescription": "Number of log files to keep (restart required)",
"experimentalFeatures": "Experimental features",
"delaySync": "Delay sync",
"delaySyncDescription": "Sync task wait until file stop changing",
"storage": "Storage",
"openFolder": "Open folder",
"openSite": "Open site",
Expand Down
3 changes: 3 additions & 0 deletions ui/public/locales/pl/common.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
"logLevelDescription": "Ustaw szczegółowość dziennika (wymaga restartu)",
"logMaxFiles": "Maksymalna liczba plików dziennika",
"logMaxFilesDescription": "Liczba plików dziennika do przechowywania (wymaga restartu)",
"experimentalFeatures": "Funkcje eksperymentalne",
"delaySync": "Opóźnij synchronizację",
"delaySyncDescription": "Zadanie synchronizacji czeka, aż plik przestanie się zmieniać",
"storage": "Pamięć",
"openFolder": "Otwórz folder",
"openSite": "Otwórz witrynę",
Expand Down
Loading