-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathtranslation_attr_sync.py
More file actions
151 lines (131 loc) · 5.59 KB
/
translation_attr_sync.py
File metadata and controls
151 lines (131 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# -*- coding: utf-8 -*-
# This file is part of the CloudBlue Connect connect-cli.
# Copyright (c) 2019-2025 CloudBlue. All Rights Reserved.
import math
from collections import namedtuple
from zipfile import BadZipFile
import click
from connect.client import ClientError
from openpyxl import load_workbook
from openpyxl.utils.exceptions import InvalidFileException
from connect.cli.plugins.shared.constants import ATTRIBUTES_SHEET_COLUMNS
from connect.cli.plugins.shared.exceptions import SheetNotFoundError
from connect.cli.plugins.shared.sync_stats import SynchronizerStats
class TranslationAttributesSynchronizer:
"""
Synchronize the attributes of a translation from excel file.
"""
_MAX_BATCH_SIZE = 10
def __init__(self, client, progress, stats=None):
self._client = client
self._progress = progress
self._wb = None
self._ws = None
if stats is None:
stats = SynchronizerStats()
self._mstats = stats['Translations Attributes']
@property
def max_batch_size(self):
return self._MAX_BATCH_SIZE
def open(self, input_file, worksheet):
self._open_workbook(input_file)
if worksheet not in self._wb.sheetnames:
raise SheetNotFoundError(
f"File does not contain worksheet '{worksheet}' to synchronize, skipping",
)
self._ws = self._wb[worksheet]
self._validate_attributes_worksheet(self._ws)
def save(self, output_file):
self._wb.save(output_file)
def sync(self, translation, is_clone=False):
translation_id = self._get_translation_id(translation)
attributes = self._collect_attributes_to_update(self._ws, translation, is_clone)
if attributes:
self._update_attributes(translation_id, attributes, self._ws)
def _open_workbook(self, input_file):
try:
self._wb = load_workbook(input_file, data_only=True)
except InvalidFileException as ife:
raise click.ClickException(str(ife))
except BadZipFile:
raise click.ClickException(f'{input_file} is not a valid xlsx file.')
@staticmethod
def _validate_attributes_worksheet(ws):
for col_idx, header in enumerate(ATTRIBUTES_SHEET_COLUMNS, 1):
if header == 'original value':
continue
cell = ws.cell(1, col_idx)
if cell.value != header:
raise click.ClickException(
f"Column '{cell.coordinate}' must be '{header}', but it is '{cell.value}'",
)
def _collect_attributes_to_update(self, ws, translation, is_clone):
AttributeRow = namedtuple(
'AttributeRow',
(header.replace(' ', '_').lower() for header in ATTRIBUTES_SHEET_COLUMNS),
)
task = self._progress.add_task('Process attribute', total=ws.max_row - 1)
new_attrs = None
if is_clone and translation is not None:
translation_res = self._client.ns('localization').translations[translation['id']]
new_attrs = list(translation_res.attributes.all())
attributes = {}
for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), 2):
row = AttributeRow(*row)
self._progress.update(
task,
description=f'Process attribute {row.key}',
advance=1,
)
if row.action == 'update':
attributes[row_idx] = {'key': row.key, 'value': row.value, 'comment': row.comment}
if new_attrs:
new_attrs_idx = row_idx - 2
if (
self._is_equal_attribute(new_attrs_idx, new_attrs, row)
or translation['auto']['enabled']
):
attributes.pop(row_idx)
self._mstats.skipped()
continue
attributes[row_idx]['key'] = new_attrs[new_attrs_idx]['key']
else:
self._mstats.skipped()
self._progress.update(task, completed=ws.max_row - 1)
return attributes
def _update_attributes(self, translation_id, attributes, ws):
max_batch_size = self.max_batch_size
try:
translation_res = self._client.ns('localization').translations[translation_id]
attr_value_list = list(attributes.values())
chunk = 0
# bulk update only support 10 items at a time
for _ in range((math.ceil(len(attributes) / max_batch_size))):
translation_res.attributes.bulk_update(
attr_value_list[chunk : chunk + max_batch_size],
)
chunk += max_batch_size
self._mstats.updated(len(attributes))
for row_idx in attributes.keys():
self._update_attributes_sheet_row(ws, row_idx)
except ClientError as e:
self._mstats.error(
f'Error while updating attributes: {str(e)}',
range(1, len(attributes) + 1),
)
@staticmethod
def _is_equal_attribute(row_idx, attributes, row):
new_value = attributes[row_idx].get('value', None)
new_comment = attributes[row_idx].get('comment', None)
return all(
[new_value == row.value, new_comment == row.comment],
)
@staticmethod
def _update_attributes_sheet_row(ws, row_idx):
ws.cell(row_idx, 3, value='-')
@staticmethod
def _get_translation_id(translation):
try:
return translation['id']
except TypeError:
return translation