-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnet-pos.py
More file actions
139 lines (117 loc) · 5.72 KB
/
net-pos.py
File metadata and controls
139 lines (117 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
import json
import os
import threading
import requests
import time
import pwnagotchi.plugins as plugins
from pwnagotchi.utils import StatusFile
class NetPos(plugins.Plugin):
__author__ = 'zenzen san, doki'
__version__ = '2.0.4'
__license__ = 'GPL3'
__GitHub__ = "https://github.com/do-ki/pwnagotchi/blob/master/pwnagotchi/plugins/default/net-pos.py"
__description__ = """Saves a json file with the access points with more signal
whenever a handshake is captured.
When internet is available the files are converted to geo locations
using Google Geolocation API."""
API_URL = 'https://www.googleapis.com/geolocation/v1/geolocate?key={api}'
#https://github.com/do-ki/pwnagotchi/blob/master/pwnagotchi/plugins/default/net-pos.py
def __init__(self):
self.skip = list()
self.ready = False
self.lock = threading.Lock()
self.report_path = '/root/.net_pos_saved'
self.report = StatusFile(self.report_path, data_format='json')
def on_loaded(self):
if 'api_key' not in self.options or not self.options['api_key']:
logging.error("NET-POS: 'api_key' not set. Can't use Google's Geolocation API.")
return
if 'api_url' in self.options:
self.API_URL = self.options['api_url']
if 'status_file' in self.options:
self.report_path = self.options['status_file']
self.report = StatusFile(self.report_path, data_format='json')
self.ready = True
logging.info("NET-POS: Plugin loaded successfully.")
logging.debug(f"NET-POS: Using API URL: {self.API_URL}")
def on_internet_available(self, agent):
if self.lock.locked():
return
with self.lock:
if not self.ready:
return
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, f)
for f in all_files if f.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(reported) - set(self.skip)
if new_np_files:
logging.debug("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)
for idx, np_file in enumerate(new_np_files):
geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
reported.append(np_file)
self.report.update(data={'reported': reported})
continue
try:
geo_data = self._get_geo_data(np_file)
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s - RequestException: %s", np_file, req_e)
self.skip.append(np_file)
continue
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s - JSONDecodeError: %s, removing it...", np_file, js_e)
os.remove(np_file)
continue
except OSError as os_e:
logging.error("NET-POS: %s - OSError: %s", np_file, os_e)
self.skip.append(np_file)
continue
with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)
reported.append(np_file)
self.report.update(data={'reported': reported})
display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})")
display.update(force=True)
def on_handshake(self, agent, filename, access_point, client_station):
netpos = self._get_netpos(agent)
if not netpos['wifiAccessPoints']:
return
netpos["ts"] = int(time.time())
netpos_filename = filename.replace('.pcap', '.net-pos.json')
logging.debug("NET-POS: Saving net-location to %s", netpos_filename)
try:
with open(netpos_filename, 'w+t') as net_pos_file:
json.dump(netpos, net_pos_file)
except OSError as os_e:
logging.error("NET-POS: %s", os_e)
def _get_netpos(self, agent):
aps = agent.get_access_points()
netpos = {'wifiAccessPoints': []}
for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]:
netpos['wifiAccessPoints'].append({
'macAddress': access_point['mac'],
'signalStrength': access_point['rssi']
})
return netpos
def _get_geo_data(self, path, timeout=30):
geourl = self.API_URL.format(api=self.options['api_key'])
try:
with open(path, "r") as json_file:
data = json.load(json_file)
except (json.JSONDecodeError, OSError) as e:
raise e
try:
result = requests.post(geourl, json=data, timeout=timeout)
return_geo = result.json()
if data.get("ts"):
return_geo["ts"] = data["ts"]
return return_geo
except requests.exceptions.RequestException as req_e:
raise req_e