This repository was archived by the owner on Dec 1, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcore.py
More file actions
889 lines (762 loc) · 37 KB
/
core.py
File metadata and controls
889 lines (762 loc) · 37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
from __future__ import annotations
import os
from collections.abc import Callable, Mapping
from typing import Any, Literal
import dask.dataframe as dd
import dask.dataframe.dask_expr as dx
import nested_pandas as npd
import numpy as np
import pandas as pd
import pyarrow as pa
from dask.dataframe.dask_expr._collection import new_collection
from dask.dataframe.dask_expr._expr import no_default as dsk_no_default
from nested_pandas.series.dtype import NestedDtype
from nested_pandas.series.packer import pack, pack_flat, pack_lists
from pandas._libs import lib
from pandas._typing import Axis, IndexLabel
from pandas.api.extensions import no_default
# need this for the base _Frame class
# mypy: disable-error-code="misc"
class _Frame(dx.FrameBase): # type: ignore
"""Base class for extensions of Dask Dataframes."""
_partition_type = npd.NestedFrame
def __init__(self, expr):
super().__init__(expr)
@property
def _args(self):
# Ensure our Dask extension can correctly be used by pickle.
# See https://github.com/geopandas/dask-geopandas/issues/237
return super()._args
def optimize(self, fuse: bool = True):
result = new_collection(self.expr.optimize(fuse=fuse))
return result
def __dask_postpersist__(self):
func, args = super().__dask_postpersist__()
return self._rebuild, (func, args)
def _rebuild(self, graph, func, args): # type: ignore
collection = func(graph, *args)
return collection
def _nested_meta_from_flat(flat, name):
"""construct meta for a packed series from a flat dataframe"""
pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes
pyarrow_fields = {} # grab underlying pyarrow dtypes
for field, dtype in pd_fields.items():
if hasattr(dtype, "pyarrow_dtype"):
pyarrow_fields[field] = dtype.pyarrow_dtype
else: # or convert from numpy types
pyarrow_fields[field] = pa.from_numpy_dtype(dtype)
return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields))
class NestedFrame(
_Frame, dd.DataFrame
): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0)
"""An extension for a Dask Dataframe that has Nested-Pandas functionality.
Examples
--------
>>> import nested_dask as nd
>>> base = nd.NestedFrame(base_data)
>>> layer = nd.NestedFrame(layer_data)
>>> base.add_nested(layer, "layer")
"""
_partition_type = npd.NestedFrame # Tracks the underlying data type
def __getitem__(self, item):
"""Adds custom __getitem__ functionality for nested columns"""
if isinstance(item, str) and self._is_known_hierarchical_column(item):
nested, col = item.split(".")
meta = pd.Series(name=col, dtype=pd.ArrowDtype(self.dtypes[nested].fields[col]))
return self.map_partitions(lambda x: x[nested].nest.get_flat_series(col), meta=meta)
else:
return super().__getitem__(item)
def __setitem__(self, key, value):
"""Adds custom __setitem__ behavior for nested columns"""
# Replacing or adding columns to a nested structure
# Allows statements like ndf["nested.t"] = ndf["nested.t"] - 5
# Or ndf["nested.base_t"] = ndf["nested.t"] - 5
# Performance note: This requires building a new nested structure
if self._is_known_hierarchical_column(key) or (
"." in key and key.split(".")[0] in self.nested_columns
):
nested, col = key.split(".")
# View the nested column as a flat df
new_flat = self[nested].nest.to_flat()
new_flat[col] = value
# Handle strings specially
if isinstance(value, str):
new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())})
# pack the modified df back into a nested column
meta = _nested_meta_from_flat(new_flat, nested)
packed = new_flat.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta)
return super().__setitem__(nested, packed)
# Adding a new nested structure from a column
# Allows statements like ndf["new_nested.t"] = ndf["nested.t"] - 5
elif "." in key:
new_nested, col = key.split(".")
if isinstance(value, dd.Series):
value.name = col
value = value.to_frame()
meta = _nested_meta_from_flat(value, new_nested)
packed = value.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta)
return super().__setitem__(new_nested, packed)
return super().__setitem__(key, value)
def _repr_html_(self):
# following dask-geopandas lead
output = super()._repr_html_()
return output.replace("Dask DataFrame Structure", "Nested-Dask NestedFrame Structure")
@classmethod
def from_pandas(
cls,
data,
npartitions=None,
chunksize=None,
sort=True,
) -> NestedFrame:
"""Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas
NestedFrame or Pandas DataFrame.
Parameters
----------
data: `NestedFrame` or `DataFrame`
Nested-Pandas NestedFrame containing the underlying data
npartitions: `int`, optional
The number of partitions of the index to create. Note that depending on
the size and index of the dataframe, the output may have fewer
partitions than requested.
chunksize: `int`, optional
The desired number of rows per index partition to use. Note that
depending on the size and index of the dataframe, actual partition
sizes may vary.
sort: `bool`, optional
Whether to sort the frame by a default index.
Returns
----------
result: `NestedFrame`
The constructed Dask-Nested NestedFrame object.
"""
result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort)
return NestedFrame.from_dask_dataframe(result)
@classmethod
def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame:
"""Converts a Dask Dataframe to a Dask-Nested NestedFrame
Parameters
----------
df:
A Dask Dataframe to convert
Returns
-------
`nested_dask.NestedFrame`
"""
return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy()))
@classmethod
def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True):
"""
Create Nested-Dask NestedFrames from many Dask Delayed objects.
Docstring is copied from `dask.dataframe.from_delayed`.
Parameters
----------
dfs :
A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either
of these objects, e.g. returned by ``client.submit``. These comprise the
individual partitions of the resulting dataframe.
If a single object is provided (not an iterable), then the resulting dataframe
will have only one partition.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions :
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
prefix :
Prefix to prepend to the keys.
verify_meta :
If True check that the partitions have consistent metadata, defaults to True.
"""
nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta)
return NestedFrame.from_dask_dataframe(nf)
@classmethod
def from_map(
cls,
func,
*iterables,
args=None,
meta=None,
divisions=None,
label=None,
enforce_metadata=True,
**kwargs,
):
"""
Create a DataFrame collection from a custom function map
WARNING: The ``from_map`` API is experimental, and stability is not
yet guaranteed. Use at your own risk!
Parameters
----------
func : callable
Function used to create each partition. If ``func`` satisfies the
``DataFrameIOFunction`` protocol, column projection will be enabled.
*iterables : Iterable objects
Iterable objects to map to each output partition. All iterables must
be the same length. This length determines the number of partitions
in the output collection (only one element of each iterable will
be passed to ``func`` for each partition).
args : list or tuple, optional
Positional arguments to broadcast to each output partition. Note
that these arguments will always be passed to ``func`` after the
``iterables`` positional arguments.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions : tuple, str, optional
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
label : str, optional
String to use as the function-name label in the output
collection-key names.
enforce_metadata : bool, default True
Whether to enforce at runtime that the structure of the DataFrame
produced by ``func`` actually matches the structure of ``meta``.
This will rename and reorder columns for each partition,
and will raise an error if this doesn't work,
but it won't raise if dtypes don't match.
**kwargs:
Key-word arguments to broadcast to each output partition. These
same arguments will be passed to ``func`` for every output partition.
"""
nf = dd.from_map(
func,
*iterables,
args=args,
meta=meta,
divisions=divisions,
label=label,
enforce_metadata=enforce_metadata,
**kwargs,
)
return NestedFrame.from_dask_dataframe(nf)
@classmethod
def from_flat(cls, df, base_columns, nested_columns=None, on=None, name="nested"):
"""Creates a NestedFrame with base and nested columns from a flat
dataframe.
Parameters
----------
df: dd.DataFrame or nd.NestedFrame
A flat dataframe.
base_columns: list-like
The columns that should be used as base (flat) columns in the
output dataframe.
nested_columns: list-like, or None
The columns that should be packed into a nested column. All columns
in the list will attempt to be packed into a single nested column
with the name provided in `nested_name`. If None, is defined as all
columns not in `base_columns`.
on: str or None
The name of a column to use as the new index. Typically, the index
should have a unique value per row for base columns, and should
repeat for nested columns. For example, a dataframe with two
columns; a=[1,1,1,2,2,2] and b=[5,10,15,20,25,30] would want an
index like [0,0,0,1,1,1] if a is chosen as a base column. If not
provided the current index will be used.
name:
The name of the output column the `nested_columns` are packed into.
Returns
-------
NestedFrame
A NestedFrame with the specified nesting structure.
"""
# Handle meta
meta = npd.NestedFrame(df[base_columns]._meta)
if nested_columns is None:
nested_columns = [col for col in df.columns if (col not in base_columns) and col != on]
if len(nested_columns) > 0:
nested_meta = pack(df[nested_columns]._meta, name)
meta = meta.join(nested_meta)
return df.map_partitions(
lambda x: npd.NestedFrame.from_flat(
df=x, base_columns=base_columns, nested_columns=nested_columns, on=on, name=name
),
meta=meta,
)
@classmethod
def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"):
"""Creates a NestedFrame with base and nested columns from a flat
dataframe.
Parameters
----------
df: dd.DataFrame or nd.NestedFrame
A dataframe with list columns.
base_columns: list-like, or None
Any columns that have non-list values in the input df. These will
simply be kept as identical columns in the result
list_columns: list-like, or None
The list-value columns that should be packed into a nested column.
All columns in the list will attempt to be packed into a single
nested column with the name provided in `nested_name`. All columns
in list_columns must have pyarrow list dtypes, otherwise the
operation will fail. If None, is defined as all columns not in
`base_columns`.
name:
The name of the output column the `nested_columns` are packed into.
Returns
-------
NestedFrame
A NestedFrame with the specified nesting structure.
Note
----
As noted above, all columns in `list_columns` must have a pyarrow
ListType dtype. This is needed for proper meta propagation. To convert
a list column to this dtype, you can use this command structure:
`nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))})`
Where pa.int64 above should be replaced with the correct dtype of the
underlying data accordingly.
Additionally, it's a known issue in Dask
(https://github.com/dask/dask/issues/10139) that columns with list
values will by default be converted to the string type. This will
interfere with the ability to recast these to pyarrow lists. We
recommend setting the following dask config setting to prevent this:
`dask.config.set({"dataframe.convert-string":False})`
"""
# Resolve inputs for meta
if base_columns is None:
if list_columns is None:
# with no inputs, assume all columns are list-valued
list_columns = df.columns
else:
# if list_columns are defined, assume everything else is base
base_columns = [col for col in df.columns if col not in list_columns]
else:
if list_columns is None:
# with defined base_columns, assume everything else is list
list_columns = [col for col in df.columns if col not in base_columns]
# from_lists should have at least one list column defined
if len(list_columns) == 0:
raise ValueError("No columns were assigned as list columns.")
else:
# reject any list columns that are not pyarrow dtyped
for col in list_columns:
if not hasattr(df[col].dtype, "pyarrow_dtype"):
raise TypeError(
f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype.
Refer to the docstring for guidance on dtype requirements and assignment."""
)
elif not pa.types.is_list(df[col].dtype.pyarrow_dtype):
raise TypeError(
f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype.
Refer to the docstring for guidance on dtype requirements and assignment."""
)
meta = npd.NestedFrame(df[base_columns]._meta)
nested_meta = pack_lists(df[list_columns]._meta, name)
meta = meta.join(nested_meta)
return df.map_partitions(
lambda x: npd.NestedFrame.from_lists(
df=x, base_columns=base_columns, list_columns=list_columns, name=name
),
meta=meta,
)
def compute(self, **kwargs):
"""Compute this Dask collection, returning the underlying dataframe or series."""
return npd.NestedFrame(super().compute(**kwargs))
@property
def all_columns(self) -> dict:
"""returns a dictionary of columns for each base/nested dataframe"""
all_columns = {"base": self.columns}
for column in self.columns:
if isinstance(self[column].dtype, NestedDtype):
nest_cols = list(self.dtypes[column].fields.keys())
all_columns[column] = nest_cols
return all_columns
@property
def nested_columns(self) -> list:
"""retrieves the base column names for all nested dataframes"""
nest_cols = []
for column in self.columns:
if isinstance(self[column].dtype, NestedDtype):
nest_cols.append(column)
return nest_cols
def _is_known_hierarchical_column(self, colname) -> bool:
"""Determine whether a string is a known hierarchical column name"""
if "." in colname:
left, right = colname.split(".")
if left in self.nested_columns:
return right in self.all_columns[left]
return False
return False
def add_nested(self, nested, name, how="outer") -> NestedFrame: # type: ignore[name-defined] # noqa: F821
"""Packs a dataframe into a nested column
Parameters
----------
nested:
A flat dataframe to pack into a nested column
name:
The name given to the nested column
how: {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, default ‘outer’
How to handle the operation of the two objects.
* left: use calling frame’s index (or column if on is specified)
* right: use other’s index.
* outer: form union of calling frame’s index (or column if on is
specified) with other’s index, and sort it lexicographically.
* inner: form intersection of calling frame’s index (or column if
on is specified) with other’s index, preserving the order of the
calling’s one.
* cross: creates the cartesian product from both frames, preserves
the order of the left keys.
Returns
-------
`nested_dask.NestedFrame`
"""
nested = nested.map_partitions(lambda x: pack_flat(npd.NestedFrame(x))).rename(name)
return self.join(nested, how=how)
def query(self, expr) -> Self: # type: ignore # noqa: F821:
"""
Query the columns of a NestedFrame with a boolean expression. Specified
queries can target nested columns in addition to the typical column set
Docstring copied from nested-pandas query
Parameters
----------
expr : str
The query string to evaluate.
Access nested columns using `nested_df.nested_col` (where
`nested_df` refers to a particular nested dataframe and
`nested_col` is a column of that nested dataframe).
You can refer to variables
in the environment by prefixing them with an '@' character like
``@a + b``.
You can refer to column names that are not valid Python variable names
by surrounding them in backticks. Thus, column names containing spaces
or punctuations (besides underscores) or starting with digits must be
surrounded by backticks. (For example, a column named "Area (cm^2)" would
be referenced as ```Area (cm^2)```). Column names which are Python keywords
(like "list", "for", "import", etc) cannot be used.
For example, if one of your columns is called ``a a`` and you want
to sum it with ``b``, your query should be ```a a` + b``.
Returns
-------
DataFrame
DataFrame resulting from the provided query expression.
Notes
-----
Queries that target a particular nested structure return a dataframe
with rows of that particular nested structure filtered. For example,
querying the NestedFrame "df" with nested structure "my_nested" as
below will return all rows of df, but with mynested filtered by the
condition:
>>> df.query("mynested.a > 2")
"""
return self.map_partitions(lambda x: npd.NestedFrame(x).query(expr), meta=self._meta)
def dropna(
self,
*,
axis: Axis = 0,
how: str | lib.NoDefault = no_default,
thresh: int | lib.NoDefault = no_default,
on_nested: bool = False,
subset: IndexLabel | None = None,
inplace: bool = False,
ignore_index: bool = False,
) -> Self: # type: ignore[name-defined] # noqa: F821:
"""
Remove missing values for one layer of the NestedFrame.
Parameters
----------
axis : {0 or 'index', 1 or 'columns'}, default 0
Determine if rows or columns which contain missing values are
removed.
* 0, or 'index' : Drop rows which contain missing values.
* 1, or 'columns' : Drop columns which contain missing value.
Only a single axis is allowed.
how : {'any', 'all'}, default 'any'
Determine if row or column is removed from DataFrame, when we have
at least one NA or all NA.
* 'any' : If any NA values are present, drop that row or column.
* 'all' : If all values are NA, drop that row or column.
thresh : int, optional
Require that many non-NA values. Cannot be combined with how.
on_nested : str or bool, optional
If not False, applies the call to the nested dataframe in the
column with label equal to the provided string. If specified,
the nested dataframe should align with any columns given in
`subset`.
subset : column label or sequence of labels, optional
Labels along other axis to consider, e.g. if you are dropping rows
these would be a list of columns to include.
Access nested columns using `nested_df.nested_col` (where
`nested_df` refers to a particular nested dataframe and
`nested_col` is a column of that nested dataframe).
inplace : bool, default False
Whether to modify the DataFrame rather than creating a new one.
ignore_index : bool, default ``False``
If ``True``, the resulting axis will be labeled 0, 1, …, n - 1.
.. versionadded:: 2.0.0
Returns
-------
DataFrame or None
DataFrame with NA entries dropped from it or None if ``inplace=True``.
Notes
-----
Operations that target a particular nested structure return a dataframe
with rows of that particular nested structure affected.
Values for `on_nested` and `subset` should be consistent in pointing
to a single layer, multi-layer operations are not supported at this
time.
"""
# propagate meta, assumes row-based operation
return self.map_partitions(
lambda x: npd.NestedFrame(x).dropna(
axis=axis,
how=how,
thresh=thresh,
on_nested=on_nested,
subset=subset,
inplace=inplace,
ignore_index=ignore_index,
),
meta=self._meta,
)
def sort_values(
self,
by: str | list[str],
npartitions: int | None = None,
ascending: bool | list[bool] = True,
na_position: Literal["first"] | Literal["last"] = "last",
partition_size: float = 128e6,
sort_function: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
sort_function_kwargs: Mapping[str, Any] | None = None,
upsample: float = 1.0,
ignore_index: bool | None = False,
shuffle_method: str | None = None,
**options,
) -> Self: # type: ignore[name-defined] # noqa: F821:
"""
Sort the dataset by a single column.
Sorting a parallel dataset requires expensive shuffles and is generally
not recommended. See ‘set_index‘ for implementation details.
Parameters:
-----------
by: str or list[str]
Column(s) to sort by.
npartitions: int, None, or ‘auto’
The ideal number of output partitions. If None, use the same as the
input. If ‘auto’ then decide by memory use. Not used when sorting
nested layers.
ascending: bool or list[bool], optional
Sort ascending vs. descending. Defaults to True. Specify list for
multiple sort orders. If this is a list of bools, must match the
length of the by.
na_position: {‘last’, ‘first’}, optional
Puts NaNs at the beginning if ‘first’, puts NaN at the end if
‘last’. Defaults to ‘last’.
partition_size: float, optional
The desired size of each partition in bytes. Defaults to 128e6
(128 MB). Not used in nested sorting.
sort_function: function, optional
Sorting function to use when sorting underlying partitions. If
None, defaults to M.sort_values (the partition library’s
implementation of sort_values). Not used when sorting nested
layers.
sort_function_kwargs: dict, optional
Additional keyword arguments to pass to the partition sorting
function. By default, by, ascending, and na_position are provided.
upsample: float, optional
Used to increase the number of samples for quantiles. Not used
in nested sorting
ignore_index: bool, optional
If True, the resulting axis will be labeled 0, 1, …, n - 1.
Defaults to False.
shuffle_method: str, optional
The method to use for shuffling data. Defaults to None. Not used
in nested sorting
**options: keyword arguments, optional
Additional options to pass to the sorting function.
Returns:
--------
DataFrame
DataFrame with sorted values.
"""
# Resolve target layer
targets = []
if isinstance(by, str):
by = [by]
# Check "by" columns for hierarchical references
for col in by:
if self._is_known_hierarchical_column(col):
targets.append(col.split(".")[0])
else:
targets.append("base")
# Ensure one target layer, preventing multi-layer operations
unq_targets = np.unique(targets).tolist()
if len(unq_targets) > 1:
raise ValueError("Queries cannot target multiple structs/layers, write a separate query for each")
target_layer = unq_targets[0]
# Just use dask's sort_values if the target is the base layer
# Drops divisions, but this is expected behavior of a sorting operation
if target_layer == "base":
return super().sort_values(
by=by,
npartitions=npartitions,
ascending=ascending,
na_position=na_position,
partition_size=partition_size,
sort_function=sort_function,
sort_function_kwargs=sort_function_kwargs,
upsample=upsample,
ignore_index=ignore_index,
shuffle_method=shuffle_method,
**options,
)
# If nested target layer, go through nested-pandas API
# apply via map_partitions, meta is propagated
# does preserve divisions
return self.map_partitions(
lambda x: npd.NestedFrame(x).sort_values(
by=by,
ascending=ascending,
na_position=na_position,
ignore_index=ignore_index,
**options,
),
meta=self._meta,
)
def reduce(self, func, *args, meta=dsk_no_default, infer_nesting=True, **kwargs) -> NestedFrame:
"""
Takes a function and applies it to each top-level row of the NestedFrame.
docstring copied from nested-pandas
The user may specify which columns the function is applied to, with
columns from the 'base' layer being passsed to the function as
scalars and columns from the nested layers being passed as numpy arrays.
Parameters
----------
func : callable
Function to apply to each nested dataframe. The first arguments to `func` should be which
columns to apply the function to. See the Notes for recommendations
on writing func outputs.
args : positional arguments
Positional arguments to pass to the function, the first *args should be the names of the
columns to apply the function to.
meta : dataframe or series-like, optional
The dask meta of the output. If not provided, dask will try to
infer the metadata. This may lead to unexpected results, so
providing meta is recommended.
infer_nesting : bool, default True
If True, the function will pack output columns into nested
structures based on column names adhering to a nested naming
scheme. E.g. "nested.b" and "nested.c" will be packed into a column
called "nested" with columns "b" and "c". If False, all outputs
will be returned as base columns.
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.
Returns
-------
`NestedFrame`
`NestedFrame` with the results of the function applied to the columns of the frame.
Notes
-----
By default, `reduce` will produce a `NestedFrame` with enumerated
column names for each returned value of the function. For more useful
naming, it's recommended to have `func` return a dictionary where each
key is an output column of the dataframe returned by `reduce`.
Example User Function:
>>> def my_sum(col1, col2):
>>> '''reduce will return a NestedFrame with two columns'''
>>> return {"sum_col1": sum(col1), "sum_col2": sum(col2)}
When using nesting inference (infer_nesting=True), the output may
contain nested columns. In such cases, the meta should be provided with
the appropriate dtype for these columns. For example, the following
function, which produces a nested column "lc":
>>> def complex_output(flux):
>>> return {"max_flux": np.max(flux),
>>> "lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]),
>>> "lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5]}
Would require the following meta:
>>> # create a NestedDtype for the nested column "lc"
>>> from nested_pandas.series.dtype import NestedDtype
>>> lc_dtype = NestedDtype(pa.struct([pa.field("flux_quantiles", pa.list_(pa.float64())),
>>> pa.field("labels", pa.list_(pa.float64()))]))
>>> # use the lc_dtype in meta creation
>>> result_meta = npd.NestedFrame({'max_flux':pd.Series([], dtype='float'),
>>> 'lc':pd.Series([], dtype=lc_dtype)})
"""
# Handle meta shorthands to produce nestedframe output
# route standard dict meta to nestedframe
if isinstance(meta, dict):
series_dict = {item[0]: pd.Series(dtype=item[1]) for item in meta.items()}
meta = npd.NestedFrame(series_dict)
# reroute series meta to nestedframe, per consistency with nested-pandas
elif isinstance(meta, tuple) and len(meta) == 2: # len 2 to only try on proper series meta
meta = npd.NestedFrame(pd.Series(name=meta[0], dtype=meta[1]).to_frame())
# apply nested_pandas reduce via map_partitions
# wrap the partition in a npd.NestedFrame call for:
# https://github.com/lincc-frameworks/nested-dask/issues/21
return self.map_partitions(
lambda x: npd.NestedFrame(x).reduce(func, *args, infer_nesting=infer_nesting, **kwargs), meta=meta
)
def to_parquet(self, path, by_layer=True, **kwargs) -> None:
"""Creates parquet file(s) with the data of a NestedFrame, either
as a single parquet file directory where each nested dataset is packed
into its own column or as an individual parquet file directory for each
layer.
Docstring copied from nested-pandas.
Note that here we always opt to use the pyarrow engine for writing
parquet files.
Parameters
----------
path : str
The path to the parquet directory to be written.
by_layer : bool, default True
NOTE: by_layer=False will not reliably preserve divisions currently,
be warned when using it that loading from such a dataset will
likely require you to reset and set the index to generate divisions
information.
If False, writes the entire NestedFrame to a single parquet
directory.
If True, writes each layer to a separate parquet sub-directory
within the directory specified by path. The filename for each
outputted file will be named after its layer. For example for the
base layer this is always "base".
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.
Returns
-------
None
"""
# code copied from nested-pandas rather than wrapped
# reason being that a map_partitions call is probably not well-behaved here?
if not by_layer:
# Todo: Investigate this more
# Divisions cannot be generated from a parquet file that stores
# nested information without a reset_index().set_index() loop. It
# seems like this happens at the to_parquet level rather than
# in read_parquet as dropping the nested columns from the dataframe
# to save does enable divisions to be found, but removing the
# nested columns from the set of columns to load does not.
# Divisions are going to be crucial, and so I think it's best to
# not support this until this is resolved. However the non-by_layer
# mode is needed for by_layer so it may be best to just settle for
# changing the default and filing a higher-priority bug.
# raise NotImplementedError
# We just defer to the pandas to_parquet method if we're not writing by layer
# or there is only one layer in the NestedFrame.
super().to_parquet(path, engine="pyarrow", **kwargs)
else:
# Write the base layer to a parquet file
base_frame = self.drop(columns=self.nested_columns)
base_frame.to_parquet(os.path.join(path, "base"), by_layer=False, **kwargs)
# Write each nested layer to a parquet file
for layer in self.all_columns:
if layer != "base":
path_layer = os.path.join(path, f"{layer}")
self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs)
return None