@@ -55,21 +55,21 @@ pub fn consume(
5555 "offset ({offset:?}) must be between high ({high_watermark}) and low({low_watermark}) watermarks"
5656 ) ;
5757 start = Some ( Offset :: Offset ( offset. unwrap ( ) ) ) ;
58- } else if last . is_some ( ) {
58+ } else if let Some ( last_num_messages ) = last {
5959 assert ! (
60- last . unwrap ( ) <= num_messages_on_topic,
61- "Cannot consume {last :?} messages from a topic which only has {num_messages_on_topic} messages"
60+ last_num_messages <= num_messages_on_topic,
61+ "Cannot consume {last_num_messages :?} messages from a topic which only has {num_messages_on_topic} messages"
6262 ) ;
63- start = Some ( Offset :: Offset ( high_watermark - last . unwrap ( ) ) ) ;
63+ start = Some ( Offset :: Offset ( high_watermark - last_num_messages ) ) ;
6464 } else {
6565 start = None ;
6666 }
6767
68- if start . is_some ( ) {
69- info ! ( "Starting at {start :?}" ) ;
68+ if let Some ( _start ) = start {
69+ info ! ( "Starting at {_start :?}" ) ;
7070 let mut tpl = TopicPartitionList :: new ( ) ;
71- tpl. add_partition_offset ( & topic. topic , partition. unwrap_or ( 0 ) , start . unwrap ( ) )
72- . unwrap_or_else ( |_| panic ! ( "Failed to set partition offset to {:?}" , start . unwrap ( ) ) ) ;
71+ tpl. add_partition_offset ( & topic. topic , partition. unwrap_or ( 0 ) , _start )
72+ . unwrap_or_else ( |_| panic ! ( "Failed to set partition offset to {:?}" , _start ) ) ;
7373 consumer. assign ( & tpl) . expect ( "Failed to assign to topic" ) ;
7474 }
7575
@@ -89,9 +89,10 @@ pub fn consume(
8989 Some ( p) => {
9090 if let Some ( f) = filter {
9191 if let Some ( schema_id) = get_schema_id ( p)
92- && schema_id != f. to_bytes ( ) {
93- continue ;
94- }
92+ && schema_id != f. to_bytes ( )
93+ {
94+ continue ;
95+ }
9596 debug ! ( "Message has no schema id, ignoring filter" )
9697 }
9798
0 commit comments