@@ -222,32 +222,33 @@ def apply(
222222 values = tuple (group [facet ].unique ())
223223 supplementary_facets [facet ] += values
224224
225- supplementary_group = data_catalog
226- for facet , values in supplementary_facets .items ():
227- mask = supplementary_group [facet ].isin (values )
228- supplementary_group = supplementary_group [mask ]
225+ mask = data_catalog [list (supplementary_facets )].isin (supplementary_facets ).all (axis = "columns" )
226+ supplementary_group = data_catalog [mask ]
229227 if not supplementary_group .empty :
230228 matching_facets = list (self .matching_facets )
231229 facets = matching_facets + list (self .optional_matching_facets )
232230 datasets = group [facets ].drop_duplicates ()
233- indices = set ( )
231+ select = pd . Series ( False , index = supplementary_group . index )
234232 for i in range (len (datasets )):
235233 dataset = datasets .iloc [i ]
236234 # Restrict the supplementary datasets to those that match the main dataset.
237235 supplementaries = supplementary_group [
238- (supplementary_group [matching_facets ] == dataset [matching_facets ]).all (1 )
236+ (supplementary_group [matching_facets ] == dataset [matching_facets ]).all (axis = "columns" )
239237 ]
240238 if not supplementaries .empty :
241239 # Select the best matching supplementary dataset based on the optional matching facets.
242- scores = (supplementaries [facets ] == dataset ).sum (axis = 1 )
243- matches = supplementaries [scores == scores .max ()]
244- if "version" in facets :
240+ scores = (supplementaries [facets ] == dataset ).sum (axis = "columns" )
241+ supplementaries = supplementaries [scores == scores .max ()]
242+ if "version" in supplementaries . columns :
245243 # Select the latest version if there are multiple matches
246- matches = matches [matches ["version" ] == matches ["version" ].max ()]
247- # Select one match per dataset
248- indices .add (matches .index [0 ])
244+ supplementaries = supplementaries [
245+ supplementaries ["version" ] == supplementaries ["version" ].max ()
246+ ]
247+ # Select only the first group if there are still multiple matches
248+ first_supplementary_dataset = supplementaries [facets ].drop_duplicates ().iloc [0 ]
249+ select |= (supplementaries [facets ] == first_supplementary_dataset ).all (axis = "columns" )
249250
250- supplementary_group = supplementary_group . loc [ list ( indices )]. drop_duplicates ()
251+ supplementary_group = supplementary_group [ select ]
251252
252253 return pd .concat ([group , supplementary_group ])
253254
@@ -525,15 +526,81 @@ def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame
525526
526527
527528@frozen
528- class SelectParentExperiment :
529+ class AddParentDataset :
529530 """
530- Include a dataset's parent experiment in the selection
531+ Include a dataset's parent in the selection.
532+ """
533+
534+ parent_facet_map : dict [str , str ]
535+ """
536+ Mapping from child to parent facets.
531537 """
532538
533539 def apply (self , group : pd .DataFrame , data_catalog : pd .DataFrame ) -> pd .DataFrame :
534540 """
535- Include a dataset's parent experiment in the selection
541+ Include a dataset's parent in the selection.
536542
537- Not yet implemented
538543 """
539- raise NotImplementedError ("This is not implemented yet" ) # pragma: no cover
544+ all_parent_facets = sorted ({* self .parent_facet_map .keys (), * self .parent_facet_map .values ()})
545+
546+ # Remove datasets that do not have all parent facets set.
547+ valid_group = group [all_parent_facets ].dropna (axis = "index" )
548+
549+ # Add the parent datasets from the data catalog.
550+ select = pd .Series (False , index = data_catalog .index )
551+ select .loc [valid_group .index ] = True
552+ for _ , child_dataset in valid_group .groupby (all_parent_facets ):
553+ child_facets = {facet : child_dataset [facet ].unique ().tolist () for facet in all_parent_facets }
554+ parent_facets = {
555+ parent_facet : child_facets [child_facet ]
556+ for parent_facet , child_facet in self .parent_facet_map .items ()
557+ }
558+ parent_dataset = data_catalog [
559+ data_catalog [list (parent_facets )].isin (parent_facets ).all (axis = "columns" )
560+ ]
561+ if parent_dataset .empty :
562+ # Drop the child dataset if no parent dataset is found.
563+ logger .debug (
564+ f"Constraint { self } not satisfied because no parent dataset found for "
565+ f"{ ', ' .join (f'{ k } ={ v } ' for k , v in child_facets .items ())} "
566+ )
567+ select .loc [child_dataset .index ] = False
568+ else :
569+ # Add the latest version of the dataset to the selection.
570+ parent_dataset = parent_dataset [parent_dataset ["version" ] == parent_dataset ["version" ].max ()]
571+ select .loc [parent_dataset .index ] = True
572+
573+ return data_catalog [select ]
574+
575+ @classmethod
576+ def from_defaults (
577+ cls ,
578+ source_type : SourceDatasetType ,
579+ ) -> Self :
580+ """
581+ Include a dataset's parent in the selection.
582+
583+ The constraint is created using the defaults for the source_type.
584+
585+ Parameters
586+ ----------
587+ source_type:
588+ The source_type of the variable to add.
589+
590+ Returns
591+ -------
592+ :
593+ A constraint to include a dataset's parent in the selection.
594+
595+ """
596+ parent_facet_options = {
597+ SourceDatasetType .CMIP6 : {
598+ "source_id" : "parent_source_id" ,
599+ "experiment_id" : "parent_experiment_id" ,
600+ "variant_label" : "parent_variant_label" ,
601+ "table_id" : "table_id" ,
602+ "variable_id" : "variable_id" ,
603+ "grid_label" : "grid_label" ,
604+ },
605+ }
606+ return cls (parent_facet_map = parent_facet_options [source_type ])
0 commit comments