-
Notifications
You must be signed in to change notification settings - Fork 101
Expand file tree
/
Copy pathapi_request_sender.py
More file actions
94 lines (75 loc) · 2.73 KB
/
api_request_sender.py
File metadata and controls
94 lines (75 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import requests
from requests import RequestException
import traceback
from viberbot.api.consts import BOT_API_ENDPOINT
import json
class ApiRequestSender(object):
def __init__(self, logger, viber_bot_api_url, bot_configuration, viber_bot_user_agent):
self._logger = logger
self._viber_bot_api_url = viber_bot_api_url
self._bot_configuration = bot_configuration
self._user_agent = viber_bot_user_agent
def set_webhook(self, url, webhook_events=None, is_inline=False):
payload = {
'url': url,
'is_inline': is_inline
}
if webhook_events is not None:
if not isinstance(webhook_events, list):
webhook_events = [webhook_events]
payload['event_types'] = webhook_events
result = self.post_request(
endpoint=BOT_API_ENDPOINT.SET_WEBHOOK,
payload=json.dumps(payload))
if not result['status'] == 0:
raise Exception(u"failed with status: {0}, message: {1}".format(result['status'], result['status_message']))
return result['event_types']
def get_account_info(self):
return self.post_request(
endpoint=BOT_API_ENDPOINT.GET_ACCOUNT_INFO,
payload=json.dumps(None)
)
def post_request(self, endpoint, payload):
try:
headers = requests.utils.default_headers()
headers.update({
'User-Agent': self._user_agent,
'X-Viber-Auth-Token': self._bot_configuration.auth_token
})
response = requests.post(self._viber_bot_api_url + '/' + endpoint, data=payload, headers=headers)
response.raise_for_status()
return json.loads(response.text)
except RequestException as e:
self._logger.error(
u"failed to post request to endpoint={0}, with payload={1}. error is: {2}"
.format(endpoint, payload, traceback.format_exc()))
raise e
except Exception as ex:
self._logger.error(
u"unexpected Exception while trying to post request. error is: {0}"
.format(traceback.format_exc()))
raise ex
def get_online_status(self, ids=[]):
if ids is None or not isinstance(ids, list) or len(ids) == 0:
raise Exception(u"missing parameter ids, should be a list of viber memberIds")
payload = {
'ids': ids
}
result = self.post_request(
endpoint=BOT_API_ENDPOINT.GET_ONLINE,
payload=json.dumps(payload))
if not result['status'] == 0:
raise Exception(u"failed with status: {0}, message: {1}".format(result['status'], result['status_message']))
return result['users']
def get_user_details(self, user_id):
if user_id is None:
raise Exception(u"missing parameter id")
payload = {
'id': user_id
}
result = self.post_request(
endpoint=BOT_API_ENDPOINT.GET_USER_DETAILS,
payload=json.dumps(payload))
if not result['status'] == 0:
raise Exception(u"failed with status: {0}, message: {1}".format(result['status'], result['status_message']))
return result['user']