-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathdataframe.py
More file actions
439 lines (385 loc) · 18.8 KB
/
dataframe.py
File metadata and controls
439 lines (385 loc) · 18.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# (C) 2021 GoodData Corporation
from __future__ import annotations
from typing import Callable, Optional, Union
import pandas
from gooddata_api_client import models
from gooddata_sdk import (
Attribute,
BareExecutionResponse,
Execution,
ExecutionDefinition,
Filter,
GoodDataSdk,
ResultCacheMetadata,
ResultSizeDimensions,
)
from gooddata_pandas.data_access import compute_and_extract
from gooddata_pandas.result_convertor import (
_DEFAULT_PAGE_SIZE,
DataFrameMetadata,
LabelOverrides,
convert_execution_response_to_dataframe,
)
from gooddata_pandas.utils import (
ColumnsDef,
DefaultVisualizationColumnNaming,
IndexDef,
LabelItemDef,
_to_item,
make_pandas_index,
)
class DataFrameFactory:
"""
Factory to create pandas.DataFrame instances.
Methods:
- indexed(self, index_by: IndexDef, columns:ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None)
-> pandas.DataFrame:
- not_indexed(self, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None)
-> pandas.DataFrame:
- for_items(self, items: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None,
auto_index: bool = True) -> pandas.DataFrame:
- for_visualization(self, visualization_id: str, auto_index: bool = True)
-> pandas.DataFrame:
- result_cache_metadata_for_exec_result_id(self, result_id: str)
-> ResultCacheMetadata:
- for_exec_def(self, exec_def: ExecutionDefinition, label_overrides: Optional[LabelOverrides] = None,
result_size_dimensions_limits: ResultSizeDimensions = (), result_size_bytes_limit: Optional[int] = None,
page_size: int = _DEFAULT_PAGE_SIZE,) -> Tuple[pandas.DataFrame, DataFrameMetadata]:
- for_exec_result_id(self, result_id: str, label_overrides: Optional[LabelOverrides] = None,
result_cache_metadata: Optional[ResultCacheMetadata] = None,
result_size_dimensions_limits: ResultSizeDimensions = (),
result_size_bytes_limit: Optional[int] = None,
use_local_ids_in_headers: bool = False, page_size: int = _DEFAULT_PAGE_SIZE,)
-> Tuple[pandas.DataFrame, DataFrameMetadata]:
"""
def __init__(self, sdk: GoodDataSdk, workspace_id: str) -> None:
"""
Args:
sdk (GoodDataSdk): GoodData SDK instance.
workspace_id (str): Workspace identifier.
"""
self._sdk = sdk
self._workspace_id = workspace_id
def indexed(
self,
index_by: IndexDef,
columns: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
is_cancellable: bool = False,
result_page_len: Optional[int] = None,
) -> pandas.DataFrame:
"""
Creates a data frame indexed by values of the label. The data frame columns will be created from either
metrics or other label values.
Note that depending on the composition of the labels, the DataFrame's index may or may not be unique.
Args:
index_by (IndexDef): One or more labels to index by.
columns (ColumnsDef): Dictionary mapping column name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]):
Optional filters to apply during computation on the server.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
is_cancellable (bool, optional): Whether the execution should be cancelled when the connection is interrupted.
result_page_len (Optional[int]): Optional page size for result pagination.
Defaults to 1000. Larger values can improve performance for large result sets.
Returns:
pandas.DataFrame: A DataFrame instance.
"""
data, index = compute_and_extract(
self._sdk,
self._workspace_id,
columns=columns,
index_by=index_by,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
is_cancellable=is_cancellable,
result_page_len=result_page_len,
)
_idx = make_pandas_index(index)
return pandas.DataFrame(data=data, index=_idx)
def not_indexed(
self,
columns: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
is_cancellable: bool = False,
result_page_len: Optional[int] = None,
) -> pandas.DataFrame:
"""
Creates a data frame with columns created from metrics and or labels.
Args:
columns (ColumnsDef): Dictionary mapping column name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during
computation on the server.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
is_cancellable (bool, optional): Whether the execution should be cancelled when the connection is interrupted.
result_page_len (Optional[int]): Optional page size for result pagination.
Defaults to 1000. Larger values can improve performance for large result sets.
Returns:
pandas.DataFrame: A DataFrame instance.
"""
data, _ = compute_and_extract(
self._sdk,
self._workspace_id,
columns=columns,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
is_cancellable=is_cancellable,
result_page_len=result_page_len,
)
return pandas.DataFrame(data=data)
def for_items(
self,
items: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
auto_index: bool = True,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
is_cancellable: bool = False,
result_page_len: Optional[int] = None,
) -> pandas.DataFrame:
"""
Creates a data frame for named items. This is a convenience method that will create DataFrame with or
without an index based on the context of the items that you pass.
Args:
items (ColumnsDef): Dictionary mapping item name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during computation
on the server.
auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents
of the items.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
is_cancellable (bool, optional): Whether the execution should be cancelled when the connection is interrupted.
result_page_len (Optional[int]): Optional page size for result pagination.
Defaults to 1000. Larger values can improve performance for large result sets.
Returns:
pandas.DataFrame: A DataFrame instance.
"""
resolved_attr_cols: dict[str, LabelItemDef] = dict()
resolved_measure_cols: ColumnsDef = dict()
has_attributes = False
has_measures = False
for col_name, col_def in items.items():
item = _to_item(col_def, local_id=col_name)
if isinstance(item, Attribute):
has_attributes = True
resolved_attr_cols[col_name] = item
else:
has_measures = True
resolved_measure_cols[col_name] = item
if not auto_index or not has_measures or not has_attributes:
columns: ColumnsDef = {**resolved_attr_cols, **resolved_measure_cols}
return self.not_indexed(
columns=columns,
filter_by=filter_by,
result_page_len=result_page_len,
)
return self.indexed(
index_by=resolved_attr_cols,
columns=resolved_measure_cols,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
is_cancellable=is_cancellable,
result_page_len=result_page_len,
)
def for_visualization(
self,
visualization_id: str,
auto_index: bool = True,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
is_cancellable: bool = False,
result_page_len: Optional[int] = None,
) -> pandas.DataFrame:
"""
Creates a data frame with columns based on the content of the visualization with the provided identifier.
Args:
visualization_id (str): Visualization identifier.
auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents
of the visualization.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
is_cancellable (bool, optional): Whether the execution should be cancelled when the connection is interrupted.
result_page_len (Optional[int]): Optional page size for result pagination.
Defaults to 1000. Larger values can improve performance for large result sets.
Returns:
pandas.DataFrame: A DataFrame instance.
"""
naming = DefaultVisualizationColumnNaming()
visualization = self._sdk.visualizations.get_visualization(
workspace_id=self._workspace_id, visualization_id=visualization_id
)
filter_by = [f.as_computable() for f in visualization.filters]
columns: ColumnsDef = {
**{naming.col_name_for_attribute(a): a.as_computable() for a in visualization.attributes},
**{naming.col_name_for_metric(m): m.as_computable() for m in visualization.metrics},
}
return self.for_items(
columns,
filter_by=filter_by,
auto_index=auto_index,
on_execution_submitted=on_execution_submitted,
is_cancellable=is_cancellable,
result_page_len=result_page_len,
)
def for_created_visualization(
self,
created_visualizations_response: dict,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
is_cancellable: bool = False,
optimized: bool = False,
) -> tuple[pandas.DataFrame, DataFrameMetadata]:
"""
Creates a data frame using a created visualization.
Args:
created_visualizations_response (dict): Created visualization response.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
is_cancellable (bool, optional): Whether the execution should be cancelled when the connection is interrupted.
optimized (bool, default=False): Use memory optimized accumulator if True; by default, the accumulator stores
headers in memory as lists of dicts, which can consume a lot of memory for large results.
Optimized accumulator stores only unique values and story only reference to them in the list,
which can significantly reduce memory usage.
Returns:
pandas.DataFrame: A DataFrame instance.
"""
execution_definition = self._sdk.compute.build_exec_def_from_chat_result(
created_visualizations_response, is_cancellable=is_cancellable
)
return self.for_exec_def(
exec_def=execution_definition,
on_execution_submitted=on_execution_submitted,
optimized=optimized,
)
def result_cache_metadata_for_exec_result_id(self, result_id: str) -> ResultCacheMetadata:
"""
Retrieves result cache metadata for given :result_id:
Args:
result_id (str): ID of execution result to retrieve the metadata for.
Returns:
ResultCacheMetadata: Corresponding result cache metadata.
"""
return self._sdk.compute.retrieve_result_cache_metadata(workspace_id=self._workspace_id, result_id=result_id)
def for_exec_def(
self,
exec_def: ExecutionDefinition,
label_overrides: Optional[LabelOverrides] = None,
result_size_dimensions_limits: ResultSizeDimensions = (),
result_size_bytes_limit: Optional[int] = None,
page_size: int = _DEFAULT_PAGE_SIZE,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
optimized: bool = False,
) -> tuple[pandas.DataFrame, DataFrameMetadata]:
"""
Creates a data frame using an execution definition.
Each dimension may be sliced by multiple labels. The factory will create MultiIndex for the dataframe's
row index and the columns.
Example of label_overrides structure:
.. code-block:: python
{
"labels": {
"local_attribute_id": {
"title": "My new attribute label"
,...
},
"metrics": {
"local_metric_id": {
"title": "My new metric label"
},...
}
}
Args:
exec_def (ExecutionDefinition): Execution definition.
label_overrides (Optional[LabelOverrides]): Label overrides for metrics and attributes.
result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions.
result_size_bytes_limit (Optional[int]): Maximum size of result in bytes.
page_size (int): Number of records per page.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.
optimized (bool, default=False): Use memory optimized accumulator if True; by default, the accumulator stores
headers in memory as lists of dicts, which can consume a lot of memory for large results.
Optimized accumulator stores only unique values and story only reference to them in the list,
which can significantly reduce memory usage.
Returns:
Tuple[pandas.DataFrame, DataFrameMetadata]: Tuple holding DataFrame and DataFrame metadata.
"""
if label_overrides is None:
label_overrides = {}
execution = self._sdk.compute.for_exec_def(workspace_id=self._workspace_id, exec_def=exec_def)
result_cache_metadata = self.result_cache_metadata_for_exec_result_id(execution.result_id)
if on_execution_submitted is not None:
on_execution_submitted(execution)
return convert_execution_response_to_dataframe(
execution_response=execution.bare_exec_response,
result_cache_metadata=result_cache_metadata,
label_overrides=label_overrides,
result_size_dimensions_limits=result_size_dimensions_limits,
result_size_bytes_limit=result_size_bytes_limit,
page_size=page_size,
optimized=optimized,
)
def for_exec_result_id(
self,
result_id: str,
label_overrides: Optional[LabelOverrides] = None,
result_cache_metadata: Optional[ResultCacheMetadata] = None,
result_size_dimensions_limits: ResultSizeDimensions = (),
result_size_bytes_limit: Optional[int] = None,
use_local_ids_in_headers: bool = False,
use_primary_labels_in_attributes: bool = False,
page_size: int = _DEFAULT_PAGE_SIZE,
optimized: bool = False,
) -> tuple[pandas.DataFrame, DataFrameMetadata]:
"""
Retrieves a DataFrame and DataFrame metadata for a given execution result identifier.
Example of label_overrides structure:
.. code-block:: python
{
"labels": {
"local_attribute_id": {
"title": "My new attribute label"
,...
},
"metrics": {
"local_metric_id": {
"title": "My new metric label"
},...
}
}
Args:
result_id (str): Execution result identifier.
label_overrides (Optional[LabelOverrides]): Label overrides for metrics and attributes.
result_cache_metadata (Optional[ResultCacheMetadata]): Cache metadata for the execution result.
result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions.
result_size_bytes_limit (Optional[int]): Maximum size of the result in bytes.
use_local_ids_in_headers (bool): Use local identifier in headers.
use_primary_labels_in_attributes (bool): Use primary labels in attributes.
page_size (int): Number of records per page.
optimized (bool, default=False): Use memory optimized accumulator if True; by default, the accumulator stores
headers in memory as lists of dicts, which can consume a lot of memory for large results.
Optimized accumulator stores only unique values and story only reference to them in the list,
which can significantly reduce memory usage.
Returns:
Tuple[pandas.DataFrame, DataFrameMetadata]: Tuple holding DataFrame and DataFrame metadata.
"""
if label_overrides is None:
label_overrides = {}
if result_cache_metadata is None:
result_cache_metadata = self.result_cache_metadata_for_exec_result_id(result_id=result_id)
return convert_execution_response_to_dataframe(
execution_response=BareExecutionResponse(
api_client=self._sdk.client,
workspace_id=self._workspace_id,
execution_response=models.AfmExecutionResponse(
result_cache_metadata.execution_response, _check_type=False
),
),
result_cache_metadata=result_cache_metadata,
label_overrides=label_overrides,
result_size_dimensions_limits=result_size_dimensions_limits,
result_size_bytes_limit=result_size_bytes_limit,
use_local_ids_in_headers=use_local_ids_in_headers,
use_primary_labels_in_attributes=use_primary_labels_in_attributes,
page_size=page_size,
optimized=optimized,
)