-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathio.py
More file actions
150 lines (129 loc) · 4.88 KB
/
io.py
File metadata and controls
150 lines (129 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import io
import csv
import builtins
import pickle
from pathlib import Path
from pprint import pformat
import daiquiri
from conference_scheduler import converter
from ruamel.yaml import YAML
from slugify import slugify
logger = daiquiri.getLogger(__name__)
yaml = YAML(typ='safe')
yaml.default_flow_style = False
safe_builtins = {
'range',
'complex',
'set',
'frozenset',
'slice',
}
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
"""Only allow safe classes from builtins"""
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
"""Forbid everything else"""
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
def restricted_loads(s):
"""Helper function analogous to pickle.loads()"""
return RestrictedUnpickler(io.BytesIO(s)).load()
def import_yaml(input_folder):
"""Import all yaml files in the given folder into a single resources
dict"""
yaml_resources = {}
yaml_files = [
path for path in input_folder.iterdir()
if path.suffix == '.yml']
for path in yaml_files:
with path.open('r') as file:
yaml_resources[path.stem] = yaml.load(file)
logger.debug(f'\nreources:\n{pformat(yaml_resources)}')
return yaml_resources
def import_proposals(resources, input_folder):
"""Import the proposals data from a .csv file"""
proposals = []
with Path(input_folder, 'proposals.csv').open('r') as file:
reader = csv.DictReader(file)
for row in reader:
event_type = row['session_type']
proposals.append({
'title': row['title'],
'duration': int(row['duration']),
'demand': float(row.get('demand', 0)),
'person': slugify(row['name']),
'name': row['name'],
'tags': [row['tag']] if row['tag'] != '' else [],
'subtitle': row['subtitle'],
'description': row['description'],
'event_type': event_type})
logger.debug(f'\nreources:\n{pformat(proposals)}')
return proposals
def import_solution(solution_folder):
"""Import a previously computed schedule from a .csv file"""
csv_file = Path(solution_folder, 'schedule.csv')
logger.info(f'Importing schedule from {csv_file}')
solution = []
with Path(csv_file).open('r') as file:
reader = csv.DictReader(file)
for row in reader:
solution.append((
int(row['event_index']),
int(row['slot_index'])))
logger.debug(f'\nreources:\n{pformat(solution)}')
return solution
def import_schedule_definition(solution_folder):
"""Import previously pickled schedule"""
pickle_file = Path(solution_folder, 'scheduler.pickle')
logger.info(
f'Importing resources, events, slots and schedule from {pickle_file}')
with pickle_file.open('rb') as f:
"""restricting unsafe pickle files"""
restricted_loads(f.read())
bundle = pickle.load(f)
return bundle
def pickle_solution_and_definition(
resources, events, slots, allocations, solution, solution_folder
):
"""Store the computed solution, the resources dict and the associated
events and slots lists in pickle format"""
pickle_file = Path(solution_folder, 'scheduler.pickle')
logger.info(
f'Pickling resources, events, slots and schedule to {pickle_file}')
bundle = {
'resources': resources,
'events': events,
'slots': slots,
'allocations': allocations,
'solution': solution
}
with pickle_file.open('wb') as f:
pickle.dump(bundle, f, pickle.HIGHEST_PROTOCOL)
def export_schedule(solution, events, slots, solution_folder):
"""Write a human readable .csv file of the computed solution"""
csv_file = Path(solution_folder, 'schedule.csv')
logger.info(f'Exporting schedule to {csv_file}')
schedule = converter.solution_to_schedule(solution, events, slots)
scheduled_items = [
{
'event_index': events.index(item.event),
'event': f'{item.event.name}',
'slot_index': slots.index(item.slot),
'slot': f'{item.slot.starts_at} {item.slot.venue}'
}
for item in schedule
]
with csv_file.open('w', newline='') as f:
writer = csv.DictWriter(
f, fieldnames=['event_index', 'event', 'slot_index', 'slot'])
writer.writeheader()
for item in scheduled_items:
writer.writerow(item)
def export_solution_and_definition(
resources, events, slots, allocations, solution, solution_folder
):
solution_folder.mkdir(exist_ok=True)
pickle_solution_and_definition(
resources, events, slots, allocations, solution, solution_folder)
export_schedule(solution, events, slots, solution_folder)