-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclasses.py
More file actions
131 lines (103 loc) · 4.6 KB
/
classes.py
File metadata and controls
131 lines (103 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Encyclopedia of Life (EoL) dataset renaming module.
This module provides components for renaming images in the EoL dataset by merging
source identifiers from the original batch data. It includes filter, scheduler,
and runner classes for the EoL rename operation within the TreeOfLife toolbox.
"""
import os
from typing import List
import pandas as pd
from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.filters import PythonFilterToolBase, FilterRegister
from TreeOfLife_toolbox.main.runners import MPIRunnerTool, RunnerRegister
from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister
@FilterRegister("eol_rename")
class EoLRenameFilter(PythonFilterToolBase):
"""
Filter class for EoL rename operations.
This class registers the 'eol_rename' filter in the filtering system.
It doesn't override any methods as it uses the default behavior from
the PythonFilterToolBase class.
Attributes:
filter_name: Name of the filter used for registration and identification.
"""
def __init__(self, cfg: Config):
super().__init__(cfg)
self.filter_name: str = "eol_rename"
@SchedulerRegister("eol_rename")
class EoLRenameScheduleCreation(DefaultScheduler):
"""
Scheduler class for EoL rename operations.
This scheduler is responsible for creating the execution schedule for
the EoL rename operation. It uses the default scheduling behavior from
the DefaultScheduler class.
Attributes:
filter_name: Name of the filter used for registration and identification.
"""
def __init__(self, cfg: Config):
super().__init__(cfg)
self.filter_name: str = "eol_rename"
@RunnerRegister("eol_rename")
class EoLRenameRunner(MPIRunnerTool):
"""
Runner class for executing the EoL rename operations.
This class handles the actual processing of the EoL dataset images,
adding source identifiers by merging information from batch data
with downloaded image data.
Attributes:
filter_name: Name of the filter used for registration and identification.
data_scheme: List of fields used to partition the dataset.
verification_scheme: List of fields used for verification.
total_time: Maximum allowed execution time in seconds.
"""
def __init__(self, cfg: Config):
super().__init__(cfg)
self.filter_name: str = "eol_rename"
self.data_scheme: List[str] = ["server_name", "partition_id"]
self.verification_scheme: List[str] = ["server_name", "partition_id"]
self.total_time = 150
def apply_filter(
self, filtering_df: pd.DataFrame, server_name: str, partition_id: int
) -> int:
"""
Apply the EoL rename filter to a specific partition of data.
This method adds source identifiers to the downloaded images data by
merging 'EOL content ID' and 'EOL page ID' from the original batch data.
It concatenates these IDs to create a 'source_id' field and saves the
updated data back to the original successes.parquet file.
Args:
filtering_df: DataFrame containing the filter data.
server_name: Name of the server containing the data.
partition_id: Partition ID within the server.
Returns:
int: Number of records processed.
Notes:
- Checks for time constraints during operation.
- Skips processing if the parquet path doesn't exist.
"""
self.is_enough_time()
parquet_path = os.path.join(
self.downloaded_images_path,
f"server_name={server_name}",
f"partition_id={partition_id}",
"successes.parquet",
)
server_batch_path = os.path.join(
self.config.get_folder("urls_folder"),
f"server_name={server_name}",
f"partition_id={partition_id}",
)
if not os.path.exists(parquet_path):
self.logger.info(f"Path doesn't exists: {parquet_path}")
return 0
parquet = pd.read_parquet(parquet_path)
server_batch = pd.read_parquet(
server_batch_path, columns=["EOL content ID", "EOL page ID", "uuid"]
)
self.is_enough_time()
parquet = parquet.merge(server_batch, on="uuid", how="left", validate="1:1")
parquet["source_id"] = parquet["EOL content ID"] + "_" + parquet["EOL page ID"]
parquet.to_parquet(
parquet_path, index=False, compression="zstd", compression_level=3
)
return len(parquet)