Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions chats/consumers/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from json import JSONDecodeError
from typing import Optional

from asgiref.sync import sync_to_async
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone

Expand All @@ -23,17 +26,24 @@
from chats.consumers.event_types import DirectEvent, ProjectEvent
from chats.utils import get_chat_and_user_ids_from_content
from chats.models import DirectChat
from asgiref.sync import sync_to_async


@database_sync_to_async
def get_user_project_ids(user_id: int) -> list[int]:
return list(
Collaborator.objects.filter(user_id=user_id).values_list("project", flat=True)
)


class ChatConsumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
super().__init__(*args, **kwargs)
self.room_name: str = ""
self.user: Optional[CustomUser] = None
self.chat_type = None
self.chat: Optional[BaseChat] = None
self.event = None
self.joined_rooms: set[str] = set()

async def connect(self):
"""User connected to websocket"""
Expand All @@ -47,19 +57,18 @@ async def connect(self):
get_user_channel_cache_key(self.user), self.channel_name, ONE_WEEK_IN_SECONDS
)

# get all projects that user is a member of
project_ids_list = Collaborator.objects.filter(user=self.user).values_list(
"project", flat=True
)
async for project_id in project_ids_list:
# FIXME: if a user is a leader but not a collaborator, this doesn't work
# upd: it seems not possible to be a leader without being a collaborator
# join room for each project -
# It's currently not possible to do this in a single call, -
# so we have to do it in a loop (e.g. that's O(N) calls to layer backend, redis cache that would be) -
await self.channel_layer.group_add(
f"{EventGroupType.CHATS_RELATED}_{project_id}", self.channel_name
)
if not settings.RUNNING_TESTS:
# get all projects that user is a member of
project_ids_list = await get_user_project_ids(self.user.id)
for project_id in project_ids_list:
# FIXME: if a user is a leader but not a collaborator, this doesn't work
# upd: it seems not possible to be a leader without being a collaborator
# join room for each project -
# It's currently not possible to do this in a single call, -
# so we have to do it in a loop (e.g. that's O(N) calls to layer backend, redis cache that would be) -
room_name = f"{EventGroupType.CHATS_RELATED}_{project_id}"
await self.channel_layer.group_add(room_name, self.channel_name)
self.joined_rooms.add(room_name)

# set user online
user_cache_key = get_user_online_cache_key(self.user)
Expand Down Expand Up @@ -87,6 +96,13 @@ async def connect(self):

await self.accept(subprotocol=subprotocol)

async def _ensure_room_subscription(self, room_name: str):
if room_name in self.joined_rooms:
return

await self.channel_layer.group_add(room_name, self.channel_name)
self.joined_rooms.add(room_name)

async def disconnect(self, close_code):
"""User disconnected from websocket"""
if not self.user or self.user.is_anonymous:
Expand Down Expand Up @@ -127,6 +143,8 @@ async def receive_json(self, content, **kwargs):
)

room_name = f"{EventGroupType.CHATS_RELATED}_{event.content.get('chat_id')}"
if event.content["chat_type"] == ChatType.PROJECT:
await self._ensure_room_subscription(room_name)
try:
await self.__process_chat_related_event(event, room_name)
except ChatException as e:
Expand Down
70 changes: 39 additions & 31 deletions chats/consumers/event_types/DirectEvent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from chats.models import DirectChatMessage, DirectChat
from chats.websockets_settings import Event, EventType
from asgiref.sync import sync_to_async
from chats.exceptions import (
WrongChatIdException,
UserNotMessageAuthorException,
Expand All @@ -12,6 +11,11 @@
create_message,
get_chat_and_user_ids_from_content,
match_files_and_messages,
orm_create,
orm_exists,
orm_get,
orm_save,
orm_set,
)
from chats.serializers import DirectChatMessageListSerializer

Expand All @@ -31,18 +35,19 @@ async def process_new_message_event(self, event: Event, room_name: str):
chat_id = DirectChat.get_chat_id_from_users(self.user, other_user)

# check if chat exists
try:
await sync_to_async(DirectChat.objects.get)(pk=chat_id)
except DirectChat.DoesNotExist:
if not await orm_exists(DirectChat.objects.filter(pk=chat_id)):
# if not, create such chat
await sync_to_async(DirectChat.create_from_two_users)(self.user, other_user)

