@@ -35,6 +35,20 @@ use river_core::room_state::{ChatRoomParametersV1, ChatRoomStateV1, ChatRoomStat
3535use std:: collections:: HashMap ;
3636use std:: sync:: Arc ;
3737
38+ fn compute_update_data (
39+ state : & ChatRoomStateV1 ,
40+ baseline : Option < & ChatRoomStateV1 > ,
41+ params : & ChatRoomParametersV1 ,
42+ ) -> Option < UpdateData < ' static > > {
43+ if let Some ( baseline) = baseline {
44+ let summary = baseline. summarize ( baseline, params) ;
45+ let delta = state. delta ( baseline, params, & summary) ?;
46+ Some ( UpdateData :: Delta ( to_cbor_vec ( & delta) . into ( ) ) )
47+ } else {
48+ Some ( UpdateData :: State ( to_cbor_vec ( state) . into ( ) ) )
49+ }
50+ }
51+
3852/// Identifies contracts that have changed in order to send state updates to Freene
3953#[ derive( Clone ) ]
4054pub struct RoomSynchronizer {
@@ -644,7 +658,7 @@ impl RoomSynchronizer {
644658 rooms_to_sync. len( )
645659 ) ;
646660
647- for ( room_vk, mut state) in rooms_to_sync {
661+ for ( room_vk, ( mut state, last_synced_state ) ) in rooms_to_sync {
648662 info ! ( "Processing room: {:?}" , MemberId :: from( room_vk) ) ;
649663
650664 // Sanitize: remove any messages with invalid signatures before
@@ -705,9 +719,33 @@ impl RoomSynchronizer {
705719
706720 let contract_key = owner_vk_to_contract_key ( & room_vk) ;
707721
722+ let update_data =
723+ match compute_update_data ( & state, last_synced_state. as_ref ( ) , & params) {
724+ Some ( data) => {
725+ match & data {
726+ UpdateData :: Delta ( d) => info ! (
727+ "Room {:?}: sending delta ({} bytes)" ,
728+ MemberId :: from( room_vk) ,
729+ d. as_ref( ) . len( ) ,
730+ ) ,
731+ _ => info ! (
732+ "Room {:?}: no baseline, sending full state" ,
733+ MemberId :: from( room_vk) ,
734+ ) ,
735+ }
736+ data
737+ }
738+ None => {
739+ SYNC_INFO . with_mut ( |sync_info| {
740+ sync_info. state_updated ( & room_vk, state) ;
741+ } ) ;
742+ continue ;
743+ }
744+ } ;
745+
708746 let update_request = ContractRequest :: Update {
709747 key : contract_key,
710- data : UpdateData :: State ( to_cbor_vec ( & state ) . into ( ) ) ,
748+ data : update_data ,
711749 } ;
712750
713751 let client_request = ClientRequest :: ContractOp ( update_request) ;
@@ -1152,3 +1190,87 @@ impl RoomSynchronizer {
11521190pub struct ContractSyncInfo {
11531191 pub owner_vk : VerifyingKey ,
11541192}
1193+
1194+ #[ cfg( test) ]
1195+ mod tests {
1196+ use super :: * ;
1197+ use ed25519_dalek:: SigningKey ;
1198+ use river_core:: room_state:: message:: { AuthorizedMessageV1 , MessageV1 , RoomMessageBody } ;
1199+ use std:: time:: SystemTime ;
1200+
1201+ fn create_test_room ( ) -> ( ChatRoomStateV1 , ChatRoomParametersV1 , SigningKey ) {
1202+ let owner_sk = SigningKey :: generate ( & mut rand:: thread_rng ( ) ) ;
1203+ let owner_vk = owner_sk. verifying_key ( ) ;
1204+ let params = ChatRoomParametersV1 { owner : owner_vk } ;
1205+ let state = ChatRoomStateV1 :: default ( ) ;
1206+ ( state, params, owner_sk)
1207+ }
1208+
1209+ fn add_message ( state : & mut ChatRoomStateV1 , author_sk : & SigningKey , content : & str ) {
1210+ let msg = MessageV1 {
1211+ room_owner : state. configuration . configuration . owner_member_id ,
1212+ author : MemberId :: from ( & author_sk. verifying_key ( ) ) ,
1213+ content : RoomMessageBody :: public ( content. to_string ( ) ) ,
1214+ time : SystemTime :: now ( ) ,
1215+ } ;
1216+ let authorized = AuthorizedMessageV1 :: new ( msg, author_sk) ;
1217+ state. recent_messages . messages . push ( authorized) ;
1218+ }
1219+
1220+ #[ test]
1221+ fn no_baseline_returns_full_state ( ) {
1222+ let ( state, params, _) = create_test_room ( ) ;
1223+ let result = compute_update_data ( & state, None , & params) ;
1224+ assert ! ( matches!( result, Some ( UpdateData :: State ( _) ) ) ) ;
1225+ }
1226+
1227+ #[ test]
1228+ fn identical_states_returns_none ( ) {
1229+ let ( state, params, _) = create_test_room ( ) ;
1230+ let result = compute_update_data ( & state, Some ( & state) , & params) ;
1231+ assert ! ( result. is_none( ) ) ;
1232+ }
1233+
1234+ #[ test]
1235+ fn changed_state_returns_delta ( ) {
1236+ let ( state, params, owner_sk) = create_test_room ( ) ;
1237+ let baseline = state. clone ( ) ;
1238+
1239+ let mut current = state;
1240+ add_message ( & mut current, & owner_sk, "hello" ) ;
1241+
1242+ let result = compute_update_data ( & current, Some ( & baseline) , & params) ;
1243+ assert ! ( matches!( result, Some ( UpdateData :: Delta ( _) ) ) ) ;
1244+ }
1245+
1246+ #[ test]
1247+ fn delta_is_smaller_than_full_state ( ) {
1248+ let ( mut state, params, owner_sk) = create_test_room ( ) ;
1249+ for i in 0 ..10 {
1250+ add_message ( & mut state, & owner_sk, & format ! ( "message {}" , i) ) ;
1251+ }
1252+ let baseline = state. clone ( ) ;
1253+
1254+ let mut current = state;
1255+ add_message ( & mut current, & owner_sk, "new message" ) ;
1256+
1257+ let delta = compute_update_data ( & current, Some ( & baseline) , & params) . unwrap ( ) ;
1258+ let full = compute_update_data ( & current, None , & params) . unwrap ( ) ;
1259+
1260+ let delta_size = match & delta {
1261+ UpdateData :: Delta ( d) => d. as_ref ( ) . len ( ) ,
1262+ _ => panic ! ( "expected delta" ) ,
1263+ } ;
1264+ let full_size = match & full {
1265+ UpdateData :: State ( s) => s. as_ref ( ) . len ( ) ,
1266+ _ => panic ! ( "expected state" ) ,
1267+ } ;
1268+
1269+ assert ! (
1270+ delta_size < full_size,
1271+ "delta ({} bytes) should be smaller than full state ({} bytes)" ,
1272+ delta_size,
1273+ full_size
1274+ ) ;
1275+ }
1276+ }
0 commit comments