@@ -5,7 +5,6 @@ use gas::prelude::*;
55use rand:: Rng ;
66use rivet_api_builder:: ApiCtx ;
77use serde:: { Deserialize , Serialize } ;
8- use std:: collections:: { BTreeSet , HashSet } ;
98use std:: time:: { Duration , Instant } ;
109
1110use crate :: {
@@ -218,8 +217,11 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
218217 . await
219218 . context ( "failed reading config" ) ?;
220219
221- let quorum_members =
222- resolve_quorum_members ( & config, replica_id, input. target_replicas . as_deref ( ) ) ?;
220+ let quorum_members = utils:: resolve_active_quorum_members (
221+ & config,
222+ replica_id,
223+ input. target_replicas . as_deref ( ) ,
224+ ) ?;
223225 tracing:: debug!(
224226 ?quorum_members,
225227 quorum_size = quorum_members. len( ) ,
@@ -1005,123 +1007,12 @@ fn prepare_retry_base_delay_ms(retry_count: usize) -> u64 {
10051007 . min ( PREPARE_RETRY_MAX_DELAY_MS )
10061008}
10071009
1008- /// Returns the quorum members to use for this proposal. This supports scoped proposals because
1009- /// runner configs are often only enabled in a couple of explicitly coupled regions. If
1010- /// `target_replicas` is provided, it validates that the scope is non-empty, includes the local
1011- /// replica, and only contains active replicas. Otherwise it falls back to the full active quorum
1012- /// from the cluster config.
1013- ///
1014- /// Scoped proposals rely on a caller-side invariant: the same key must continue using the same
1015- /// replica scope unless some higher-level reconfiguration step coordinates the membership change.
1016- /// This function does not persist or enforce that per-key scope stability.
1017- fn resolve_quorum_members (
1018- config : & protocol:: ClusterConfig ,
1019- replica_id : ReplicaId ,
1020- target_replicas : Option < & [ ReplicaId ] > ,
1021- ) -> Result < Vec < ReplicaId > > {
1022- match target_replicas {
1023- Some ( target_replicas) => {
1024- let active = utils:: get_quorum_members ( config)
1025- . into_iter ( )
1026- . collect :: < HashSet < _ > > ( ) ;
1027- let validated = target_replicas. iter ( ) . copied ( ) . collect :: < BTreeSet < _ > > ( ) ;
1028-
1029- if validated. is_empty ( ) {
1030- bail ! ( "target_replicas cannot be empty" ) ;
1031- }
1032-
1033- if !validated. contains ( & replica_id) {
1034- bail ! ( "target_replicas must include the local replica" ) ;
1035- }
1036-
1037- if !validated. iter ( ) . all ( |replica| active. contains ( replica) ) {
1038- bail ! ( "target_replicas contains an inactive or unknown replica" ) ;
1039- }
1040-
1041- Ok ( validated. into_iter ( ) . collect ( ) )
1042- }
1043- None => {
1044- let replicas = utils:: get_quorum_members ( config) ;
1045- if !replicas. contains ( & replica_id) {
1046- bail ! ( "local replica is not active in the current epoxy config" ) ;
1047- }
1048- Ok ( replicas)
1049- }
1050- }
1051- }
1052-
10531010#[ cfg( test) ]
10541011mod tests {
10551012 use super :: * ;
10561013 use epoxy_protocol:: protocol:: { ClusterConfig , ReplicaConfig , ReplicaStatus } ;
10571014 use rand:: { SeedableRng , rngs:: StdRng } ;
10581015
1059- fn make_config ( replicas : & [ ( ReplicaId , ReplicaStatus ) ] ) -> ClusterConfig {
1060- ClusterConfig {
1061- coordinator_replica_id : replicas[ 0 ] . 0 ,
1062- epoch : 1 ,
1063- replicas : replicas
1064- . iter ( )
1065- . map ( |( replica_id, status) | ReplicaConfig {
1066- replica_id : * replica_id,
1067- status : status. clone ( ) ,
1068- api_peer_url : String :: new ( ) ,
1069- guard_url : String :: new ( ) ,
1070- } )
1071- . collect ( ) ,
1072- }
1073- }
1074-
1075- #[ test]
1076- fn resolve_quorum_members_none_uses_all_active ( ) {
1077- let config = make_config ( & [
1078- ( 1 , ReplicaStatus :: Active ) ,
1079- ( 2 , ReplicaStatus :: Active ) ,
1080- ( 3 , ReplicaStatus :: Joining ) ,
1081- ] ) ;
1082- let result = resolve_quorum_members ( & config, 1 , None ) . unwrap ( ) ;
1083- assert_eq ! ( result, vec![ 1 , 2 ] ) ;
1084- }
1085-
1086- #[ test]
1087- fn resolve_quorum_members_requires_local_replica_to_be_active ( ) {
1088- let config = make_config ( & [ ( 1 , ReplicaStatus :: Learning ) , ( 2 , ReplicaStatus :: Active ) ] ) ;
1089- let result = resolve_quorum_members ( & config, 1 , None ) ;
1090- assert ! ( result. is_err( ) ) ;
1091- }
1092-
1093- #[ test]
1094- fn resolve_quorum_members_scoped_subset ( ) {
1095- let config = make_config ( & [
1096- ( 1 , ReplicaStatus :: Active ) ,
1097- ( 2 , ReplicaStatus :: Active ) ,
1098- ( 3 , ReplicaStatus :: Active ) ,
1099- ] ) ;
1100- let result = resolve_quorum_members ( & config, 1 , Some ( & [ 1 , 2 ] ) ) . unwrap ( ) ;
1101- assert_eq ! ( result, vec![ 1 , 2 ] ) ;
1102- }
1103-
1104- #[ test]
1105- fn resolve_quorum_members_empty_target_errors ( ) {
1106- let config = make_config ( & [ ( 1 , ReplicaStatus :: Active ) ] ) ;
1107- let result = resolve_quorum_members ( & config, 1 , Some ( & [ ] ) ) ;
1108- assert ! ( result. is_err( ) ) ;
1109- }
1110-
1111- #[ test]
1112- fn resolve_quorum_members_missing_local_errors ( ) {
1113- let config = make_config ( & [ ( 1 , ReplicaStatus :: Active ) , ( 2 , ReplicaStatus :: Active ) ] ) ;
1114- let result = resolve_quorum_members ( & config, 1 , Some ( & [ 2 ] ) ) ;
1115- assert ! ( result. is_err( ) ) ;
1116- }
1117-
1118- #[ test]
1119- fn resolve_quorum_members_inactive_replica_errors ( ) {
1120- let config = make_config ( & [ ( 1 , ReplicaStatus :: Active ) , ( 2 , ReplicaStatus :: Learning ) ] ) ;
1121- let result = resolve_quorum_members ( & config, 1 , Some ( & [ 1 , 2 ] ) ) ;
1122- assert ! ( result. is_err( ) ) ;
1123- }
1124-
11251016 #[ test]
11261017 fn parses_set_command_as_set_proposal ( ) {
11271018 let proposal = Proposal {
0 commit comments