diff --git a/cms/djangoapps/contentstore/tests/test_course_listing.py b/cms/djangoapps/contentstore/tests/test_course_listing.py index d2fc51411482..69894ec75db8 100644 --- a/cms/djangoapps/contentstore/tests/test_course_listing.py +++ b/cms/djangoapps/contentstore/tests/test_course_listing.py @@ -10,6 +10,8 @@ from ccx_keys.locator import CCXLocator from django.test import RequestFactory from opaque_keys.edx.locations import CourseLocator +from openedx_authz.api.users import assign_role_to_user_in_scope +from openedx_authz.constants.roles import COURSE_DATA_RESEARCHER, COURSE_EDITOR, COURSE_STAFF from cms.djangoapps.contentstore.tests.utils import AjaxEnabledTestClient from cms.djangoapps.contentstore.utils import delete_course @@ -36,6 +38,8 @@ from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory from openedx.core.djangoapps.waffle_utils.testutils import WAFFLE_TABLES from openedx.core.djangolib.testing.utils import AUTHZ_TABLES +from openedx.core.djangoapps.authz.tests.mixins import AuthzTestMixin +from openedx.core import toggles as core_toggles from xmodule.modulestore import ModuleStoreEnum # lint-amnesty, pylint: disable=wrong-import-order from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase # lint-amnesty, pylint: disable=wrong-import-order from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order @@ -405,3 +409,273 @@ def _set_of_course_keys(course_list, key_attribute_name='id'): self.assertSetEqual( _set_of_course_keys(courses_in_progress), _set_of_course_keys(unsucceeded_course_actions, 'course_key') ) + + +@ddt.ddt +class TestCourseListingAuthz(AuthzTestMixin, ModuleStoreTestCase): + """ + Tests course listing using the new AuthZ authorization framework. + """ + + def setUp(self): + super().setUp() + + self.factory = RequestFactory() + + def _create_course(self, course_key): + """Helper method to create a course and its overview.""" + course = CourseFactory.create( + org=course_key.org, + number=course_key.course, + run=course_key.run, + ) + + return CourseOverviewFactory.create(id=course.id, org=course_key.org) + + def test_course_listing_with_course_staff_authz_permission(self): + # Create courses and assign access to only some of them to the user. + # Verify that only those courses are returned in the course listing. + # Using COURSE_STAFF role here. + + course_key_1 = CourseLocator("Org1", "Course1", "Run1") + course1 = self._create_course(course_key_1) + + course_key_2 = CourseLocator("Org1", "Course2", "Run1") + course2 = self._create_course(course_key_2) + + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_STAFF.external_key, + str(course_key_1), + ) + + request = self.factory.get("/course") + request.user = self.authorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + courses = list(courses_list) + + self.assertEqual(len(courses), 1) + self.assertEqual(courses[0].id, course1.id) + self.assertEqual(course2.id, course_key_2) + + def test_course_listing_with_course_editor_authz_permission(self): + # Create courses and assign access to only some of them to the user. + # Verify that only those courses are returned in the course listing. + # Using COURSE_EDITOR role here. + + course_key_1 = CourseLocator("Org1", "Course1", "Run1") + course1 = self._create_course(course_key_1) + + course_key_2 = CourseLocator("Org1", "Course2", "Run1") + course2 = self._create_course(course_key_2) + + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_EDITOR.external_key, + str(course_key_1), + ) + + request = self.factory.get("/course") + request.user = self.authorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + courses = list(courses_list) + + self.assertEqual(len(courses), 1) + self.assertEqual(courses[0].id, course1.id) + self.assertEqual(course2.id, course_key_2) + + def test_course_listing_without_permissions(self): + # Create a course but do not assign access to the user. + # Verify that no courses are returned in the course listing. + + course_key = CourseLocator("Org1", "Course1", "Run1") + + self._create_course(course_key) + + request = self.factory.get("/course") + request.user = self.unauthorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + self.assertEqual(len(list(courses_list)), 0) + + def test_non_staff_user_cannot_access(self): + """ + Create a course and assign a non-staff role to the user. + Verify that the course is not returned in the course listing. + """ + non_staff_user = UserFactory() + course_key = CourseLocator("Org1", "Course1", "Run1") + self._create_course(course_key) + self.add_user_to_role(non_staff_user, COURSE_DATA_RESEARCHER.external_key, course_key) + + request = self.factory.get("/course") + request.user = non_staff_user + + courses_list, _ = get_courses_accessible_to_user(request) + + self.assertEqual(len(list(courses_list)), 0) + + def _mock_authz_toggle(self, enabled_keys): + def _is_enabled(course_key=None, **_): + return str(course_key) in enabled_keys + return _is_enabled + + def _make_request(self, user): + request = self.factory.get("/course") + request.user = user + return request + + def _create_courses(self): + """Helper method to create multiple courses for testing.""" + authz_keys = [ + CourseLocator("Org1", "Course1", "AuthzRun"), + CourseLocator("Org1", "Course2", "AuthzRun"), + CourseLocator("Org1", "Course3", "AuthzRun"), + ] + + legacy_keys = [ + CourseLocator("Org1", "Course1", "LegacyRun"), + CourseLocator("Org1", "Course2", "LegacyRun"), + CourseLocator("Org1", "Course3", "LegacyRun"), + ] + + authz_courses = [self._create_course(k) for k in authz_keys] + legacy_courses = [self._create_course(k) for k in legacy_keys] + + return authz_keys, legacy_keys, authz_courses, legacy_courses + + @ddt.data( + { + "name": "authz_and_legacy_basic", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [0, 2], # toggle ON for Course1, Course3 + + "authz_roles": [ + {"index": 0, "role": COURSE_STAFF}, # valid (toggle ON) + {"index": 1, "role": COURSE_EDITOR}, # ignored (toggle OFF) + ], + "legacy_roles": [0], + + "expected": [("authz", 0), ("legacy", 0)], + }, + { + "name": "authz_role_but_toggle_off", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [2], # only Course3 enabled + + "authz_roles": [ + {"index": 1, "role": COURSE_EDITOR}, # should NOT appear + ], + "legacy_roles": [], + + "expected": [], + }, + { + "name": "multiple_roles_mixed", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [0, 1, 2], + + "authz_roles": [ + {"index": 0, "role": COURSE_STAFF}, + {"index": 1, "role": COURSE_EDITOR}, + ], + "legacy_roles": [2], + + "expected": [("authz", 0), ("authz", 1), ("legacy", 2)], + }, + { + "name": "staff_gets_all", + "is_staff": True, + "is_superuser": False, + "authz_enabled": [], + + "authz_roles": [], + "legacy_roles": [], + + "expected": [ + ("authz", 0), ("authz", 1), ("authz", 2), + ("legacy", 0), ("legacy", 1), ("legacy", 2), + ], + }, + { + "name": "superuser_gets_all", + "is_staff": False, + "is_superuser": True, + "authz_enabled": [], + + "authz_roles": [], + "legacy_roles": [], + + "expected": [ + ("authz", 0), ("authz", 1), ("authz", 2), + ("legacy", 0), ("legacy", 1), ("legacy", 2), + ], + }, + ) + @ddt.unpack + def test_course_access_matrix( + self, + name, + is_staff, + is_superuser, + authz_enabled, + authz_roles, + legacy_roles, + expected, + ): + # --- Setup toggle --- + enabled_keys = set() + + authz_keys, legacy_keys, authz_courses, legacy_courses = self._create_courses() + + for i in authz_enabled: + enabled_keys.add(str(authz_keys[i])) + + with patch.object( + core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, + "is_enabled", + side_effect=self._mock_authz_toggle(enabled_keys), + ): + # --- Create user --- + user = UserFactory(is_superuser=is_superuser) + + if is_staff: + GlobalStaff().add_users(user) + + # --- Assign AUTHZ roles --- + for r in authz_roles: + assign_role_to_user_in_scope( + user.username, + r["role"].external_key, + str(authz_keys[r["index"]]), + ) + + # --- Assign LEGACY roles --- + for i in legacy_roles: + CourseInstructorRole(legacy_keys[i]).add_users(user) + + # --- Execute --- + request = self._make_request(user) + courses, _ = get_courses_accessible_to_user(request) + + # --- Expected set --- + expected_ids = { + (authz_courses[i].id if group == "authz" else legacy_courses[i].id) + for group, i in expected + } + + result_ids = {course.id for course in courses} + + self.assertEqual( + result_ids, + expected_ids, + msg=f"Failed scenario: {name}", + ) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index bf2cd9cb83d6..98f3ce2a3a11 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -10,7 +10,6 @@ from typing import Dict import django.utils -from ccx_keys.locator import CCXLocator from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.decorators import login_required @@ -32,11 +31,16 @@ from opaque_keys import InvalidKeyError from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import BlockUsageLocator +from openedx.core.djangoapps.authz.decorators import LegacyAuthoringPermission, user_has_course_permission from organizations.api import add_organization_course, ensure_organization from organizations.exceptions import InvalidOrganizationException from rest_framework.exceptions import ValidationError from rest_framework.decorators import api_view +from openedx_authz.api import get_scopes_for_user_and_permission +from openedx_authz.constants.permissions import COURSES_VIEW_COURSE +from openedx_authz.api.data import CourseOverviewData from openedx.core.lib.api.view_utils import view_auth_classes +from ccx_keys.locator import CCXLocator from cms.djangoapps.contentstore.xblock_storage_handlers.view_handlers import create_xblock_info from cms.djangoapps.course_creators.views import add_user_with_status_unrequested, get_course_creator_status @@ -385,7 +389,9 @@ def get_in_process_course_actions(request): exclude_args={'state': CourseRerunUIStateManager.State.SUCCEEDED}, should_display=True, ) - if has_studio_read_access(request.user, course.course_key) + if user_has_course_permission( + request.user, COURSES_VIEW_COURSE.identifier, course.course_key, LegacyAuthoringPermission.READ + ) ] @@ -750,26 +756,193 @@ def course_index(request, course_key): return redirect(get_course_outline_url(course_key, block_to_show)) +def _has_authz_access(course_key, is_staff_user, authz_scopes): + """Returns True if the user has access to the course based on authz scopes, False otherwise.""" + if is_staff_user: + return True + + for access in authz_scopes: + if access.course_id == course_key: + return True + + return False + + +def _has_legacy_access(course_key, is_staff_user, legacy_accesses): + """Returns True if the user has access to the course based on legacy accesses, False otherwise.""" + if is_staff_user: + return True + + for access in legacy_accesses: + if access.course_id and access.course_id == course_key: + return True + elif access.org and access.course_id is None and access.org == course_key.org: + return True + + return False + + +def _apply_query_filters(request, courses): + """Applies all query filters to the given courses queryset. + This includes filtering by active/archived status, search query, ordering + and any special filters (e.g. CCX courses, template courses). The filters are applied in the following order: + 1. Active/archived status + 2. Search query + 3. Ordering + 4. Special filters (e.g. CCX courses, template courses) + The first 3 filters are applied using queryset methods, while the last filter is applied using a Python filter + function since it involves checking the course type (i.e. if it's a CCX course or a template course). + """ + + def filter_course(course): + """ + Special filters + """ + # CCXs cannot be edited in Studio (aka cms) and should not be shown in this dashboard. + include_course = not isinstance(course.id, CCXLocator) + + # TODO remove this condition when templates purged from db + include_course = include_course and course.location.course != 'templates' + + return include_course + + search_query, order, active_only, archived_only = get_query_params_if_present(request) + + filtered_courses = get_filtered_and_ordered_courses( + courses, + active_only, + archived_only, + search_query, + order, + ) + return filter(filter_course, filtered_courses) + + +def _get_candidate_course_keys(request): + """ + Return candidate course keys the user may have access to, combining: + + - Authz scopes: + Collects course keys from the user's scopes for the + `COURSES_VIEW_COURSE` permission. + + - Legacy access: + Collects course keys based on Django group roles + (`CourseInstructorRole`, `CourseStaffRole`) and org-level access. + If the user has org-level access, all courses within those orgs + are included. + """ + user = request.user + + # Authz start -------------------------------------- + # Recolecting all course keys from authz scopes + authz_scopes = get_scopes_for_user_and_permission( + user.username, + COURSES_VIEW_COURSE.identifier + ) + + authz_keys = { + access.course_key + for access in authz_scopes + if isinstance(access, CourseOverviewData) and access.course_key + } + # Authz end ----------------------------------------- + + # Legacy start -------------------------------------- + # Recolecting all course keys from django groups + instructor_courses = UserBasedRole(user, CourseInstructorRole.ROLE).courses_with_role() + + with strict_role_checking(): + staff_courses = UserBasedRole(user, CourseStaffRole.ROLE).courses_with_role() + + group_keys = set() + org_accesses = set() + legacy_accesses = instructor_courses | staff_courses + + for access in legacy_accesses: + if access.course_id is not None: + course_key = access.course_id + if not isinstance(course_key, CourseKey): + # CCX courses cannot be edited in Studio and should not be shown in this dashboard. + course_key = CourseKey.from_string(str(course_key)) + group_keys.add(course_key) + elif access.org: + org_accesses.add(access.org) + else: + # No course_id or org is associated with this access. + pass + + if org_accesses: + # Getting courses from user global orgs + all_courses_give_an_org = CourseOverview.get_all_courses(orgs=org_accesses).values_list("id", flat=True) + group_keys.update(all_courses_give_an_org) + # Legacy end ---------------------------------------- + + return authz_keys | group_keys + + @function_trace('get_courses_accessible_to_user') def get_courses_accessible_to_user(request): """ - Try to get all courses by first reversing django groups and fallback to old method if it fails - Note: overhead of pymongo reads will increase if getting courses from django groups fails + Return courses accessible to the user using a hybrid AuthZ + legacy approach. - Arguments: - request: the request object + Flow: + 1. Determine candidate course keys: + - Staff: all courses (full scan). + - Non-staff: derived from AuthZ scopes and legacy access. + + 2. Single-pass access evaluation: + - Use AuthZ or legacy checks per course (based on feature flags). + - Collect only accessible course keys. + + 3. Batch fetch courses: + - Retrieve all valid courses in one query (ordered by creation date). + + 4. Apply request-based filters. + + Returns: + tuple: + - list[CourseOverview]: Accessible courses. + - list: In-process course actions (staff only). """ - if GlobalStaff().has_user(request.user): - # user has global access so no need to get courses from django groups - courses, in_process_course_actions = _accessible_courses_summary_iter(request) + user = request.user + is_staff_user = GlobalStaff().has_user(user) or user.is_superuser + in_process_actions = [] + + # Step 1: Determine candidate keys + if is_staff_user: + # Unavoidable full scan + # however, we only fetch the course keys here for the access check, + # and defer fetching the full course objects until after filtering by access + candidate_keys = CourseOverview.get_all_courses().values_list("id", flat=True) + # Compute actions once for staff users since they have access to all courses + in_process_actions = get_in_process_course_actions(request) else: - try: - courses, in_process_course_actions = _accessible_courses_list_from_groups(request) - except AccessListFallback: - # user have some old groups or there was some error getting courses from django groups - # so fallback to iterating through all courses - courses, in_process_course_actions = _accessible_courses_summary_iter(request) - return courses, in_process_course_actions + candidate_keys = _get_candidate_course_keys(request) + + # Step 2: Single-pass decision → collect valid keys + valid_course_keys = set() + for course_key in candidate_keys: + if is_staff_user or user_has_course_permission( + user, + COURSES_VIEW_COURSE.identifier, + course_key, + LegacyAuthoringPermission.READ, + ): + valid_course_keys.add(course_key) + + if not valid_course_keys: + return [], in_process_actions + + # Step 3: Batch fetch valid courses with a single query, ordered by creation date + courses = CourseOverview.get_all_courses( + filter_={'id__in': list(valid_course_keys)} + ).order_by('created') # default ordering is by created date + + # Step 4: Apply filters (e.g. search, active/archived status, ordering) + courses = _apply_query_filters(request, courses) + + return courses, in_process_actions def _process_courses_list(courses_iter, in_process_course_actions, split_archived=False): diff --git a/openedx/core/djangoapps/authz/tests/mixins.py b/openedx/core/djangoapps/authz/tests/mixins.py index c2dc32277b2a..12baf7d1369b 100644 --- a/openedx/core/djangoapps/authz/tests/mixins.py +++ b/openedx/core/djangoapps/authz/tests/mixins.py @@ -14,6 +14,75 @@ from common.djangoapps.student.tests.factories import UserFactory +class AuthzTestMixin: + """ + Minimal reusable mixin for AuthZ-enabled tests. + """ + + @classmethod + def setUpClass(cls): + cls.toggle_patcher = patch.object( + core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, + "is_enabled", + return_value=True, + ) + cls.toggle_patcher.start() + super().setUpClass() + + @classmethod + def tearDownClass(cls): + cls.toggle_patcher.stop() + super().tearDownClass() + + def setUp(self): + super().setUp() + + self._seed_policies() + + self.authorized_user = UserFactory() + self.unauthorized_user = UserFactory() + + self.authorized_client = APIClient() + self.authorized_client.force_authenticate(user=self.authorized_user) + + self.unauthorized_client = APIClient() + self.unauthorized_client.force_authenticate(user=self.unauthorized_user) + + def tearDown(self): + super().tearDown() + AuthzEnforcer.get_enforcer().clear_policy() + + def add_user_to_role(self, user, role, course_key): + """Helper method to add a user to a role for the course.""" + assign_role_to_user_in_scope( + user.username, + role, + str(course_key) + ) + AuthzEnforcer.get_enforcer().load_policy() + + @classmethod + def _seed_policies(cls): + """Seed the database with AuthZ policies.""" + global_enforcer = AuthzEnforcer.get_enforcer() + global_enforcer.load_policy() + + model_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/model.conf", + ) + + policy_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/authz.policy", + ) + + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer(model_path, policy_path), + target_enforcer=global_enforcer, + ) + + class CourseAuthzTestMixin: """ Reusable mixin for testing course-scoped AuthZ endpoints.