@@ -171,8 +171,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
171171 let handler = format_ident ! ( "handle_{}" , ident) ;
172172 quote ! {
173173 if let Some ( port) = fut. broker. #ident. enabled( ) {
174+ any_handler_enabled |= true ;
174175 loop {
175176 match :: futures:: StreamExt :: poll_next_unpin( port, cx) {
177+ // check if message is newer than fence
176178 :: std:: task:: Poll :: Ready ( Some ( output) ) if output. source_ts > fence => {
177179 match fut. broker. #handler( fut. plan, output) {
178180 :: std:: result:: Result :: Ok ( :: agentwire:: BrokerFlow :: Break ) => {
@@ -194,9 +196,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
194196 }
195197 }
196198 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: Some ( _) ) => {
197- continue ;
199+ continue ; // skip message because its older than `fence`
198200 }
199201 :: std:: task:: Poll :: Ready ( :: std:: option:: Option :: None ) => {
202+ // channel sender is dropped, which means agent terminated
200203 return :: std:: task:: Poll :: Ready (
201204 :: std:: result:: Result :: Err (
202205 :: agentwire:: BrokerError :: AgentTerminated (
@@ -206,7 +209,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
206209 ) ;
207210 }
208211 :: std:: task:: Poll :: Pending => {
209- break ;
212+ break ; // No more messages to process
210213 }
211214 }
212215 }
@@ -231,7 +234,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
231234 }
232235 } ) ;
233236 let run = quote ! {
234- # [ allow ( missing_docs ) ]
237+ /// Future for [`#ident::run`].
235238 pub struct #run_fut_name<' a> {
236239 broker: & ' a mut #ident,
237240 plan: & ' a mut dyn #broker_plan,
@@ -247,20 +250,31 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
247250 ) -> :: std:: task:: Poll <Self :: Output > {
248251 let fence = self . fence;
249252 let fut = self . as_mut( ) . get_mut( ) ;
253+ let mut any_handler_enabled = false ;
250254 ' outer: loop {
251255 #( #run_handlers) *
252256 #poll_extra
257+ if !any_handler_enabled {
258+ // Prevent infinite loop in edge case where no handlers are
259+ // enabled.
260+ return :: std:: task:: Poll :: Pending ;
261+ }
253262 }
263+
254264 }
255265 }
256266
257267 impl #ident {
258- # [ allow ( missing_docs ) ]
268+ /// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
259269 pub fn run<' a>( & ' a mut self , plan: & ' a mut dyn #broker_plan) -> #run_fut_name<' a> {
260270 Self :: run_with_fence( self , plan, :: std:: time:: Instant :: now( ) )
261271 }
262272
263- #[ allow( missing_docs) ]
273+ /// Runs the broker, filtering any events to only those with a timestamp
274+ /// newer than `fence`.
275+ ///
276+ /// Events are fed the broker's `handle_*` functions, and `plan` is passed
277+ /// there as an argument.
264278 pub fn run_with_fence<' a>(
265279 & ' a mut self ,
266280 plan: & ' a mut dyn #broker_plan,
0 commit comments