@@ -2,8 +2,19 @@ use std::sync::Arc;
22use std:: sync:: atomic:: Ordering ;
33
44use crate :: AppState ;
5+ use opencode_mem_service:: maintenance:: { MaintenanceServices , run_maintenance_tick} ;
56
67pub async fn start_cron_scheduler ( state : Arc < AppState > ) {
8+ let services = MaintenanceServices {
9+ observation_service : Arc :: clone ( & state. observation_service ) ,
10+ session_service : Arc :: clone ( & state. session_service ) ,
11+ knowledge_service : Arc :: clone ( & state. knowledge_service ) ,
12+ search_service : Arc :: clone ( & state. search_service ) ,
13+ queue_service : Arc :: clone ( & state. queue_service ) ,
14+ infinite_mem : state. infinite_mem . clone ( ) ,
15+ config : Arc :: clone ( & state. config ) ,
16+ } ;
17+
718 let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
819 let mut shutdown_rx = state. shutdown_tx . subscribe ( ) ;
920 let mut loop_count: u64 = 0 ;
@@ -22,126 +33,6 @@ pub async fn start_cron_scheduler(state: Arc<AppState>) {
2233
2334 loop_count = loop_count. wrapping_add ( 1 ) ;
2435
25- if loop_count. is_multiple_of ( 60 )
26- && let Some ( ref infinite_mem) = state. infinite_mem
27- {
28- tracing:: debug!( "Cron: running infinite memory compression..." ) ;
29- let mem = Arc :: clone ( infinite_mem) ;
30- state. background_tasks . lock ( ) . await . spawn ( async move {
31- match mem. run_full_compression ( ) . await {
32- Ok ( ( five_min, hour, day) ) => {
33- if five_min > 0 || hour > 0 || day > 0 {
34- tracing:: info!(
35- "Cron: created {} 5min, {} hour, {} day summaries" ,
36- five_min,
37- hour,
38- day,
39- ) ;
40- }
41- }
42- Err ( e) => tracing:: warn!( "Cron: infinite memory error: {e:?}" ) ,
43- }
44- } ) ;
45- }
46-
47- if loop_count. is_multiple_of ( 180 ) {
48- tracing:: debug!( "Cron: running embedding backfill..." ) ;
49- let state_clone = Arc :: clone ( & state) ;
50- state. background_tasks . lock ( ) . await . spawn ( async move {
51- match state_clone. search_service . run_embedding_backfill ( 100 ) . await {
52- Ok ( generated) if generated > 0 => {
53- tracing:: info!( "Cron: generated {} embeddings" , generated) ;
54- }
55- Ok ( _) => { }
56- Err ( e) => tracing:: warn!( "Cron: embedding backfill failed: {}" , e) ,
57- }
58- } ) ;
59- }
60-
61- if loop_count. is_multiple_of ( 360 ) {
62- let state_clone = Arc :: clone ( & state) ;
63- state. background_tasks . lock ( ) . await . spawn ( async move {
64- match state_clone. observation_service . run_dedup_sweep ( ) . await {
65- Ok ( merged) if merged > 0 => {
66- tracing:: info!( merged, "Cron: dedup sweep completed" ) ;
67- }
68- Ok ( _) => { }
69- Err ( e) => tracing:: warn!( error = %e, "Cron: dedup sweep failed" ) ,
70- }
71- } ) ;
72- }
73-
74- if loop_count. is_multiple_of ( 720 ) {
75- let state_clone = Arc :: clone ( & state) ;
76- state. background_tasks . lock ( ) . await . spawn ( async move {
77- if let Err ( e) = state_clone
78- . observation_service
79- . cleanup_old_injections ( )
80- . await
81- {
82- tracing:: warn!( error = %e, "Cron: injection cleanup failed" ) ;
83- }
84- } ) ;
85- }
86-
87- if loop_count. is_multiple_of ( 17280 ) {
88- let ttl_secs = state. config . dlq_ttl_secs ( ) ;
89- let state_clone = Arc :: clone ( & state) ;
90- state. background_tasks . lock ( ) . await . spawn ( async move {
91- match state_clone
92- . queue_service
93- . clear_stale_failed_messages ( ttl_secs)
94- . await
95- {
96- Ok ( deleted) if deleted > 0 => {
97- tracing:: info!( deleted, "Cron: DLQ garbage collection completed" ) ;
98- }
99- Ok ( _) => { }
100- Err ( e) => tracing:: warn!( error = %e, "Cron: DLQ garbage collection failed" ) ,
101- }
102- } ) ;
103- }
104-
105- if loop_count. is_multiple_of ( 2160 ) {
106- let state_clone = Arc :: clone ( & state) ;
107- state. background_tasks . lock ( ) . await . spawn ( async move {
108- match state_clone
109- . knowledge_service
110- . run_confidence_lifecycle ( )
111- . await
112- {
113- Ok ( ( decayed, archived) ) if decayed > 0 || archived > 0 => {
114- tracing:: info!(
115- decayed,
116- archived,
117- "Cron: knowledge confidence lifecycle completed"
118- ) ;
119- }
120- Ok ( _) => { }
121- Err ( e) => {
122- tracing:: warn!( error = %e, "Cron: knowledge confidence lifecycle failed" ) ;
123- }
124- }
125- } ) ;
126- }
127-
128- if loop_count. is_multiple_of ( 360 ) {
129- let state_clone = Arc :: clone ( & state) ;
130- state. background_tasks . lock ( ) . await . spawn ( async move {
131- match state_clone
132- . session_service
133- . generate_pending_summaries ( 10 )
134- . await
135- {
136- Ok ( n) if n > 0 => {
137- tracing:: info!( generated = n, "Cron: session summary generation completed" ) ;
138- }
139- Ok ( _) => { }
140- Err ( e) => {
141- tracing:: warn!( error = %e, "Cron: session summary generation failed" ) ;
142- }
143- }
144- } ) ;
145- }
36+ run_maintenance_tick ( & services, loop_count) . await ;
14637 }
14738}
0 commit comments