-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
100 lines (74 loc) · 3.33 KB
/
main.py
File metadata and controls
100 lines (74 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
import csv, json
from requests import session
from utils import format_company, format_profile, find_username
scrape_mode = 'company' # 'person' or 'company'
timeout = 10
class Client():
def __init__(self, cookies_filepath='cookies.json'):
self.session = session()
with open(cookies_filepath) as f:
data = json.loads(f.read())
self.session.cookies.update(data['cookies'])
self.session.headers.update(data['headers'])
def get_profile(self, username):
res = self.session.get(f'https://linkedin.com/voyager/api/identity/profiles/{username}/profileView')
if res.status_code not in [200, 404, 403]:
print('** ERROR: Request failed, status code: ', res.status_code)
raise Exception('Requst failed.')
if res.status_code == 404:
print(f'** WARN: "{username} not found.')
return ""
if res.status_code == 403:
print(f'** WARN: "{username}" profile is forbidden.')
return ""
included = res.json()['included']
return format_profile(included)
def get_company(self, universal_name):
params = {
"decorationId": "com.linkedin.voyager.deco.organization.web.WebFullCompanyMain-12",
"q": "universalName",
"universalName": universal_name,
}
url = "https://www.linkedin.com/voyager/api/organization/companies"
res = self.session.get(url, params=params)
if res.status_code not in [200, 404, 403]:
print('** ERROR: Request failed, status code: ', res.status_code)
raise Exception('Requst failed.')
if res.status_code == 404:
print(f'** WARN: "{universal_name} not found.')
return ""
if res.status_code == 403:
print(f'** WARN: "{universal_name}" profile is forbidden.')
return ""
return format_company(res.json()['included'], universal_name)
def get_input_profiles(input_filepath='profiles.txt', type_='person'):
with open(input_filepath) as f:
lines = f.readlines()
profiles = list(filter(None, [find_username(l.strip(), type_)
for l in lines]))
return profiles
def scrape_profiles(profiles, output_filepath, type_='person'):
client = Client()
output_file = open(output_filepath, 'w+', encoding='utf-8')
writer = csv.DictWriter(output_file, fieldnames=['username', 'text'])
writer.writeheader()
for i, p in enumerate(profiles):
print(f'#{i+1}. Scraping: {p}')
if type_ == 'person':
text = client.get_profile(p)
else:
text = client.get_company(p)
writer.writerow({'username': p, 'text': text})
if i != len(profiles) - 1: time.sleep(timeout)
output_file.close()
if __name__ == '__main__':
linkedin_type = scrape_mode
profiles = get_input_profiles(type_=linkedin_type)
print(f'Found {len(profiles)} linkedin company profiles.')
if len(profiles) > 80:
print('** ERROR: Cannot scrape more than 80 profiles at a time.')
elif len(profiles) == 0:
print('** WARN: No linkedin profiles found.')
else:
scrape_profiles(profiles, output_filepath='profiles_output.csv', type_=linkedin_type)