-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathclient_v2.py
More file actions
151 lines (125 loc) · 5.05 KB
/
client_v2.py
File metadata and controls
151 lines (125 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from time import sleep
from typing import Optional, Union
from mindee.client_mixin import ClientMixin
from mindee.error.mindee_error import MindeeError
from mindee.error.mindee_http_error_v2 import handle_error_v2
from mindee.input.inference_predict_options import InferencePredictOptions
from mindee.input.local_response import LocalResponse
from mindee.input.polling_options import PollingOptions
from mindee.input.sources.local_input_source import LocalInputSource
from mindee.logger import logger
from mindee.mindee_http.mindee_api_v2 import MindeeApiV2
from mindee.mindee_http.response_validation_v2 import (
is_valid_get_response,
is_valid_post_response,
)
from mindee.parsing.v2.inference_response import InferenceResponse
from mindee.parsing.v2.job_response import JobResponse
class ClientV2(ClientMixin):
"""
Mindee API Client.
See: https://developers.mindee.com/docs/
"""
api_key: Optional[str]
mindee_api: MindeeApiV2
def __init__(self, api_key: Optional[str] = None) -> None:
"""
Mindee API Client.
:param api_key: Your API key for all endpoints
"""
self.api_key = api_key
self.mindee_api = MindeeApiV2(api_key)
def enqueue(
self, input_source: LocalInputSource, options: InferencePredictOptions
) -> JobResponse:
"""
Enqueues a document to a given model.
:param input_source: The document/source file to use.
Has to be created beforehand.
:param options: Options for the prediction.
:return: A valid inference response.
"""
logger.debug("Enqueuing document to '%s'", options.model_id)
if options.page_options and input_source.is_pdf():
input_source.process_pdf(
options.page_options.operation,
options.page_options.on_min_pages,
options.page_options.page_indexes,
)
response = self.mindee_api.predict_async_req_post(
input_source=input_source, options=options
)
dict_response = response.json()
if not is_valid_post_response(response):
handle_error_v2(dict_response)
return JobResponse(dict_response)
def parse_queued(
self,
queue_id: str,
) -> Union[InferenceResponse, JobResponse]:
"""
Parses a queued document.
:param queue_id: queue_id received from the API.
"""
logger.debug("Fetching from queue '%s'.", queue_id)
response = self.mindee_api.get_inference_from_queue(queue_id)
if not is_valid_get_response(response):
handle_error_v2(response.json())
dict_response = response.json()
if "job" in dict_response:
return JobResponse(dict_response)
return InferenceResponse(dict_response)
def enqueue_and_parse(
self, input_source: LocalInputSource, options: InferencePredictOptions
) -> InferenceResponse:
"""
Enqueues to an asynchronous endpoint and automatically polls for a response.
:param input_source: The document/source file to use.
Has to be created beforehand.
:param options: Options for the prediction.
:return: A valid inference response.
"""
if not options.polling_options:
options.polling_options = PollingOptions()
self._validate_async_params(
options.polling_options.initial_delay_sec,
options.polling_options.delay_sec,
options.polling_options.max_retries,
)
queue_result = self.enqueue(input_source, options)
logger.debug(
"Successfully enqueued document with job id: %s", queue_result.job.id
)
sleep(options.polling_options.initial_delay_sec)
retry_counter = 1
poll_results = self.parse_queued(
queue_result.job.id,
)
while retry_counter < options.polling_options.max_retries:
if not isinstance(poll_results, JobResponse):
break
if poll_results.job.status == "Failed":
raise MindeeError(f"Parsing failed for job {poll_results.job.id}")
logger.debug(
"Polling server for parsing result with job id: %s",
queue_result.job.id,
)
retry_counter += 1
sleep(options.polling_options.delay_sec)
poll_results = self.parse_queued(queue_result.job.id)
if not isinstance(poll_results, InferenceResponse):
raise MindeeError(
f"Couldn't retrieve document after {retry_counter} tries."
)
return poll_results
@staticmethod
def load_inference(local_response: LocalResponse) -> InferenceResponse:
"""
Load a prediction from the V2 API.
:param local_response: Local response to load.
:return: A valid prediction.
"""
try:
return InferenceResponse(local_response.as_dict)
except KeyError as exc:
raise MindeeError("No prediction found in local response.") from exc