-
Notifications
You must be signed in to change notification settings - Fork 288
Expand file tree
/
Copy pathasync_fail.py
More file actions
45 lines (34 loc) · 1.35 KB
/
async_fail.py
File metadata and controls
45 lines (34 loc) · 1.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from typing import Optional
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.web.async_slack_response import AsyncSlackResponse
class AsyncFail:
client: AsyncWebClient
function_execution_id: Optional[str]
_called: bool
def __init__(
self,
client: AsyncWebClient,
function_execution_id: Optional[str],
):
self.client = client
self.function_execution_id = function_execution_id
self._called = False
async def __call__(self, error: str) -> AsyncSlackResponse:
"""Signal that the custom function failed to complete.
Kwargs:
error: Error message to return to slack
Returns:
SlackResponse: The response object returned from slack
Raises:
ValueError: If this function cannot be used.
"""
if self.function_execution_id is None:
raise ValueError("fail is unsupported here as there is no function_execution_id")
self._called = True
return await self.client.functions_completeError(function_execution_id=self.function_execution_id, error=error)
def has_been_called(self) -> bool:
"""Check if this fail function has been called.
Returns:
bool: True if the fail function has been called, False otherwise.
"""
return self._called