@@ -73,14 +73,14 @@ fn generate_run_start<'a>(
7373
7474 let run_start_args = RunStartArgs {
7575 start_time : start_time as u64 ,
76- stop_time : 0 , // TODO check this
77- run_name : Some ( fbb. create_string ( run_name. as_str ( ) ) ) ,
76+ stop_time : 0 , // TODO check this - it's optional so not necessarily 0
77+ run_name : Some ( fbb. create_string ( & run_name) ) ,
7878 instrument_name : Some ( fbb. create_string ( "saluki-howl" ) ) ,
79- nexus_structure : Some ( fbb. create_string ( nexus_structure. to_string ( ) . as_str ( ) ) ) , // TODO
80- job_id : Some ( fbb. create_string ( job_id. as_str ( ) ) ) ,
79+ nexus_structure : Some ( fbb. create_string ( & nexus_structure. to_string ( ) ) ) ,
80+ job_id : Some ( fbb. create_string ( & job_id) ) ,
8181 broker : None ,
8282 service_id : None ,
83- filename : Some ( fbb. create_string ( file_name. as_str ( ) ) ) ,
83+ filename : Some ( fbb. create_string ( & file_name) ) ,
8484 n_periods : 1 ,
8585 detector_spectrum_map : Some ( det_spec_map_buf) ,
8686 metadata : None ,
@@ -114,6 +114,7 @@ fn generate_run_stop<'a>(fbb: &'a mut FlatBufferBuilder<'_>, job_id: String) ->
114114fn produce_messages (
115115 producer : & BaseProducer ,
116116 fbb : & mut FlatBufferBuilder ,
117+ frame : i64 ,
117118 topic_prefix : String ,
118119 events_per_message : i32 ,
119120 messages_per_frame : u32 ,
@@ -125,12 +126,19 @@ fn produce_messages(
125126 current_job_id : String ,
126127) -> String {
127128 // get currnet time
129+ let now = SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . as_millis ( ) as f32 ;
130+
131+ for _ in 0 ..messages_per_frame {
132+ match producer. send ( BaseRecord :: to ( format ! ( "{topic_prefix}_rawEvents" ) . as_str ( ) )
133+ . key ( "" )
134+ . payload ( generate_fake_events ( fbb, frame, events_per_message, tof_peak, tof_sigma, det_min, det_max, now) ) ,
135+ ) {
136+ Ok ( _) => { }
137+ Err ( err) => { warn ! ( "Failed to send messages: {}" , err. 0 . to_string( ) ) ; }
138+ }
139+ }
128140
129- // generate fake events
130-
131- // for x in range(messages_per_frame)
132-
133- // poll producer
141+ producer. poll ( Duration :: from_secs ( 0 ) ) ;
134142
135143 // create run stop + start if frames_per_run > 0 and frame % frames_per_run == 0
136144
@@ -236,37 +244,40 @@ pub fn howl(
236244 . payload ( generate_run_start (
237245 & mut fbb,
238246 det_max,
239- topic_prefix,
240- current_job_id,
247+ topic_prefix. clone ( ) ,
248+ current_job_id. clone ( ) ,
241249 ) ) ,
242250 )
243251 . expect ( "Failed to enqueue run start message" ) ;
244252
245- let target_frame_time = Duration :: from_secs ( ( 1 / frames_per_second) as u64 ) ;
253+ let target_frame_time = Duration :: from_secs_f64 ( 1.0 / frames_per_second as f64 ) ;
254+ debug ! ( "Target frame time: {target_frame_time:?}" ) ;
246255
247- let mut frames: u64 = 0 ;
256+ let mut frames: i64 = 0 ;
248257
249258 let mut target_time = SystemTime :: now ( )
250259 . duration_since ( SystemTime :: UNIX_EPOCH )
251260 . unwrap ( ) ;
261+ debug ! ( "Target time: {target_time:?}" ) ;
252262 loop {
253263 target_time += target_frame_time;
264+ debug ! ( "New target: {target_time:?}" ) ;
254265 frames += 1 ;
255266
256267 current_job_id = produce_messages (
257268 & producer,
258269 & mut fbb,
259- topic_prefix,
270+ frames,
271+ topic_prefix. clone ( ) ,
260272 events_per_message,
261273 messages_per_frame,
262274 frames_per_run,
263275 tof_peak,
264276 tof_sigma,
265277 det_min,
266278 det_max,
267- current_job_id,
279+ current_job_id. clone ( ) ,
268280 ) ;
269-
270281 let sleep_time = target_time
271282 - SystemTime :: now ( )
272283 . duration_since ( SystemTime :: UNIX_EPOCH )
0 commit comments