-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy path__init__.py
More file actions
232 lines (188 loc) · 8.2 KB
/
__init__.py
File metadata and controls
232 lines (188 loc) · 8.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from collections import Counter, OrderedDict
from collections.abc import Iterable, Iterator
from . import interfaces
from .interfaces import FieldPosition
__version__ = "0.2.0"
class Field(interfaces.Field):
"""A name-value pair representing a single field in an HTTP Request or Response.
The kind will dictate metadata placement within an HTTP message.
All field names are case insensitive and case-variance must be treated as
equivalent. Names may be normalized but should be preserved for accuracy during
transmission.
"""
def __init__(
self,
*,
name: str,
values: Iterable[str] | None = None,
kind: FieldPosition = FieldPosition.HEADER,
):
self.name = name
self.values: list[str] = list(values) if values is not None else []
self.kind = kind
def add(self, value: str) -> None:
"""Append a value to a field."""
self.values.append(value)
def set(self, values: list[str]) -> None:
"""Overwrite existing field values."""
self.values = values
def remove(self, value: str) -> None:
"""Remove all matching entries from list."""
try:
while True:
self.values.remove(value)
except ValueError:
return
def as_string(self, delimiter: str = ",") -> str:
"""Get delimited string of all values. A comma is used by default.
If the ``Field`` has zero values, the empty string is returned. If the ``Field``
has exactly one value, the value is returned unmodified.
For ``Field``s with more than one value, the values are joined by a comma and a
space. For such multi-valued ``Field``s, any values that already contain
commas or double quotes will be surrounded by double quotes. Within any values
that get quoted, pre-existing double quotes and backslashes are escaped with a
backslash.
"""
value_count = len(self.values)
if value_count == 0:
return ""
if value_count == 1:
return self.values[0]
return delimiter.join(quote_and_escape_field_value(val) for val in self.values)
def as_tuples(self) -> list[tuple[str, str]]:
"""Get list of ``name``, ``value`` tuples where each tuple represents one
value."""
return [(self.name, val) for val in self.values]
def __eq__(self, other: object) -> bool:
"""Name, values, and kind must match.
Values order must match.
"""
if not isinstance(other, Field):
return False
return (
self.name == other.name
and self.kind is other.kind
and self.values == other.values
)
def __repr__(self) -> str:
return f"Field(name={self.name!r}, value={self.values!r}, kind={self.kind!r})"
class Fields(interfaces.Fields):
def __init__(
self,
initial: Iterable[interfaces.Field] | None = None,
*,
encoding: str = "utf-8",
):
"""Collection of header and trailer entries mapped by name.
:param initial: Initial list of ``Field`` objects. ``Field``s can also be added
and later removed.
:param encoding: The string encoding to be used when converting the ``Field``
name and value from ``str`` to ``bytes`` for transmission.
"""
init_fields = list(initial) if initial is not None else []
init_field_names = [self._normalize_field_name(fld.name) for fld in init_fields]
fname_counter = Counter(init_field_names)
repeated_names_exist = (
len(init_fields) > 0 and fname_counter.most_common(1)[0][1] > 1
)
if repeated_names_exist:
non_unique_names = [name for name, num in fname_counter.items() if num > 1]
raise ValueError(
"Field names of the initial list of fields must be unique. The "
"following normalized field names appear more than once: "
f"{', '.join(non_unique_names)}."
)
init_tuples = zip(init_field_names, init_fields)
self.entries: dict[str, interfaces.Field] = OrderedDict(init_tuples)
self.encoding: str = encoding
def set_field(self, field: interfaces.Field) -> None:
"""Alias for __setitem__ to utilize the field.name for the entry key."""
self.__setitem__(field.name, field)
def __setitem__(self, name: str, field: interfaces.Field) -> None:
"""Set or override entry for a Field name."""
normalized_name = self._normalize_field_name(name)
normalized_field_name = self._normalize_field_name(field.name)
if normalized_name != normalized_field_name:
raise ValueError(
f"Supplied key {name} does not match Field.name "
f"provided: {normalized_field_name}"
)
self.entries[normalized_name] = field
def get(
self, key: str, default: interfaces.Field | None = None
) -> interfaces.Field | None:
return self[key] if key in self else default
def __getitem__(self, name: str) -> interfaces.Field:
"""Retrieve Field entry."""
normalized_name = self._normalize_field_name(name)
return self.entries[normalized_name]
def __delitem__(self, name: str) -> None:
"""Delete entry from collection."""
normalized_name = self._normalize_field_name(name)
del self.entries[normalized_name]
def get_by_type(self, kind: FieldPosition) -> list[interfaces.Field]:
"""Helper function for retrieving specific types of fields.
Used to grab all headers or all trailers.
"""
return [entry for entry in self.entries.values() if entry.kind is kind]
def extend(self, other: interfaces.Fields) -> None:
"""Merges ``entries`` of ``other`` into the current ``entries``.
For every `Field` in the ``entries`` of ``other``: If the normalized name
already exists in the current ``entries``, the values from ``other`` are
appended. Otherwise, the ``Field`` is added to the list of ``entries``.
"""
for other_field in other:
try:
cur_field = self.__getitem__(other_field.name)
for other_value in other_field.values:
cur_field.add(other_value)
except KeyError:
self.__setitem__(other_field.name, other_field)
def _normalize_field_name(self, name: str) -> str:
"""Normalize field names.
For use as key in ``entries``.
"""
return name.lower()
def __eq__(self, other: object) -> bool:
"""Encoding must match.
Entries must match in values and order.
"""
if not isinstance(other, Fields):
return False
return self.encoding == other.encoding and self.entries == other.entries
def __iter__(self) -> Iterator[interfaces.Field]:
yield from self.entries.values()
def __len__(self) -> int:
return len(self.entries)
def __repr__(self) -> str:
return f"Fields({self.entries})"
def __contains__(self, key: str) -> bool:
return self._normalize_field_name(key) in self.entries
def quote_and_escape_field_value(value: str) -> str:
"""Escapes and quotes a single :class:`Field` value if necessary.
See :func:`Field.as_string` for quoting and escaping logic.
"""
chars_to_quote = (",", '"')
if any(char in chars_to_quote for char in value):
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
else:
return value
def tuples_to_fields(
tuples: Iterable[tuple[str, str]], *, kind: FieldPosition | None = None
) -> Fields:
"""Convert ``name``, ``value`` tuples to ``Fields`` object. Each tuple represents
one Field value.
:param kind: The Field kind to define for all tuples.
"""
fields = Fields()
for name, value in tuples:
try:
fields[name].add(value)
except KeyError:
fields[name] = Field(
name=name, values=[value], kind=kind or FieldPosition.HEADER
)
return fields