-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
664 lines (565 loc) · 28.5 KB
/
Copy pathapp.py
File metadata and controls
664 lines (565 loc) · 28.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
import argparse
import json
import socket
import sys
import time
import concurrent.futures
from scapy.all import ARP, Ether, srp, IP, ICMP, UDP, sr1, sniff, conf
from scapy.layers.dhcp import DHCP
from zeroconf import Zeroconf, ServiceBrowser
try:
from rich.console import Console
from rich.table import Table
except ImportError:
print("🚨 'rich' library is missing. Run 'pip install rich' to install it.")
sys.exit(1)
try:
from manuf import manuf as _manuf
mac_lookup = _manuf.MacParser()
except ImportError:
print("🚨 'manuf' library is missing. Run 'pip install manuf' to install it.")
sys.exit(1)
console = Console()
# PROBE'lar: Belirli portlara gönderilecek özel payload'lar, banner'ları tetiklemek ve yakalamak için.
PROBES = {
21: b"HELP\r\n", # FTP
22: None, # SSH — banner'ı kendisi gönderir
25: b"EHLO probe\r\n", # SMTP
80: b"HEAD / HTTP/1.0\r\n\r\n",
110: b"CAPA\r\n", # POP3
143: b"A001 CAPABILITY\r\n", # IMAP
443: b"HEAD / HTTP/1.0\r\n\r\n",
3306: None, # MySQL — bağlantıda greeting gönderir
5432: None, # PostgreSQL — bağlantıda greeting gönderir
6379: b"INFO\r\n", # Redis
8080: b"HEAD / HTTP/1.0\r\n\r\n",
8443: b"HEAD / HTTP/1.0\r\n\r\n",
11211: b"stats\r\n", # Memcached
27017: None, # MongoDB — bağlantıda greeting gönderir
}
DEFAULT_PORTS = [
21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3389, 8080, 8443, # Defaults
3306, 5432, 27017, 6379, 1433, # DB (MySQL, Postgres, Mongo, Redis, MSSQL)
5900, 2222, # (VNC, Alt SSH)
3000, 5000, 8000, # (React, Flask, Node...)
548, 62078, 631, # (AFP/macOS, iOS lockdown, CUPS/Linux)
]
# ─────────────────────────────────────────────
# PORT-BASED OS SIGNATURES
# ─────────────────────────────────────────────
# Belirli portların açık olması OS hakkında güçlü ipucu verir.
# Her kural: (port_set, os_label, confidence)
# Confidence: "strong" → tek başına yeterli, "hint" → diğer ipuçlarıyla birlikte değerlendir
PORT_OS_SIGNATURES = [
# Windows-specific
({445, 3389}, "Windows", "strong"), # SMB + RDP = kesin Windows
({445, 135}, "Windows", "strong"), # SMB + RPC
({445}, "Windows", "hint"), # SMB tek başına (Samba da olabilir)
({3389}, "Windows", "hint"), # RDP (xrdp de olabilir ama nadir)
({1433}, "Windows", "hint"), # MSSQL genelde Windows
# macOS / Apple
({548, 62078}, "macOS/iOS", "strong"), # AFP + lockdown = Apple cihaz
({548}, "macOS", "hint"), # AFP
({62078}, "iOS", "strong"), # iPhone/iPad lockdown service
# Linux hints
({631}, "Linux", "hint"), # CUPS print server
]
# Banner'lardaki keyword → OS eşleştirmesi
# Tuple: (keyword, os_label)
BANNER_OS_KEYWORDS = [
# Linux distros
("ubuntu", "Linux (Ubuntu)"),
("debian", "Linux (Debian)"),
("centos", "Linux (CentOS)"),
("fedora", "Linux (Fedora)"),
("red hat", "Linux (RHEL)"),
("alpine", "Linux (Alpine)"),
("arch", "Linux (Arch)"),
("raspbian", "Linux (Raspbian)"),
# Generic Linux signals
("openssh", "Linux"), # OpenSSH genelde Linux (macOS da olabilir)
("nginx", "Linux"), # nginx %95 Linux'ta çalışır
("apache", "Linux"), # Apache da genelde Linux
# Windows signals
("microsoft", "Windows"),
("windows", "Windows"),
("iis", "Windows"), # IIS = kesin Windows
# macOS
("macos", "macOS"),
("darwin", "macOS"),
# Network devices
("cisco", "Cisco/Network"),
("mikrotik", "MikroTik/Network"),
("routeros", "MikroTik/Network"),
("ubnt", "Ubiquiti/Network"),
("unifi", "Ubiquiti/Network"),
("fortinet", "Fortinet/Network"),
("pfsense", "pfSense/Network"),
# Embedded / IoT
("lighttpd", "Embedded/IoT"),
("mini-httpd", "Embedded/IoT"),
("boa", "Embedded/IoT"),
]
def get_vendor(mac_address):
"""
Retrieves the device vendor name from a local OUI database using the MAC address.
Returns 'Unknown' if the vendor is not found or an error occurs.
"""
try:
return mac_lookup.get_manuf_long(mac_address) or "Unknown"
except Exception:
return "Unknown"
def scan_single_port_and_grab_banner(ip, port):
"""
TCP connect scan. Returns a tuple on success, None otherwise.
(display_str, raw_banner_or_None, port_num) — port is open
(f"{port} [Filtered]", None, port_num) — no response, firewall DROP
None — port is closed (RST) or OS error
Closed ports intentionally return None so they don't appear in the output
table — showing all 26 "Closed" entries per device would be noise.
On Linux, closed = connect_ex returns non-zero (ECONNREFUSED errno).
On Windows, closed = ConnectionRefusedError exception — both paths return None.
HTTP responses are special-cased: the Server header is extracted instead of
the status line (e.g. "nginx/1.23.1" instead of "HTTP/1.1 200 OK").
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.7)
result = s.connect_ex((ip, port))
if result == 0:
banner = ""
try:
probe = PROBES.get(port, b"\r\n")
if probe:
s.sendall(probe)
data = s.recv(1024)
if data:
decoded = data.decode('utf-8', errors='ignore').strip()
# HTTP response → Server header'ını çıkar
if decoded.upper().startswith("HTTP/"):
server_banner = ""
for line in decoded.split('\n'):
if line.lower().startswith('server:'):
server_banner = line.split(':', 1)[1].strip()
break
raw_banner = server_banner if server_banner else decoded.split('\n')[0]
else:
# SSH, FTP, MySQL vs. → ilk satırı al
raw_banner = decoded.split('\n')[0]
banner = raw_banner[:25] + ".." if len(raw_banner) > 25 else raw_banner
except (socket.timeout, OSError):
pass
display = f"{port} [{banner}]" if banner else str(port)
return (display, banner, port)
return None
except socket.timeout:
return (f"{port} [Filtered]", None, port)
except (OSError, ConnectionRefusedError):
return None
def get_open_ports(ip, target_ports):
"""
Uses multithreading to quickly scan a list of target ports on a given IP.
Returns a tuple: (display_string, list_of_open_ports, list_of_raw_banners)
- display_string: comma-separated port info for the table
- open_port_numbers: set of open port numbers (for OS fingerprinting)
- raw_banners: list of raw banner strings (for OS fingerprinting)
"""
open_ports_display = []
open_port_numbers = set()
raw_banners = []
# ThreadPoolExecutor ile portları paralel tarayarak hızlandırıyoruz. Her port için scan_single_port_and_grab_banner fonksiyonunu çağırıyoruz.
with concurrent.futures.ThreadPoolExecutor(max_workers=min(50, len(target_ports))) as executor:
results = executor.map(lambda p: scan_single_port_and_grab_banner(ip, p), target_ports)
for result in results:
if result is not None:
display, banner, port_num = result
open_ports_display.append(display)
open_port_numbers.add(port_num)
if banner:
raw_banners.append(banner)
display_str = ", ".join(open_ports_display) if open_ports_display else "Closed"
return (display_str, open_port_numbers, raw_banners)
def guess_os(ip, open_ports, banners):
"""
Multi-layered OS fingerprinting. Priority order:
1. Banner keyword matching — most reliable if available, looks for OS-specific keywords in service banners
2. Port signature matching — specific port combinations are unique to certain operating systems
3. TTL fallback — last resort if no other information is available
Returns: (os_label, method_used)
"""
# Layer 1: Banner analysis
# Banner'larda OS-specific keyword ara
# Birden fazla banner farklı OS'e işaret edebilir (ör. nginx + Windows banner)
# Bu yüzden oy sistemi kullan
os_votes = {}
for banner in banners:
banner_lower = banner.lower()
for keyword, os_label in BANNER_OS_KEYWORDS:
if keyword in banner_lower:
os_votes[os_label] = os_votes.get(os_label, 0) + 1
if os_votes:
# En çok oy alan OS'i seç
best_os = max(os_votes, key=os_votes.get)
return (best_os, "banner")
# Layer 2: Port signature matching
# Açık portları bilinen OS imzalarıyla karşılaştır
# Önce "strong" eşleşmelere bak, sonra "hint"lere
for port_set, os_label, confidence in PORT_OS_SIGNATURES:
if port_set.issubset(open_ports) and confidence == "strong":
return (os_label, "ports")
# Strong bulamadıysa hint'lere bak
hint_votes = {}
for port_set, os_label, confidence in PORT_OS_SIGNATURES:
if port_set.issubset(open_ports) and confidence == "hint":
hint_votes[os_label] = hint_votes.get(os_label, 0) + 1
if hint_votes:
best_hint = max(hint_votes, key=hint_votes.get)
return (f"{best_hint} (?)", "ports-hint")
# Layer 3: TTL fallback
# Hiçbir port/banner ipucu yoksa ICMP TTL'e bak
try:
pkt = IP(dst=ip) / ICMP()
reply = sr1(pkt, timeout=0.5, verbose=False)
if reply is not None:
ttl = reply.ttl
if ttl <= 64:
return ("Linux/Mac/Mobile", "ttl")
elif ttl <= 128:
return ("Windows", "ttl")
elif ttl <= 255:
return ("Cisco/Network Device", "ttl")
except (OSError, PermissionError):
pass
return ("Unknown", "none")
def passive_sniff(iface, timeout):
"""
Passively listens for three wake-up signals without sending any packets:
- ARP: gratuitous ARP / device joining the network
- DHCP Request (type 3): device waking up and renewing its IP lease
- mDNS (UDP 5353): Apple/Android/smart devices announcing services on unlock
"""
discovered = {}
def handle(pkt):
if pkt.haslayer(ARP) and pkt[ARP].psrc != "0.0.0.0":
discovered[pkt[ARP].hwsrc] = pkt[ARP].psrc
elif pkt.haslayer(DHCP):
options = pkt[DHCP].options
if any(opt == ("message-type", 3) for opt in options if isinstance(opt, tuple)):
if pkt.haslayer(Ether):
mac = pkt[Ether].src
discovered[mac] = discovered.get(mac, "DHCP-pending")
elif pkt.haslayer(UDP) and pkt[UDP].dport == 5353:
if pkt.haslayer(IP) and pkt.haslayer(Ether):
discovered[pkt[Ether].src] = pkt[IP].src
try:
sniff_filter = "arp or (udp and (port 67 or port 68 or port 5353))"
sniff(iface=iface, filter=sniff_filter, prn=handle, timeout=timeout, store=False)
except OSError:
pass
return discovered
def detect_arp_spoofing(devices):
"""
Detects two classes of ARP anomalies:
1. Same MAC on multiple IPs — classic ARP cache poisoning / MITM.
2. Same IP with multiple MACs — someone is hijacking an existing IP.
"""
seen = set()
unique = []
for dev in devices:
key = (dev["ip"], dev["mac"].lower())
if key not in seen:
seen.add(key)
unique.append({"ip": dev["ip"], "mac": dev["mac"].lower()})
mac_to_ips = {}
ip_to_macs = {}
for dev in unique:
mac_to_ips.setdefault(dev["mac"], []).append(dev["ip"])
ip_to_macs.setdefault(dev["ip"], []).append(dev["mac"])
suspicious = {}
for mac, ips in mac_to_ips.items():
if len(ips) > 1:
suspicious[mac] = ips
for ip, macs in ip_to_macs.items():
if len(macs) > 1:
suspicious[ip] = macs
return suspicious
MDNS_SERVICES = [
"_apple-mobdev2._tcp.local.", # iOS cihazlar bu servisle kendilerini tanıtır, iPhone/iPad tespiti için çok değerli
"_companion-link._tcp.local.", # Apple TV ve bazı macOS servisleri bu isimle görünür
"_airplay._tcp.local.", # AirPlay destekli cihazlar (Apple TV, bazı hoparlörler) bu servisle kendilerini tanıtır
"_raop._tcp.local.", # AirPlay Audio Only (RAOP) destekli cihazlar bu servisle görünür, özellikle hoparlör tespiti için
"_googlecast._tcp.local.", # Chromecast ve Google Cast destekli cihazlar bu servisle kendilerini tanıtır
"_http._tcp.local.", # Genel web sunucuları bu servisle kendilerini tanıtabilir, özellikle gömülü cihazlarda yaygın
"_printer._tcp.local.", # Yazıcılar bu servisle kendilerini tanıtabilir, özellikle ağ yazıcıları için
"_smb._tcp.local.", # SMB hizmeti sunan cihazlar (Windows, NAS'lar) bu servisle görünür
]
def query_mdns(timeout=3):
"""
Passively listens for mDNS (Bonjour/Zeroconf) announcements on the LAN.
Returns two dicts:
names — {ip: display_name} e.g. "iPhone (2)" → for the mdns_name column
hostnames — {ip: hostname} e.g. "Ileris-iPhone" → for hostname resolution
"""
names = {}
hostnames = {}
class Listener:
def add_service(self, zc, type_, name):
info = zc.get_service_info(type_, name)
if info and info.addresses:
try:
ip = socket.inet_ntoa(info.addresses[0])
names[ip] = name.split(".")[0]
if info.server:
h = info.server.rstrip(".").split(".")[0]
if h:
hostnames[ip] = h
except OSError:
pass
def remove_service(self, *_):
pass
def update_service(self, *_):
pass
zc = Zeroconf()
listener = Listener()
_browsers = [ServiceBrowser(zc, svc, listener) for svc in MDNS_SERVICES]
time.sleep(timeout)
zc.close()
return names, hostnames
# NBNS Node Status Request packet — encoded wildcard "*" name, NBSTAT type
_NBNS_REQUEST = (
b'\x00\x01' # Transaction ID
b'\x00\x00' # Flags: query
b'\x00\x01' # Questions: 1
b'\x00\x00' # Answer RRs: 0
b'\x00\x00' # Authority RRs: 0
b'\x00\x00' # Additional RRs: 0
b'\x20' # Name length: 32
b'CKAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # Encoded "*" wildcard (32 bytes)
b'\x00' # End of name
b'\x00\x21' # Type: NBSTAT
b'\x00\x01' # Class: IN
)
def query_nbns(ips, timeout=1):
"""
Sends NBNS Node Status Requests in parallel to each IP.
Returns {ip: hostname} for Windows/Samba devices that respond.
NBNS response layout (no question echo): 12 header + 34 RR name + 10 RR fields = 56 bytes,
then rdata: 1 byte num_names, each entry 18 bytes (15 name + 1 type + 2 flags).
"""
def query_single(ip):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.sendto(_NBNS_REQUEST, (ip, 137))
data, _ = sock.recvfrom(1024)
sock.close()
# Scan for the rdata start: RFC 1002 says response may or may not echo the
# question section (Windows 10/11 doesn't; Samba often does). Instead of a
# fixed offset, find the first byte in [56..120] where the value looks like
# a plausible num_names (1–50) and the remaining data fits the name table.
names_start = None
for start in range(56, min(len(data) - 1, 120)):
num = data[start]
if 1 <= num <= 50 and start + 1 + num * 18 <= len(data):
names_start = start
break
if names_start is None:
return ip, None
num_names = data[names_start]
for i in range(num_names):
offset = names_start + 1 + i * 18
if offset + 18 > len(data):
break
name = data[offset:offset + 15].decode("ascii", errors="ignore").strip()
name_type = data[offset + 15]
flags = int.from_bytes(data[offset + 16:offset + 18], "big")
# Type 0x00 = workstation/server, unique (not group: bit 0x8000)
if name_type == 0x00 and not (flags & 0x8000) and name:
return ip, name
return ip, None
except Exception:
return ip, None
results = {}
if not ips:
return results
with concurrent.futures.ThreadPoolExecutor(max_workers=min(20, len(ips))) as executor:
for ip, name in executor.map(query_single, ips):
if name:
results[ip] = name
return results
def resolve_hostname(ip, mdns_hostnames, nbns_map):
"""
Two fast layers: mDNS → NBNS. Returns None if neither matched so the caller
can batch-parallelize the slow reverse-DNS fallback separately.
"""
if ip in mdns_hostnames:
return mdns_hostnames[ip]
if ip in nbns_map:
return nbns_map[ip]
return None
def scan_network(ip_range, timeout_val, ports_to_scan):
"""
Main orchestration function for network discovery.
1. ARP scan → active hosts
2. Port scan + banner grabbing (parallel)
3. OS fingerprinting from banners → ports → TTL (priority order)
4. mDNS enrichment
"""
try:
arp_request = ARP(pdst=ip_range)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast / arp_request
answered_list = srp(arp_request_broadcast, timeout=timeout_val, retry=3, verbose=False)[0]
def scan_device(element):
ip_addr = element[1].psrc
mac_addr = element[1].hwsrc
vendor = get_vendor(mac_addr)
# Port scan → banner grab
ports_display, open_port_numbers, raw_banners = get_open_ports(ip_addr, ports_to_scan)
# OS fingerprint: banner → ports → TTL
os_guess, os_method = guess_os(ip_addr, open_port_numbers, raw_banners)
return {
"ip": ip_addr, "mac": mac_addr, "hostname": "-",
"vendor": vendor, "os": os_guess, "os_method": os_method,
"open_ports": ports_display, "mdns_name": "-"
}
ips = [el[1].psrc for el in answered_list]
with concurrent.futures.ThreadPoolExecutor(max_workers=min(20, len(answered_list) or 1)) as executor:
devices_list = list(executor.map(scan_device, answered_list))
# NBNS and mDNS run concurrently after port scans complete
console.print("[dim]Resolving hostnames (NBNS + mDNS)...[/dim]")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
nbns_future = executor.submit(query_nbns, ips)
mdns_future = executor.submit(query_mdns)
nbns_map = nbns_future.result()
mdns_names, mdns_hostnames = mdns_future.result()
for dev in devices_list:
dev["hostname"] = resolve_hostname(dev["ip"], mdns_hostnames, nbns_map) or "-"
if dev["ip"] in mdns_names:
dev["mdns_name"] = mdns_names[dev["ip"]]
# Parallel reverse-DNS for anything still unresolved (rarely populated on home LANs,
# but cheap when run concurrently rather than one blocking call per device)
unresolved = [dev for dev in devices_list if dev["hostname"] == "-"]
if unresolved:
def _rdns(dev):
try:
return dev["ip"], socket.gethostbyaddr(dev["ip"])[0]
except (socket.herror, socket.gaierror):
return dev["ip"], "-"
ip_to_dev = {dev["ip"]: dev for dev in unresolved}
with concurrent.futures.ThreadPoolExecutor(max_workers=min(20, len(unresolved))) as executor:
for ip, name in executor.map(_rdns, unresolved):
if name != "-":
ip_to_dev[ip]["hostname"] = name
return devices_list
except PermissionError:
console.print("[bold red]🚨 Error: You need to run this as Administrator![/bold red]")
sys.exit(1)
except Exception as e:
console.print(f"[bold red]🚨 Unexpected error: {e}[/bold red]")
sys.exit(1)
def print_table(results, title_text, is_alert=False):
color = "red" if is_alert else "magenta"
table = Table(title=title_text, title_style=f"bold {color}")
table.add_column("IP Address", style="cyan", justify="center")
table.add_column("MAC Address", style="green", justify="center")
table.add_column("Hostname", style="yellow")
table.add_column("Vendor / mDNS", style="blue")
table.add_column("OS (method)", style="bright_yellow")
table.add_column("Open Ports & Banner", style="red")
table.add_column("Status", justify="center")
for dev in results:
mdns = dev.get("mdns_name", "-")
vendor_cell = f"{dev['vendor']}\n[dim]{mdns}[/dim]" if mdns != "-" else dev["vendor"]
# OS kolonunda method'u da göster (hangi katmandan geldiği belli olsun)
os_method = dev.get("os_method", "")
if os_method and os_method != "none":
os_cell = f"{dev['os']}\n[dim]via {os_method}[/dim]"
else:
os_cell = dev["os"]
status = "[bold red]⚠ ARP SPOOF[/bold red]" if dev.get("arp_spoof") else "[green]OK[/green]"
table.add_row(dev["ip"], dev["mac"], dev["hostname"], vendor_cell, os_cell, dev["open_ports"], status)
console.print(table)
# ─────────────────────────────────────────────
# MAIN
# ─────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Local Network Scanner (Network Scanner)")
parser.add_argument("-s", "--subnet", required=True, help="Network range to scan")
parser.add_argument("-t", "--timeout", type=int, default=2, help="Scan timeout duration")
parser.add_argument("-p", "--ports", type=str, help="Custom port list (e.g. 22,80,443,3000)")
parser.add_argument("--json", action="store_true", help="Output results in JSON format")
parser.add_argument("--watch", action="store_true", help="Continuously scan and alert on new devices")
parser.add_argument("--interval", type=int, default=30, help="Watch mode interval in seconds (default: 30)")
parser.add_argument("--passive", action="store_true", help="Hybrid mode: active scan + passive ARP sniffing between rounds")
parser.add_argument("--iface", type=str, default=None, help="Network interface to sniff on (default: auto-detect)")
args = parser.parse_args()
if args.ports:
try:
target_ports = [int(p.strip()) for p in args.ports.split(",")]
except ValueError:
console.print("[bold red]🚨 Error: Enter ports as comma-separated numbers (e.g. 80,443)[/bold red]")
sys.exit(1)
else:
target_ports = DEFAULT_PORTS
if args.watch:
iface = args.iface or str(conf.iface)
mode_label = f"active + passive sniffing on [bold]{iface}[/bold]" if args.passive else "active"
console.print(f"[bold blue]👀 WATCH MODE ACTIVE! Scanning {args.subnet} every {args.interval}s ({mode_label})... Press CTRL+C to stop.[/bold blue]")
previous_macs = set()
try:
while True:
results = scan_network(args.subnet, args.timeout, target_ports)
spoof_map = detect_arp_spoofing(results)
if spoof_map:
console.print("\n[bold red]⚠ ARP SPOOFING DETECTED! The following MAC addresses appear on multiple IPs:[/bold red]")
for mac, ips in spoof_map.items():
console.print(f" [red]MAC {mac} → {', '.join(ips)}[/red]")
for dev in results:
dev["arp_spoof"] = dev["mac"].lower() in spoof_map or dev["ip"] in spoof_map
current_macs = {dev["mac"] for dev in results}
new_macs = current_macs - previous_macs
dropped_macs = previous_macs - current_macs
new_devices = [dev for dev in results if dev["mac"] in new_macs]
if new_devices:
console.print("\n[bold red blink]🚨 ALERT! NEW DEVICE(S) JOINED THE NETWORK![/bold red blink]")
print_table(new_devices, f"Newly Detected Devices ({time.strftime('%H:%M:%S')})", is_alert=True)
if dropped_macs:
console.print(f"\n[bold yellow]👋 DEVICE(S) LEFT THE NETWORK: {', '.join(dropped_macs)}[/bold yellow]")
if not new_devices and not dropped_macs:
console.print(f"[dim]No new activity detected... ({time.strftime('%H:%M:%S')})[/dim]")
previous_macs = current_macs
if args.passive:
console.print(f"[dim]Passive sniffing on {iface} for {args.interval}s...[/dim]")
sniffed = passive_sniff(iface=iface, timeout=args.interval)
passive_new = {mac for mac in sniffed if mac not in previous_macs}
if passive_new:
console.print(f"\n[bold magenta]🔍 PASSIVE CAPTURE! Device(s) detected without active scan:[/bold magenta]")
for mac in passive_new:
ip = sniffed[mac]
vendor = get_vendor(mac)
console.print(f" [magenta]IP: {ip} MAC: {mac} Vendor: {vendor}[/magenta]")
previous_macs = previous_macs | passive_new
else:
time.sleep(args.interval)
except KeyboardInterrupt:
console.print("\n[bold yellow]🛑 Monitoring stopped. Goodbye![/bold yellow]")
sys.exit(0)
else:
with console.status(f"[bold green]Scanning {args.subnet}...[/bold green]", spinner="bouncingBar"):
results = scan_network(args.subnet, args.timeout, target_ports)
if not results:
console.print("[bold yellow]No devices found on the network. Are you sure the subnet is correct?[/bold yellow]")
sys.exit(0)
spoof_map = detect_arp_spoofing(results)
if spoof_map:
console.print("\n[bold red]⚠ ARP SPOOFING DETECTED! The following MAC addresses appear on multiple IPs:[/bold red]")
for mac, ips in spoof_map.items():
console.print(f" [red]MAC {mac} → {', '.join(ips)}[/red]")
console.print()
for dev in results:
dev["arp_spoof"] = dev["mac"].lower() in spoof_map or dev["ip"] in spoof_map
if args.json:
print(json.dumps(results, indent=4, ensure_ascii=False))
else:
print_table(results, f"Network Scan Results ({args.subnet})")