From 6c3e1d90aba8eb7035be18629136f6dda3d39928 Mon Sep 17 00:00:00 2001 From: npuchois Date: Mon, 18 May 2026 16:32:44 +0200 Subject: [PATCH] refactor: mep BC 3.1 - add _key field in list table omop pg --- CHANGELOG.md | 16 ++++ .../cohort/pg/PGCohortCreation.scala | 87 ++++++++++--------- .../testCases/occurences/resource_2.csv | 2 +- .../resource_1.csv | 2 +- .../cohort/pg/PGCohortCreationTest.scala | 29 ++++++- 5 files changed, 89 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03af9dd..a3299bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### 🚀 Features + +- *(health)* Return 503 error if spark context is stopped +- Add local development documentation (#21) + +### 🚜 Refactor + +- Mep BC 3.1 - add _key field in list table omop pg +- Resource pmsi changes (#16) + +### 📚 Documentation + +- Add missing mode documentation + ## [2.10.0] - 2025-07-01 ### 🚀 Features diff --git a/src/main/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreation.scala b/src/main/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreation.scala index 901f957..f4053a2 100644 --- a/src/main/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreation.scala +++ b/src/main/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreation.scala @@ -12,8 +12,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession, functions => F} import org.hl7.fhir.r4.model.ListResource.ListMode /** - * @param pg pgTool obj - */ + * @param pg pgTool obj + */ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { private final val cohort_item_table_rw = AppConfig.get.pg.get.cohortConfig.cohortItemsTableName private final val cohort_table_rw = AppConfig.get.pg.get.cohortConfig.cohortTableName @@ -33,11 +33,12 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { } else { ("", "") } + val key_col = baseCohortId.map(_.toString).get val stmt = s""" - |insert into ${cohort_table_rw} - |(hash, title, ${note_text_column_name},${indentifier_col} _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size) - |values (-1, ?, ?,${identifier_val} ?, ?, '$cohort_provider_name', 'Practitioner', '${mode.toCode}', '${CohortStatus.RUNNING}', ?, now(), ?) + |insert into $cohort_table_rw + |(hash, title, $note_text_column_name,$indentifier_col _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key) + |values (-1, ?, ?,$identifier_val ?, ?, '$cohort_provider_name', 'Practitioner', '${mode.toCode}', '${CohortStatus.RUNNING}', ?, now(), ?, '$key_col') |returning id |""".stripMargin val result = pg @@ -47,7 +48,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { cohortDefinitionName, cohortDefinitionSyntax, ownerEntityId, - s"Practitioner/${ownerEntityId}", + s"Practitioner/$ownerEntityId", resourceType, size ) @@ -58,8 +59,8 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { } /** - * This loads both a cohort and its definition into postgres and solr - */ + * This loads both a cohort and its definition into postgres and solr + */ override def updateCohort(cohortId: Long, cohort: DataFrame, sourcePopulation: SourcePopulation, @@ -122,11 +123,11 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { private def readCohortDiffEntries(cohortId: Long): DataFrame = { val stmt = s""" - |select date,_itemreferenceid,deleted - |from ${cohort_item_table_rw} - |join ${cohort_table_rw} on ${cohort_table_rw}.id = ${cohort_item_table_rw}._listid - |where ${cohort_table_rw}.identifier___official__value = '$cohortId' - |""".stripMargin + |select date,_itemreferenceid,deleted + |from ${cohort_item_table_rw} + |join ${cohort_table_rw} on ${cohort_table_rw}.id = ${cohort_item_table_rw}._listid + |where ${cohort_table_rw}.identifier___official__value = '$cohortId' + |""".stripMargin pg.sqlExecWithResult(stmt) .select(col("date"), col("_itemreferenceid"), col("deleted")) .orderBy(col("date").asc) @@ -140,7 +141,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { for (sc_id <- sourcePopulation.cohortList.get) { val (list_list_id, list_relationship_concept_id) = (List(List(cohortDefinitionId, sc_id), List(sc_id, cohortDefinitionId)), - List(44818821, 44818823)) + List(44818821, 44818823)) for ((list_id, relationship_concept_id) <- list_list_id zip list_relationship_concept_id) { val stmt = s""" @@ -155,24 +156,24 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { } /** - * This loads only a cohort definition into postgres. - * - * @param cohortDefinitionId id of the cohort - * @param count nb of patient of the cohort - */ + * This loads only a cohort definition into postgres. + * + * @param cohortDefinitionId id of the cohort + * @param count nb of patient of the cohort + */ private def uploadCount( - cohortDefinitionId: Long, - count: Long - ): Unit = { + cohortDefinitionId: Long, + count: Long + ): Unit = { setOmopCohortSize(cohortDefinitionId, count) setOmopCohortStatus(cohortDefinitionId, CohortStatus.FINISHED) setOmopCohortActive(cohortDefinitionId, status = true) } private def setOmopCohortSize( - cohortDefinitionId: Long, - count: Long - ): Unit = { + cohortDefinitionId: Long, + count: Long + ): Unit = { val stmt = s""" |update ${cohort_table_rw} @@ -183,9 +184,9 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { } private def setOmopCohortStatus( - cohortDefinitionId: Long, - status: CohortStatus.Value - ): Unit = { + cohortDefinitionId: Long, + status: CohortStatus.Value + ): Unit = { val stmt = s""" |update ${cohort_table_rw} @@ -196,9 +197,9 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { } private def setOmopCohortActive( - cohortDefinitionId: Long, - status: Boolean - ): Unit = { + cohortDefinitionId: Long, + status: Boolean + ): Unit = { val mode = if (status) "working" else "snapshot" val stmt = s""" @@ -220,22 +221,22 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging { "cohort dataframe shall have _listid, _provider, _provider and item__reference" ) pg.outputBulk(cohort_item_table_rw, - dfAddHash(df), - Some(4), - primaryKeys = Seq("_listid", "_itemreferenceid", "_provider")) + dfAddHash(df), + Some(4), + primaryKeys = Seq("_listid", "_itemreferenceid", "_provider")) } /** - * Adds a hash column based on several other columns - * - * @param df DataFrame - * @param columnsToExclude List[String] the columns not to be hashed - * @return DataFrame - */ + * Adds a hash column based on several other columns + * + * @param df DataFrame + * @param columnsToExclude List[String] the columns not to be hashed + * @return DataFrame + */ private def dfAddHash( - df: DataFrame, - columnsToExclude: List[String] = Nil - ): DataFrame = { + df: DataFrame, + columnsToExclude: List[String] = Nil + ): DataFrame = { df.withColumn( "hash", hash( diff --git a/src/test/resources/testCases/occurences/resource_2.csv b/src/test/resources/testCases/occurences/resource_2.csv index 16999ea..963f3da 100644 --- a/src/test/resources/testCases/occurences/resource_2.csv +++ b/src/test/resources/testCases/occurences/resource_2.csv @@ -1,4 +1,4 @@ -patient;_ref.encounter.period.start;_ref.encounter.period.end;recordedDate +patient;_ref.encounter.period.start;_ref.encounter.period.end;onsetDateTime 22;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z 22;2016-03-07T12:04:00Z;2016-03-07T12:04:00Z;2016-03-07T12:04:00Z 23;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z diff --git a/src/test/resources/testCases/temporalConstraintDirectChronologicalOrder/resource_1.csv b/src/test/resources/testCases/temporalConstraintDirectChronologicalOrder/resource_1.csv index 47937ac..9f00807 100644 --- a/src/test/resources/testCases/temporalConstraintDirectChronologicalOrder/resource_1.csv +++ b/src/test/resources/testCases/temporalConstraintDirectChronologicalOrder/resource_1.csv @@ -1,4 +1,4 @@ -patient;_ref.encounter.period.start;_ref.encounter.period.end;recordedDate +patient;_ref.encounter.period.start;_ref.encounter.period.end;onsetDateTime 22;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z 23;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z 24;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z diff --git a/src/test/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreationTest.scala b/src/test/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreationTest.scala index cf06594..b7c2803 100644 --- a/src/test/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreationTest.scala +++ b/src/test/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreationTest.scala @@ -41,8 +41,8 @@ class PGCohortCreationTest pgTools.sqlExecWithResult( """ |insert into list_cohort360 - |(hash, title, note__text, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size) - |values (-1, ?, ?, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?) + |(hash, title, note__text, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key) + |values (-1, ?, ?, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?, '-1') |returning id |""".stripMargin, List("test", "test", "test", "Practitioner/test", "test", 1) @@ -57,6 +57,31 @@ class PGCohortCreationTest 1) } + test("testCreateCohortWithBaseCohort") { + val pgTools = mock[PGTool] + val pgCohortCreation = new PGCohortCreation(pgTools) + val expectedResult: Dataset[Row] = mock[Dataset[Row]] + when(expectedResult.collect()).thenReturn(Array(Row(1L))) + when( + pgTools.sqlExecWithResult( + """ + |insert into list_cohort360 + |(hash, title, note__text, identifier, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key) + |values (-1, ?, ?, 42, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?, '42') + |returning id + |""".stripMargin, + List("test", "test", "test", "Practitioner/test", "test", 1) + )).thenReturn(expectedResult) + pgCohortCreation.createCohort("test", + Some("test"), + "test", + "test", + "test", + Some(42L), + ListMode.SNAPSHOT, + 1) + } + test("testUpdateCohort") { val pgTools = mock[PGTool]