-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathsampled_split.py
More file actions
103 lines (81 loc) · 3.24 KB
/
sampled_split.py
File metadata and controls
103 lines (81 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict, List
from pyroaring import BitMap
from pypaimon.read.split import Split
class SampledSplit(Split):
"""
A Split wrapper that contains sampled row indexes for each file.
This class wraps a data split and maintains a mapping from file names to
lists of sampled row indexes. It is used for random sampling scenarios where
only specific rows from each file need to be read.
Attributes:
_data_split: The underlying data split being wrapped.
_sampled_file_idx_map: A dictionary mapping file names to lists of
sampled row indexes within each file.
"""
def __init__(
self,
data_split: 'Split',
sampled_file_idx_map: Dict[str, BitMap]
):
self._data_split = data_split
self._sampled_file_idx_map = sampled_file_idx_map
def data_split(self) -> 'Split':
return self._data_split
def sampled_file_idx_map(self) -> Dict[str, BitMap]:
return self._sampled_file_idx_map
@property
def files(self) -> List['DataFileMeta']:
return self._data_split.files
@property
def partition(self) -> 'GenericRow':
return self._data_split.partition
@property
def bucket(self) -> int:
return self._data_split.bucket
@property
def row_count(self) -> int:
if not self._sampled_file_idx_map:
return self._data_split.row_count
total_rows = 0
for file in self._data_split.files:
positions = self._sampled_file_idx_map[file.file_name]
total_rows += len(positions)
return total_rows
@property
def file_paths(self):
return self._data_split.file_paths
@property
def file_size(self):
return self._data_split.file_size
@property
def raw_convertible(self):
return self._data_split.raw_convertible
@property
def data_deletion_files(self):
return self._data_split.data_deletion_files
def __eq__(self, other):
if not isinstance(other, SampledSplit):
return False
return (self._data_split == other._data_split and
self._sampled_file_idx_map == other._sampled_file_idx_map)
def __hash__(self):
return hash((id(self._data_split), tuple(sorted(self._sampled_file_idx_map.items()))))
def __repr__(self):
return (f"SampledSplit(data_split={self._data_split}, "
f"sampled_file_idx_map={self._sampled_file_idx_map})")