@@ -187,8 +187,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
187187 let handler = format_ident ! ( "handle_{}" , ident) ;
188188 quote ! {
189189 if let Some ( port) = fut. broker. #ident. enabled( ) {
190+ any_handler_enabled |= true ;
190191 loop {
191192 match :: futures:: StreamExt :: poll_next_unpin( port, cx) {
193+ // check if message is newer than fence
192194 :: std:: task:: Poll :: Ready ( Some ( output) ) if output. source_ts > fence => {
193195 match fut. broker. #handler( fut. plan, output) {
194196 :: std:: result:: Result :: Ok ( :: agentwire:: BrokerFlow :: Break ) => {
@@ -210,9 +212,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
210212 }
211213 }
212214 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: Some ( _) ) => {
213- continue ;
215+ continue ; // skip message because its older than `fence`
214216 }
215217 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: None ) => {
218+ // channel sender is dropped, which means agent terminated
216219 return :: std:: task:: Poll :: Ready (
217220 :: std:: result:: Result :: Err (
218221 :: agentwire:: BrokerError :: AgentTerminated (
@@ -222,7 +225,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
222225 ) ;
223226 }
224227 :: std:: task:: Poll :: Pending => {
225- break ;
228+ break ; // No more messages to process
226229 }
227230 }
228231 }
@@ -249,7 +252,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
249252 } ,
250253 ) ;
251254 let run = quote ! {
252- # [ allow ( missing_docs ) ]
255+ /// Future for [`#ident::run`].
253256 pub struct #run_fut_name<' a> {
254257 broker: & ' a mut #ident,
255258 plan: & ' a mut dyn #broker_plan,
@@ -265,20 +268,32 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
265268 ) -> :: std:: task:: Poll <Self :: Output > {
266269 let fence = self . fence;
267270 let fut = self . as_mut( ) . get_mut( ) ;
271+ let mut any_handler_enabled = false ;
268272 ' outer: loop {
269273 #( #run_handlers) *
270274 #poll_extra
275+ #[ allow( unreachable_code) ]
276+ if !any_handler_enabled {
277+ // Prevent infinite loop in edge case where no handlers are
278+ // enabled.
279+ return :: std:: task:: Poll :: Pending ;
280+ }
271281 }
282+
272283 }
273284 }
274285
275286 impl #ident {
276- # [ allow ( missing_docs ) ]
287+ /// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
277288 pub fn run<' a>( & ' a mut self , plan: & ' a mut dyn #broker_plan) -> #run_fut_name<' a> {
278289 Self :: run_with_fence( self , plan, :: std:: time:: Instant :: now( ) )
279290 }
280291
281- #[ allow( missing_docs) ]
292+ /// Runs the broker, filtering any events to only those with a timestamp
293+ /// newer than `fence`.
294+ ///
295+ /// Events are fed the broker's `handle_*` functions, and `plan` is passed
296+ /// there as an argument.
282297 pub fn run_with_fence<' a>(
283298 & ' a mut self ,
284299 plan: & ' a mut dyn #broker_plan,
0 commit comments