Skip to content

Commit 455de3d

Browse files
committed
Add async materialized view refresh jobs
Adds refresh jobs for existing materialized views and invokes them: - Refreshes the published variants materialized view whenever a new score set is published. (We cannot delete a published score set, nor edit it's variants, so publication time is the only moment this view becomes outdated). - Refreshes all materialized views at 0300. Adds tests for newly placed publication view refresh. Clarifies connection -> inspector flow during mat view refresh. Opens #405.
1 parent 79ec917 commit 455de3d

5 files changed

Lines changed: 374 additions & 123 deletions

File tree

src/mavedb/db/view.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def _drop_view(element: DropView, compiler, **kw):
5050
return "DROP %s %s" % ("MATERIALIZED VIEW" if element.materialized else "VIEW", element.name)
5151

5252

53-
def view_exists(ddl: CreateView, target, connection: Session, materialized: bool, **kw):
53+
def view_exists(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
5454
inspector = sa.inspect(connection)
5555
if inspector is None:
5656
return False
@@ -59,7 +59,7 @@ def view_exists(ddl: CreateView, target, connection: Session, materialized: bool
5959
return ddl.name in view_names
6060

6161

62-
def view_doesnt_exist(ddl: CreateView, target, connection: Session, materialized: bool, **kw):
62+
def view_doesnt_exist(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
6363
return not view_exists(ddl, target, connection, materialized, **kw)
6464

6565

@@ -121,7 +121,7 @@ class MyView(Base):
121121
return t
122122

123123

124-
def refresh_mat_view(session, name, concurrently=True):
124+
def refresh_mat_view(session: Session, name: str, concurrently=True):
125125
"""
126126
Refreshes a single materialized view, given by `name`.
127127
"""
@@ -132,12 +132,15 @@ def refresh_mat_view(session, name, concurrently=True):
132132
session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name))
133133

134134

135-
# TODO: untested.
136-
def refresh_all_mat_views(session, concurrently=True):
135+
def refresh_all_mat_views(session: Session, concurrently=True):
137136
"""
138137
Refreshes all materialized views. Views are refreshed in non-deterministic order,
139138
so view definitions can't depend on each other.
140139
"""
141-
mat_views = session.inspect(session.engine).get_view_names()
142-
for v in mat_views:
143-
refresh_mat_view(session, v, concurrently)
140+
inspector = sa.inspect(session.connection())
141+
142+
if not inspector:
143+
return
144+
145+
for mv in inspector.get_materialized_view_names():
146+
refresh_mat_view(session, mv, concurrently)

src/mavedb/routers/score_sets.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,6 @@ async def upload_score_set_variant_data(
656656
except UnicodeDecodeError as e:
657657
raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.")
658658

659-
660659
if scores_file:
661660
# Although this is also updated within the variant creation job, update it here
662661
# as well so that we can display the proper UI components (queue invocation delay
@@ -1014,11 +1013,12 @@ async def delete_score_set(
10141013
response_model=score_set.ScoreSet,
10151014
response_model_exclude_none=True,
10161015
)
1017-
def publish_score_set(
1016+
async def publish_score_set(
10181017
*,
10191018
urn: str,
10201019
db: Session = Depends(deps.get_db),
10211020
user_data: UserData = Depends(require_current_user),
1021+
worker: ArqRedis = Depends(deps.get_worker),
10221022
) -> Any:
10231023
"""
10241024
Publish a score set.
@@ -1095,4 +1095,18 @@ def publish_score_set(
10951095
db.commit()
10961096
db.refresh(item)
10971097

1098+
# await the insertion of this job into the worker queue, not the job itself.
1099+
job = await worker.enqueue_job(
1100+
"refresh_published_variants_view",
1101+
correlation_id_for_context(),
1102+
user_data.user.id,
1103+
)
1104+
if job is not None:
1105+
save_to_logging_context({"worker_job_id": job.job_id})
1106+
logger.info(msg="Enqueud published variant materialized view refresh job.", extra=logging_context())
1107+
else:
1108+
logger.warning(
1109+
msg="Failed to enqueue published variant materialized view refresh job.", extra=logging_context()
1110+
)
1111+
10981112
return item

src/mavedb/worker/jobs.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sqlalchemy.orm import Session
1515

1616
from mavedb.data_providers.services import vrs_mapper
17+
from mavedb.db.view import refresh_all_mat_views
1718
from mavedb.lib.exceptions import MappingEnqueueError, NonexistentMappingReferenceError, NonexistentMappingResultsError
1819
from mavedb.lib.logging.context import format_raised_exception_info_as_dict
1920
from mavedb.lib.score_sets import (
@@ -29,6 +30,7 @@
2930
from mavedb.models.enums.mapping_state import MappingState
3031
from mavedb.models.enums.processing_state import ProcessingState
3132
from mavedb.models.mapped_variant import MappedVariant
33+
from mavedb.models.published_variant import PublishedVariantsMV
3234
from mavedb.models.score_set import ScoreSet
3335
from mavedb.models.user import User
3436
from mavedb.models.variant import Variant
@@ -50,7 +52,9 @@ async def mapping_in_execution(redis: ArqRedis, job_id: str):
5052
await redis.set(MAPPING_CURRENT_ID_NAME, "")
5153

5254

53-
def setup_job_state(ctx, invoker: int, resource: Optional[str], correlation_id: str):
55+
def setup_job_state(
56+
ctx, invoker: Optional[int], resource: Optional[str], correlation_id: Optional[str]
57+
) -> dict[str, Any]:
5458
ctx["state"][ctx["job_id"]] = {
5559
"application": "mavedb-worker",
5660
"user": invoker,
@@ -653,3 +657,20 @@ async def variant_mapper_manager(ctx: dict, correlation_id: str, updater_id: int
653657
db.commit()
654658

655659
return {"success": False, "enqueued_job": new_job_id}
660+
661+
662+
# TODO#405: Refresh materialized views within an executor.
663+
async def refresh_materialized_views(ctx: dict):
664+
logging_context = setup_job_state(ctx, None, None, None)
665+
logger.debug(msg="Began refresh materialized views.", extra=logging_context)
666+
refresh_all_mat_views(ctx["db"])
667+
logger.debug(msg="Done refreshing materialized views.", extra=logging_context)
668+
return {"success": True}
669+
670+
671+
async def refresh_published_variants_view(ctx: dict, correlation_id: str):
672+
logging_context = setup_job_state(ctx, None, None, correlation_id)
673+
logger.debug(msg="Began refresh of published variants materialized view.", extra=logging_context)
674+
PublishedVariantsMV.refresh(ctx["db"])
675+
logger.debug(msg="Done refreshing of published variants materialized view.", extra=logging_context)
676+
return {"success": True}

src/mavedb/worker/settings.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,29 @@
33
from typing import Callable
44

55
from arq.connections import RedisSettings
6-
from arq.cron import CronJob
6+
from arq.cron import CronJob, cron
77

88
from mavedb.data_providers.services import cdot_rest
99
from mavedb.db.session import SessionLocal
1010
from mavedb.lib.logging.canonical import log_job
11-
from mavedb.worker.jobs import create_variants_for_score_set, map_variants_for_score_set, variant_mapper_manager
11+
from mavedb.worker.jobs import (
12+
create_variants_for_score_set,
13+
map_variants_for_score_set,
14+
variant_mapper_manager,
15+
refresh_materialized_views,
16+
refresh_published_variants_view,
17+
)
1218

1319
# ARQ requires at least one task on startup.
1420
BACKGROUND_FUNCTIONS: list[Callable] = [
1521
create_variants_for_score_set,
1622
variant_mapper_manager,
1723
map_variants_for_score_set,
24+
refresh_published_variants_view,
25+
]
26+
BACKGROUND_CRONJOBS: list[CronJob] = [
27+
cron(refresh_materialized_views, name="refresh_all_materialized_views", hour=3, minute=0)
1828
]
19-
BACKGROUND_CRONJOBS: list[CronJob] = []
2029

2130
REDIS_IP = os.getenv("REDIS_IP") or "localhost"
2231
REDIS_PORT = int(os.getenv("REDIS_PORT") or 6379)

0 commit comments

Comments
 (0)