Skip to content

Commit 35f7365

Browse files
authored
Use JSON Patch (RFC 6902) instead of JSON Merge (#32)
1 parent 03d5f1a commit 35f7365

3 files changed

Lines changed: 112 additions & 41 deletions

File tree

src/models/config/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::HashMap;
21
use serde::{Serialize, Deserialize};
32
use twilight_model::id::Id;
43
use twilight_model::id::marker::{ApplicationMarker, GuildMarker};

src/server/guild/editing.rs

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use std::collections::{BTreeSet, HashMap};
1+
use std::collections::HashMap;
22
use std::sync::Arc;
3+
use json_patch::Patch;
34
use mongodb::bson::doc;
45
use mongodb::bson::oid::ObjectId;
5-
use serde_json::{Map, Value};
6+
use serde::Serialize;
67
use tokio::sync::{Mutex, RwLock};
78
use tracing::{error, warn};
89
use twilight_model::id::Id;
@@ -11,18 +12,22 @@ use crate::context::Context;
1112
use crate::models::config::GuildConfig;
1213
use crate::server::guild::ws::{Connection, OutboundAction, OutboundMessage};
1314

15+
#[derive(Clone, Debug, Serialize)]
16+
pub struct Change {
17+
pub author_id: Id<UserMarker>,
18+
pub changes: Patch
19+
}
20+
1421
struct GuildEditingState {
1522
pub connections: Vec<Arc<Connection>>,
16-
pub changes: Value,
17-
pub edited_by: BTreeSet<Id<UserMarker>>
23+
pub changes: Vec<Change>,
1824
}
1925

2026
impl Default for GuildEditingState {
2127
fn default() -> Self {
2228
Self {
2329
connections: vec![],
24-
changes: Value::Object(Map::new()),
25-
edited_by: Default::default(),
30+
changes: vec![],
2631
}
2732
}
2833
}
@@ -59,14 +64,16 @@ impl GuildsEditing {
5964

6065
pub async fn marge_changes(
6166
&self,
62-
author: Id<UserMarker>,
67+
author_id: Id<UserMarker>,
6368
guild_id: Id<GuildMarker>,
64-
changes: Value
69+
changes: Patch
6570
) -> Option<()> {
6671
let guild = self.get_guild(guild_id).await?;
6772
let mut guild_lock = guild.lock().await;
68-
json_patch::merge(&mut guild_lock.changes, &changes);
69-
guild_lock.edited_by.insert(author);
73+
guild_lock.changes.push(Change {
74+
author_id,
75+
changes
76+
});
7077
Some(())
7178
}
7279

@@ -75,7 +82,55 @@ impl GuildsEditing {
7582
list_lock.get(&guild_id).cloned()
7683
}
7784

78-
pub async fn broadcast_changes(&self, context: &Arc<Context>, guild_id: Id<GuildMarker>) -> Option<()> {
85+
pub async fn broadcast_users(&self, guild_id: Id<GuildMarker>) -> Option<()> {
86+
let guild = self.get_guild(guild_id).await?;
87+
let guild_lock = guild.lock().await;
88+
89+
let users = guild_lock.connections
90+
.iter().map(|connection| connection.user_id)
91+
.collect::<Vec<Id<UserMarker>>>();
92+
93+
for connection in &guild_lock.connections {
94+
let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::OverwriteUsers(users.to_owned())));
95+
}
96+
97+
Some(())
98+
}
99+
100+
pub async fn broadcast_change(
101+
&self, guild_id: Id<GuildMarker>, author_id: Id<UserMarker>, changes: Patch
102+
) -> Option<()> {
103+
let guild = self.get_guild(guild_id).await?;
104+
let guild_lock = guild.lock().await;
105+
106+
for connection in &guild_lock.connections {
107+
let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::PushChange(Change {
108+
author_id,
109+
changes: changes.to_owned()
110+
})));
111+
}
112+
113+
Some(())
114+
}
115+
116+
pub async fn get_initialization_data(&self, context: &Arc<Context>, guild_id: Id<GuildMarker>)
117+
-> Option<(GuildConfig, Vec<Change>, Vec<Id<UserMarker>>)> {
118+
let config = context.mongodb
119+
.get_config(guild_id)
120+
.await
121+
.inspect_err(|error| error!(name: "mongodb error", ?error))
122+
.ok()?;
123+
124+
let guild = self.get_guild(guild_id).await?;
125+
let guild_lock = guild.lock().await;
126+
let users = guild_lock.connections
127+
.iter().map(|connection| connection.user_id)
128+
.collect::<Vec<Id<UserMarker>>>();
129+
130+
Some((config.to_owned(), guild_lock.changes.to_owned(), users))
131+
}
132+
133+
pub async fn broadcast_config_overwrite(&self, context: &Arc<Context>, guild_id: Id<GuildMarker>) -> Option<()> {
79134
let config = context.mongodb
80135
.get_config(guild_id)
81136
.await
@@ -89,10 +144,9 @@ impl GuildsEditing {
89144
.collect::<Vec<Id<UserMarker>>>();
90145

91146
for connection in &guild_lock.connections {
92-
let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::UpdateConfigurationData {
147+
let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::OverwriteConfigurationData {
93148
saved_config: config.to_owned(),
94149
changes: guild_lock.changes.to_owned(),
95-
users: users.to_owned(),
96150
}));
97151
}
98152

@@ -112,9 +166,14 @@ impl GuildsEditing {
112166
let mut new_config = serde_json::to_value(config)
113167
.inspect_err(|error| error!(name: "cannot convert guild config to value", ?error))
114168
.ok()?;
115-
json_patch::merge(&mut new_config, &guild_lock.changes);
169+
for patch in &guild_lock.changes {
170+
json_patch::patch(&mut new_config, &patch.changes)
171+
.inspect_err(|error| error!(name: "error applying patch to guild config", ?patch, ?error))
172+
.ok()?;
173+
}
174+
116175
let new_config: GuildConfig = serde_json::from_value(new_config)
117-
.inspect_err(|error| error!(name: "cannot marge edits with guild config", ?error))
176+
.inspect_err(|error| error!(name: "cannot serialize config after applying patches", ?error))
118177
.ok()?;
119178

120179
if new_config.guild_id != guild_id {
@@ -138,8 +197,7 @@ impl GuildsEditing {
138197
.ok()?;
139198
context.mongodb.configs_cache.remove(&guild_id);
140199

141-
guild_lock.changes = Value::Object(Map::new());
142-
guild_lock.edited_by.clear();
200+
guild_lock.changes = vec![];
143201

144202
Some(())
145203
}

src/server/guild/ws.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use std::borrow::Cow;
22
use std::sync::Arc;
33
use futures_util::{SinkExt, StreamExt};
4+
use json_patch::Patch;
45
use mongodb::bson::oid::ObjectId;
56
use serde::{Deserialize, Serialize};
6-
use serde_json::Value;
77
use tokio::sync::mpsc::UnboundedSender;
88
use tokio_stream::wrappers::UnboundedReceiverStream;
9-
use tracing::{error, info};
9+
use tracing::{error, info, warn};
1010
use twilight_model::id::Id;
1111
use twilight_model::id::marker::UserMarker;
1212
use twilight_model::user::CurrentUserGuild;
@@ -15,7 +15,7 @@ use crate::context::Context;
1515
use crate::database::redis::PartialGuild;
1616
use crate::models::config::GuildConfig;
1717
use crate::ok_or_return;
18-
use crate::server::guild::editing::GuildsEditing;
18+
use crate::server::guild::editing::{Change, GuildsEditing};
1919
use crate::server::session::AuthorizationInformation;
2020

2121
macro_rules! close {
@@ -28,14 +28,17 @@ macro_rules! unwrap_or_close_and_return {
2828
($target: expr, $tx: expr, $reason: expr) => {
2929
match $target {
3030
Ok(value) => value,
31-
Err(_) => {
32-
close!($tx, $reason);
31+
Err(err) => {
32+
let reason = $reason;
33+
tracing::warn!(name: "connection closed due to error", ?err, ?reason);
34+
close!($tx, reason);
3335
return
3436
}
3537
}
3638
};
3739
}
3840

41+
#[derive(Debug)]
3942
pub enum CloseReason {
4043
MessageIsNotString,
4144
CannotParseJSON,
@@ -97,26 +100,33 @@ pub async fn handle_connection(
97100
Message::close_with(reason.code(), reason.text())
98101
).await;
99102
guilds_editing.remove_connection(guild_id, session_id).await;
100-
guilds_editing.broadcast_changes(&context, guild_id).await;
103+
guilds_editing.broadcast_users(guild_id).await;
101104
}
102105
}
103106
}
104107
let _ = ws_tx.close().await;
105108
});
106109

110+
111+
guilds_editing.broadcast_users(guild_id).await;
112+
107113
guilds_editing.add_connection(guild_id, Connection {
108114
user_id: info.user.id,
109115
session_id,
110116
tx: tx.to_owned(),
111117
}).await;
112118

113-
let _ = tx.send(OutboundAction::Message(OutboundMessage::Initialization {
114-
cached: ok_or_return!(context.redis.get_guild(guild.id).await, Ok),
115-
oauth2: guild.to_owned(),
116-
session_id
117-
}));
118-
119-
guilds_editing.broadcast_changes(&context, guild_id).await;
119+
if let Some((saved_config, changes, users)) =
120+
guilds_editing.get_initialization_data(&context, guild_id).await {
121+
let _ = tx.send(OutboundAction::Message(OutboundMessage::Initialization {
122+
cached: ok_or_return!(context.redis.get_guild(guild.id).await, Ok),
123+
oauth2: guild.to_owned(),
124+
saved_config,
125+
changes,
126+
users,
127+
session_id
128+
}));
129+
}
120130

121131
while let Some(result) = ws_rx.next().await {
122132
let message = match result {
@@ -136,12 +146,12 @@ pub async fn handle_connection(
136146
}
137147

138148
guilds_editing.remove_connection(guild_id, session_id).await;
139-
guilds_editing.broadcast_changes(&context, guild_id).await;
149+
guilds_editing.broadcast_users(guild_id).await;
140150
}
141151
#[derive(Debug, Deserialize)]
142152
#[serde(tag = "action", content = "data")]
143153
enum InboundMessage {
144-
GuildConfigUpdate(Value),
154+
GuildConfigUpdate(Patch),
145155
ApplyChanges
146156
}
147157

@@ -151,13 +161,17 @@ pub enum OutboundMessage {
151161
Initialization {
152162
oauth2: CurrentUserGuild,
153163
cached: PartialGuild,
154-
session_id: ObjectId
155-
},
156-
UpdateConfigurationData {
164+
session_id: ObjectId,
157165
saved_config: GuildConfig,
158-
changes: Value,
166+
changes: Vec<Change>,
159167
users: Vec<Id<UserMarker>>
160-
}
168+
},
169+
OverwriteConfigurationData {
170+
saved_config: GuildConfig,
171+
changes: Vec<Change>
172+
},
173+
OverwriteUsers(Vec<Id<UserMarker>>),
174+
PushChange(Change)
161175
}
162176

163177
pub enum OutboundAction {
@@ -183,8 +197,8 @@ async fn on_message(
183197

184198
match message {
185199
InboundMessage::GuildConfigUpdate(changes) => {
186-
let _ = guilds_editing.marge_changes(info.user.id, guild.id, changes).await;
187-
let _ = guilds_editing.broadcast_changes(&context, guild.id).await;
200+
let _ = guilds_editing.marge_changes(info.user.id, guild.id, changes.to_owned()).await;
201+
let _ = guilds_editing.broadcast_change(guild.id, info.user.id, changes).await;
188202
}
189203
InboundMessage::ApplyChanges => {
190204
info!(
@@ -193,7 +207,7 @@ async fn on_message(
193207
guild_id = %guild.id
194208
);
195209
guilds_editing.apply_changes(&context, guild.id).await;
196-
let _ = guilds_editing.broadcast_changes(&context, guild.id).await;
210+
let _ = guilds_editing.broadcast_config_overwrite(&context, guild.id).await;
197211
let _ = context.redis.announce_config_update(guild.id).await
198212
.inspect_err(|error| {
199213
error!(name: "error sending guild_id to redis update announcer", ?error, %guild.id)

0 commit comments

Comments
 (0)