-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path__init__.py
More file actions
330 lines (250 loc) · 11.4 KB
/
__init__.py
File metadata and controls
330 lines (250 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from io import BytesIO
from typing import List, Optional, Union, override
# NOTE aliasing the imports with 'as' makes them public in the eyes
# of static code checkers. Thus we avoid listing them with __all__ = ...
from ._internal import ClientOptions as ClientOptions
from ._internal import ListResult as ListResult
from ._internal import ObjectMeta as ObjectMeta
from ._internal import ObjectStore as _ObjectStore
from ._internal import Path as Path
try:
import importlib.metadata as importlib_metadata
except ImportError:
import importlib_metadata # type: ignore
__version__ = importlib_metadata.version("object-store-python")
PathLike = Union[str, List[str], Path]
BytesLike = Union[bytes, BytesIO]
DELIMITER = "/"
def _as_path(raw: PathLike) -> Path:
if isinstance(raw, str):
return Path(raw)
if isinstance(raw, list):
return Path(DELIMITER.join(raw))
if isinstance(raw, Path):
return raw
raise ValueError(f"Cannot convert type '{type(raw)}' to type Path.")
def _as_bytes(raw: BytesLike) -> bytes:
if isinstance(raw, bytes):
return raw
if isinstance(raw, BytesIO):
return raw.read()
raise ValueError(f"Cannot convert type '{type(raw)}' to type bytes.")
class ObjectStore(_ObjectStore):
"""A uniform API for interacting with object storage services and local files.
backed by the Rust object_store crate."""
@override
def head(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.
Args:
location (PathLike): path / key to storage location
Returns:
ObjectMeta: metadata for object at location
"""
return super().head(_as_path(location))
@override
async def head_async(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.
Args:
location (PathLike): path / key to storage location
Returns:
ObjectMeta: metadata for object at location
"""
return await super().head_async(_as_path(location))
@override
def get(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.
Args:
location (PathLike): path / key to storage location
Returns:
bytes: raw data stored in location
"""
return super().get(_as_path(location))
@override
async def get_async(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.
Args:
location (PathLike): path / key to storage location
Returns:
bytes: raw data stored in location
"""
return super().get(_as_path(location))
@override
def get_range(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.
Args:
location (PathLike): path / key to storage location
start (int): zero-based start index
length (int): length of the byte range
Returns:
bytes: raw data range stored in location
"""
return super().get_range(_as_path(location), start, length)
@override
async def get_range_async(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.
Args:
location (PathLike): path / key to storage location
start (int): zero-based start index
length (int): length of the byte range
Returns:
bytes: raw data range stored in location
"""
return await super().get_range_async(_as_path(location), start, length)
@override
def put(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.
Args:
location (PathLike): path / key to storage location
bytes (BytesLike): data to be written to location
"""
return super().put(_as_path(location), _as_bytes(bytes))
@override
async def put_async(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.
Args:
location (PathLike): path / key to storage location
bytes (BytesLike): data to be written to location
"""
return await super().put_async(_as_path(location), _as_bytes(bytes))
@override
def delete(self, location: PathLike) -> None:
"""Delete the object at the specified location.
Args:
location (PathLike): path / key to storage location
"""
return super().delete(_as_path(location))
@override
async def delete_async(self, location: PathLike) -> None:
"""Delete the object at the specified location.
Args:
location (PathLike): path / key to storage location
"""
return await super().delete_async(_as_path(location))
@override
def list(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.
Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.
Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.
Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return super().list(prefix_)
@override
async def list_async(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.
Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.
Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.
Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_async(prefix_)
@override
def list_with_delimiter(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
metadata.
Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.
Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.
Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return super().list_with_delimiter(prefix_)
@override
async def list_with_delimiter_async(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
metadata.
Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.
Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.
Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_with_delimiter_async(prefix_)
@override
def copy(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.
If there exists an object at the destination, it will be overwritten.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return super().copy(_as_path(src), _as_path(dst))
@override
async def copy_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.
If there exists an object at the destination, it will be overwritten.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_async(_as_path(src), _as_path(dst))
@override
def copy_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.
Will return an error if the destination already has an object.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return super().copy_if_not_exists(_as_path(src), _as_path(dst))
@override
async def copy_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.
Will return an error if the destination already has an object.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_if_not_exists_async(_as_path(src), _as_path(dst))
@override
def rename(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.
By default, this is implemented as a copy and then delete source. It may not
check when deleting source that it was the same object that was originally copied.
If there exists an object at the destination, it will be overwritten.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return super().rename(_as_path(src), _as_path(dst))
@override
async def rename_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.
By default, this is implemented as a copy and then delete source. It may not
check when deleting source that it was the same object that was originally copied.
If there exists an object at the destination, it will be overwritten.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_async(_as_path(src), _as_path(dst))
@override
def rename_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.
Will return an error if the destination already has an object.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return super().rename_if_not_exists(_as_path(src), _as_path(dst))
@override
async def rename_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.
Will return an error if the destination already has an object.
Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_if_not_exists_async(_as_path(src), _as_path(dst))