-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathchat.py
More file actions
275 lines (233 loc) · 10.8 KB
/
chat.py
File metadata and controls
275 lines (233 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import datetime
import json
from json import JSONDecodeError
from typing import Optional
from asgiref.sync import sync_to_async
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from chats.exceptions import ChatException
from chats.models import BaseChat
from chats.utils import get_user_channel_cache_key
from chats.websockets_settings import (
Event,
EventType,
EventGroupType,
ChatType,
)
from core.constants import ONE_DAY_IN_SECONDS, ONE_WEEK_IN_SECONDS
from core.utils import get_user_online_cache_key, get_users_online_cache_key
from projects.models import Collaborator
from users.models import CustomUser
from chats.consumers.event_types import DirectEvent, ProjectEvent
from chats.utils import get_chat_and_user_ids_from_content
from chats.models import DirectChat
@database_sync_to_async
def get_user_project_ids(user_id: int) -> list[int]:
return list(
Collaborator.objects.filter(user_id=user_id).values_list("project", flat=True)
)
class ChatConsumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.room_name: str = ""
self.user: Optional[CustomUser] = None
self.chat_type = None
self.chat: Optional[BaseChat] = None
self.event = None
self.joined_rooms: set[str] = set()
async def connect(self):
"""User connected to websocket"""
if self.scope["user"].is_anonymous:
# not authenticated
return await self.close(403)
self.user = self.scope["user"]
cache.set(
get_user_channel_cache_key(self.user), self.channel_name, ONE_WEEK_IN_SECONDS
)
if not settings.RUNNING_TESTS:
# get all projects that user is a member of
project_ids_list = await get_user_project_ids(self.user.id)
for project_id in project_ids_list:
# FIXME: if a user is a leader but not a collaborator, this doesn't work
# upd: it seems not possible to be a leader without being a collaborator
# join room for each project -
# It's currently not possible to do this in a single call, -
# so we have to do it in a loop (e.g. that's O(N) calls to layer backend, redis cache that would be) -
room_name = f"{EventGroupType.CHATS_RELATED}_{project_id}"
await self.channel_layer.group_add(room_name, self.channel_name)
self.joined_rooms.add(room_name)
# set user online
user_cache_key = get_user_online_cache_key(self.user)
cache.set(user_cache_key, True, ONE_DAY_IN_SECONDS)
online_users = cache.get(get_users_online_cache_key(), set())
online_users.add(self.user.id)
cache.set(get_users_online_cache_key(), online_users)
# notify everyone that this user is online
await self.channel_layer.group_send(
EventGroupType.GENERAL_EVENTS,
{"type": EventType.SET_ONLINE, "content": {"user_id": self.user.id}},
)
# add to group to listen for general events, like online/offline
await self.channel_layer.group_add(
EventGroupType.GENERAL_EVENTS, self.channel_name
)
# Confirm selected subprotocol so browser clients finish handshake.
subprotocol = None
if (
self.scope.get("subprotocols")
and len(self.scope["subprotocols"]) >= 1
):
subprotocol = self.scope["subprotocols"][0]
await self.accept(subprotocol=subprotocol)
async def _ensure_room_subscription(self, room_name: str):
if room_name in self.joined_rooms:
return
await self.channel_layer.group_add(room_name, self.channel_name)
self.joined_rooms.add(room_name)
async def disconnect(self, close_code):
"""User disconnected from websocket"""
if not self.user or self.user.is_anonymous:
# don't need to proccess logic for disconnect
# if we are not logged in!
return
online_users = cache.get(get_users_online_cache_key(), set())
online_users.discard(self.user.id)
cache.set(get_users_online_cache_key(), online_users)
cache.delete(get_user_online_cache_key(self.user))
room_name = EventGroupType.GENERAL_EVENTS
# TODO: add a User extra-small serializer for this?
await self.channel_layer.group_send(
room_name,
{"type": EventType.SET_OFFLINE, "content": {"user_id": self.user.id}},
)
async def receive_json(self, content, **kwargs):
"""Receive message from WebSocket in JSON format"""
# todo reply_to key is not required
event = Event(type=content["type"], content=content.get("content"))
# two event types - related to group chat and related to leave/connect
if event.type in [
EventType.NEW_MESSAGE,
EventType.TYPING,
EventType.READ_MESSAGE,
EventType.DELETE_MESSAGE,
EventType.EDIT_MESSAGE,
]:
if event.content["chat_type"] == ChatType.DIRECT:
self.event = DirectEvent(self.user, self.channel_layer, self.channel_name)
elif event.content["chat_type"] == ChatType.PROJECT:
self.event = ProjectEvent(
self.user, self.channel_layer, self.channel_name
)
room_name = f"{EventGroupType.CHATS_RELATED}_{event.content.get('chat_id')}"
if event.content["chat_type"] == ChatType.PROJECT:
await self._ensure_room_subscription(room_name)
try:
await self.__process_chat_related_event(event, room_name)
except ChatException as e:
await self.send_json({"error": str(e.get_error())})
except KeyError as e:
await self.send_json(
{
"error": f"Missing key (might be backend's fault,"
f" but most likely you are missing this field): {e}"
}
)
elif event.type in [EventType.SET_ONLINE, EventType.SET_OFFLINE]:
room_name = EventGroupType.GENERAL_EVENTS
await self.__process_general_event(event, room_name)
else:
return self.disconnect(400)
async def __process_chat_related_event(self, event, room_name):
self.event: DirectEvent | ProjectEvent
if event.type == EventType.NEW_MESSAGE:
await self.event.process_new_message_event(event, room_name)
elif event.type == EventType.TYPING:
await self.__process_typing_event(event, room_name)
elif event.type == EventType.READ_MESSAGE:
await self.event.process_read_message_event(event, room_name)
elif event.type == EventType.DELETE_MESSAGE:
await self.event.process_delete_message_event(event, room_name)
elif event.type == EventType.EDIT_MESSAGE:
await self.event.process_edit_message_event(event, room_name)
async def __process_typing_event(self, event: Event, room_name: str):
"""Send typing event to room group."""
event_data = {
"type": EventType.TYPING,
"content": {
"chat_id": event.content["chat_id"],
"chat_type": event.content["chat_type"],
"user_id": self.user.id,
"end_time": (timezone.now() + datetime.timedelta(seconds=5)).isoformat(),
},
}
if event.content["chat_type"] == ChatType.DIRECT:
# fixme: need to move this to func
chat_id, other_user = await get_chat_and_user_ids_from_content(
event.content, self.user
)
# if chat_id == 17_7, then chat_id will be == 7_17
chat_id = DirectChat.get_chat_id_from_users(self.user, other_user)
# check if chat exists
try:
await sync_to_async(DirectChat.objects.get)(pk=chat_id)
except DirectChat.DoesNotExist:
# if not, create such chat
await sync_to_async(DirectChat.create_from_two_users)(
self.user, other_user
)
# send message to user's channel
other_user_channel = cache.get(get_user_channel_cache_key(other_user), None)
if other_user_channel:
await self.channel_layer.send(other_user_channel, event_data)
await self.channel_layer.group_send(
room_name,
event_data,
)
async def message_read(self, event: Event):
await self.send(json.dumps(event))
async def user_typing(self, event: Event):
await self.send(json.dumps(event))
async def new_message(self, event: Event):
await self.send(json.dumps(event))
async def delete_message(self, event: Event):
await self.send(json.dumps(event))
async def set_online(self, event: Event):
await self.send(json.dumps(event))
async def set_offline(self, event: Event):
await self.send(json.dumps(event))
async def edit_message(self, event: Event):
await self.send(json.dumps(event))
async def __process_general_event(self, event: Event, room_name: str):
cache_key = get_user_online_cache_key(self.user)
users_online_list_key = get_users_online_cache_key()
if event.type == EventType.SET_ONLINE:
cache.set(cache_key, True, ONE_DAY_IN_SECONDS)
users_online_list = cache.get_or_set(users_online_list_key, set())
users_online_list.add(self.user.pk)
cache.set(users_online_list_key, users_online_list, ONE_DAY_IN_SECONDS)
# sent everyone online event that user X is online
await self.channel_layer.group_send(
room_name, {"type": EventType.SET_ONLINE, "user_id": self.user.pk}
)
elif event.type == EventType.SET_OFFLINE:
cache.delete(cache_key)
users_online_list = cache.get_or_set(users_online_list_key, set())
users_online_list.remove(self.user.pk)
cache.set(users_online_list_key, users_online_list, ONE_DAY_IN_SECONDS)
# sent everyone online event that user X is offline
await self.channel_layer.group_send(
room_name, {"type": EventType.SET_OFFLINE, "user_id": self.user.pk}
)
# TODO: close connection here?
# await self.close(200)
else:
raise ValueError("Unknown event type")
async def decode_json(self, text_data) -> dict:
try:
return json.loads(text_data)
except JSONDecodeError as error:
await self.disconnect(400)
raise error