-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
137 lines (108 loc) · 5.02 KB
/
index.py
File metadata and controls
137 lines (108 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import csv
from collections import deque
import time
class WebCrawler:
def __init__(self, start_url, output_file):
self.start_url = self.normalize_url(start_url)
# self.start_url = self.start_url if self.start_url.endswith('/') else f"{self.start_url}/"
self.output_file = output_file
self.domain = urlparse(self.start_url).netloc.replace('www.', '')
self.visited = set()
self.queue = deque([self.start_url])
# Create/open CSV file with status_code column
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['URL', 'Status_Code'])
def normalize_url(self, url):
"""Normalize URL by removing 'www.', standardizing protocol, and handling trailing slashes"""
parsed = urlparse(url)
# Remove 'www.' from netloc if present
netloc = parsed.netloc.replace('www.', '')
# Normalize the path - remove trailing slash unless it's the root path
path = parsed.path
if path.endswith('/') and len(path) > 1:
path = path.rstrip('/')
# Rebuild URL with normalized components
normalized = parsed._replace(
scheme='https', # Standardize to https
netloc=netloc,
path=path
).geturl()
return normalized.split('#')[0].split('?')[0] # Remove fragments and query params
def is_valid_url(self, url):
"""Check if URL belongs to the same domain and is a web URL"""
web_schemes = {'http', 'https'}
parsed = urlparse(url)
if parsed.scheme not in web_schemes:
return False
# Normalize domain comparison by removing 'www.'
site_domain = self.domain.replace('www.', '')
url_domain = parsed.netloc.replace('www.', '')
return url_domain == site_domain or not parsed.netloc
def get_links(self, url, response):
"""Extract all links from a webpage"""
try:
if response.status_code < 300: # Only parse content for successful responses
soup = BeautifulSoup(response.text, 'html.parser')
links = set()
for link in soup.find_all('a'):
href = link.get('href')
if href:
absolute_url = urljoin(url, href)
if self.is_valid_url(absolute_url):
normalized_url = self.normalize_url(absolute_url)
links.add(normalized_url)
return links
return set()
except Exception as e:
print(f"Error processing {url}: {str(e)}")
return set()
def save_url(self, url, status_code=None):
"""Save URL and status code to CSV file"""
with open(self.output_file, 'a', newline='') as f:
writer = csv.writer(f)
if status_code and status_code >= 300:
writer.writerow([url, status_code])
else:
writer.writerow([url, ''])
def crawl(self):
"""Main crawling method"""
print(f"Starting crawl of {self.start_url}")
while self.queue:
current_url = self.queue.popleft()
normalized_url = self.normalize_url(current_url)
if normalized_url in self.visited:
continue
print(f"Processing: {normalized_url}")
self.visited.add(normalized_url)
try:
# Get the response and status code
response = requests.get(current_url, timeout=10)
status_code = response.status_code
# Save URL with status code only if >= 300
self.save_url(normalized_url, status_code)
# Only process links if it's a successful response
new_links = self.get_links(current_url, response)
# Add new links to queue
for link in new_links:
if self.normalize_url(link) not in self.visited:
self.queue.append(link)
except requests.RequestException as e:
print(f"Error accessing {normalized_url}: {str(e)}")
# Save failed requests with status code 0
self.save_url(normalized_url, 0)
# Be nice to the server
time.sleep(1)
print(f"Crawl complete! Found {len(self.visited)} URLs")
def main():
# Get input from user
start_url = input("Enter the website URL to crawl: ")
output_file = input("Enter the output CSV filename (e.g., links.csv): ")
# Create and run crawler
crawler = WebCrawler(start_url, output_file)
crawler.crawl()
if __name__ == "__main__":
main()