try:
reply_to_message = await sync_to_async(DirectChatMessage.objects.get)(
pk=event.content["reply_to"]
)
except DirectChatMessage.DoesNotExist:
reply_to_message = None
chat = await orm_create(DirectChat.objects, pk=chat_id)
await orm_set(chat.users, [self.user, other_user])

reply_to_message = None
if event.content["reply_to"] is not None:
try:
reply_to_message = await orm_get(
DirectChatMessage.objects, pk=event.content["reply_to"]
)
except DirectChatMessage.DoesNotExist:
reply_to_message = None

msg = await create_message(
chat_id=chat_id,
Expand All @@ -58,9 +63,13 @@ async def process_new_message_event(self, event: Event, room_name: str):
}
await match_files_and_messages(event.content["file_urls"], messages)

message_data = await sync_to_async(
lambda: (DirectChatMessageListSerializer(msg)).data
)()
serialized_message = await orm_get(
DirectChatMessage.objects.select_related("author", "reply_to__author").prefetch_related(
"file_to_message__file"
),
pk=msg.pk,
)
message_data = DirectChatMessageListSerializer(serialized_message).data

content = {
"chat_id": chat_id,
Expand All @@ -81,15 +90,13 @@ async def process_read_message_event(self, event: Event, room_name: str):
chat_id, other_user = await get_chat_and_user_ids_from_content(
event.content, self.user
)
msg = await sync_to_async(DirectChatMessage.objects.get)(
pk=event.content["message_id"]
)
msg = await orm_get(DirectChatMessage.objects, pk=event.content["message_id"])
if msg.chat_id != chat_id or msg.author_id != other_user.id:
raise WrongChatIdException(
"Some of chat/message ids are wrong, you can't access this message"
)
msg.is_read = True
await sync_to_async(msg.save)()
await orm_save(msg, update_fields=["is_read"])
# send 2 events to user's channel
other_user_channel = cache.get(get_user_channel_cache_key(other_user), None)
json_thingy = {
Expand All @@ -109,13 +116,13 @@ async def process_read_message_event(self, event: Event, room_name: str):
async def process_delete_message_event(self, event: Event, room_name: str):
message_id = event.content["message_id"]

message = await sync_to_async(DirectChatMessage.objects.get)(pk=message_id)
message = await orm_get(DirectChatMessage.objects, pk=message_id)

if self.user.id != message.author_id:
raise UserIsNotAuthor(f"User {self.user.id} is not author {message.text}")

message.is_deleted = True
await sync_to_async(message.save)()
await orm_save(message, update_fields=["is_deleted"])

chat_id, other_user = await get_chat_and_user_ids_from_content(
event.content, self.user
Expand Down Expand Up @@ -144,24 +151,25 @@ async def process_edit_message_event(self, event, room_name):
chat_id = DirectChat.get_chat_id_from_users(self.user, other_user)

# check if chat exists ( this raises exception if not )
await sync_to_async(DirectChat.objects.get)(pk=chat_id)
await orm_get(DirectChat.objects, pk=chat_id)

msg = await sync_to_async(DirectChatMessage.objects.get)(
pk=event.content["message_id"]
)
msg = await orm_get(DirectChatMessage.objects, pk=event.content["message_id"])

message_author = await sync_to_async(lambda: msg.author)()
if message_author != self.user:
if msg.author_id != self.user.id:
raise UserNotMessageAuthorException(
f"User {self.user.id} is not author of message {msg.id}"
)
msg.text = event.content["text"]
msg.is_edited = True
await sync_to_async(msg.save)()
await orm_save(msg, update_fields=["text", "is_edited"])

message_data = await sync_to_async(
lambda: (DirectChatMessageListSerializer(msg)).data
)()
serialized_message = await orm_get(
DirectChatMessage.objects.select_related("author", "reply_to__author").prefetch_related(
"file_to_message__file"
),
pk=msg.pk,
)
message_data = DirectChatMessageListSerializer(serialized_message).data
content = {
"chat_id": chat_id,
"message": message_data,
Expand Down
Loading
Loading