-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathrun.py
More file actions
113 lines (95 loc) · 5.06 KB
/
run.py
File metadata and controls
113 lines (95 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
from consensus_decentralization.aggregate import aggregate
from consensus_decentralization.map import apply_mapping
from consensus_decentralization.analyze import analyze
from consensus_decentralization.parse import parse
from consensus_decentralization.plot import plot
import consensus_decentralization.helper as hlp
logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)
def process_data(force_map, ledger_dir, ledger, output_dir):
clustering_flag = hlp.get_clustering_flag()
mapped_data_file = ledger_dir / hlp.get_mapped_data_filename(clustering_flag)
if force_map or not mapped_data_file.is_file():
raw_data_dirs = hlp.get_input_directories()
parsed_data = parse(ledger=ledger, input_dirs=raw_data_dirs)
return apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir)
return None
def main(ledgers, timeframe, estimation_window, frequency, population_windows,
force_map, interim_dir=hlp.INTERIM_DIR, results_dir=hlp.RESULTS_DIR):
"""
Executes the entire pipeline (parsing, mapping, analyzing) for some projects and timeframes.
:param ledgers: list of strings that correspond to the ledgers whose data should be analyzed
:param timeframe: tuple of (start_date, end_date) where each date is a datetime.date object.
:param estimation_window: int or None. The number of days to consider for the estimation of the power of an entity (
i.e. counting all the blocks produced by the entity within estimation_window days). If None, the entire
timeframe will be considered.
:param frequency: int or None. The number of days to consider for the frequency of the analysis (i.e. the number
of days between each data point considered in the analysis). If None, only one data point will be considered,
spanning the entire timeframe (i.e. it needs to be combined with None
estimation_window).
:param population_windows: int. The number of windows to look backwards and forwards to determine the population of
active block producers for a given time period.
:param force_map: bool. If True, then the mapping will be performed,
regardless of whether mapped data for the project already exist.
:param interim_dir: pathlib.PosixPath object of the directory where the
output data will be saved
:param results_dir: pathlib.PosixPath object of the directory where the results will be saved
"""
logging.info(f"The ledgers that will be analyzed are: {','.join(ledgers)}")
for ledger in list(ledgers):
ledger_dir = interim_dir / ledger
ledger_dir.mkdir(parents=True, exist_ok=True) # create ledger output directory if it doesn't already exist
try:
mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir)
except FileNotFoundError as e:
logging.error(repr(e))
ledgers.remove(ledger)
continue
aggregate(
ledger,
interim_dir,
timeframe,
estimation_window,
frequency,
force_map,
mapped_data=mapped_data
)
if ledgers:
aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency)
metrics_dir = results_dir / 'metrics'
metrics_dir.mkdir(parents=True, exist_ok=True)
used_metrics = analyze(
projects=ledgers,
aggregated_data_filename=aggregated_data_filename,
population_windows=population_windows,
input_dir=interim_dir,
output_dir=metrics_dir
)
if hlp.get_plot_flag():
figures_dir = results_dir / 'figures'
figures_dir.mkdir(parents=True, exist_ok=True)
plot(
ledgers=ledgers,
metrics=used_metrics,
aggregated_data_filename=aggregated_data_filename,
animated=hlp.get_plot_config_data()['animated'],
metrics_dir=metrics_dir,
figures_dir=figures_dir
)
if __name__ == '__main__':
ledgers = hlp.get_ledgers()
estimation_window, frequency = hlp.get_estimation_window_and_frequency()
population_windows = hlp.get_population_windows()
force_map_flag = hlp.get_force_map_flag()
results_dir = hlp.get_results_dir(estimation_window, frequency, population_windows)
results_dir.mkdir(parents=True, exist_ok=True)
start_date, end_date = hlp.get_start_end_dates()
timeframe_start = hlp.get_timeframe_beginning(start_date)
timeframe_end = hlp.get_timeframe_end(end_date)
if timeframe_end < timeframe_start:
raise ValueError('Invalid --timeframe values. Please note that if providing a second date, it must occur after '
'the first date.')
timeframe = (timeframe_start, timeframe_end)
main(ledgers, timeframe, estimation_window, frequency, population_windows,
force_map_flag, results_dir=results_dir)
logging.info('Done. Please check the output directory for results.')