-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathabipdb.py
More file actions
108 lines (99 loc) · 3.13 KB
/
abipdb.py
File metadata and controls
108 lines (99 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import requests
import json
import os
import time
import fnmatch
cwd = os.getcwd()
path_to_abipCache = cwd + "/DataDumps/abuseIP/"
def abip_url_getData(ip, abip_key):
url = 'https://api.abuseipdb.com/api/v2/check'
querystring = {
'ipAddress': ip,
'maxAgeInDays': '90',
'verbose': 'true'
}
headers = {
'Accept': 'application/json',
'Key': abip_key
}
return requests.get(url, headers=headers, params=querystring)
def abipFileGetData(fileName):
with open(path_to_abipCache + fileName, 'r') as fr:
data = json.load(fr)
return data
def getFileName(ip):
filterOn = ip + "*.json"
dirList = os.listdir(path_to_abipCache)
findFile = fnmatch.filter(dirList, filterOn)
return findFile
def getTimeDif(ip, holding_days):
fileName = getFileName(ip)
if not fileName:
return (0, None, None)
removeIP = fileName[0].split('_')
removeJson = removeIP[1].split('.json')
fileTime = int(removeJson[0])
currentTime = int(time.time())
getDif = (currentTime - fileTime) / 86400
if holding_days > getDif:
return (1, fileName[0], getDif)
else:
return (0, fileName[0], getDif)
def checkIfFileExists(ip):
fileName = getFileName(ip)
if len(fileName) == 1:
if os.path.exists(path_to_abipCache + fileName[0]):
return (1, fileName[0], "valid path")
else:
return (0, fileName[0], "no path")
else:
if len(fileName) == 0:
return (0, fileName, "No file")
elif len(fileName) >= 2:
return (0, fileName, "too many of same file")
else:
return (0, fileName, "error")
def createFile(ip, abip_key):
if checkIfFileExists(ip)[0] == 0:
abip_api = abip_url_getData(ip, abip_key)
if abip_api.status_code == 200:
fx = open(path_to_abipCache + ip + '_' + str(int(time.time())) + ".json", 'x')
fx.write(abip_api.text)
fx.close()
isfile = checkIfFileExists(ip)
if isfile[0] == 1:
return (1, "File Creation Successful", isfile)
else:
return (0, "File Creation Unsuccessful", isfile)
elif abip_api.status_code == 400:
return (0, f"ABIP :: Not Found or Missing Value for Value :: {ip} :: Status Code :: {abip_api.status_code}")
elif abip_api.status_code == 404:
return (0, f"ABIP :: No Response for Value :: {ip} :: Status Code :: {abip_api.status_code}")
else:
return (0, f"ABIP :: Server Error :: {ip} :: Status Code :: {abip_api.status_code} :: Or API Keys are missing")
else:
return (0, checkIfFileExists(ip))
def deleteFile(ip):
os.remove(str(path_to_abipCache + getFileName(ip)[0]))
if checkIfFileExists(ip)[0] == 0:
return 1
else:
return 0
def abip_xr_data(ip, abip_key, holding_days):
if checkIfFileExists(ip)[0] == 1:
if getTimeDif(ip, holding_days)[0] == 1:
return abipFileGetData(getFileName(ip)[0])
else:
delete = deleteFile(ip)
makeFile = createFile(ip, abip_key)
if delete == 1:
if makeFile[0] == 1:
return abipFileGetData(getFileName(ip)[0])
else:
return makeFile
else:
makeFile = createFile(ip, abip_key)
if makeFile[0] == 1:
return abipFileGetData(getFileName(ip)[0])
else:
return makeFile