-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscrape_swc_data.py
More file actions
325 lines (262 loc) · 10.7 KB
/
scrape_swc_data.py
File metadata and controls
325 lines (262 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/env python3
"""
Script to scrape SWC Holiday Spree user data and store in MariaDB database.
"""
import sys
import time
import uuid
import os
from datetime import datetime
from typing import Optional, Dict, Any
from pathlib import Path
try:
import requests
import pymysql
import yaml
except ImportError:
print("Missing required packages. Installing...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "pymysql", "pyyaml"])
import requests
import pymysql
import yaml
def load_config(config_path: str = "config.yaml") -> Dict[str, Any]:
"""
Load configuration from YAML file.
Args:
config_path: Path to the configuration file
Returns:
Configuration dictionary
"""
if not os.path.exists(config_path):
print(f"Error: Configuration file '{config_path}' not found.")
print("Please copy 'config.yaml.example' to 'config.yaml' and fill in your credentials.")
sys.exit(1)
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config
except yaml.YAMLError as e:
print(f"Error parsing configuration file: {e}")
sys.exit(1)
def generate_uuid7() -> str:
"""
Generate a UUID v7 (time-ordered UUID).
Uses timestamp-based ordering for better database indexing.
"""
# Get current timestamp in milliseconds
timestamp_ms = int(datetime.now().timestamp() * 1000)
# Generate random UUID and modify it to be UUID7-like
# UUID7 format: timestamp (48 bits) + version (4 bits) + random (12 bits) + variant (2 bits) + random (62 bits)
random_uuid = uuid.uuid4()
uuid_int = random_uuid.int
# Set version to 7 (0111 in binary)
uuid_int = (uuid_int & ~(0xF000 << 64)) | (0x7000 << 64)
# Set variant to 10 (RFC 4122)
uuid_int = (uuid_int & ~(0xC000000000000000)) | (0x8000000000000000)
# Insert timestamp into the first 48 bits
uuid_int = (uuid_int & ((1 << 80) - 1)) | (timestamp_ms << 80)
return str(uuid.UUID(int=uuid_int))
def fetch_user_data(user_id: int, config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Fetch user data from the API.
Args:
user_id: The user ID to fetch
config: Configuration dictionary
Returns:
User data dictionary or None if request fails
"""
api_config = config['api']
url = f"{api_config['base_url']}?{api_config['user_id_param']}={user_id}"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("user"):
return data["user"]
else:
print(f"No data found for user ID {user_id}")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching user ID {user_id}: {e}")
return None
except ValueError as e:
print(f"Error parsing JSON for user ID {user_id}: {e}")
return None
def log_audit_trail(cursor, id_external: int, field_name: str, old_value: Any, new_value: Any, change_type: str) -> None:
"""
Log changes to the audit_trail table.
Args:
cursor: Database cursor
id_external: External user ID
field_name: Name of the field that changed
old_value: Previous value
new_value: New value
change_type: Type of change (INSERT or UPDATE)
"""
try:
audit_id = generate_uuid7()
sql = """
INSERT INTO `audit_trail`
(`id`, `id_external`, `field_name`, `old_value`, `new_value`, `change_type`)
VALUES (%s, %s, %s, %s, %s, %s)
"""
values = (
audit_id,
id_external,
field_name,
str(old_value) if old_value is not None else None,
str(new_value) if new_value is not None else None,
change_type
)
cursor.execute(sql, values)
except Exception as e:
print(f"Warning: Error logging audit trail: {e}")
def upsert_user_data(cursor, user_data: Dict[str, Any]) -> bool:
"""
Insert or update user data in the database with audit trail tracking.
Args:
cursor: Database cursor
user_data: User data dictionary
Returns:
True if successful, False otherwise
"""
try:
id_external = user_data["id"]
new_full_name = user_data["full_name"]
new_email = user_data["email_address"]
new_entries = int(user_data["number_of_entries"])
new_amount = float(user_data["accumulated_amount"])
# Check if record exists
check_sql = """
SELECT `full_name`, `email_address`, `number_of_entries`, `accumulated_amount`
FROM `swc_data`
WHERE `id_external` = %s
"""
cursor.execute(check_sql, (id_external,))
existing_record = cursor.fetchone()
if existing_record:
# Record exists - perform update and track changes
old_full_name, old_email, old_entries, old_amount = existing_record
# Update the record
update_sql = """
UPDATE `swc_data`
SET `full_name` = %s,
`email_address` = %s,
`number_of_entries` = %s,
`accumulated_amount` = %s,
`updated_at` = CURRENT_TIMESTAMP
WHERE `id_external` = %s
"""
cursor.execute(update_sql, (new_full_name, new_email, new_entries, new_amount, id_external))
# Track changes in audit trail for monitored fields
if old_entries != new_entries:
log_audit_trail(cursor, id_external, 'number_of_entries', old_entries, new_entries, 'UPDATE')
if float(old_amount) != new_amount:
log_audit_trail(cursor, id_external, 'accumulated_amount', old_amount, new_amount, 'UPDATE')
# Also track other field changes
if old_full_name != new_full_name:
log_audit_trail(cursor, id_external, 'full_name', old_full_name, new_full_name, 'UPDATE')
if old_email != new_email:
log_audit_trail(cursor, id_external, 'email_address', old_email, new_email, 'UPDATE')
return True
else:
# Record doesn't exist - perform insert
uuid7_id = generate_uuid7()
insert_sql = """
INSERT INTO `swc_data`
(`id`, `id_external`, `full_name`, `email_address`, `number_of_entries`, `accumulated_amount`)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (uuid7_id, id_external, new_full_name, new_email, new_entries, new_amount))
# Log insert to audit trail
log_audit_trail(cursor, id_external, 'number_of_entries', None, new_entries, 'INSERT')
log_audit_trail(cursor, id_external, 'accumulated_amount', None, new_amount, 'INSERT')
log_audit_trail(cursor, id_external, 'full_name', None, new_full_name, 'INSERT')
log_audit_trail(cursor, id_external, 'email_address', None, new_email, 'INSERT')
return True
except Exception as e:
print(f"Error upserting user data (ID: {user_data.get('id')}): {e}")
return False
def main():
"""Main execution function."""
# Load configuration
config = load_config()
# Extract database configuration
db_config = config['database']
# Extract scraping configuration
scraping_config = config['scraping']
START_ID = scraping_config['start_id']
MAX_CONSECUTIVE_404 = scraping_config['max_consecutive_404']
RATE_LIMIT_DELAY = scraping_config['rate_limit_delay']
print("Connecting to database...")
try:
# Connect to database
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print(f"Connected to database: {db_config['database']}")
print(f"Starting adaptive data scraping (auto-detecting range)...")
print(f"Stop condition: {MAX_CONSECUTIVE_404} consecutive 404 errors\n")
success_count = 0
fail_count = 0
insert_count = 0
update_count = 0
consecutive_404_count = 0
user_id = START_ID
while True:
print(f"Processing user ID {user_id}...", end=" ")
# Fetch user data from API
user_data = fetch_user_data(user_id, config)
if user_data:
# Reset consecutive 404 counter on success
consecutive_404_count = 0
# Check if record exists to determine if it's an insert or update
check_sql = "SELECT COUNT(*) FROM `swc_data` WHERE `id_external` = %s"
cursor.execute(check_sql, (user_data["id"],))
record_exists = cursor.fetchone()[0] > 0
# Upsert into database
if upsert_user_data(cursor, user_data):
connection.commit()
if record_exists:
print(f"✓ Updated - {user_data['full_name']}")
update_count += 1
else:
print(f"✓ Inserted - {user_data['full_name']}")
insert_count += 1
success_count += 1
else:
print("✗ Failed to upsert")
fail_count += 1
consecutive_404_count += 1
else:
print("✗ No data")
fail_count += 1
consecutive_404_count += 1
# Check if we should stop
if consecutive_404_count >= MAX_CONSECUTIVE_404:
print(f"\n{'='*60}")
print(f"Reached {MAX_CONSECUTIVE_404} consecutive 404 errors.")
print(f"Stopping at ID {user_id}. Likely reached end of data.")
print(f"{'='*60}")
break
user_id += 1
# Rate limiting - be nice to the server
time.sleep(RATE_LIMIT_DELAY)
print(f"\n{'='*60}")
print(f"Scraping completed!")
print(f"ID Range Checked: {START_ID} to {user_id}")
print(f"Success: {success_count} (Inserted: {insert_count}, Updated: {update_count})")
print(f"Failed: {fail_count}")
print(f"Total: {success_count + fail_count}")
print(f"{'='*60}")
except pymysql.Error as e:
print(f"Database error: {e}")
sys.exit(1)
finally:
if 'cursor' in locals():
cursor.close()
if 'connection' in locals():
connection.close()
print("\nDatabase connection closed.")
if __name__ == "__main__":
main()