From d458983dfb3ac2f356ef22f1626c56a83ec7a956 Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Tue, 17 Mar 2026 12:33:49 -0600 Subject: [PATCH 1/7] feat: add authz permission for the course authoring list --- .../contentstore/tests/test_course_listing.py | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/cms/djangoapps/contentstore/tests/test_course_listing.py b/cms/djangoapps/contentstore/tests/test_course_listing.py index d2fc51411482..75a893316751 100644 --- a/cms/djangoapps/contentstore/tests/test_course_listing.py +++ b/cms/djangoapps/contentstore/tests/test_course_listing.py @@ -6,6 +6,8 @@ import random from unittest.mock import Mock, patch +import casbin +import pkg_resources import ddt from ccx_keys.locator import CCXLocator from django.test import RequestFactory @@ -45,6 +47,81 @@ QUERY_COUNT_TABLE_IGNORELIST = WAFFLE_TABLES + AUTHZ_TABLES +from rest_framework.test import APIClient +from openedx_authz.api.users import assign_role_to_user_in_scope +from openedx_authz.constants.roles import COURSE_STAFF, COURSE_EDITOR, COURSE_DATA_RESEARCHER +from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.engine.utils import migrate_policy_between_enforcers + +from openedx.core import toggles as core_toggles + + +class AuthzTestMixin: + """ + Minimal reusable mixin for AuthZ-enabled tests. + """ + + @classmethod + def setUpClass(cls): + cls.toggle_patcher = patch.object( + core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, + "is_enabled", + return_value=True, + ) + cls.toggle_patcher.start() + super().setUpClass() + + @classmethod + def tearDownClass(cls): + cls.toggle_patcher.stop() + super().tearDownClass() + + def setUp(self): + super().setUp() + + self._seed_policies() + + self.authorized_user = UserFactory() + self.unauthorized_user = UserFactory() + + self.authorized_client = APIClient() + self.authorized_client.force_authenticate(user=self.authorized_user) + + self.unauthorized_client = APIClient() + self.unauthorized_client.force_authenticate(user=self.unauthorized_user) + + def tearDown(self): + super().tearDown() + AuthzEnforcer.get_enforcer().clear_policy() + + def add_user_to_role(self, user, role, course_key): + """Helper method to add a user to a role for the course.""" + assign_role_to_user_in_scope( + user.username, + role, + str(course_key) + ) + AuthzEnforcer.get_enforcer().load_policy() + + @classmethod + def _seed_policies(cls): + global_enforcer = AuthzEnforcer.get_enforcer() + global_enforcer.load_policy() + + model_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/model.conf", + ) + + policy_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/authz.policy", + ) + + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer(model_path, policy_path), + target_enforcer=global_enforcer, + ) @ddt.ddt class TestCourseListing(ModuleStoreTestCase): @@ -405,3 +482,112 @@ def _set_of_course_keys(course_list, key_attribute_name='id'): self.assertSetEqual( _set_of_course_keys(courses_in_progress), _set_of_course_keys(unsucceeded_course_actions, 'course_key') ) + + + +class TestCourseListingAuthz(AuthzTestMixin, ModuleStoreTestCase): + """ + Tests course listing using the new AuthZ authorization framework. + """ + + def setUp(self): + super().setUp() + + self.factory = RequestFactory() + + def _create_course(self, course_key): + course = CourseFactory.create( + org=course_key.org, + number=course_key.course, + run=course_key.run, + ) + + return CourseOverviewFactory.create(id=course.id, org=course_key.org) + + def test_course_listing_with_course_staff_authz_permission(self): + # Create courses and assign access to only some of them to the user. + # Verify that only those courses are returned in the course listing. + # Using COURSE_STAFF role here. + + course_key_1 = CourseLocator("Org1", "Course1", "Run1") + course1 = self._create_course(course_key_1) + + course_key_2 = CourseLocator("Org1", "Course2", "Run1") + course2 = self._create_course(course_key_2) + + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_STAFF.external_key, + str(course_key_1), + ) + + request = self.factory.get("/course") + request.user = self.authorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + courses = list(courses_list) + + self.assertEqual(len(courses), 1) + self.assertEqual(courses[0].id, course1.id) + self.assertEqual(course2.id, course_key_2) + + def test_course_listing_with_course_editor_authz_permission(self): + # Create courses and assign access to only some of them to the user. + # Verify that only those courses are returned in the course listing. + # Using COURSE_EDITOR role here. + + course_key_1 = CourseLocator("Org1", "Course1", "Run1") + course1 = self._create_course(course_key_1) + + course_key_2 = CourseLocator("Org1", "Course2", "Run1") + course2 = self._create_course(course_key_2) + + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_EDITOR.external_key, + str(course_key_1), + ) + + request = self.factory.get("/course") + request.user = self.authorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + courses = list(courses_list) + + self.assertEqual(len(courses), 1) + self.assertEqual(courses[0].id, course1.id) + self.assertEqual(course2.id, course_key_2) + + def test_course_listing_without_permissions(self): + # Create a course but do not assign access to the user. + # Verify that no courses are returned in the course listing. + + course_key = CourseLocator("Org1", "Course1", "Run1") + + self._create_course(course_key) + + request = self.factory.get("/course") + request.user = self.unauthorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + + self.assertEqual(len(list(courses_list)), 0) + + def test_non_staff_user_cannot_access(self): + """ + Create a course and assign a non-staff role to the user. + Verify that the course is not returned in the course listing. + """ + non_staff_user = UserFactory() + course_key = CourseLocator("Org1", "Course1", "Run1") + self._create_course(course_key) + self.add_user_to_role(non_staff_user, COURSE_DATA_RESEARCHER.external_key, course_key) + + request = self.factory.get("/course") + request.user = non_staff_user + + courses_list, _ = get_courses_accessible_to_user(request) + + self.assertEqual(len(list(courses_list)), 0) From 7ded7f651131ee7423a4e5a5152ac5737d802b0b Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Fri, 20 Mar 2026 16:22:16 -0600 Subject: [PATCH 2/7] fixup! feat: add authz permission for the course authoring list --- .../contentstore/tests/test_course_listing.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/cms/djangoapps/contentstore/tests/test_course_listing.py b/cms/djangoapps/contentstore/tests/test_course_listing.py index 75a893316751..75f5119ee7db 100644 --- a/cms/djangoapps/contentstore/tests/test_course_listing.py +++ b/cms/djangoapps/contentstore/tests/test_course_listing.py @@ -34,6 +34,7 @@ UserBasedRole ) from common.djangoapps.student.tests.factories import UserFactory +from common.djangoapps.util import course from openedx.core.djangoapps.content.course_overviews.models import CourseOverview from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory from openedx.core.djangoapps.waffle_utils.testutils import WAFFLE_TABLES @@ -591,3 +592,91 @@ def test_non_staff_user_cannot_access(self): courses_list, _ = get_courses_accessible_to_user(request) self.assertEqual(len(list(courses_list)), 0) + + def test_course_listing_with_mix_authz_and_legacy_permissions(self): + authz_courses_keys = { + "course-v1:Org1+Course1+AuthzRun", + "course-v1:Org1+Course2+AuthzRun", + "course-v1:Org1+Course3+AuthzRun", + } + + authz_courses = [] + legacy_courses = [] + + def mock_is_enabled(*args, **kwargs): + # Extract course key from context (depends on how it's passed) + course_key = kwargs.get("context") or (args[0] if args else None) + return str(course_key) in authz_courses_keys + + with patch.object( + core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, + "is_enabled", + side_effect=mock_is_enabled, + ): + # Courses with authz flag enabled + authz_enable_course_key_1 = CourseLocator("Org1", "Course1", "AuthzRun") + authz_enable_course_1 = self._create_course(authz_enable_course_key_1) + + authz_enable_course_key_2 = CourseLocator("Org1", "Course2", "AuthzRun") + authz_enable_course_2 = self._create_course(authz_enable_course_key_2) + + authz_enable_course_key_3 = CourseLocator("Org1", "Course3", "AuthzRun") + authz_enable_course_3 = self._create_course(authz_enable_course_key_3) + + authz_courses.extend([authz_enable_course_1, authz_enable_course_2, authz_enable_course_3]) + + # Courses without authz flag enabled (legacy courses) + legacy_course_key_1 = CourseLocator("Org1", "Course1", "LegacyRun") + legacy_course_1 = self._create_course(legacy_course_key_1) + + legacy_course_key_2 = CourseLocator("Org1", "Course2", "LegacyRun") + legacy_course_2 = self._create_course(legacy_course_key_2) + + legacy_course_key_3 = CourseLocator("Org1", "Course3", "LegacyRun") + legacy_course_3 = self._create_course(legacy_course_key_3) + + legacy_courses.extend([legacy_course_1, legacy_course_2, legacy_course_3]) + + # Assign roles + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_STAFF.external_key, + str(authz_enable_course_key_1), + ) + + assign_role_to_user_in_scope( + self.authorized_user.username, + COURSE_STAFF.external_key, + str(authz_enable_course_key_2), + ) + + CourseInstructorRole(legacy_course_key_1).add_users(self.authorized_user) + + request = self.factory.get("/course") + request.user = self.authorized_user + + courses_list, _ = get_courses_accessible_to_user(request) + courses = list(courses_list) + + # Case 1: authorized user is added as a CourseStaff role is assigned for authz_enable_course_1, + # authz_enable_course_2 and CourseInstructor role is assigned for legacy_course_1, + # so these three courses should be returned in the course listing even if the toggle is enabled + # for authz_enable_course_3 but no role is assigned for it. + self.assertEqual(len(courses), 3) + self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, legacy_course_1.id] for course in courses)) + + # Case 2: Staff user should have access to all courses regardless of authz permissions. + # Pending authz_course_3 to check as staff + GlobalStaff().add_users(self.authorized_user) + courses_list, _ = get_courses_accessible_to_user(request) + courses = list(courses_list) + self.assertEqual(len(courses), 6) + self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] for course in courses)) + + # Case 3: Superuser should have access to all courses regardless of authz permissions. + superuser = UserFactory(is_superuser=True) + request.user = superuser + courses_list, _ = get_courses_accessible_to_user(request) + courses = list(courses_list) + self.assertEqual(len(courses), 6) + self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] for course in courses)) From ec400f062bb9d6e25fc7ab282c558811c01bf7aa Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Fri, 20 Mar 2026 17:07:44 -0600 Subject: [PATCH 3/7] fixup! feat: add authz permission for the course authoring list --- cms/djangoapps/contentstore/views/course.py | 179 ++++++++++++++++++-- 1 file changed, 163 insertions(+), 16 deletions(-) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index bf2cd9cb83d6..6ea2d7b7fc89 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -32,11 +32,16 @@ from opaque_keys import InvalidKeyError from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import BlockUsageLocator +from openedx.core.djangoapps.authz.decorators import LegacyAuthoringPermission, user_has_course_permission from organizations.api import add_organization_course, ensure_organization from organizations.exceptions import InvalidOrganizationException from rest_framework.exceptions import ValidationError from rest_framework.decorators import api_view +from openedx_authz.api import get_scopes_for_user_and_permission +from openedx_authz.constants.permissions import COURSES_VIEW_COURSE +from openedx_authz.api.data import CourseOverviewData from openedx.core.lib.api.view_utils import view_auth_classes +from openedx.core import toggles as core_toggles from cms.djangoapps.contentstore.xblock_storage_handlers.view_handlers import create_xblock_info from cms.djangoapps.course_creators.views import add_user_with_status_unrequested, get_course_creator_status @@ -385,7 +390,9 @@ def get_in_process_course_actions(request): exclude_args={'state': CourseRerunUIStateManager.State.SUCCEEDED}, should_display=True, ) - if has_studio_read_access(request.user, course.course_key) + if user_has_course_permission( + request.user, COURSES_VIEW_COURSE.identifier, course.course_key, LegacyAuthoringPermission.READ + ) ] @@ -750,26 +757,166 @@ def course_index(request, course_key): return redirect(get_course_outline_url(course_key, block_to_show)) +def _has_authz_access(course_key, is_staff_user, authz_scopes): + """Returns True if the user has access to the course based on authz scopes, False otherwise.""" + if is_staff_user: + return True + + for access in authz_scopes: + if access.course_id == course_key: + return True + + return False + + +def _has_legacy_access(course_key, is_staff_user, legacy_accesses): + """Returns True if the user has access to the course based on legacy accesses, False otherwise.""" + if is_staff_user: + return True + + for access in legacy_accesses: + if access.course_id and access.course_id == course_key: + return True + elif access.org and access.course_id is None and access.org == course_key.org: + return True + + return False + + +def _apply_query_filters(request, courses): + """Applies all query filters to the given courses queryset. + This includes filtering by active/archived status, search query, ordering + and any special filters (e.g. CCX courses, template courses). The filters are applied in the following order: + 1. Special filters (e.g. CCX courses, template courses) + 2. Active/archived status + 3. Search query + 4. Ordering + """ + + def filter_course(course): + """ + Special filters + """ + # CCXs cannot be edited in Studio (aka cms) and should not be shown in this dashboard. + include_course = not isinstance(course.id, CCXLocator) + + # TODO remove this condition when templates purged from db + include_course = include_course and course.location.course != 'templates' + + return include_course + + filtered_courses = filter(filter_course, courses) + + search_query, order, active_only, archived_only = get_query_params_if_present(request) + + return get_filtered_and_ordered_courses( + filtered_courses, + active_only, + archived_only, + search_query, + order, + ) + + +def _get_candidate_course_keys(request): + """Returns a list of candidate course keys that the user may have access to, + based on both authz scopes and legacy group-based access. + Also returns the authz scopes and legacy accesses for further processing.""" + user = request.user + + # Authz start -------------------------------------- + # Recolecting all course keys from authz scopes + authz_scopes = get_scopes_for_user_and_permission( + user.username, + COURSES_VIEW_COURSE.identifier + ) + + authz_keys = { + access.course_id + for access in authz_scopes + if access.course_id is not None and isinstance(access, CourseOverviewData) + } + # Authz end ----------------------------------------- + + # Legacy start -------------------------------------- + # Recolecting all course keys from django groups + instructor_courses = UserBasedRole(user, CourseInstructorRole.ROLE).courses_with_role() + + with strict_role_checking(): + staff_courses = UserBasedRole(user, CourseStaffRole.ROLE).courses_with_role() + + group_keys = set() + org_accesses = set() + legacy_accesses = instructor_courses | staff_courses + + for access in legacy_accesses: + if access.course_id is not None: + group_keys.add(access.course_id) + elif access.org: + org_accesses.add(access.org) + else: + # No course_id or org is associated with this access. + pass + + if org_accesses: + # Getting courses from user global orgs + all_courses_give_an_org = CourseOverview.get_all_courses(orgs=list(org_accesses)) + org_course_keys = {overview.id for overview in all_courses_give_an_org} + group_keys.update(org_course_keys) + # Legacy end ---------------------------------------- + + return list(authz_keys | group_keys), authz_scopes, legacy_accesses + + @function_trace('get_courses_accessible_to_user') def get_courses_accessible_to_user(request): """ - Try to get all courses by first reversing django groups and fallback to old method if it fails - Note: overhead of pymongo reads will increase if getting courses from django groups fails - - Arguments: - request: the request object + Hybrid approach: + - Single-pass decision per course (authz vs legacy) + - Batch DB fetch for performance """ - if GlobalStaff().has_user(request.user): - # user has global access so no need to get courses from django groups - courses, in_process_course_actions = _accessible_courses_summary_iter(request) + user = request.user + is_staff_user = GlobalStaff().has_user(user) or user.is_superuser + authz_scopes = None + legacy_accesses = None + + # Step 1: Determine candidate keys + if is_staff_user: + # unavoidable full scan + candidate_courses = CourseOverview.get_all_courses() + candidate_keys = [c.id for c in candidate_courses] else: - try: - courses, in_process_course_actions = _accessible_courses_list_from_groups(request) - except AccessListFallback: - # user have some old groups or there was some error getting courses from django groups - # so fallback to iterating through all courses - courses, in_process_course_actions = _accessible_courses_summary_iter(request) - return courses, in_process_course_actions + candidate_keys, authz_scopes, legacy_accesses = _get_candidate_course_keys(request) + + if not candidate_keys: + return [], [] + + # Step 2: Single-pass decision → collect valid keys + valid_course_keys = set() + + for course_key in candidate_keys: + if core_toggles.enable_authz_course_authoring(course_key): + if _has_authz_access(course_key, is_staff_user, authz_scopes): + valid_course_keys.add(course_key) + else: + if _has_legacy_access(course_key, is_staff_user, legacy_accesses): + valid_course_keys.add(course_key) + + if not valid_course_keys: + return [], [] + + # Step 3: Batch fetch (key optimization) + courses = CourseOverview.get_all_courses( + filter_={'id__in': list(valid_course_keys)} + ) + + # Step 4: Apply filters once + courses = _apply_query_filters(request, courses) + + # Step 5: Compute actions once + in_process_actions = get_in_process_course_actions(request) + + return list(courses), in_process_actions def _process_courses_list(courses_iter, in_process_course_actions, split_archived=False): From 96b100ab8d2d7573f600946cea70ee92e57325dc Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Fri, 20 Mar 2026 18:03:32 -0600 Subject: [PATCH 4/7] fixup! feat: add authz permission for the course authoring list --- .../contentstore/tests/test_course_listing.py | 107 +++++------------- cms/djangoapps/contentstore/views/course.py | 17 +-- openedx/core/djangoapps/authz/tests/mixins.py | 69 +++++++++++ 3 files changed, 104 insertions(+), 89 deletions(-) diff --git a/cms/djangoapps/contentstore/tests/test_course_listing.py b/cms/djangoapps/contentstore/tests/test_course_listing.py index 75f5119ee7db..c2a0751293db 100644 --- a/cms/djangoapps/contentstore/tests/test_course_listing.py +++ b/cms/djangoapps/contentstore/tests/test_course_listing.py @@ -6,12 +6,12 @@ import random from unittest.mock import Mock, patch -import casbin -import pkg_resources import ddt from ccx_keys.locator import CCXLocator from django.test import RequestFactory from opaque_keys.edx.locations import CourseLocator +from openedx_authz.api.users import assign_role_to_user_in_scope +from openedx_authz.constants.roles import COURSE_DATA_RESEARCHER, COURSE_EDITOR, COURSE_STAFF from cms.djangoapps.contentstore.tests.utils import AjaxEnabledTestClient from cms.djangoapps.contentstore.utils import delete_course @@ -34,11 +34,12 @@ UserBasedRole ) from common.djangoapps.student.tests.factories import UserFactory -from common.djangoapps.util import course from openedx.core.djangoapps.content.course_overviews.models import CourseOverview from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory from openedx.core.djangoapps.waffle_utils.testutils import WAFFLE_TABLES from openedx.core.djangolib.testing.utils import AUTHZ_TABLES +from openedx.core.djangoapps.authz.tests.mixins import AuthzTestMixin +from openedx.core import toggles as core_toggles from xmodule.modulestore import ModuleStoreEnum # lint-amnesty, pylint: disable=wrong-import-order from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase # lint-amnesty, pylint: disable=wrong-import-order from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order @@ -48,81 +49,6 @@ QUERY_COUNT_TABLE_IGNORELIST = WAFFLE_TABLES + AUTHZ_TABLES -from rest_framework.test import APIClient -from openedx_authz.api.users import assign_role_to_user_in_scope -from openedx_authz.constants.roles import COURSE_STAFF, COURSE_EDITOR, COURSE_DATA_RESEARCHER -from openedx_authz.engine.enforcer import AuthzEnforcer -from openedx_authz.engine.utils import migrate_policy_between_enforcers - -from openedx.core import toggles as core_toggles - - -class AuthzTestMixin: - """ - Minimal reusable mixin for AuthZ-enabled tests. - """ - - @classmethod - def setUpClass(cls): - cls.toggle_patcher = patch.object( - core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, - "is_enabled", - return_value=True, - ) - cls.toggle_patcher.start() - super().setUpClass() - - @classmethod - def tearDownClass(cls): - cls.toggle_patcher.stop() - super().tearDownClass() - - def setUp(self): - super().setUp() - - self._seed_policies() - - self.authorized_user = UserFactory() - self.unauthorized_user = UserFactory() - - self.authorized_client = APIClient() - self.authorized_client.force_authenticate(user=self.authorized_user) - - self.unauthorized_client = APIClient() - self.unauthorized_client.force_authenticate(user=self.unauthorized_user) - - def tearDown(self): - super().tearDown() - AuthzEnforcer.get_enforcer().clear_policy() - - def add_user_to_role(self, user, role, course_key): - """Helper method to add a user to a role for the course.""" - assign_role_to_user_in_scope( - user.username, - role, - str(course_key) - ) - AuthzEnforcer.get_enforcer().load_policy() - - @classmethod - def _seed_policies(cls): - global_enforcer = AuthzEnforcer.get_enforcer() - global_enforcer.load_policy() - - model_path = pkg_resources.resource_filename( - "openedx_authz.engine", - "config/model.conf", - ) - - policy_path = pkg_resources.resource_filename( - "openedx_authz.engine", - "config/authz.policy", - ) - - migrate_policy_between_enforcers( - source_enforcer=casbin.Enforcer(model_path, policy_path), - target_enforcer=global_enforcer, - ) @ddt.ddt class TestCourseListing(ModuleStoreTestCase): @@ -497,6 +423,7 @@ def setUp(self): self.factory = RequestFactory() def _create_course(self, course_key): + """Helper method to create a course and its overview.""" course = CourseFactory.create( org=course_key.org, number=course_key.course, @@ -663,7 +590,11 @@ def mock_is_enabled(*args, **kwargs): # so these three courses should be returned in the course listing even if the toggle is enabled # for authz_enable_course_3 but no role is assigned for it. self.assertEqual(len(courses), 3) - self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, legacy_course_1.id] for course in courses)) + self.assertTrue( + all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, legacy_course_1.id] + for course in courses + ) + ) # Case 2: Staff user should have access to all courses regardless of authz permissions. # Pending authz_course_3 to check as staff @@ -671,7 +602,14 @@ def mock_is_enabled(*args, **kwargs): courses_list, _ = get_courses_accessible_to_user(request) courses = list(courses_list) self.assertEqual(len(courses), 6) - self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] for course in courses)) + self.assertTrue( + all( + course.id in + [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, + legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] + for course in courses + ) + ) # Case 3: Superuser should have access to all courses regardless of authz permissions. superuser = UserFactory(is_superuser=True) @@ -679,4 +617,11 @@ def mock_is_enabled(*args, **kwargs): courses_list, _ = get_courses_accessible_to_user(request) courses = list(courses_list) self.assertEqual(len(courses), 6) - self.assertTrue(all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] for course in courses)) + self.assertTrue( + all( + course.id in + [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, + legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] + for course in courses + ) + ) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index 6ea2d7b7fc89..bf24cc8a9250 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -787,10 +787,12 @@ def _apply_query_filters(request, courses): """Applies all query filters to the given courses queryset. This includes filtering by active/archived status, search query, ordering and any special filters (e.g. CCX courses, template courses). The filters are applied in the following order: - 1. Special filters (e.g. CCX courses, template courses) - 2. Active/archived status - 3. Search query - 4. Ordering + 1. Active/archived status + 2. Search query + 3. Ordering + 4. Special filters (e.g. CCX courses, template courses) + The first 3 filters are applied using queryset methods, while the last filter is applied using a Python filter + function since it involves checking the course type (i.e. if it's a CCX course or a template course). """ def filter_course(course): @@ -805,17 +807,16 @@ def filter_course(course): return include_course - filtered_courses = filter(filter_course, courses) - search_query, order, active_only, archived_only = get_query_params_if_present(request) - return get_filtered_and_ordered_courses( - filtered_courses, + filtered_courses = get_filtered_and_ordered_courses( + courses, active_only, archived_only, search_query, order, ) + return filter(filter_course, filtered_courses) def _get_candidate_course_keys(request): diff --git a/openedx/core/djangoapps/authz/tests/mixins.py b/openedx/core/djangoapps/authz/tests/mixins.py index c2dc32277b2a..12baf7d1369b 100644 --- a/openedx/core/djangoapps/authz/tests/mixins.py +++ b/openedx/core/djangoapps/authz/tests/mixins.py @@ -14,6 +14,75 @@ from common.djangoapps.student.tests.factories import UserFactory +class AuthzTestMixin: + """ + Minimal reusable mixin for AuthZ-enabled tests. + """ + + @classmethod + def setUpClass(cls): + cls.toggle_patcher = patch.object( + core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, + "is_enabled", + return_value=True, + ) + cls.toggle_patcher.start() + super().setUpClass() + + @classmethod + def tearDownClass(cls): + cls.toggle_patcher.stop() + super().tearDownClass() + + def setUp(self): + super().setUp() + + self._seed_policies() + + self.authorized_user = UserFactory() + self.unauthorized_user = UserFactory() + + self.authorized_client = APIClient() + self.authorized_client.force_authenticate(user=self.authorized_user) + + self.unauthorized_client = APIClient() + self.unauthorized_client.force_authenticate(user=self.unauthorized_user) + + def tearDown(self): + super().tearDown() + AuthzEnforcer.get_enforcer().clear_policy() + + def add_user_to_role(self, user, role, course_key): + """Helper method to add a user to a role for the course.""" + assign_role_to_user_in_scope( + user.username, + role, + str(course_key) + ) + AuthzEnforcer.get_enforcer().load_policy() + + @classmethod + def _seed_policies(cls): + """Seed the database with AuthZ policies.""" + global_enforcer = AuthzEnforcer.get_enforcer() + global_enforcer.load_policy() + + model_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/model.conf", + ) + + policy_path = pkg_resources.resource_filename( + "openedx_authz.engine", + "config/authz.policy", + ) + + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer(model_path, policy_path), + target_enforcer=global_enforcer, + ) + + class CourseAuthzTestMixin: """ Reusable mixin for testing course-scoped AuthZ endpoints. From 91ee8c1ecf2b7715c13eb8d6c22fa6f328c75090 Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Sat, 21 Mar 2026 12:18:11 -0600 Subject: [PATCH 5/7] fixup! feat: add authz permission for the course authoring list --- cms/djangoapps/contentstore/views/course.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index bf24cc8a9250..b03ab300091b 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -909,7 +909,7 @@ def get_courses_accessible_to_user(request): # Step 3: Batch fetch (key optimization) courses = CourseOverview.get_all_courses( filter_={'id__in': list(valid_course_keys)} - ) + ).order_by('created') # default ordering is by last modified date, descending # Step 4: Apply filters once courses = _apply_query_filters(request, courses) From 128901f704f68b1c8e89c17fd02e43dc2f9f4d41 Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Sat, 21 Mar 2026 12:32:55 -0600 Subject: [PATCH 6/7] fixup! feat: add authz permission for the course authoring list --- cms/djangoapps/contentstore/views/course.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index b03ab300091b..bc02172e3876 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -880,17 +880,20 @@ def get_courses_accessible_to_user(request): is_staff_user = GlobalStaff().has_user(user) or user.is_superuser authz_scopes = None legacy_accesses = None + in_process_actions = [] # Step 1: Determine candidate keys if is_staff_user: # unavoidable full scan candidate_courses = CourseOverview.get_all_courses() candidate_keys = [c.id for c in candidate_courses] + # Compute actions once for staff users since they have access to all courses + in_process_actions = get_in_process_course_actions(request) else: candidate_keys, authz_scopes, legacy_accesses = _get_candidate_course_keys(request) if not candidate_keys: - return [], [] + return [], in_process_actions # Step 2: Single-pass decision → collect valid keys valid_course_keys = set() @@ -904,7 +907,7 @@ def get_courses_accessible_to_user(request): valid_course_keys.add(course_key) if not valid_course_keys: - return [], [] + return [], in_process_actions # Step 3: Batch fetch (key optimization) courses = CourseOverview.get_all_courses( @@ -914,9 +917,6 @@ def get_courses_accessible_to_user(request): # Step 4: Apply filters once courses = _apply_query_filters(request, courses) - # Step 5: Compute actions once - in_process_actions = get_in_process_course_actions(request) - return list(courses), in_process_actions From 78bf7857920855ac7d64cb2c92c52d74bad02463 Mon Sep 17 00:00:00 2001 From: Daniel Wong Date: Sat, 21 Mar 2026 16:43:56 -0600 Subject: [PATCH 7/7] fixup! feat: add authz permission for the course authoring list --- .../contentstore/tests/test_course_listing.py | 240 +++++++++++------- cms/djangoapps/contentstore/views/course.py | 95 ++++--- 2 files changed, 207 insertions(+), 128 deletions(-) diff --git a/cms/djangoapps/contentstore/tests/test_course_listing.py b/cms/djangoapps/contentstore/tests/test_course_listing.py index c2a0751293db..69894ec75db8 100644 --- a/cms/djangoapps/contentstore/tests/test_course_listing.py +++ b/cms/djangoapps/contentstore/tests/test_course_listing.py @@ -411,7 +411,7 @@ def _set_of_course_keys(course_list, key_attribute_name='id'): ) - +@ddt.ddt class TestCourseListingAuthz(AuthzTestMixin, ModuleStoreTestCase): """ Tests course listing using the new AuthZ authorization framework. @@ -520,108 +520,162 @@ def test_non_staff_user_cannot_access(self): self.assertEqual(len(list(courses_list)), 0) - def test_course_listing_with_mix_authz_and_legacy_permissions(self): - authz_courses_keys = { - "course-v1:Org1+Course1+AuthzRun", - "course-v1:Org1+Course2+AuthzRun", - "course-v1:Org1+Course3+AuthzRun", - } + def _mock_authz_toggle(self, enabled_keys): + def _is_enabled(course_key=None, **_): + return str(course_key) in enabled_keys + return _is_enabled + + def _make_request(self, user): + request = self.factory.get("/course") + request.user = user + return request + + def _create_courses(self): + """Helper method to create multiple courses for testing.""" + authz_keys = [ + CourseLocator("Org1", "Course1", "AuthzRun"), + CourseLocator("Org1", "Course2", "AuthzRun"), + CourseLocator("Org1", "Course3", "AuthzRun"), + ] - authz_courses = [] - legacy_courses = [] + legacy_keys = [ + CourseLocator("Org1", "Course1", "LegacyRun"), + CourseLocator("Org1", "Course2", "LegacyRun"), + CourseLocator("Org1", "Course3", "LegacyRun"), + ] - def mock_is_enabled(*args, **kwargs): - # Extract course key from context (depends on how it's passed) - course_key = kwargs.get("context") or (args[0] if args else None) - return str(course_key) in authz_courses_keys + authz_courses = [self._create_course(k) for k in authz_keys] + legacy_courses = [self._create_course(k) for k in legacy_keys] + + return authz_keys, legacy_keys, authz_courses, legacy_courses + + @ddt.data( + { + "name": "authz_and_legacy_basic", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [0, 2], # toggle ON for Course1, Course3 + + "authz_roles": [ + {"index": 0, "role": COURSE_STAFF}, # valid (toggle ON) + {"index": 1, "role": COURSE_EDITOR}, # ignored (toggle OFF) + ], + "legacy_roles": [0], + + "expected": [("authz", 0), ("legacy", 0)], + }, + { + "name": "authz_role_but_toggle_off", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [2], # only Course3 enabled + + "authz_roles": [ + {"index": 1, "role": COURSE_EDITOR}, # should NOT appear + ], + "legacy_roles": [], + + "expected": [], + }, + { + "name": "multiple_roles_mixed", + "is_staff": False, + "is_superuser": False, + "authz_enabled": [0, 1, 2], + + "authz_roles": [ + {"index": 0, "role": COURSE_STAFF}, + {"index": 1, "role": COURSE_EDITOR}, + ], + "legacy_roles": [2], + + "expected": [("authz", 0), ("authz", 1), ("legacy", 2)], + }, + { + "name": "staff_gets_all", + "is_staff": True, + "is_superuser": False, + "authz_enabled": [], + + "authz_roles": [], + "legacy_roles": [], + + "expected": [ + ("authz", 0), ("authz", 1), ("authz", 2), + ("legacy", 0), ("legacy", 1), ("legacy", 2), + ], + }, + { + "name": "superuser_gets_all", + "is_staff": False, + "is_superuser": True, + "authz_enabled": [], + + "authz_roles": [], + "legacy_roles": [], + + "expected": [ + ("authz", 0), ("authz", 1), ("authz", 2), + ("legacy", 0), ("legacy", 1), ("legacy", 2), + ], + }, + ) + @ddt.unpack + def test_course_access_matrix( + self, + name, + is_staff, + is_superuser, + authz_enabled, + authz_roles, + legacy_roles, + expected, + ): + # --- Setup toggle --- + enabled_keys = set() + + authz_keys, legacy_keys, authz_courses, legacy_courses = self._create_courses() + + for i in authz_enabled: + enabled_keys.add(str(authz_keys[i])) with patch.object( core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, "is_enabled", - side_effect=mock_is_enabled, + side_effect=self._mock_authz_toggle(enabled_keys), ): - # Courses with authz flag enabled - authz_enable_course_key_1 = CourseLocator("Org1", "Course1", "AuthzRun") - authz_enable_course_1 = self._create_course(authz_enable_course_key_1) - - authz_enable_course_key_2 = CourseLocator("Org1", "Course2", "AuthzRun") - authz_enable_course_2 = self._create_course(authz_enable_course_key_2) - - authz_enable_course_key_3 = CourseLocator("Org1", "Course3", "AuthzRun") - authz_enable_course_3 = self._create_course(authz_enable_course_key_3) - - authz_courses.extend([authz_enable_course_1, authz_enable_course_2, authz_enable_course_3]) - - # Courses without authz flag enabled (legacy courses) - legacy_course_key_1 = CourseLocator("Org1", "Course1", "LegacyRun") - legacy_course_1 = self._create_course(legacy_course_key_1) - - legacy_course_key_2 = CourseLocator("Org1", "Course2", "LegacyRun") - legacy_course_2 = self._create_course(legacy_course_key_2) - - legacy_course_key_3 = CourseLocator("Org1", "Course3", "LegacyRun") - legacy_course_3 = self._create_course(legacy_course_key_3) - - legacy_courses.extend([legacy_course_1, legacy_course_2, legacy_course_3]) - - # Assign roles - assign_role_to_user_in_scope( - self.authorized_user.username, - COURSE_STAFF.external_key, - str(authz_enable_course_key_1), - ) - - assign_role_to_user_in_scope( - self.authorized_user.username, - COURSE_STAFF.external_key, - str(authz_enable_course_key_2), - ) - - CourseInstructorRole(legacy_course_key_1).add_users(self.authorized_user) + # --- Create user --- + user = UserFactory(is_superuser=is_superuser) + + if is_staff: + GlobalStaff().add_users(user) + + # --- Assign AUTHZ roles --- + for r in authz_roles: + assign_role_to_user_in_scope( + user.username, + r["role"].external_key, + str(authz_keys[r["index"]]), + ) - request = self.factory.get("/course") - request.user = self.authorized_user + # --- Assign LEGACY roles --- + for i in legacy_roles: + CourseInstructorRole(legacy_keys[i]).add_users(user) - courses_list, _ = get_courses_accessible_to_user(request) - courses = list(courses_list) + # --- Execute --- + request = self._make_request(user) + courses, _ = get_courses_accessible_to_user(request) - # Case 1: authorized user is added as a CourseStaff role is assigned for authz_enable_course_1, - # authz_enable_course_2 and CourseInstructor role is assigned for legacy_course_1, - # so these three courses should be returned in the course listing even if the toggle is enabled - # for authz_enable_course_3 but no role is assigned for it. - self.assertEqual(len(courses), 3) - self.assertTrue( - all(course.id in [authz_enable_course_1.id, authz_enable_course_2.id, legacy_course_1.id] - for course in courses - ) - ) + # --- Expected set --- + expected_ids = { + (authz_courses[i].id if group == "authz" else legacy_courses[i].id) + for group, i in expected + } - # Case 2: Staff user should have access to all courses regardless of authz permissions. - # Pending authz_course_3 to check as staff - GlobalStaff().add_users(self.authorized_user) - courses_list, _ = get_courses_accessible_to_user(request) - courses = list(courses_list) - self.assertEqual(len(courses), 6) - self.assertTrue( - all( - course.id in - [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, - legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] - for course in courses - ) - ) + result_ids = {course.id for course in courses} - # Case 3: Superuser should have access to all courses regardless of authz permissions. - superuser = UserFactory(is_superuser=True) - request.user = superuser - courses_list, _ = get_courses_accessible_to_user(request) - courses = list(courses_list) - self.assertEqual(len(courses), 6) - self.assertTrue( - all( - course.id in - [authz_enable_course_1.id, authz_enable_course_2.id, authz_enable_course_3.id, - legacy_course_1.id, legacy_course_2.id, legacy_course_3.id] - for course in courses - ) + self.assertEqual( + result_ids, + expected_ids, + msg=f"Failed scenario: {name}", ) diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index bc02172e3876..98f3ce2a3a11 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -10,7 +10,6 @@ from typing import Dict import django.utils -from ccx_keys.locator import CCXLocator from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.decorators import login_required @@ -41,7 +40,7 @@ from openedx_authz.constants.permissions import COURSES_VIEW_COURSE from openedx_authz.api.data import CourseOverviewData from openedx.core.lib.api.view_utils import view_auth_classes -from openedx.core import toggles as core_toggles +from ccx_keys.locator import CCXLocator from cms.djangoapps.contentstore.xblock_storage_handlers.view_handlers import create_xblock_info from cms.djangoapps.course_creators.views import add_user_with_status_unrequested, get_course_creator_status @@ -820,9 +819,19 @@ def filter_course(course): def _get_candidate_course_keys(request): - """Returns a list of candidate course keys that the user may have access to, - based on both authz scopes and legacy group-based access. - Also returns the authz scopes and legacy accesses for further processing.""" + """ + Return candidate course keys the user may have access to, combining: + + - Authz scopes: + Collects course keys from the user's scopes for the + `COURSES_VIEW_COURSE` permission. + + - Legacy access: + Collects course keys based on Django group roles + (`CourseInstructorRole`, `CourseStaffRole`) and org-level access. + If the user has org-level access, all courses within those orgs + are included. + """ user = request.user # Authz start -------------------------------------- @@ -833,9 +842,9 @@ def _get_candidate_course_keys(request): ) authz_keys = { - access.course_id + access.course_key for access in authz_scopes - if access.course_id is not None and isinstance(access, CourseOverviewData) + if isinstance(access, CourseOverviewData) and access.course_key } # Authz end ----------------------------------------- @@ -852,7 +861,11 @@ def _get_candidate_course_keys(request): for access in legacy_accesses: if access.course_id is not None: - group_keys.add(access.course_id) + course_key = access.course_id + if not isinstance(course_key, CourseKey): + # CCX courses cannot be edited in Studio and should not be shown in this dashboard. + course_key = CourseKey.from_string(str(course_key)) + group_keys.add(course_key) elif access.org: org_accesses.add(access.org) else: @@ -861,63 +874,75 @@ def _get_candidate_course_keys(request): if org_accesses: # Getting courses from user global orgs - all_courses_give_an_org = CourseOverview.get_all_courses(orgs=list(org_accesses)) - org_course_keys = {overview.id for overview in all_courses_give_an_org} - group_keys.update(org_course_keys) + all_courses_give_an_org = CourseOverview.get_all_courses(orgs=org_accesses).values_list("id", flat=True) + group_keys.update(all_courses_give_an_org) # Legacy end ---------------------------------------- - return list(authz_keys | group_keys), authz_scopes, legacy_accesses + return authz_keys | group_keys @function_trace('get_courses_accessible_to_user') def get_courses_accessible_to_user(request): """ - Hybrid approach: - - Single-pass decision per course (authz vs legacy) - - Batch DB fetch for performance + Return courses accessible to the user using a hybrid AuthZ + legacy approach. + + Flow: + 1. Determine candidate course keys: + - Staff: all courses (full scan). + - Non-staff: derived from AuthZ scopes and legacy access. + + 2. Single-pass access evaluation: + - Use AuthZ or legacy checks per course (based on feature flags). + - Collect only accessible course keys. + + 3. Batch fetch courses: + - Retrieve all valid courses in one query (ordered by creation date). + + 4. Apply request-based filters. + + Returns: + tuple: + - list[CourseOverview]: Accessible courses. + - list: In-process course actions (staff only). """ user = request.user is_staff_user = GlobalStaff().has_user(user) or user.is_superuser - authz_scopes = None - legacy_accesses = None in_process_actions = [] # Step 1: Determine candidate keys if is_staff_user: - # unavoidable full scan - candidate_courses = CourseOverview.get_all_courses() - candidate_keys = [c.id for c in candidate_courses] + # Unavoidable full scan + # however, we only fetch the course keys here for the access check, + # and defer fetching the full course objects until after filtering by access + candidate_keys = CourseOverview.get_all_courses().values_list("id", flat=True) # Compute actions once for staff users since they have access to all courses in_process_actions = get_in_process_course_actions(request) else: - candidate_keys, authz_scopes, legacy_accesses = _get_candidate_course_keys(request) - - if not candidate_keys: - return [], in_process_actions + candidate_keys = _get_candidate_course_keys(request) # Step 2: Single-pass decision → collect valid keys valid_course_keys = set() - for course_key in candidate_keys: - if core_toggles.enable_authz_course_authoring(course_key): - if _has_authz_access(course_key, is_staff_user, authz_scopes): - valid_course_keys.add(course_key) - else: - if _has_legacy_access(course_key, is_staff_user, legacy_accesses): - valid_course_keys.add(course_key) + if is_staff_user or user_has_course_permission( + user, + COURSES_VIEW_COURSE.identifier, + course_key, + LegacyAuthoringPermission.READ, + ): + valid_course_keys.add(course_key) if not valid_course_keys: return [], in_process_actions - # Step 3: Batch fetch (key optimization) + # Step 3: Batch fetch valid courses with a single query, ordered by creation date courses = CourseOverview.get_all_courses( filter_={'id__in': list(valid_course_keys)} - ).order_by('created') # default ordering is by last modified date, descending + ).order_by('created') # default ordering is by created date - # Step 4: Apply filters once + # Step 4: Apply filters (e.g. search, active/archived status, ordering) courses = _apply_query_filters(request, courses) - return list(courses), in_process_actions + return courses, in_process_actions def _process_courses_list(courses_iter, in_process_course_actions, split_archived=False):