-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmergeBackups.py
More file actions
279 lines (243 loc) · 11 KB
/
mergeBackups.py
File metadata and controls
279 lines (243 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import os, sys
import sqlite3
from pathlib import Path
''' Merge any number of Readerware 3 SQLite exports into one master.
Handles all lookup re-mapping and deduplication.
'''
fout = open("mergeBackups.log", "w", encoding="utf-8")
def merge_books(source_dbs, target_db):
"""
The main merging function.
source_dbs: list of source DB file paths
target_db: target DB file path
target created from schema.sql, so repeated runs should produce the same result.
Some failure when source DBs have repeated entries.
Uses content hash for deduplication.
Adds provenance info to lookup tables.
"""
errors = 0
src_lookups = {} # should be created once and survives between DBs
reverse_lookups = {} # should be one to one, but for checking integrity failures, repeated inside some DBs
# print("address of src_lookups:", id(src_lookups))
# print("address of reverse_lookups:", id(reverse_lookups))
target = sqlite3.connect(target_db)
target.execute("PRAGMA foreign_keys = OFF") # speed
tc = target.cursor()
# ------------------------------------------------------------------
# 1. Ensure target has all the lookup tables + a content hash column
# ------------------------------------------------------------------
try:
for table in ["CONTRIBUTOR", "PUBLISHER_LIST", "PUBLICATION_PLACE_LIST", "CATEGORY_LIST", "FORMAT_LIST", "LANGUAGE_LIST"]:
tc.execute(f"ALTER TABLE {table} ADD COLUMN merge_source TEXT") # optional provenance
target.commit()
except sqlite3.OperationalError:
pass # already done
except Exception as e:
print("Error altering target DB:", e)
target.close()
return
# Cache dictionaries: source_value → target_ROWKEY
lookup_maps = {
"CONTRIBUTOR": {}, # "Asimov, Isaac" → 42
"PUBLISHER_LIST": {},
"PUBLICATION_PLACE_LIST": {},
"CATEGORY_LIST": {},
"FORMAT_LIST": {},
"LANGUAGE_LIST": {},
}
def get_or_create_lookup(table, item):
""" Given a lookup table name and a text value, return the ROWKEY in target DB.
If not found, insert a new entry and return that ROWKEY.
Caches results in lookup_maps for speed.
"""
name = None
# Look in target first
if table == "CONTRIBUTOR":
try:
name = (item[0] or "").strip()
fout.write(f"\nLooking up CONTRIBUTOR: '{name}' ")
if name == "Wil Mccarthy": # or name == "Bill O'Reilly":
print("debug")
except Exception as e:
return None
map_dict = lookup_maps[table]
if name == 'Wil Mccarthy':
print(len(map_dict))
if item in map_dict:
print(f"Found {table} '{name}' in cache with ROWKEY {map_dict[item]}" )
return map_dict[item]
field = "NAME"
try:
tc.execute(f"SELECT ROWKEY, NAME, SORT_NAME FROM {table} WHERE {field} = ?", (name,))
print(f"Queried {table} for '{name}'")
except Exception as e:
print(e)
else:
listitem = (item or "").strip()
if not listitem:
return None
map_dict = lookup_maps[table]
if listitem in map_dict:
return map_dict[listitem]
field = "LISTITEM"
tc.execute(f"SELECT ROWKEY FROM {table} WHERE {field} = ?", (listitem,))
print(f"Queried {table} for '{listitem}'")
row = tc.fetchone()
if row:
rowkey = row[0]
print(f"Found existing {table} ROWKEY {rowkey} for '{item}'")
else:
# Insert new
try:
if table == "CONTRIBUTOR":
fout.write(f" => Inserting '{name}' ")
sort_name = item[1]
tc.execute(f"INSERT INTO {table} ({field}, SORT_NAME, merge_source ) VALUES (?,?,?)", (name, sort_name, row_dict['PROVENANCE']))
fout.write(f" -> ROWKEY {tc.lastrowid}")
else:
tc.execute(f"INSERT INTO {table} ({field},merge_source ) VALUES (?,?)", (item, row_dict['PROVENANCE']))
rowkey = tc.lastrowid
except sqlite3.IntegrityError as e:
fout.write(f"Fail !! -> ROWKEY {tc.lastrowid}")
print(f"Integrity error inserting ({field}) ({item}) into ({table}): {e}")
return None
except Exception as e:
print(f"Error inserting ({field}) ({item}) into ({table}): {e}")
return None
map_dict[item] = rowkey
if[0] == "Wil Mccarthy":
print
print(f"Mapped {table} '{item}' to ROWKEY {rowkey}")
return rowkey
# ------------------------------------------------------------------
# 2. Process each source database
# ------------------------------------------------------------------
for src_path in source_dbs:
print(f"\nMerging {src_path} → {target_db}")
src = sqlite3.connect(src_path)
sc = src.cursor()
try:
# Verify integrity of source DB
sc.execute("PRAGMA integrity_check")
result = sc.fetchone()
if result == ('ok',):
print(f"Database '{src_path}' is valid and connection is solid.")
else:
print(f"Database '{src_path}' has integrity issues: {result}")
except sqlite3.Error as e:
print(f"Failed to connect or verify database integrity: {e}")
# Build local lookup caches for this source (speed)
# if not src_lookups:
# src_lookups = {}
# reverse_lookups = {} # should be one to one, but for checking integrity failures
for table in lookup_maps:
if table not in src_lookups:
src_lookups[table] = {}
if table == "CONTRIBUTOR":
sc.execute(f"SELECT ROWKEY, NAME, SORT_NAME FROM {table}") # different text col, sigh
for rowkey, name, sort_name in sc.fetchall():
name = (name or "").strip()
if name:
src_lookups[table][rowkey] = name, sort_name
reverse_lookups.setdefault(table, {}).setdefault(name, []).append(rowkey)
else:
sc.execute(f"SELECT ROWKEY, LISTITEM FROM {table}")
for rowkey, name in sc.fetchall():
name = (name or "").strip()
if name:
src_lookups[table][rowkey] = name
reverse_lookups.setdefault(table, {}).setdefault(name, []).append(rowkey)
# for table, name_map in reverse_lookups.items():
# for name, keys in name_map.items():
# unique_keys = set(keys)
# if len(unique_keys) > 1:
# print(f"Warning: In source {src_path}, {table} has duplicate entries for '{name}': {keys}")
# ------------------------------------------------------------------
# 3. Iterate every book in source
# ------------------------------------------------------------------
sc.execute("SELECT * FROM BOOKS")
columns = [desc[0] for desc in sc.description]
inserted = updated = skipped = 0
for row in sc:
row_dict = dict(zip(columns, row))
# 3a. Re-map all foreign keys using the master lookup tables
for field, table in [
("AUTHOR", "CONTRIBUTOR"),
("AUTHOR2", "CONTRIBUTOR"),
("AUTHOR3", "CONTRIBUTOR"),
("PUBLISHER", "PUBLISHER_LIST"),
("PUB_PLACE", "PUBLICATION_PLACE_LIST"),
("CATEGORY1", "CATEGORY_LIST"),
("CATEGORY2", "CATEGORY_LIST"),
("CATEGORY3", "CATEGORY_LIST"),
("FORMAT", "FORMAT_LIST"),
("CONTENT_LANGUAGE", "LANGUAGE_LIST"),
]:
''' Get the old key from the source row, look up its text name,
then get or create the corresponding key in the target DB
and update the row_dict field to point to that new key.
'''
old_key = row_dict.get(field)
if old_key is not None:
old_name = src_lookups[table].get(old_key, "")
if old_name == "":
print(f"Warning: In source {src_path}, no name found for {table} ROWKEY {old_key} (field {field}) in book '{row_dict.get('TITLE', '')}'")
else:
new_key = get_or_create_lookup(table, old_name) #rc_lookups)
row_dict[field] = new_key
# 3. get the current hash value for deduplication
hash = row_dict['HASH']
# 3c. Check if this exact book already exists in target
tc.execute("SELECT ROWKEY FROM BOOKS WHERE HASH = ?", (hash,))
exists = tc.fetchone()
if exists and isinstance(exists, tuple) :
continue # No need to reinsert! I hope.
# 3d. Insert new book
cols = ", ".join(f'"{c}"' for c in row_dict.keys())
placeholders = ", ".join("?" for _ in row_dict)
row_dict['ROWKEY'] = None
try:
tc.execute(f"INSERT INTO BOOKS ({cols}) VALUES ({placeholders})",
list(row_dict.values()))
except Exception as e:
print(f"{errors} {inserted} {row_dict['TITLE']} {e}")
errors += 1
pass
inserted += 1
src.close()
print(f" → {inserted} inserted, {updated} covers upgraded, {skipped} duplicates skipped")
target.commit()
target.execute("VACUUM")
target.close()
print("\nAll done. Master database ready.")
if __name__ == "__main__":
# hardcoded paths for my use, adjust as needed
target_db = "converted_readerware/books.db"
source_dbs = "rw_converted"
if os.path.exists(target_db):
os.unlink(target_db)
workDir = os.path.dirname(target_db)
# Create target DB from schema.sql
schemaPath = os.path.join(workDir, "schema.sql")
with open(schemaPath, "r") as f:
try:
schema_sql = f.read()
target = sqlite3.connect(target_db)
tc = target.cursor()
tc.executescript(schema_sql)
target.commit()
target.close()
except Exception as e:
print("Error creating target DB schema:", e)
sys.exit(1)
# hardcoded db names for my use, ordered by size/date
sources = [
"Books To Read Next.db",
"BorrowedBooks.db" ,
"MyOwnBooks.db",
"BookCatalog.db",
"McCollough.db",
"NewMcCollough.db"]
source_dbs = [os.path.join(source_dbs, s) for s in sources]
merge_books(source_dbs, target_db)
fout.close()