-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmastosint.py
More file actions
499 lines (419 loc) · 23 KB
/
mastosint.py
File metadata and controls
499 lines (419 loc) · 23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
#!/usr/bin/env python3
"""
MastOSINT - Mastodon OSINT Search Tool
Interactive OSINT investigation tool for Mastodon/Fediverse networks
GitHub: https://github.com/yourusername/mastosint
License: MIT
"""
import requests
import json
import sys
import time
import os
from typing import Optional, Dict, List, Any
from urllib.parse import urljoin
# ASCII Art Banner
ASCII_ART = """
.................:-==+++++++++++++++++++++++*++++++++++++===--:.............
.........:-=+***++++**+++++++++++++++++++++++++++++++++++++++++++*++=-.........
......-+**+++*++++++++++++++++***+****++++++***++*++++++++++++++++*+++**=:......
.........-+**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*++**=.......
.......=*+***+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++***+*+*+:....
.....-*+***+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*+*+*+*+*=...
...:+*******************************************************************************+**-.....
.....-*************************************************************************************-....
.....-***********************************+****************++********+********************+***-...
.....-*+****************+****=:.........:=**+*+*+*********+-:.........:+***+**+****************:....
.....++***+***********++**-.................=*+*+**+***+:................:+**+*****************+....
....=****+**************-.....................=******+:.....................=*******************-...
....****************+*+.... ...............:*+*++............ . ..-******************=...
...:*****************+..... ................:*+=............. . ...-*****************=...
...-*****************:..... .....:...........:=.................... ....=****************+...
...=****************=. .....:+*+*****+:...............-+*******-... .....+***************+...
...=***************+:. ....=***********=.............+***********:. .....=***************+...
...+***************+.. ...:*************-.... ..+************=. .....=***************+...
...+***************+.. ...-*************+.... .:*************+. .....-***************+.
...+***************+.. ...-*************+.... .=**************. .....-***************+.
...+***************+.. ...-*************+.... .-*************+. .....-***************+.
...=***************+.. ...-*************+.... .-**************. .....-***************+...
...=***************+.. ...-*************+.... .-**************. .....-***************+...
...=***************+.. ...-*************+.... .-**************. .....-***************=...
...=***************+.. ...-*************+.... .-**************. .....-***************=..
...=***************+.. ...-*************+.... .-**************. .....-***************=..
...-***************+.. ...-*************+.... .-**************. .....-***************=..
...-***************+.. ...-*************+..........-**************. .....-***************=...
...-***************+.. ...-***************************************. .....-***************-...
...:***************+.. ...-***************************************. .....-***************-...
...:#**************+.. ...-***************************************. .....-**************#:...
....#**************+.. ...-***************************************. .....-**************+....
....#**************+.. ...-***************************************. .....-**************-....
....+***************==========+#**************************************==========+*************+...
....=*****************************************************************************************....
....-****************************************************************************************.....
.....**************************************************************************************+......
.....+************************************************************************************-.......
.....=**********************************************************************************-.........
...:#******************************************************************************=:.....
....***************************************************************************+=.........
....-*#*********************#+-=*****************************************+=:....... ......
..***********************#*..........::--=============---::::...............
..-#***********************-...........................................
...+***********************+.............................. ................
....+**#*******************#+....
.....=#***********************......... .................
......:*************************:.............................:...
..=##*##*******************##*-................:-=*#####+...
....=##**##******************##*####**######**##**###**#*...
......-*#**#*#*#*#**#######*#*######********##*#**###**#*...
....=#####*#**########################*****#***#*##*...
.......-*####*##*###########################**###*-....
...........-+**#####*#############**###**##**+-........
...........-=+*####**###**###*+=-.............
"""
# Popular Mastodon instances
POPULAR_INSTANCES = [
"mastodon.social",
"mastodon.online",
"mstdn.social",
"fosstodon.org",
"techhub.social",
"infosec.exchange",
"hachyderm.io",
"mas.to",
"universeodon.com",
"mathstodon.xyz",
"mstdn.jp",
"pawoo.net",
"baraag.net",
"pixelfed.social",
"mastodon.world",
]
class MastOSINT:
"""Mastodon OSINT Search Tool"""
def __init__(self, instance: str = "mastodon.social", token: Optional[str] = None):
self.instance = instance.replace("https://", "").replace("http://", "")
self.base_url = f"https://{self.instance}"
self.api_endpoint = urljoin(self.base_url, "/api/v2/search")
self.token = token
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'MastOSINT/3.0 (OSINT Investigation Tool)'
})
if self.token:
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
def search(self, query: str, search_type: Optional[str] = None,
resolve: bool = True, limit: int = 40) -> Dict[str, Any]:
"""Perform a search on the Mastodon instance."""
params = {
'q': query,
'limit': min(limit, 40)
}
if search_type:
params['type'] = search_type
if resolve and self.token:
params['resolve'] = 'true'
try:
response = self.session.get(self.api_endpoint, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
return {"error": "Request timeout", "instance": self.instance}
except requests.exceptions.HTTPError as e:
return {"error": f"HTTP {response.status_code}", "instance": self.instance}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}", "instance": self.instance}
def universal_search(self, query: str, search_type: Optional[str] = None) -> Dict[str, Any]:
"""Search across multiple instances."""
results = {
"query": query,
"search_type": search_type or "all",
"instances_searched": 0,
"successful": 0,
"failed": 0,
"aggregated_accounts": [],
"aggregated_statuses": [],
"aggregated_hashtags": [],
"errors": []
}
print(f"\n{'='*80}")
print(f"UNIVERSAL SEARCH: '{query}' (Type: {search_type or 'all'})")
print(f"Searching {len(POPULAR_INSTANCES)} instances...")
print(f"{'='*80}\n")
for idx, instance in enumerate(POPULAR_INSTANCES, 1):
print(f"[{idx}/{len(POPULAR_INSTANCES)}] {instance:30}", end=" ")
results["instances_searched"] += 1
# Don't use token for cross-instance searches - use public API
searcher = MastOSINT(instance=instance, token=None)
search_results = searcher.search(query=query, search_type=search_type, resolve=False)
if "error" in search_results:
print(f"❌ {search_results['error']}")
results["failed"] += 1
results["errors"].append({"instance": instance, "error": search_results["error"]})
else:
accounts = len(search_results.get('accounts', []))
statuses = len(search_results.get('statuses', []))
hashtags = len(search_results.get('hashtags', []))
print(f"✓ {accounts}a {statuses}s {hashtags}h")
results["successful"] += 1
for account in search_results.get('accounts', []):
account['_source'] = instance
results["aggregated_accounts"].append(account)
for status in search_results.get('statuses', []):
status['_source'] = instance
results["aggregated_statuses"].append(status)
for hashtag in search_results.get('hashtags', []):
hashtag['_source'] = instance
results["aggregated_hashtags"].append(hashtag)
time.sleep(0.1) # Be polite to servers
return results
def print_banner():
"""Print ASCII art banner"""
print("\033[96m" + ASCII_ART + "\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96m" + "MastOSINT - Mastodon OSINT Investigation Tool".center(80) + "\033[0m")
print("\033[1;96m" + "Professional OSINT Tool for Fediverse Networks".center(80) + "\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m\n")
def print_menu():
"""Print main menu"""
print("\n\033[1;93m" + "="*80 + "\033[0m")
print("\033[1;93mSEARCH OPTIONS:\033[0m")
print("\033[1;93m" + "="*80 + "\033[0m")
print("\033[92m1.\033[0m Search for USERNAME (across all instances)")
print("\033[92m2.\033[0m Search for HASHTAG (across all instances)")
print("\033[92m3.\033[0m Search for KEYWORD in posts (across all instances)")
print("\033[92m4.\033[0m Search SPECIFIC INSTANCE (focused search)")
print("\033[92m5.\033[0m Search for ACCOUNT HANDLE (e.g., @user@instance.com)")
print("\033[92m6.\033[0m Export last results to JSON")
print("\033[91m0.\033[0m Exit")
print("\033[1;93m" + "="*80 + "\033[0m")
def print_results_summary(results: Dict[str, Any]):
"""Print search results summary"""
print(f"\n\033[1;92m{'='*80}\033[0m")
print(f"\033[1;92mSEARCH COMPLETE\033[0m")
print(f"\033[1;92m{'='*80}\033[0m")
print(f"Query: \033[96m{results['query']}\033[0m")
print(f"Type: \033[96m{results.get('search_type', 'all')}\033[0m")
print(f"Instances searched: {results['instances_searched']}")
print(f"Successful: \033[92m{results['successful']}\033[0m")
print(f"Failed: \033[91m{results['failed']}\033[0m")
print(f"\n\033[1;96mRESULTS:\033[0m")
print(f" Accounts: \033[1;93m{len(results['aggregated_accounts'])}\033[0m")
print(f" Statuses: \033[1;93m{len(results['aggregated_statuses'])}\033[0m")
print(f" Hashtags: \033[1;93m{len(results['aggregated_hashtags'])}\033[0m")
print(f"\033[1;92m{'='*80}\033[0m\n")
def print_accounts(accounts: List[Dict[str, Any]], limit: int = None):
"""Print account results"""
if not accounts:
print("\n\033[91mNo accounts found.\033[0m\n")
return
print(f"\n\033[1;96m{'='*80}\033[0m")
print(f"\033[1;96mACCOUNTS FOUND: {len(accounts)}\033[0m")
print(f"\033[1;96m{'='*80}\033[0m")
# Display ALL accounts if no limit specified
display_count = len(accounts) if limit is None else min(limit, len(accounts))
for idx, account in enumerate(accounts[:display_count], 1):
print(f"\n\033[1;93m[{idx}] @{account.get('acct', 'N/A')}\033[0m")
print(f" Display Name: {account.get('display_name', 'N/A')}")
print(f" Profile URL: \033[94m{account.get('url', 'N/A')}\033[0m")
print(f" Source: \033[96m{account.get('_source', 'N/A')}\033[0m")
print(f" Followers: {account.get('followers_count', 0)} | Following: {account.get('following_count', 0)} | Posts: {account.get('statuses_count', 0)}")
print(f" Created: {account.get('created_at', 'N/A')}")
print(f" Bot: {'Yes' if account.get('bot') else 'No'}")
if account.get('note'):
import re
bio = account['note'].replace('<p>', '').replace('</p>', '\n')
bio = re.sub('<[^<]+?>', '', bio)
bio_preview = bio[:150] + "..." if len(bio) > 150 else bio
print(f" Bio: {bio_preview}")
print(f" {'-'*76}")
print()
def print_hashtags(hashtags: List[Dict[str, Any]], limit: int = None):
"""Print hashtag results"""
if not hashtags:
print("\n\033[91mNo hashtags found.\033[0m\n")
return
print(f"\n\033[1;96m{'='*80}\033[0m")
print(f"\033[1;96mHASHTAGS FOUND: {len(hashtags)}\033[0m")
print(f"\033[1;96m{'='*80}\033[0m")
# Display ALL hashtags if no limit specified
display_count = len(hashtags) if limit is None else min(limit, len(hashtags))
for idx, tag in enumerate(hashtags[:display_count], 1):
print(f"\n\033[1;93m[{idx}] #{tag.get('name', 'N/A')}\033[0m")
print(f" URL: \033[94m{tag.get('url', 'N/A')}\033[0m")
print(f" Source: \033[96m{tag.get('_source', 'N/A')}\033[0m")
if tag.get('history'):
total_uses = sum(int(h.get('uses', 0)) for h in tag['history'])
total_accounts = sum(int(h.get('accounts', 0)) for h in tag['history'])
print(f" Recent Uses: {total_uses} | Accounts: {total_accounts}")
print()
def print_statuses(statuses: List[Dict[str, Any]], limit: int = None):
"""Print status/post results"""
if not statuses:
print("\n\033[91mNo statuses found.\033[0m\n")
return
print(f"\n\033[1;96m{'='*80}\033[0m")
print(f"\033[1;96mSTATUSES/POSTS FOUND: {len(statuses)}\033[0m")
print(f"\033[1;96m{'='*80}\033[0m")
# Display ALL statuses if no limit specified
display_count = len(statuses) if limit is None else min(limit, len(statuses))
for idx, status in enumerate(statuses[:display_count], 1):
account = status.get('account', {})
print(f"\n\033[1;93m[{idx}] @{account.get('acct', 'N/A')}\033[0m")
print(f" Posted: {status.get('created_at', 'N/A')}")
print(f" URL: \033[94m{status.get('url', 'N/A')}\033[0m")
print(f" Source: \033[96m{status.get('_source', 'N/A')}\033[0m")
print(f" Replies: {status.get('replies_count', 0)} | Boosts: {status.get('reblogs_count', 0)} | Favs: {status.get('favourites_count', 0)}")
import re
content = status.get('content', '')
content = re.sub('<[^<]+?>', '', content)
content_preview = content[:200] + "..." if len(content) > 200 else content
print(f" Content: {content_preview}")
print(f" {'-'*76}")
print()
def export_results(results: Dict[str, Any], filename: str):
"""Export results to JSON"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n\033[92m✓ Results exported to: {filename}\033[0m\n")
except Exception as e:
print(f"\n\033[91m✗ Error exporting: {e}\033[0m\n")
def load_token():
"""Load OAuth token from environment variable or .token file"""
# Try environment variable first
token = os.environ.get('MASTODON_TOKEN')
if token:
return token
# Try .token file
token_file = os.path.join(os.path.dirname(__file__), '.token')
if os.path.exists(token_file):
try:
with open(token_file, 'r') as f:
token = f.read().strip()
if token:
return token
except:
pass
return None
def main():
"""Main interactive interface"""
print_banner()
# Load token if available
token = load_token()
if token:
print("\033[92m✓ OAuth token loaded\033[0m")
else:
print("\033[93m⚠ No OAuth token found (running in public API mode)\033[0m")
print("\033[93m For enhanced features, set MASTODON_TOKEN environment variable\033[0m")
print("\033[93m or create a .token file. See README for details.\033[0m")
osint = MastOSINT(token=token)
last_results = None
while True:
print_menu()
choice = input("\n\033[1;96mSelect option (0-6): \033[0m").strip()
if choice == '0':
print("\n\033[92mThank you for using MastOSINT. Stay safe!\033[0m\n")
sys.exit(0)
elif choice == '1':
# Username search
print("\n\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96mUSERNAME SEARCH\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
query = input("\033[93mEnter username to search: \033[0m").strip()
if query:
last_results = osint.universal_search(query, search_type='accounts')
print_results_summary(last_results)
print_accounts(last_results['aggregated_accounts'])
elif choice == '2':
# Hashtag search
print("\n\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96mHASHTAG SEARCH\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
query = input("\033[93mEnter hashtag (with or without #): \033[0m").strip()
query = query.lstrip('#')
if query:
last_results = osint.universal_search(query, search_type='hashtags')
print_results_summary(last_results)
print_hashtags(last_results['aggregated_hashtags'])
elif choice == '3':
# Keyword search in posts
print("\n\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96mKEYWORD SEARCH IN POSTS\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
query = input("\033[93mEnter keyword/phrase to search: \033[0m").strip()
if query:
last_results = osint.universal_search(query, search_type='statuses')
print_results_summary(last_results)
print_statuses(last_results['aggregated_statuses'])
elif choice == '4':
# Specific instance search
print("\n\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96mSPECIFIC INSTANCE SEARCH\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
print("\nPopular instances:")
for i, inst in enumerate(POPULAR_INSTANCES[:10], 1):
print(f" {i}. {inst}")
print(" Or enter custom instance...")
instance = input("\033[93mEnter instance name: \033[0m").strip()
query = input("\033[93mEnter search query: \033[0m").strip()
if instance and query:
searcher = MastOSINT(instance=instance, token=token)
results = searcher.search(query, resolve=True if token else False)
if "error" not in results:
# Format as universal results for consistency
last_results = {
"query": query,
"search_type": "all",
"instances_searched": 1,
"successful": 1,
"failed": 0,
"aggregated_accounts": results.get('accounts', []),
"aggregated_statuses": results.get('statuses', []),
"aggregated_hashtags": results.get('hashtags', []),
"errors": []
}
for account in last_results['aggregated_accounts']:
account['_source'] = instance
for status in last_results['aggregated_statuses']:
status['_source'] = instance
for hashtag in last_results['aggregated_hashtags']:
hashtag['_source'] = instance
print_results_summary(last_results)
print_accounts(last_results['aggregated_accounts'])
print_statuses(last_results['aggregated_statuses'])
print_hashtags(last_results['aggregated_hashtags'])
else:
print(f"\n\033[91m✗ Error: {results['error']}\033[0m\n")
elif choice == '5':
# Account handle search
print("\n\033[1;96m" + "="*80 + "\033[0m")
print("\033[1;96mACCOUNT HANDLE SEARCH\033[0m")
print("\033[1;96m" + "="*80 + "\033[0m")
print("Example: @username@mastodon.social")
query = input("\033[93mEnter full account handle: \033[0m").strip()
if query:
last_results = osint.universal_search(query, search_type='accounts')
print_results_summary(last_results)
print_accounts(last_results['aggregated_accounts'])
elif choice == '6':
# Export results
if last_results:
filename = input("\033[93mEnter filename (default: results.json): \033[0m").strip()
if not filename:
filename = f"mastosint_results_{int(time.time())}.json"
if not filename.endswith('.json'):
filename += '.json'
export_results(last_results, filename)
else:
print("\n\033[91mNo results to export. Perform a search first.\033[0m\n")
else:
print("\n\033[91mInvalid option. Please select 0-6.\033[0m\n")
input("\n\033[96mPress Enter to continue...\033[0m")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n\033[92mExiting MastOSINT. Stay safe!\033[0m\n")
sys.exit(0)