-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathblob.py
More file actions
141 lines (114 loc) · 4.35 KB
/
blob.py
File metadata and controls
141 lines (114 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import io
from typing import Any, Optional, Union
from azure.functions import _abc as azf_abc
from azure.functions import _blob as azf_blob
from . import meta
class InputStream(azf_blob.InputStream):
def __init__(self, *, data: Union[bytes, meta.Datum],
name: Optional[str] = None,
uri: Optional[str] = None,
length: Optional[int] = None,
blob_properties: Optional[dict] = None,
metadata: Optional[dict] = None) -> None:
super().__init__(name=name, length=length, uri=uri)
self._io = io.BytesIO(data) # type: ignore
self._blob_properties = blob_properties
self._metadata = metadata
@property
def name(self) -> Optional[str]:
return self._name
@property
def length(self) -> Optional[int]:
return self._length
@property
def uri(self) -> Optional[str]:
return self._uri
@property
def blob_properties(self):
return self._blob_properties
@property
def metadata(self):
return self._metadata
def read(self, size=-1) -> bytes:
return self._io.read(size)
# implemented read1 method using aliasing.
read1 = read
def readable(self) -> bool:
return True
def seekable(self) -> bool:
return False
def writable(self) -> bool:
return False
class BlobConverter(meta.InConverter,
meta.OutConverter,
binding='blob',
trigger='blobTrigger'):
@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, (azf_abc.InputStream, bytes, str))
@classmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
return (
issubclass(pytype, (str, bytes, bytearray, azf_abc.InputStream))
or callable(getattr(pytype, 'read', None))
)
@classmethod
def encode(cls, obj: Any, *,
expected_type: Optional[type]) -> meta.Datum:
if callable(getattr(obj, 'read', None)):
# file-like object
obj = obj.read()
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
elif isinstance(obj, (bytes, bytearray)):
return meta.Datum(type='bytes', value=bytes(obj))
else:
raise NotImplementedError
@classmethod
def decode(cls, data: meta.Datum, *, trigger_metadata) -> Any:
if data is None or data.type is None:
return None
data_type = data.type
if data_type == 'string':
data = data.value.encode('utf-8')
elif data_type == 'bytes':
data = data.value
else:
raise ValueError(
f'unexpected type of data received for the "blob" binding '
f': {data_type!r}'
)
if not trigger_metadata:
return InputStream(data=data)
else:
properties = cls._decode_trigger_metadata_field(
trigger_metadata, 'Properties', python_type=dict)
if properties:
blob_properties = properties
length = properties.get('ContentLength') or \
properties.get('Length')
length = int(length) if length else None
else:
blob_properties = None
length = None
metadata = None
try:
metadata = cls._decode_trigger_metadata_field(trigger_metadata,
'Metadata',
python_type=dict)
except (KeyError, ValueError):
# avoiding any exceptions when fetching Metadata as the
# metadata type is unclear.
pass
return InputStream(
data=data,
name=cls._decode_trigger_metadata_field(
trigger_metadata, 'BlobTrigger', python_type=str),
length=length,
uri=cls._decode_trigger_metadata_field(
trigger_metadata, 'Uri', python_type=str),
blob_properties=blob_properties,
metadata=metadata
)