@@ -171,8 +171,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
171171 let handler = format_ident ! ( "handle_{}" , ident) ;
172172 quote ! {
173173 if let Some ( port) = fut. broker. #ident. enabled( ) {
174+ any_handler_enabled |= true ;
174175 loop {
175176 match :: futures:: StreamExt :: poll_next_unpin( port, cx) {
177+ // check if message is newer than fence
176178 :: std:: task:: Poll :: Ready ( Some ( output) ) if output. source_ts > fence => {
177179 match fut. broker. #handler( fut. plan, output) {
178180 :: std:: result:: Result :: Ok ( :: agentwire:: BrokerFlow :: Break ) => {
@@ -194,9 +196,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
194196 }
195197 }
196198 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: Some ( _) ) => {
197- continue ;
199+ continue ; // skip message because its older than `fence`
198200 }
199201 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: None ) => {
202+ // channel sender is dropped, which means agent terminated
200203 return :: std:: task:: Poll :: Ready (
201204 :: std:: result:: Result :: Err (
202205 :: agentwire:: BrokerError :: AgentTerminated (
@@ -206,7 +209,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
206209 ) ;
207210 }
208211 :: std:: task:: Poll :: Pending => {
209- break ;
212+ break ; // No more messages to process
210213 }
211214 }
212215 }
@@ -231,7 +234,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
231234 }
232235 } ) ;
233236 let run = quote ! {
234- # [ allow ( missing_docs ) ]
237+ /// Future for [`#ident::run`].
235238 pub struct #run_fut_name<' a> {
236239 broker: & ' a mut #ident,
237240 plan: & ' a mut dyn #broker_plan,
@@ -247,20 +250,32 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
247250 ) -> :: std:: task:: Poll <Self :: Output > {
248251 let fence = self . fence;
249252 let fut = self . as_mut( ) . get_mut( ) ;
253+ let mut any_handler_enabled = false ;
250254 ' outer: loop {
255+ :: tracing:: info!( "looping" ) ;
251256 #( #run_handlers) *
252257 #poll_extra
258+ if !any_handler_enabled {
259+ // Prevent infinite loop in edge case where no handlers are
260+ // enabled.
261+ return :: std:: task:: Poll :: Pending ;
262+ }
253263 }
264+
254265 }
255266 }
256267
257268 impl #ident {
258- # [ allow ( missing_docs ) ]
269+ /// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
259270 pub fn run<' a>( & ' a mut self , plan: & ' a mut dyn #broker_plan) -> #run_fut_name<' a> {
260271 Self :: run_with_fence( self , plan, :: std:: time:: Instant :: now( ) )
261272 }
262273
263- #[ allow( missing_docs) ]
274+ /// Runs the broker, filtering any events to only those with a timestamp
275+ /// newer than `fence`.
276+ ///
277+ /// Events are fed the broker's `handle_*` functions, and `plan` is passed
278+ /// there as an argument.
264279 pub fn run_with_fence<' a>(
265280 & ' a mut self ,
266281 plan: & ' a mut dyn #broker_plan,
0 commit comments