Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions code-rs/tui/src/bottom_pane/chat_composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,14 @@ impl ChatComposer {
}
}

impl Drop for ChatComposer {
fn drop(&mut self) {
if let Some(animation_flag) = self.animation_running.take() {
animation_flag.store(false, Ordering::Relaxed);
}
}
}

impl WidgetRef for ChatComposer {
fn render_ref(&self, area: Rect, buf: &mut Buffer) {
if self.render_mode == ComposerRenderMode::FooterOnly {
Expand Down Expand Up @@ -3074,8 +3082,56 @@ fn lerp_gradient_color(gradient: BorderGradient, ratio: f32) -> Color {
mod tests {
use super::*;
use crate::app_event::AppEvent;
use crate::thread_spawner;
use ratatui::buffer::Buffer;
use ratatui::layout::Rect;
use std::thread;
use std::time::{Duration, Instant};

fn wait_for_background_threads_at_most(limit: usize, timeout: Duration) {
let deadline = Instant::now() + timeout;
loop {
if thread_spawner::active_thread_count() <= limit {
return;
}
assert!(
Instant::now() < deadline,
"background thread count stayed above {limit}: {}",
thread_spawner::active_thread_count()
);
thread::sleep(Duration::from_millis(10));
}
}

fn wait_for_background_threads_at_least(limit: usize, timeout: Duration) {
let deadline = Instant::now() + timeout;
loop {
if thread_spawner::active_thread_count() >= limit {
return;
}
assert!(
Instant::now() < deadline,
"background thread count never reached {limit}: {}",
thread_spawner::active_thread_count()
);
thread::sleep(Duration::from_millis(10));
}
}

#[test]
fn drop_stops_running_animation_thread() {
let baseline = thread_spawner::active_thread_count();
let (tx, _rx) = std::sync::mpsc::channel::<AppEvent>();
let app_tx = AppEventSender::new(tx);

{
let mut composer = ChatComposer::new(true, app_tx, true, false);
composer.set_task_running(true);
wait_for_background_threads_at_least(baseline + 1, Duration::from_millis(250));
}

wait_for_background_threads_at_most(baseline, Duration::from_secs(2));
}

#[test]
fn auto_review_status_stays_left_with_auto_drive_footer() {
Expand Down
187 changes: 155 additions & 32 deletions code-rs/tui/src/chatwidget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,7 @@ pub(crate) struct ChatWidget<'a> {
rate_limit_secondary_next_reset_at: Option<DateTime<Utc>>,
rate_limit_refresh_scheduled_for: Option<DateTime<Utc>>,
rate_limit_refresh_schedule_id: Arc<AtomicU64>,
rate_limit_refresh_task: Option<task::JoinHandle<()>>,
content_buffer: String,
// Buffer for streaming assistant answer text; we do not surface partial
// We wait for the final AgentMessage event and then emit the full text
Expand Down Expand Up @@ -6689,6 +6690,7 @@ impl ChatWidget<'_> {
rate_limit_secondary_next_reset_at: None,
rate_limit_refresh_scheduled_for: None,
rate_limit_refresh_schedule_id: Arc::new(AtomicU64::new(0)),
rate_limit_refresh_task: None,
content_buffer: String::new(),
last_assistant_message: None,
last_answer_stream_id_in_turn: None,
Expand Down Expand Up @@ -7058,6 +7060,7 @@ impl ChatWidget<'_> {
rate_limit_secondary_next_reset_at: None,
rate_limit_refresh_scheduled_for: None,
rate_limit_refresh_schedule_id: Arc::new(AtomicU64::new(0)),
rate_limit_refresh_task: None,
content_buffer: String::new(),
last_assistant_message: None,
last_answer_stream_id_in_turn: None,
Expand Down Expand Up @@ -16464,17 +16467,25 @@ impl ChatWidget<'_> {
self.maybe_schedule_rate_limit_refresh();
}

fn cancel_rate_limit_refresh_task(&mut self) {
if let Some(handle) = self.rate_limit_refresh_task.take() {
handle.abort();
}
}

fn maybe_schedule_rate_limit_refresh(&mut self) {
let Some(reset_at) = self.rate_limit_secondary_next_reset_at else {
self.rate_limit_refresh_scheduled_for = None;
self.rate_limit_refresh_schedule_id.fetch_add(1, Ordering::SeqCst);
self.cancel_rate_limit_refresh_task();
return;
};

if self.rate_limit_refresh_scheduled_for == Some(reset_at) {
return;
}

self.cancel_rate_limit_refresh_task();
self.rate_limit_refresh_scheduled_for = Some(reset_at);
let schedule_id = self
.rate_limit_refresh_schedule_id
Expand All @@ -16494,47 +16505,54 @@ impl ChatWidget<'_> {
return;
}

if thread_spawner::spawn_lightweight("rate-reset-refresh", move || {
let now = Utc::now();
let delay = reset_at.signed_duration_since(now) + ChronoDuration::seconds(1);
if let Ok(delay) = delay.to_std() {
if !delay.is_zero() {
std::thread::sleep(delay);
}
}

if schedule_token.load(Ordering::SeqCst) != schedule_id {
return;
}
let Some(account) = account else {
return;
};

let Some(account) = account else {
return;
};
let now = Utc::now();
let delay = reset_at.signed_duration_since(now) + ChronoDuration::seconds(1);
let delay = delay.to_std().ok();

let plan = account
.tokens
.as_ref()
.and_then(|tokens| tokens.id_token.get_chatgpt_plan_type());
let should_refresh = account_usage::mark_rate_limit_refresh_attempt_if_due(
&config.code_home,
&account.id,
plan.as_deref(),
Some(reset_at),
Utc::now(),
account_usage::rate_limit_refresh_stale_interval(),
)
.unwrap_or(false);
if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
self.rate_limit_refresh_task = Some(runtime_handle.spawn(async move {
if let Some(delay) = delay {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}

if should_refresh {
start_rate_limit_refresh_for_account(
maybe_run_rate_limit_refresh(
schedule_token,
schedule_id,
reset_at,
app_event_tx,
config,
debug_enabled,
account,
true,
false,
);
}));
return;
}

tracing::warn!(
"rate reset refresh scheduled without Tokio runtime; falling back to lightweight thread"
);
if thread_spawner::spawn_lightweight("rate-reset-refresh", move || {
if let Some(delay) = delay {
if !delay.is_zero() {
std::thread::sleep(delay);
}
}

maybe_run_rate_limit_refresh(
schedule_token,
schedule_id,
reset_at,
app_event_tx,
config,
debug_enabled,
account,
);
})
.is_none()
{
Expand Down Expand Up @@ -30357,6 +30375,51 @@ fn release_background_lock(agent_id: &Option<String>) {
}
}

fn maybe_run_rate_limit_refresh(
schedule_token: Arc<AtomicU64>,
schedule_id: u64,
reset_at: DateTime<Utc>,
app_event_tx: AppEventSender,
config: Config,
debug_enabled: bool,
account: StoredAccount,
) {
if schedule_token.load(Ordering::SeqCst) != schedule_id {
return;
}

let plan = account
.tokens
.as_ref()
.and_then(|tokens| tokens.id_token.get_chatgpt_plan_type());
let should_refresh = account_usage::mark_rate_limit_refresh_attempt_if_due(
&config.code_home,
&account.id,
plan.as_deref(),
Some(reset_at),
Utc::now(),
account_usage::rate_limit_refresh_stale_interval(),
)
.unwrap_or(false);

if should_refresh {
start_rate_limit_refresh_for_account(
app_event_tx,
config,
debug_enabled,
account,
true,
false,
);
}
}

impl Drop for ChatWidget<'_> {
fn drop(&mut self) {
self.cancel_rate_limit_refresh_task();
}
}

#[cfg(test)]
static AUTO_REVIEW_STUB: once_cell::sync::Lazy<std::sync::Mutex<Option<Box<dyn FnMut() + Send>>>> =
once_cell::sync::Lazy::new(|| std::sync::Mutex::new(None));
Expand Down Expand Up @@ -30867,6 +30930,66 @@ use code_core::protocol::OrderMeta;
assert_eq!(reloaded.service_tier, None);
}

#[test]
fn repeated_rate_limit_reschedules_do_not_consume_lightweight_threads() {
let _runtime_guard = enter_test_runtime_guard();
let code_home = tempdir().expect("temp code home");
auth_accounts::upsert_api_key_account(
code_home.path(),
"sk-test".to_string(),
Some("Test Account".to_string()),
true,
)
.expect("active account");

let mut config = Config::load_from_base_config_with_overrides(
ConfigToml::default(),
ConfigOverrides::default(),
code_home.path().to_path_buf(),
)
.expect("config");
config.code_home = code_home.path().to_path_buf();

let (tx_raw, _rx) = std::sync::mpsc::channel::<AppEvent>();
let app_event_tx = crate::app_event_sender::AppEventSender::new(tx_raw);
let terminal_info = crate::tui::TerminalInfo {
picker: None,
font_size: (8, 16),
};
let mut chat = ChatWidget::new(
config,
app_event_tx,
None,
Vec::new(),
false,
terminal_info,
false,
None,
);

let baseline = crate::thread_spawner::active_thread_count();
for offset_minutes in 10..18 {
chat.rate_limit_secondary_next_reset_at =
Some(Utc::now() + ChronoDuration::minutes(offset_minutes));
chat.maybe_schedule_rate_limit_refresh();
}

std::thread::sleep(std::time::Duration::from_millis(20));

assert!(
chat.rate_limit_refresh_task.is_some(),
"expected rate-limit refresh to use a tracked async task"
);
assert_eq!(
crate::thread_spawner::active_thread_count(),
baseline,
"rate-limit refresh reschedules should not consume lightweight threads"
);

chat.rate_limit_secondary_next_reset_at = None;
chat.maybe_schedule_rate_limit_refresh();
}

#[test]
fn apply_context_mode_selection_persists_disabled_override() {
let _runtime_guard = enter_test_runtime_guard();
Expand Down
5 changes: 5 additions & 0 deletions code-rs/tui/src/thread_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,8 @@ where
}
}
}

#[cfg(test)]
pub(crate) fn active_thread_count() -> usize {
ACTIVE_THREADS.load(Ordering::SeqCst)
}