-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathsync_stats.py
More file actions
208 lines (168 loc) · 6.13 KB
/
sync_stats.py
File metadata and controls
208 lines (168 loc) · 6.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# -*- coding: utf-8 -*-
# This file is part of the CloudBlue Connect connect-cli.
# Copyright (c) 2019-2025 CloudBlue. All Rights Reserved.
from collections import defaultdict
from connect.cli.core.terminal import console
class SynchronizerStats(dict):
"""
Track stats of synchronizer and print results when finished.
This groups the stats in different modules.
To increment stats of a module:
stats['module name'].updated() # +1 updated
stats['module name'].deleted(3) # +3 deleted
To track errors:
stats['module name'].error('error message') # add error not related to any row
stats['module name'].error('first error', 7) # add an error in row #7
stats['module name'].error(['second error', 'third error'], 7) # add more errors in row #7
stats['module name'].error('the error', range(1, 11)) # add an error in rows #1 to #10
"""
COLUMNS = (
'Module',
('right', 'Processed'),
('right', 'Created'),
('right', 'Updated'),
('right', 'Deleted'),
('right', 'Skipped'),
('right', 'Errors'),
)
def __init__(self, *args, operation='Sync', header='Results of synchronization'):
self._initial_modules = args
self.operation = operation
self.header = header
self.reset()
def __str__(self):
return '\n'.join(f'{k} - {v}' for k, v in self.items())
def __getitem__(self, key):
if key not in self:
self[key] = _SynchronizerStatsModule(key)
return super().__getitem__(key)
def reset(self):
"""
reset to initial modules, resetting all their stats.
"""
self.clear()
for module_name in self._initial_modules:
self[module_name]
def print(self):
console.header(self.header)
self.print_results()
self.print_errors()
def print_results(self):
console.table(
columns=self.COLUMNS,
rows=[
(module_stats.name, *module_stats.get_counts_as_tuple())
for module_stats in self.values()
],
expand=True,
)
def print_errors(self): # noqa: CCR001
total_error_count = sum(
len(module_stats._errors) + len(module_stats._row_errors)
for module_stats in self.values()
)
if total_error_count == 0:
return
console.confirm(
f'\n{self.operation} operation had {total_error_count} errors, do you want to see them?',
abort=True,
)
console.echo('')
for module_stats in filter(lambda ms: ms._errors or ms._row_errors, self.values()):
console.secho(f'Module {module_stats.name}:\n', fg='magenta')
if module_stats._errors:
console.echo(' Errors')
console.echo('\n'.join(f' - {msg}' for msg in module_stats._errors))
console.echo('')
# group rows with same error message to avoid long prints in bulk errors
first_row = None
current_error = None
for row_idx, messages in module_stats._row_errors.items():
error = '\n'.join(f' - {msg}' for msg in messages)
if not first_row:
first_row = row_idx
if not current_error:
current_error = error
if error != current_error:
self._print_error(current_error, first_row, row_idx - 1)
current_error = error
first_row = row_idx
if current_error:
self._print_error(current_error, first_row, row_idx)
def _print_error(self, error, row, last_row=None):
if last_row is None or row == last_row:
console.echo(f' Errors at row #{row}')
else:
console.echo(f' Errors at rows #{row} to #{last_row}')
console.echo(error)
console.echo('')
class _SynchronizerStatsModule:
def __init__(self, name):
self.name = name
self.reset()
def __str__(self):
return ', '.join(f'{k}: {v}' for k, v in self.get_counts_as_dict().items())
def reset(self):
self._updated = 0
self._created = 0
self._deleted = 0
self._skipped = 0
self._errors = []
self._row_errors = defaultdict(list)
def updated(self, count=1):
self._updated += count
def created(self, count=1):
self._created += count
def deleted(self, count=1):
self._deleted += count
def skipped(self, count=1):
self._skipped += count
def error(self, err, row=None):
if not isinstance(err, (list, tuple)):
err = [err]
if row is None:
self._errors.extend(err)
elif hasattr(row, '__iter__'):
for r in row:
self._row_errors[r].extend(err)
else:
self._row_errors[row].extend(err)
def get_processed_count(self):
return (
self._updated
+ self._created
+ self._deleted
+ self._skipped
+ len(self._errors)
+ len(self._row_errors)
)
def get_counts_as_dict(self):
return {
'processed': self.get_processed_count(),
'created': self._created,
'updated': self._updated,
'deleted': self._deleted,
'skipped': self._skipped,
'errors': len(self._errors) + len(self._row_errors),
}
def get_counts_as_tuple(self):
return (
self.get_processed_count(),
self._created,
self._updated,
self._deleted,
self._skipped,
len(self._errors) + len(self._row_errors),
)
class SynchronizerStatsSingleModule(_SynchronizerStatsModule):
"""
Track the stats of a single module.
"""
def __init__(self, module_name):
super().__init__(module_name)
self.__stats = SynchronizerStats()
self.__stats[module_name] = self
def __str__(self):
return f'{self.name} - {super().__str__()}'
def print(self):
self.__stats.print()