-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathabc_unit.py
More file actions
42 lines (32 loc) · 1.16 KB
/
abc_unit.py
File metadata and controls
42 lines (32 loc) · 1.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Iterable
from sdp.data_units.data_entry import DataEntry
class DataSource(ABC):
def __init__(self, source: Any):
self.source = source
self.number_of_entries = 0
self.total_duration = 0.0
self.metrics = []
@abstractmethod
def read_entry(self) -> Dict:
pass
@abstractmethod
def read_entries(self, in_memory_chunksize: int = None) -> List[Dict]:
pass
@abstractmethod
def write_entry(self, data_entry: DataEntry):
pass
@abstractmethod
def write_entries(self, data_entries: List[DataEntry]):
pass
def update_metrics(self, data_entry: DataEntry):
if data_entry.metrics is not None:
self.metrics.append(data_entry.metrics)
if data_entry.data is dict:
self.total_duration += data_entry.data.get("duration", 0)
self.number_of_entries += 1
class DataSetter(ABC):
def __init__(self, processors_cfgs: List[Dict]):
self.processors_cfgs = processors_cfgs
def get_resolvable_link(*args):
return f"${{{'.' + '.'.join(list(map(str, args)))}}}"