-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgrpc-scan.py
More file actions
executable file
·276 lines (237 loc) · 10.7 KB
/
grpc-scan.py
File metadata and controls
executable file
·276 lines (237 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import argparse
import socket
import time
import grpc
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from grpc_reflection.v1alpha import reflection_pb2, reflection_pb2_grpc
def quick_port_check(host, port):
"""Ultra-fast TCP port check"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.001) # 1ms timeout
result = s.connect_ex((host, port))
return result == 0
except:
return False
def scan_port(host, port, verbose=False):
# First do an ultra-fast check
if not quick_port_check(host, port):
return None
# If port is open, consider it a potential gRPC service
try:
# Use very short timeouts for gRPC
options = [
('grpc.connect_timeout_ms', 100), # 100ms connect timeout
('grpc.keepalive_timeout_ms', 100)
]
channel = grpc.insecure_channel(f"{host}:{port}", options=options)
# Short deadline for connection
try:
grpc.channel_ready_future(channel).result(timeout=0.1) # 100ms timeout
except grpc.FutureTimeoutError:
if verbose:
print(f"Timeout connecting to gRPC server at {host}:{port}")
return None
# Try to use reflection to list services
stub = reflection_pb2_grpc.ServerReflectionStub(channel)
services = []
try:
# List services using reflection with timeout
request = reflection_pb2.ServerReflectionRequest(list_services="")
# The correct method is ServerReflectionInfo
responses = stub.ServerReflectionInfo(iter([request]))
# Only process the first response with a timeout
for response in responses:
if response.HasField("list_services_response"):
for service in response.list_services_response.service:
services.append(service.name)
break
# Break after first response or short timeout
break
return {
"host": host,
"port": port,
"services": services,
"reflection": True
}
except grpc.RpcError as e:
# Check if this is a gRPC server without reflection
status_code = e.code()
details = e.details()
if status_code == grpc.StatusCode.UNIMPLEMENTED and "Method not found" in details:
# This is likely a gRPC server without reflection support
return {
"host": host,
"port": port,
"services": [],
"reflection": False,
"status": "GRPC_NO_REFLECTION"
}
elif status_code == grpc.StatusCode.UNAVAILABLE:
# Not a gRPC server or connection issue
if verbose:
print(f"Connection issue with {host}:{port} - {details}")
return None
else:
# Other gRPC error, but still might be a gRPC server
if verbose:
print(f"gRPC error on {host}:{port} - {status_code}: {details}")
return {
"host": host,
"port": port,
"services": [],
"reflection": False,
"status": f"GRPC_ERROR_{status_code}",
"details": details
}
except Exception as e:
if verbose:
print(f"Error listing services on {host}:{port} - {str(e)}")
# Even if we can't list services, if the port is open and accepts gRPC connections,
# we'll consider it a gRPC service for the purpose of stop-on-first
return {
"host": host,
"port": port,
"services": [],
"reflection": False,
"status": "UNKNOWN_ERROR",
"details": str(e)
}
except Exception as e:
if verbose:
print(f"Error connecting to gRPC server at {host}:{port} - {str(e)}")
return None
def batch_scan(host_ports, verbose=False, max_workers=100, stop_on_first=False):
"""Scan a batch of host:port combinations in parallel"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_hostport = {
executor.submit(scan_port, host, port, verbose): (host, port)
for host, port in host_ports
}
# Process results as they complete
for future in as_completed(future_to_hostport):
result = future.result()
if result:
results.append(result)
if stop_on_first:
# Cancel all pending futures if we need to stop on first result
for f in future_to_hostport:
if not f.done():
f.cancel()
break
return results
def main():
parser = argparse.ArgumentParser(description="Ultra-fast scanner for gRPC services")
parser.add_argument("--hosts", "-H", default="localhost", help="Comma-separated list of hosts to scan")
parser.add_argument("--start", "-s", type=int, default=50000, help="Start port")
parser.add_argument("--end", "-e", type=int, default=50100, help="End port")
parser.add_argument("--concurrency", "-c", type=int, default=500, help="Max concurrent scans")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--continuous", action="store_true", help="Continuous scanning")
parser.add_argument("--rate", "-r", type=float, default=0,
help="Scan rate in samples per second (0 for maximum speed)")
parser.add_argument("--batch-size", "-b", type=int, default=1000,
help="Batch size for scanning")
parser.add_argument("--stop-on-first", "-f", action="store_true",
help="Stop scanning after finding the first gRPC service")
args = parser.parse_args()
hosts = [host.strip() for host in args.hosts.split(",")]
port_range = range(args.start, args.end + 1)
print(f"Scanning ports {args.start}-{args.end} on hosts: {', '.join(hosts)}")
print(f"Concurrency: {args.concurrency}")
if args.rate > 0:
print(f"Scan rate: {args.rate} samples per second")
else:
print("Scan rate: Maximum speed")
if args.stop_on_first:
print("Will stop after finding the first gRPC service")
if args.continuous and args.stop_on_first:
print("Continuous scanning until a gRPC service is found")
scan_count = 0
while True:
scan_count += 1
if args.continuous:
print(f"\n--- Scan cycle #{scan_count} ---")
found_services = []
start_time = time.time()
total_ports = len(hosts) * (args.end - args.start + 1)
scanned = 0
# Create batches of host:port combinations
all_host_ports = [(host, port) for host in hosts for port in port_range]
# Process in batches to avoid creating too many threads at once
for i in range(0, len(all_host_ports), args.batch_size):
batch = all_host_ports[i:i+args.batch_size]
# Apply rate limiting if needed
if args.rate > 0:
expected_time = scanned / args.rate
elapsed = time.time() - start_time
if elapsed < expected_time:
time.sleep(expected_time - elapsed)
# Scan the batch
batch_results = batch_scan(batch, args.verbose, args.concurrency, args.stop_on_first)
found_services.extend(batch_results)
scanned += len(batch)
# Progress update
elapsed = time.time() - start_time
if elapsed > 0:
rate = scanned / elapsed
print(f"\rScanned {scanned}/{total_ports} ports ({rate:.2f} ports/sec)", end="")
# Stop scanning if we found a service and stop_on_first is enabled
if args.stop_on_first and found_services:
break
print() # New line after progress
# Print results
if found_services:
print("\nFound gRPC services:")
for service in found_services:
host = service["host"]
port = service["port"]
print(f"\n{host}:{port}")
if service.get("reflection", False):
if service["services"]:
print(" Services available via reflection:")
for svc in service["services"]:
print(f" - {svc}")
else:
print(" No services found via reflection")
else:
status = service.get("status", "UNKNOWN")
if status == "GRPC_NO_REFLECTION":
print(" ✓ gRPC server detected")
print(" ✗ Reflection not supported")
print(" ℹ To list services, you need the .proto files or the server needs to support reflection")
elif status.startswith("GRPC_ERROR_"):
print(f" ✓ gRPC server detected")
print(f" ✗ Error: {service.get('details', 'Unknown error')}")
else:
print(f" ✓ gRPC server detected")
print(f" ✗ Unable to list services: {service.get('details', 'Unknown error')}")
# If we found services and stop_on_first is enabled, we're done
if args.stop_on_first:
break
else:
print("\nNo gRPC services found.")
scan_time = time.time() - start_time
print(f"\nScan completed in {scan_time:.2f} seconds")
if scan_time > 0:
print(f"Average scan rate: {total_ports/scan_time:.2f} ports/second")
# If not continuous, we're done
if not args.continuous:
break
# If continuous and stop_on_first but we found services, we're done
if args.continuous and args.stop_on_first and found_services:
break
print("\nStarting next scan cycle...")
# Apply rate limiting if needed
if args.rate > 0:
expected_time = scanned / args.rate
elapsed = time.time() - start_time
if elapsed < expected_time:
time.sleep(expected_time - elapsed)
else:
time.sleep(1) # Sleep for 1 second to avoid overwhelming the network
if __name__ == "__main__":
main()