-
-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathviews.py
More file actions
314 lines (258 loc) · 11.5 KB
/
views.py
File metadata and controls
314 lines (258 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
# SPDX-License-Identifier: AGPL-3.0-only
# See https://github.com/aboutcode-org/dejacode for support or download.
# See https://aboutcode.org for more information about AboutCode FOSS projects.
#
import datetime
import io
import json
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.exceptions import FieldDoesNotExist
from django.core.exceptions import ValidationError
from django.forms.formsets import formset_factory
from django.http import Http404
from django.http import HttpResponse
from django.utils.encoding import smart_str
from django.utils.text import normalize_newlines
from django.utils.text import slugify
from django.utils.translation import gettext as _
from django.views.generic import TemplateView
from django.views.generic.detail import SingleObjectMixin
from django.views.generic.list import MultipleObjectMixin
import odfdo
import saneyaml
import xlsxwriter
from dje.utils import get_preserved_filters
from dje.views import AdminLinksDropDownMixin
from dje.views import BootstrapCSSMixin
from dje.views import DataspacedFilterView
from dje.views import DownloadableMixin
from dje.views import HasPermissionMixin
from dje.views import PaginationMixin
from reporting.filters import ReportFilterSet
from reporting.forms import RuntimeFilterBaseFormSet
from reporting.forms import RuntimeFilterForm
from reporting.models import Report
class DetailListHybridView(SingleObjectMixin, MultipleObjectMixin, TemplateView):
def _BaseDetailView_get(self, request, *args, **kwargs):
self.object = self.get_object()
def get_object_list(self):
"""
Correspond to `get_queryset()` of `MultipleObjectMixin`.
It was renamed so that it does not conflict with
`SingleObjectMixin.get_queryset()` which is needed by
`SingleObjectMixin.get_object()`.
"""
raise NotImplementedError
def _BaseListView_get(self, request, *args, **kwargs):
self.object_list = self.get_object_list()
allow_empty = self.get_allow_empty()
if not allow_empty:
# When pagination is enabled and object_list is a queryset,
# it's better to do a cheap query than to load the un-paginated
# queryset in memory.
if self.get_paginate_by(self.object_list) is not None and hasattr(
self.object_list, "exists"
):
is_empty = not self.object_list.exists()
else:
is_empty = len(self.object_list) == 0
if is_empty:
raise Http404(
_(f"Empty list and '{self.__class__.__name__}.allow_empty' is False.")
)
def get(self, request, *args, **kwargs):
self._BaseDetailView_get(request, *args, **kwargs)
self._BaseListView_get(request, *args, **kwargs)
return super().get(request, *args, **kwargs)
class ReportDetailsView(
LoginRequiredMixin,
PaginationMixin,
BootstrapCSSMixin,
DownloadableMixin,
HasPermissionMixin,
DetailListHybridView,
):
model = Report
template_name = "reporting/report_base.html"
paginate_by = 100
slug_field = "uuid"
slug_url_kwarg = "uuid"
def get_object_list(self):
runtime_value_map = None
if self.runtime_filter_formset.is_bound and self.runtime_filter_formset.is_valid():
filters = [x.filter for x in self.runtime_filter_formset]
runtime_value_map = dict(zip(filters, self.runtime_filter_formset.cleaned_data))
object_list = []
self.errors = []
try:
object_list = self.object.query.get_qs(runtime_value_map, user=self.request.user)
except ValidationError as e:
# Using the _non_form_errors to store the validation error, as we
# cannot locate the exact field with the issue at that point.
self.runtime_filter_formset._non_form_errors = e.messages
except FieldDoesNotExist as e:
self.errors.append(e)
except ValueError:
self.runtime_filter_formset._non_form_errors = ["Invalid value for runtime parameter"]
return object_list
def _BaseListView_get(self, request, *args, **kwargs):
self.runtime_filter_formset = self.get_runtime_filter_formset(self.object)
super()._BaseListView_get(request, *args, **kwargs)
def get_root_filename(self):
return slugify(self.object.name)
def get_paginate_by(self, queryset):
if self.format: # Do not paginate if the response is downloaded as a file
return
return self.paginate_by
def get_runtime_editable_filters(self, report):
filters = list(report.query.filters.filter(runtime_parameter=True))
for filter_ in filters:
# '\u2192' is a right arrow character
filter_.field_with_arrows = filter_.field_name.replace("__", " \u2192 ")
return filters
def get_runtime_filter_formset(self, report):
RuntimeFilterFormSet = formset_factory(
form=RuntimeFilterForm,
formset=RuntimeFilterBaseFormSet,
extra=0,
)
return RuntimeFilterFormSet(**self.get_runtime_filter_formset_kwargs(report))
def get_runtime_filter_formset_kwargs(self, report):
filters = self.get_runtime_editable_filters(report)
initial = [{"value": filter_.value} for filter_ in filters]
kwargs = {
"request": self.request,
"filters": filters,
"initial": initial,
}
# Decide whether to bind the formset.
# If there is a query string, bind the formset, unless the number or
# parameter is less than 3, as it's not likely to be formset's
# management.
# This handle the case where "format" and/or "page" are the only query
# string parameter, then do not bind to avoid validation failure.
if self.request.GET and len(self.request.GET) > 3:
kwargs.update({"data": self.request.GET})
return kwargs
def get_query_string_params(self):
"""Return the query string parameters in a known order"""
keys = self.request.GET.keys()
return [(key, self.request.GET[key]) for key in sorted(keys)]
def get_interpolated_report_context(self, request, report):
if not report.report_context:
return
def get_time():
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d %I:%M:%S %p")
replacements = {
"{{dataspace}}": lambda: request.user.dataspace,
"{{date-time}}": get_time,
"{{user}}": lambda: request.user,
}
context = report.report_context
for string, func in replacements.items():
context = context.replace(string, str(func()))
return context
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
request = self.request
report = self.object
model_class = report.column_template.get_model_class()
# Only available in the UI since the link is relative to the current URL
include_view_link = not self.format and hasattr(model_class, "get_absolute_url")
interpolated_report_context = self.get_interpolated_report_context(request, report)
multi_as_list = True if self.format in ["json", "yaml"] else False
output = report.get_output(
queryset=context["object_list"],
user=request.user,
include_view_link=include_view_link,
multi_as_list=multi_as_list,
)
context.update(
{
"opts": self.model._meta,
"preserved_filters": get_preserved_filters(request, self.model),
"headers": report.column_template.as_headers(include_view_link),
"runtime_filter_formset": self.runtime_filter_formset,
"query_string_params": self.get_query_string_params(),
"interpolated_report_context": interpolated_report_context,
"errors": getattr(self, "errors", []),
"output": output,
}
)
return context
def get_dump(self, dumper, **dumper_kwargs):
"""Return the data dump using provided kwargs."""
context = self.get_context_data(**self.kwargs)
data = [dict(zip(context["headers"], values)) for values in context["output"]]
return dumper(data, **dumper_kwargs)
def get_json_response(self, **response_kwargs):
"""Return serialized results as json content response."""
dump = self.get_dump(json.dumps, indent=2, default=smart_str)
return HttpResponse(dump, **response_kwargs)
def get_yaml_response(self, **response_kwargs):
"""Return serialized results as yaml content response."""
dump = self.get_dump(saneyaml.dump)
return HttpResponse(dump, **response_kwargs)
def get_ods_response(self, **response_kwargs):
"""Return the results as ods format."""
context = self.get_context_data(**self.kwargs)
report_data = [context["headers"]] + context["output"]
document = odfdo.Document("spreadsheet")
table = odfdo.Table("Report")
for row_data in report_data:
row = odfdo.Row()
for cell_value in row_data:
row.append(odfdo.Cell(normalize_newlines(cell_value), cell_type="string"))
table.append(row)
document.body.clear()
document.body.append(table)
file_output = io.BytesIO()
document.save(file_output)
return HttpResponse(file_output.getvalue(), **response_kwargs)
def get_xlsx_response(self, **response_kwargs):
"""Return the results as `xlsx` format."""
context = self.get_context_data(**self.kwargs)
report_data = [context["headers"]] + context["output"]
file_output = io.BytesIO()
workbook = xlsxwriter.Workbook(file_output)
worksheet = workbook.add_worksheet()
for row_index, row in enumerate(report_data):
for cell_index, cell in enumerate(row):
worksheet.write_string(row_index, cell_index, normalize_newlines(cell))
workbook.close()
return HttpResponse(file_output.getvalue(), **response_kwargs)
def get_queryset(self):
"""
Scope the QS to the user dataspace, as the entry point for this view is
a UUID and it can be a shared value across dataspace, following a copy for example.
"""
qs = Report.objects.scope(self.request.user.dataspace)
if not self.request.user.is_staff:
return qs.user_availables()
return qs
class ReportListView(
LoginRequiredMixin,
AdminLinksDropDownMixin,
DataspacedFilterView,
):
"""Display a list of Reports."""
model = Report
filterset_class = ReportFilterSet
template_name = "reporting/report_list.html"
template_list_table = "reporting/includes/report_list_table.html"
def get_queryset(self):
qs = super().get_queryset()
if not self.request.user.is_staff:
qs = qs.user_availables()
return qs.select_related("query__content_type").order_by("name", "query__content_type")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if not self.request.GET.get("sort", None):
context["object_list"] = self.object_list.order_by("group", "name").group_by("group")
else:
context["object_list"] = {"": context["object_list"]}
return context