-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdata_plane_manager.py
More file actions
151 lines (130 loc) · 6.24 KB
/
data_plane_manager.py
File metadata and controls
151 lines (130 loc) · 6.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import annotations
import random
from typing import TYPE_CHECKING
from pydantic import TypeAdapter, ValidationError
from ...core_definitions import IntersectDataHandler, IntersectMimeType
from ..exceptions import IntersectError
from ..logger import logger
from .minio_utils import (
MinioPayload,
create_minio_store,
delete_minio_object,
get_minio_object,
send_minio_object,
)
if TYPE_CHECKING:
from ...config.shared import DataStoreConfigMap, HierarchyConfig
MINIO_ADAPTER = TypeAdapter(MinioPayload)
class DataPlaneManager:
"""The DataPlaneManager serves as a common interface to the data plane.
The API supports extensive plug-and-play for different data providers.
"""
def __init__(self, hierarchy: HierarchyConfig, data_configs: DataStoreConfigMap) -> None:
"""Inside the constructor, we verify that all data configuration credentials are correct.
Params:
hierarchy: Hierarchy configuration
data_configs: data configuration
"""
self._hierarchy = hierarchy
self._minio_providers = list(map(create_minio_store, data_configs.minio))
# warn users about missing data plane
if not self._minio_providers:
logger.warning('WARNING: This service cannot support any MINIO instances')
def incoming_message_data_handler(
self, message: bytes, request_data_handler: IntersectDataHandler
) -> bytes:
"""Get data from the request data provider.
Params:
message: the message sent externally to this location
Returns:
the actual data we want to submit to the user function
Raise:
IntersectException - if we couldn't get the data
"""
if request_data_handler == IntersectDataHandler.MESSAGE:
return message
if request_data_handler == IntersectDataHandler.MINIO:
# TODO - we may want to send additional provider information in the payload
try:
payload: MinioPayload = MINIO_ADAPTER.validate_json(message)
except ValidationError as e:
logger.warning('Invalid MINIO payload format, dropping message')
raise IntersectError from e
provider = None
for store in self._minio_providers:
if store._base_url._url.geturl() == payload['minio_url']: # noqa: SLF001 (only way to get URL from MINIO API)
provider = store
break
if not provider:
logger.error(
f"You did not configure listening to MINIO instance '{payload['minio_url']}'. You must fix this to handle this data."
)
raise IntersectError
return get_minio_object(provider, payload)
logger.warning(f'Cannot parse data handler {request_data_handler}')
raise IntersectError
def outgoing_message_data_handler(
self,
function_response: bytes,
content_type: IntersectMimeType,
data_handler: IntersectDataHandler,
) -> bytes:
"""Send the user's response to the appropriate data provider.
Params:
- function_response - the return value from the user's function
- content_type - content type of function_response
- data_handler - where we're going to send the data off to (i.e. the message, MINIO...)
Returns:
the payload of the message, this varies based off of the data_handler value
Raise:
IntersectException - if there was any error in submitting the response
"""
# TODO - instead of requiring users to specify the data handler themselves, another idea could be to use
# sys.getsizeof(function_response) and determine the data handler dynamically
# users could perhaps specify T1/T2/T3 data types but not the specific implementation
if data_handler == IntersectDataHandler.MESSAGE:
return function_response
if data_handler == IntersectDataHandler.MINIO:
if not self._minio_providers:
logger.error(
'No MINIO provider configured, so you cannot set response_data_handler on @intersect_message to equal IntersectDataHandler.MINIO .'
)
raise IntersectError
provider = random.choice(self._minio_providers) # noqa: S311 (TODO choose a MINIO provider better than at random - this may be determined from external message params)
minio_payload = send_minio_object(
function_response, provider, content_type, self._hierarchy
)
return MINIO_ADAPTER.dump_json(minio_payload)
logger.error(
f'No support implemented for code {data_handler}, please upgrade your intersect-sdk version.'
)
raise IntersectError
def remove_remote_data(
self, message: bytes, request_data_handler: IntersectDataHandler
) -> None:
"""Removes data from the request data provider.
This does not raise an exception if unable to remove the data, just logs the problem.
In general, this should only be called if you can verify an issue in the headers
Params:
message: the message sent externally to this location
Returns:
the actual data we want to submit to the user function
"""
if request_data_handler == IntersectDataHandler.MINIO:
# TODO - we may want to send additional provider information in the payload
try:
payload: MinioPayload = MINIO_ADAPTER.validate_json(message)
except ValidationError as e:
logger.warning('remove_remote - invalid params', e)
return
provider = None
for store in self._minio_providers:
if store._base_url._url.geturl() == payload['minio_url']: # noqa: SLF001 (only way to get URL from MINIO API)
provider = store
break
if not provider:
logger.error(
f"You did not configure listening to MINIO instance '{payload['minio_url']}'. You must fix this to handle this data."
)
return
delete_minio_object(provider, payload)