Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions chats/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,43 @@
User = get_user_model()


def get_lookup_kwarg(request, view, *names):
view_kwargs = getattr(view, "kwargs", {}) or {}
parser_kwargs = (getattr(request, "parser_context", None) or {}).get("kwargs", {})

for name in names:
if name in view_kwargs:
return view_kwargs[name]
if name in parser_kwargs:
return parser_kwargs[name]
return None


def is_project_member(user, project) -> bool:
if not getattr(user, "is_authenticated", False):
return False

if project.leader_id == user.id:
return True

return project.collaborator_set.filter(user_id=user.id).exists()


class IsProjectChatMember(BasePermission):
def has_permission(self, request, view) -> bool:
project_id = get_lookup_kwarg(request, view, "id", "pk")
if project_id is None:
return True

try:
project = Project.objects.get(pk=view.kwargs["id"])
except Project.DoesNotExist:
project = Project.objects.only("id", "leader_id").get(pk=project_id)
except (Project.DoesNotExist, TypeError, ValueError):
return False
if request.user in project.get_collaborators_user_list():
return True
return True

return is_project_member(request.user, project)

def has_object_permission(self, request, view, obj):
if request.user in obj.project.get_collaborators_user_list():
return True
return False
return is_project_member(request.user, obj.project)


class IsChatMember(BasePermission):
Expand All @@ -28,8 +51,12 @@ class IsChatMember(BasePermission):
"""

def has_permission(self, request, view) -> bool:
kwargs = request.parser_context.get("kwargs")
chat_id = get_lookup_kwarg(request, view, "id")
if chat_id is None:
return True

chat_id: str = kwargs["id"]
user_id = getattr(request.user, "id", None)
if user_id is None:
return False

return str(request.user.id) in chat_id.split("_")
return str(user_id) in str(chat_id).split("_")
73 changes: 73 additions & 0 deletions chats/tests/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from django.contrib.auth import get_user_model
from django.test import TestCase, override_settings
from rest_framework.test import APIRequestFactory, force_authenticate

from chats.models import ProjectChat
from chats.tests.constants import TEST_USER1, TEST_USER2, TEST_USER3
from chats.views import ProjectChatDetail
from projects.models import Collaborator, Project


@override_settings(ALLOWED_HOSTS=["testserver", "dev.procollab.ru", "127.0.0.1"])
class ChatPermissionsTests(TestCase):
def setUp(self):
super().setUp()
self.factory = APIRequestFactory()
self.leader = get_user_model().objects.create(**TEST_USER1)
self.collaborator = get_user_model().objects.create(**TEST_USER2)
self.outsider = get_user_model().objects.create(**TEST_USER3)
self.project = Project.objects.create(leader=self.leader)
self.chat = ProjectChat.objects.create(project=self.project)
Collaborator.objects.create(
user=self.collaborator,
project=self.project,
role="User",
)

self.staff = get_user_model().objects.create(
email="swagger-staff@test.test",
password="very_strong_password",
first_name="Swagger",
last_name="Staff",
birthday="2000-01-01",
is_staff=True,
is_superuser=True,
is_active=True,
)
self.staff.set_password("very_strong_password")
self.staff.save()

def test_swagger_schema_is_available_for_staff(self):
self.client.force_login(self.staff)

response = self.client.get(
"/swagger/?format=openapi",
secure=True,
HTTP_HOST="dev.procollab.ru",
)

self.assertEqual(response.status_code, 200)

def test_project_chat_detail_is_available_for_leader(self):
request = self.factory.get(f"/chats/projects/{self.chat.id}/")
force_authenticate(request, user=self.leader)

response = ProjectChatDetail.as_view()(request, pk=self.chat.id)

self.assertEqual(response.status_code, 200)

def test_project_chat_detail_is_available_for_collaborator(self):
request = self.factory.get(f"/chats/projects/{self.chat.id}/")
force_authenticate(request, user=self.collaborator)

response = ProjectChatDetail.as_view()(request, pk=self.chat.id)

self.assertEqual(response.status_code, 200)

def test_project_chat_detail_is_forbidden_for_outsider(self):
request = self.factory.get(f"/chats/projects/{self.chat.id}/")
force_authenticate(request, user=self.outsider)

response = ProjectChatDetail.as_view()(request, pk=self.chat.id)

self.assertEqual(response.status_code, 403)
Loading