From b6a17c3fb31a4d85e6447361cb11a2d8a2e4af36 Mon Sep 17 00:00:00 2001 From: Samuel Allan Date: Wed, 4 Mar 2026 17:49:06 +1030 Subject: [PATCH] wip: work on simplifying reindex workflow for studio Work in progress and subject to change. Private-ref: https://tasks.opencraft.com/browse/BB-10499 --- .../management/commands/prompt.py | 3 +- openedx/core/djangoapps/content/search/api.py | 397 ++++++++---------- .../commands/configure_studio_index.py | 47 +++ .../management/commands/reindex_studio.py | 42 +- 4 files changed, 240 insertions(+), 249 deletions(-) create mode 100644 openedx/core/djangoapps/content/search/management/commands/configure_studio_index.py diff --git a/cms/djangoapps/contentstore/management/commands/prompt.py b/cms/djangoapps/contentstore/management/commands/prompt.py index b53be18c9ec7..8d3e1e67542a 100644 --- a/cms/djangoapps/contentstore/management/commands/prompt.py +++ b/cms/djangoapps/contentstore/management/commands/prompt.py @@ -2,11 +2,12 @@ Takes user input. """ +from typing import Literal import sys -def query_yes_no(question, default="yes"): +def query_yes_no(question: str, default: Literal["yes", "no"] | None ="yes") -> bool: """Ask a yes/no question via raw_input() and return their answer. "question" is a string that is presented to the user. diff --git a/openedx/core/djangoapps/content/search/api.py b/openedx/core/djangoapps/content/search/api.py index b36beeba4710..d424d6756b2b 100644 --- a/openedx/core/djangoapps/content/search/api.py +++ b/openedx/core/djangoapps/content/search/api.py @@ -73,42 +73,12 @@ _MEILI_CLIENT = None _MEILI_API_KEY_UID = None -LOCK_EXPIRE = 24 * 60 * 60 # Lock expires in 24 hours - MAX_ACCESS_IDS_IN_FILTER = 1_000 MAX_ORGS_IN_FILTER = 1_000 EXCLUDED_XBLOCK_TYPES = ['course', 'course_info'] -@contextmanager -def _index_rebuild_lock() -> Generator[str, None, None]: - """ - Lock to prevent that more than one rebuild is running at the same time - """ - lock_id = f"lock-meilisearch-index-{STUDIO_INDEX_NAME}" - new_index_name = STUDIO_INDEX_NAME + "_new" - - status = cache.add(lock_id, new_index_name, LOCK_EXPIRE) - - if not status: - # Lock already acquired - raise RuntimeError("Rebuild already in progress") - - # Lock acquired - try: - yield new_index_name - finally: - # Release the lock - cache.delete(lock_id) - - -def _get_running_rebuild_index_name() -> str | None: - lock_id = f"lock-meilisearch-index-{STUDIO_INDEX_NAME}" - - return cache.get(lock_id) - - def _get_meilisearch_client(): """ Get the Meiliesearch client @@ -190,51 +160,6 @@ def _index_exists(index_name: str) -> bool: return True -@contextmanager -def _using_temp_index(status_cb: Callable[[str], None] | None = None) -> Generator[str, None, None]: - """ - Create a new temporary Meilisearch index, populate it, then swap it to - become the active index. - - Args: - status_cb (Callable): A callback function to report status messages - """ - if status_cb is None: - status_cb = log.info - - client = _get_meilisearch_client() - status_cb("Checking index...") - with _index_rebuild_lock() as temp_index_name: - if _index_exists(temp_index_name): - status_cb("Temporary index already exists. Deleting it...") - _wait_for_meili_task(client.delete_index(temp_index_name)) - - status_cb("Creating new index...") - _wait_for_meili_task( - client.create_index(temp_index_name, {'primaryKey': 'id'}) - ) - new_index_created = client.get_index(temp_index_name).created_at - - yield temp_index_name - - if not _index_exists(STUDIO_INDEX_NAME): - # We have to create the "target" index before we can successfully swap the new one into it: - status_cb("Preparing to swap into index (first time)...") - _wait_for_meili_task(client.create_index(STUDIO_INDEX_NAME)) - status_cb("Swapping index...") - client.swap_indexes([{'indexes': [temp_index_name, STUDIO_INDEX_NAME]}]) - # If we're using an API key that's restricted to certain index prefix(es), we won't be able to get the status - # of this request unfortunately. https://github.com/meilisearch/meilisearch/issues/4103 - while True: - time.sleep(1) - if client.get_index(STUDIO_INDEX_NAME).created_at != new_index_created: - status_cb("Waiting for swap completion...") - else: - break - status_cb("Deleting old index...") - _wait_for_meili_task(client.delete_index(temp_index_name)) - - def _index_is_empty(index_name: str) -> bool: """ Check if an index is empty @@ -250,7 +175,8 @@ def _index_is_empty(index_name: str) -> bool: def _configure_index(index_name): """ Configure the index. The following index settings are best changed on an empty index. - Changing them on a populated index will "re-index all documents in the index", which can take some time. + Changing them on a populated index will "re-index all documents in the index", + which can take some time, although not as much time as really re-indexing everything. Args: index_name (str): The name of the index to configure @@ -336,6 +262,12 @@ def is_meilisearch_enabled() -> bool: return False +def reset_incremental_index_state(status_cb: Callable[[str], None] | None = None) -> None: + # Ensure that the next index will start from scratch, because none are indexed now. + status_cb("Resetting incremental indexing state...") + IncrementalIndexCompleted.objects.all().delete() + + def reset_index(status_cb: Callable[[str], None] | None = None) -> None: """ Reset the Meilisearch index, deleting all documents and reconfiguring it @@ -343,10 +275,23 @@ def reset_index(status_cb: Callable[[str], None] | None = None) -> None: if status_cb is None: status_cb = log.info - status_cb("Creating new empty index...") - with _using_temp_index(status_cb) as temp_index_name: - _configure_index(temp_index_name) - status_cb("Index recreated!") + client = _get_meilisearch_client() + if _index_exists(STUDIO_INDEX_NAME): + status_cb("Deleting existing index...") + _wait_for_meili_task(client.delete_index(STUDIO_INDEX_NAME)) + else: + status_cb("No existing index...") + + reset_incremental_index_state(status_cb) + + status_cb("Creating new index...") + _wait_for_meili_task( + client.create_index(STUDIO_INDEX_NAME, {'primaryKey': 'id'}) + ) + + status_cb("Configuring the new index...") + _configure_index(STUDIO_INDEX_NAME) + status_cb("Index reset complete.") @@ -385,17 +330,17 @@ def init_index(status_cb: Callable[[str], None] | None = None, warn_cb: Callable if warn_cb is None: warn_cb = log.warning - if _index_exists(STUDIO_INDEX_NAME): - if _index_is_empty(STUDIO_INDEX_NAME): + index_name = STUDIO_INDEX_NAME + + if _index_exists(index_name): + if not _is_index_configured(index_name): + status_cb("Configuring index (this may take several minutes)...") + # TODO: For some configuration changes, this can take several minutes if there is a lot of content. + # Figure out when is safe to run this (eg. it might be too long for migrations?). + _configure_index(index_name) + if _index_is_empty(index_name): warn_cb( "The studio search index is empty. Please run ./manage.py cms reindex_studio" - " [--incremental]" - ) - return - if not _is_index_configured(STUDIO_INDEX_NAME): - warn_cb( - "A rebuild of the index is required. Please run ./manage.py cms reindex_studio" - " [--incremental]" ) return status_cb("Index already exists and is configured.") @@ -432,20 +377,24 @@ def add_with_children(block): return docs -def rebuild_index(status_cb: Callable[[str], None] | None = None, incremental=False) -> None: # lint-amnesty, pylint: disable=too-many-statements +# TODO: Figure something out to avoid this being run multiple times simultaneously. +def rebuild_index(status_cb: Callable[[str], None] | None = None, warn_cb: Callable[[str], None] | None = None) -> None: # lint-amnesty, pylint: disable=too-many-statements """ Rebuild the Meilisearch index from scratch """ if status_cb is None: status_cb = log.info + if warn_cb is None: + warn_cb = log.warning + + # Ensure the index exists and is configured before rebuilding + init_index(status_cb, warn_cb) client = _get_meilisearch_client() # Get the lists of libraries status_cb("Counting libraries...") - keys_indexed = [] - if incremental: - keys_indexed = list(IncrementalIndexCompleted.objects.values_list("context_key", flat=True)) + keys_indexed = list(IncrementalIndexCompleted.objects.values_list("context_key", flat=True)) lib_keys = [ lib.library_key for lib in lib_api.ContentLibrary.objects.select_related("org").only("org", "slug").order_by("-id") @@ -464,149 +413,139 @@ def rebuild_index(status_cb: Callable[[str], None] | None = None, incremental=Fa num_blocks_done = 0 # How many individual components/XBlocks we've indexed status_cb(f"Found {num_courses} courses, {num_libraries} libraries.") - with _using_temp_index(status_cb) if not incremental else nullcontext(STUDIO_INDEX_NAME) as index_name: - ############## Configure the index ############## + index_name = STUDIO_INDEX_NAME - # The index settings are best changed on an empty index. - # Changing them on a populated index will "re-index all documents in the index", which can take some time - # and use more RAM. Instead, we configure an empty index then populate it one course/library at a time. - if not incremental: - _configure_index(index_name) + ############## Libraries ############## + status_cb("Indexing libraries...") - ############## Libraries ############## - status_cb("Indexing libraries...") - - def index_library(lib_key: LibraryLocatorV2) -> list: - docs = [] - for component in lib_api.get_library_components(lib_key): - try: - metadata = lib_api.LibraryXBlockMetadata.from_component(lib_key, component) - doc = {} - doc.update(searchable_doc_for_library_block(metadata)) - doc.update(searchable_doc_tags(metadata.usage_key)) - doc.update(searchable_doc_collections(metadata.usage_key)) - doc.update(searchable_doc_containers(metadata.usage_key, "units")) - docs.append(doc) - except Exception as err: # pylint: disable=broad-except - status_cb(f"Error indexing library component {component}: {err}") - if docs: - try: - # Add all the docs in this library at once (usually faster than adding one at a time): - _wait_for_meili_task(client.index(index_name).add_documents(docs)) - except (TypeError, KeyError, MeilisearchError) as err: - status_cb(f"Error indexing library {lib_key}: {err}") - return docs - - ############## Collections ############## - def index_collection_batch(batch, num_done, library_key) -> int: - docs = [] - for collection in batch: - try: - collection_key = lib_api.library_collection_locator(library_key, collection.key) - doc = searchable_doc_for_collection(collection_key, collection=collection) - doc.update(searchable_doc_tags(collection_key)) - docs.append(doc) - except Exception as err: # pylint: disable=broad-except - status_cb(f"Error indexing collection {collection}: {err}") - num_done += 1 - - if docs: - try: - # Add docs in batch of 100 at once (usually faster than adding one at a time): - _wait_for_meili_task(client.index(index_name).add_documents(docs)) - except (TypeError, KeyError, MeilisearchError) as err: - status_cb(f"Error indexing collection batch {p}: {err}") - return num_done - - ############## Containers ############## - def index_container_batch(batch, num_done, library_key) -> int: - docs = [] - for container in batch: - try: - container_key = lib_api.library_container_locator( - library_key, - container, - ) - doc = searchable_doc_for_container(container_key) - doc.update(searchable_doc_tags(container_key)) - doc.update(searchable_doc_collections(container_key)) - container_type = lib_api.ContainerType(container_key.container_type) - match container_type: - case lib_api.ContainerType.Unit: - doc.update(searchable_doc_containers(container_key, "subsections")) - case lib_api.ContainerType.Subsection: - doc.update(searchable_doc_containers(container_key, "sections")) - docs.append(doc) - except Exception as err: # pylint: disable=broad-except - status_cb(f"Error indexing container {container.key}: {err}") - num_done += 1 - - if docs: - try: - # Add docs in batch of 100 at once (usually faster than adding one at a time): - _wait_for_meili_task(client.index(index_name).add_documents(docs)) - except (TypeError, KeyError, MeilisearchError) as err: - status_cb(f"Error indexing container batch {p}: {err}") - return num_done - - for lib_key in lib_keys: - status_cb(f"{num_contexts_done + 1}/{num_contexts}. Now indexing blocks in library {lib_key}") - lib_docs = index_library(lib_key) - num_blocks_done += len(lib_docs) - - # To reduce memory usage on large instances, split up the Collections into pages of 100 collections: - library = lib_api.get_library(lib_key) - collections = content_api.get_collections(library.learning_package_id, enabled=True) - num_collections = collections.count() - num_collections_done = 0 - status_cb(f"{num_collections_done}/{num_collections}. Now indexing collections in library {lib_key}") - paginator = Paginator(collections, 100) - for p in paginator.page_range: - num_collections_done = index_collection_batch( - paginator.page(p).object_list, - num_collections_done, - lib_key, - ) - if incremental: - IncrementalIndexCompleted.objects.get_or_create(context_key=lib_key) - status_cb(f"{num_collections_done}/{num_collections} collections indexed for library {lib_key}") - - # Similarly, batch process Containers (units, sections, etc) in pages of 100 - containers = content_api.get_containers(library.learning_package_id) - num_containers = containers.count() - num_containers_done = 0 - status_cb(f"{num_containers_done}/{num_containers}. Now indexing containers in library {lib_key}") - paginator = Paginator(containers, 100) - for p in paginator.page_range: - num_containers_done = index_container_batch( - paginator.page(p).object_list, - num_containers_done, - lib_key, + def index_library(lib_key: LibraryLocatorV2) -> list: + docs = [] + for component in lib_api.get_library_components(lib_key): + try: + metadata = lib_api.LibraryXBlockMetadata.from_component(lib_key, component) + doc = {} + doc.update(searchable_doc_for_library_block(metadata)) + doc.update(searchable_doc_tags(metadata.usage_key)) + doc.update(searchable_doc_collections(metadata.usage_key)) + doc.update(searchable_doc_containers(metadata.usage_key, "units")) + docs.append(doc) + except Exception as err: # pylint: disable=broad-except + status_cb(f"Error indexing library component {component}: {err}") + if docs: + try: + # Add all the docs in this library at once (usually faster than adding one at a time): + _wait_for_meili_task(client.index(index_name).add_documents(docs)) + except (TypeError, KeyError, MeilisearchError) as err: + status_cb(f"Error indexing library {lib_key}: {err}") + return docs + + ############## Collections ############## + def index_collection_batch(batch, num_done, library_key) -> int: + docs = [] + for collection in batch: + try: + collection_key = lib_api.library_collection_locator(library_key, collection.key) + doc = searchable_doc_for_collection(collection_key, collection=collection) + doc.update(searchable_doc_tags(collection_key)) + docs.append(doc) + except Exception as err: # pylint: disable=broad-except + status_cb(f"Error indexing collection {collection}: {err}") + num_done += 1 + + if docs: + try: + # Add docs in batch of 100 at once (usually faster than adding one at a time): + _wait_for_meili_task(client.index(index_name).add_documents(docs)) + except (TypeError, KeyError, MeilisearchError) as err: + status_cb(f"Error indexing collection batch {p}: {err}") + return num_done + + ############## Containers ############## + def index_container_batch(batch, num_done, library_key) -> int: + docs = [] + for container in batch: + try: + container_key = lib_api.library_container_locator( + library_key, + container, ) - status_cb(f"{num_containers_done}/{num_containers} containers indexed for library {lib_key}") - if incremental: - IncrementalIndexCompleted.objects.get_or_create(context_key=lib_key) + doc = searchable_doc_for_container(container_key) + doc.update(searchable_doc_tags(container_key)) + doc.update(searchable_doc_collections(container_key)) + container_type = lib_api.ContainerType(container_key.container_type) + match container_type: + case lib_api.ContainerType.Unit: + doc.update(searchable_doc_containers(container_key, "subsections")) + case lib_api.ContainerType.Subsection: + doc.update(searchable_doc_containers(container_key, "sections")) + docs.append(doc) + except Exception as err: # pylint: disable=broad-except + status_cb(f"Error indexing container {container.key}: {err}") + num_done += 1 - num_contexts_done += 1 + if docs: + try: + # Add docs in batch of 100 at once (usually faster than adding one at a time): + _wait_for_meili_task(client.index(index_name).add_documents(docs)) + except (TypeError, KeyError, MeilisearchError) as err: + status_cb(f"Error indexing container batch {p}: {err}") + return num_done + + for lib_key in lib_keys: + status_cb(f"{num_contexts_done + 1}/{num_contexts}. Now indexing blocks in library {lib_key}") + lib_docs = index_library(lib_key) + num_blocks_done += len(lib_docs) + + # To reduce memory usage on large instances, split up the Collections into pages of 100 collections: + library = lib_api.get_library(lib_key) + collections = content_api.get_collections(library.learning_package_id, enabled=True) + num_collections = collections.count() + num_collections_done = 0 + status_cb(f"{num_collections_done}/{num_collections}. Now indexing collections in library {lib_key}") + paginator = Paginator(collections, 100) + for p in paginator.page_range: + num_collections_done = index_collection_batch( + paginator.page(p).object_list, + num_collections_done, + lib_key, + ) + IncrementalIndexCompleted.objects.get_or_create(context_key=lib_key) + status_cb(f"{num_collections_done}/{num_collections} collections indexed for library {lib_key}") + + # Similarly, batch process Containers (units, sections, etc) in pages of 100 + containers = content_api.get_containers(library.learning_package_id) + num_containers = containers.count() + num_containers_done = 0 + status_cb(f"{num_containers_done}/{num_containers}. Now indexing containers in library {lib_key}") + paginator = Paginator(containers, 100) + for p in paginator.page_range: + num_containers_done = index_container_batch( + paginator.page(p).object_list, + num_containers_done, + lib_key, + ) + status_cb(f"{num_containers_done}/{num_containers} containers indexed for library {lib_key}") + IncrementalIndexCompleted.objects.get_or_create(context_key=lib_key) - ############## Courses ############## - status_cb("Indexing courses...") - # To reduce memory usage on large instances, split up the CourseOverviews into pages of 1,000 courses: + num_contexts_done += 1 - paginator = Paginator(CourseOverview.objects.only('id', 'display_name'), 1000) - for p in paginator.page_range: - for course in paginator.page(p).object_list: - status_cb( - f"{num_contexts_done + 1}/{num_contexts}. Now indexing course {course.display_name} ({course.id})" - ) - if course.id in keys_indexed: - num_contexts_done += 1 - continue - course_docs = index_course(course.id, index_name) - if incremental: - IncrementalIndexCompleted.objects.get_or_create(context_key=course.id) + ############## Courses ############## + status_cb("Indexing courses...") + # To reduce memory usage on large instances, split up the CourseOverviews into pages of 1,000 courses: + + paginator = Paginator(CourseOverview.objects.only('id', 'display_name'), 1000) + for p in paginator.page_range: + for course in paginator.page(p).object_list: + status_cb( + f"{num_contexts_done + 1}/{num_contexts}. Now indexing course {course.display_name} ({course.id})" + ) + if course.id in keys_indexed: num_contexts_done += 1 - num_blocks_done += len(course_docs) + continue + course_docs = index_course(course.id, index_name) + IncrementalIndexCompleted.objects.get_or_create(context_key=course.id) + num_contexts_done += 1 + num_blocks_done += len(course_docs) IncrementalIndexCompleted.objects.all().delete() status_cb(f"Done! {num_blocks_done} blocks indexed across {num_contexts_done} courses, collections and libraries.") diff --git a/openedx/core/djangoapps/content/search/management/commands/configure_studio_index.py b/openedx/core/djangoapps/content/search/management/commands/configure_studio_index.py new file mode 100644 index 000000000000..81a35ba3b2e3 --- /dev/null +++ b/openedx/core/djangoapps/content/search/management/commands/configure_studio_index.py @@ -0,0 +1,47 @@ +""" +Command to incrementially index content in the search index for courses (in Studio, i.e. Draft +mode), in Meilisearch. + +See also ./reindex_studio.py + +See also cms/djangoapps/contentstore/management/commands/reindex_course.py which +indexes LMS (published) courses in ElasticSearch. +""" + +from django.core.management import BaseCommand, CommandError + +from cms.djangoapps.contentstore.management.commands.prompt import query_yes_no +from ... import api + + +class Command(BaseCommand): + """ + Build or re-build the Meilisearch search index for courses and libraries in Studio. + + This is separate from LMS search features like courseware search or forum search. + """ + + def add_arguments(self, parser): + parser.add_argument( + "--reset", + action="store_true", + help=( + "Reset the index to a new clean state. " + "Warning: this deletes everything from the index." + ), + default=False, + ) + + def handle(self, *args, **options): + """ + Configure the index + """ + if not api.is_meilisearch_enabled(): + raise CommandError( + "Meilisearch is not enabled. Please set MEILISEARCH_ENABLED to True in your settings." + ) + + if options["reset"]: + api.reset_index(self.stdout.write) + else: + api.init_index(self.stdout.write, self.stderr.write) diff --git a/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py b/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py index a320790739a1..afb2fe15b690 100644 --- a/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py +++ b/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py @@ -1,12 +1,16 @@ """ -Command to build or re-build the search index for courses (in Studio, i.e. Draft +Command to incrementially index content in the search index for courses (in Studio, i.e. Draft mode), in Meilisearch. +See also ./configure_studio_index.py + See also cms/djangoapps/contentstore/management/commands/reindex_course.py which indexes LMS (published) courses in ElasticSearch. """ + from django.core.management import BaseCommand, CommandError +from cms.djangoapps.contentstore.management.commands.prompt import query_yes_no from ... import api @@ -17,27 +21,27 @@ class Command(BaseCommand): This is separate from LMS search features like courseware search or forum search. """ - # TODO: improve this - see https://github.com/openedx/edx-platform/issues/36868 - def add_arguments(self, parser): - parser.add_argument("--experimental", action="store_true") # kept for compatibility but ignored. - parser.add_argument("--reset", action="store_true") - parser.add_argument("--init", action="store_true") - parser.add_argument("--incremental", action="store_true") - parser.set_defaults(experimental=False, reset=False, init=False, incremental=False) + parser.add_argument( + "--reset-progress", + action="store_true", + help=( + "Reset the incremental indexing state, and start indexing from scratch. " + "No existing indexed data is deleted." + ), + default=False, + ) def handle(self, *args, **options): """ - Build a new search index for Studio, containing content from courses and libraries + Reindex content. """ if not api.is_meilisearch_enabled(): - raise CommandError("Meilisearch is not enabled. Please set MEILISEARCH_ENABLED to True in your settings.") - - if options["reset"]: - api.reset_index(self.stdout.write) - elif options["init"]: - api.init_index(self.stdout.write, self.stderr.write) - elif options["incremental"]: - api.rebuild_index(self.stdout.write, incremental=True) - else: - api.rebuild_index(self.stdout.write) + raise CommandError( + "Meilisearch is not enabled. Please set MEILISEARCH_ENABLED to True in your settings." + ) + + if options["reset-progress"]: + api.reset_incremental_index_state(self.stdout.write) + + api.rebuild_index(self.stdout.write, self.stderr.write)