-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigrate_server.py
More file actions
373 lines (305 loc) · 14.3 KB
/
migrate_server.py
File metadata and controls
373 lines (305 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#!/usr/bin/env python3
#
# Migrate all rooms from an old to a new matrix server:
# 1. Create all rooms on new server which exist on the old server instance
# 2. Copy all contents from rooms on old server to newly created roooms
#
# For forward references
from __future__ import annotations
from dataclasses import dataclass
import toml
import argparse
import logging
import pathlib
import sys
import nio
import asyncio
import io
HOME = pathlib.Path.home()
def sys_exit(msg, exit=True) -> None:
print(msg, file=sys.stderr)
logging.error(msg)
if exit:
sys.exit(-1)
@dataclass(kw_only=True)
class Server:
server: str = ''
user: str = ''
password: str = ''
token: str = ''
class Config:
SERVER_CREDS = '.server_creds.toml'
# Get server config TOML and return as map
def __init__(self, creds: str = None) -> None:
self.old_list = ['1', 't', 'p', 'v']
self.new_list = ['2', 'u', 'q', 'w']
if creds == None:
self.creds = HOME / pathlib.Path(self.SERVER_CREDS)
else:
self.creds = pathlib.Path(creds)
ns_map = self.parse_cmdline()
self.read_creds()
self.get_cmdline(ns_map)
def get_verbose(self) -> bool:
return self.verbose
def parse_cmdline(self) -> dict[str,str]:
parser = argparse.ArgumentParser('Process server configurations')
tokens = ['server', 'user', 'password', 'token']
self.old_map = {}
enum0 = list(enumerate(tokens))
help_list = []
# Consume copy only in first loop
enum1 = enum0.copy()
for old in self.old_list:
tup = enum1.pop(0)
self.old_map[old] = tup[1]
help_list.append('old ' + tup[1])
self.new_map = {}
for new in self.new_list:
tup = enum0.pop(0)
self.new_map[new] = tup[1]
help_list.append('new ' + tup[1])
for opt in self.old_list+self.new_list:
help = help_list.pop(0)
parser.add_argument('-' + opt, type=str, help=help)
parser.add_argument('-V', '--verbose', action='store_true')
parser.add_argument('-c', '--config', type=str)
# Construct map from parsed option namespace (skip empty elements)
ns_map = dict(filter(lambda item: item[1] is not None, vars(parser.parse_args()).items()))
if 'config' in ns_map:
self.creds = ns_map['config']
self.verbose = ns_map['verbose']
return ns_map
def read_creds(self) -> None:
with open(self.creds) as f:
config = toml.load(f)
self.old = Server(**config['old'])
self.new = Server(**config['new'])
# Parse cmdline parms overwriting existing config map from TOML file
def get_cmdline(self, ns_map: dict[str, str]) -> None:
for key in ns_map:
if key in self.old_list:
setattr(self.old, self.old_map[key], ns_map[key])
elif key in self.new_list:
setattr(self.new, self.new_map[key], ns_map[key])
def get_verb(self) -> bool:
return self.verbose
# HACK:
# Get media from old server before uploading to new server
async def download_mxc(server: Matrix_Server, url: str) -> bytearray:
response = await server.client.download(mxc=url)
if hasattr(response, 'body'):
return response
else:
return b''
class Matrix_Server:
device_cnt: int = 1
def __init__(self, server:Server, verbose=False, old: Matrix_Server = None) -> None:
self.server = server
self.verb = verbose
self.old = old
self.device = f'migrate_server_{self.device_cnt}'
self.device_cnt += 1
# For the display names of created rooms
self.room_names = []
async def login(self) -> nio.AsyncClient:
self.client = nio.AsyncClient(
homeserver=self.server.server,
user=self.server.user,
config=nio.AsyncClientConfig(store=nio.store.database.SqliteMemoryStore),
)
self.client.device_id = self.device
# We prefer passwords over access tokens
if len(self.server.password) > 0:
login_resp = await self.client.login(password=self.server.password)
else:
login_resp = await self.client.login(token=self.server.token)
if self.verb:
sys_exit(f'Logged into server {self.server.server}', False)
if isinstance(login_resp, (nio.LoginResponse)):
await self.sync()
self.client.load_store()
self.rooms = self.client.rooms
for room in self.rooms:
self.room_names.append(self.rooms[room].display_name)
else:
logging.error(f'{str(login_resp)}')
sys_exit(f'Cannot log into {self.server.server}, aborting')
async def logout(self) -> None:
resp = await self.client.logout(False)
if isinstance(resp, nio.LogoutError):
logging.error(f'Logging out from {self.server.server} failed with {str(resp)}')
if self.verb:
sys_exit(f'Logged out of server {self.server.server}', False)
async def sync(self) -> None:
sync_resp = await self.client.sync(full_state=True, timeout=30000,
# Limit fetch of room events as they will be fetched later
sync_filter={"room": {"timeline": {"limit": 1}}})
if isinstance(sync_resp, (nio.SyncError)):
logging(f'Error syncing: {str(sync_resp)}', False)
def get_rooms(self) -> list[nio.MatrixRoom]:
return self.rooms
async def create_room(self, room_name: str) -> nio.MatrixRoom:
old_room = self.old.get_room(room_name)
resp = await self.client.room_create(visibility=nio.RoomVisibility.public,
room_version=old_room.room_version,
name=room_name, topic=old_room.topic)
if isinstance(resp, (nio.RoomCreateError)):
logging.error(f'Room creation error {str(resp)}')
sys_exit(f'Cannot create room {room_name}, aborting')
elif self.verb:
sys_exit(f'Created room {room_name}', False)
await self.sync()
# Update cached data
self.rooms = self.client.rooms
self.room_names.append(room_name)
room = self.get_room(room_name)
avatar_url = old_room.gen_avatar_url
# Create avatar for new room if possible
if avatar_url != None and len(avatar_url) > 0:
avatar = await download_mxc(self.old, avatar_url)
if isinstance(avatar, nio.DownloadResponse):
body = avatar.body
size = len(body)
if size > 0:
resp, _ = await self.client.upload(io.BytesIO(avatar.body), avatar.content_type, filesize=size)
if self.verb:
if isinstance(resp, nio.UploadResponse):
sys_exit(f'Uploaded room avatar, obtained URL {resp.content_uri}', False)
else:
sys_exit(f'Error when uploading room avatar: {str(resp)}', False)
content = {
'url': resp.content_uri
}
resp = await self.client.room_put_state(room.room_id, 'm.room.avatar', content)
if self.verb:
if isinstance(resp, nio.RoomPutStateError):
sys_exit(f'Setting room state of {room.room_id} resulted in {str(resp)}', False)
return room
async def fetch_room_events(self, start_token: str, room: nio.MatrixRoom, direction: nio.MessageDirection) -> list[nio.Event]:
events = []
while True:
resp = await self.client.room_messages(room.room_id, start_token,
limit=1000, direction=direction)
if isinstance(resp, nio.RoomMessagesError):
logging.error(f'Failed to get messages for room {room.display_name} @ start_token {start_token}')
if len(resp.chunk) == 0:
break
events.extend(event for event in resp.chunk if isinstance(event, (nio.RoomMessageFormatted, nio.RedactedEvent,
nio.RoomMessageMedia, nio.RoomEncryptedMedia)))
start_token = resp.end
if self.verb:
sys_exit(f'Fetched {len(events)} from room {room.display_name} in direction {str(direction)}', False)
return events
async def get_room_events(self, room: nio.MatrixRoom) -> list[nio.Event]:
sync_resp = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}})
start_token = sync_resp.rooms.join[room.room_id].timeline.prev_batch
events = await self.fetch_room_events(start_token, room, nio.MessageDirection.back)
events.reverse()
events += await self.fetch_room_events(start_token, room, nio.MessageDirection.front)
if self.verb:
sys_exit(f'Fetched {len(events)} in total from room {room.display_name}', False)
return events
def get_room(self, display_name: str) -> nio.MatrixRoom:
if display_name in self.room_names:
for room in self.rooms:
if self.rooms[room].display_name == display_name:
return self.rooms[room]
else:
return None
def get_room_from_id(self, room_id: str) -> nio.MatrixRoom:
if room_id in self.rooms:
return self.rooms[room_id]
return None
async def post_event(self, room: nio.MatrixRoom, event: nio.Event) -> None:
if isinstance(event, (nio.RoomMessageMedia, nio.RoomEncryptedMedia)):
# mime = event.mimetype
media_data_resp = await download_mxc(self.old, event.url)
if hasattr(media_data_resp, 'body'):
body = media_data_resp.body
name = media_data_resp.filename
mime = media_data_resp.content_type
body_size = len(body)
resp, _ = await self.client.upload(data_provider=io.BytesIO(body), content_type=mime,
filename=name, filesize=body_size)
if isinstance(resp, nio.UploadResponse):
if self.verb:
sys_exit(f'Uploaded {name}, obtained URL {resp.content_uri}', False)
content = {
'body': name,
'info': {
'size': body_size,
'mimetype': mime,
},
'url': resp.content_uri
}
else:
err_str = str(resp)
sys_exit(f'Error when uploading {name}: {err_str}', False)
content = {
'body': err_str
}
else:
content = {
'body': 'Empty body, something went wrong'
}
if self.verb:
sys_exit(f'mxc_download returned empty body from url {event.url}', False)
else:
content = {
'body': event.body,
}
msgtype = event.source['content']['msgtype']
content['msgtype'] = msgtype
try:
await self.client.room_send(room.room_id, message_type='m.room.message', content=content)
except Exception as e:
sys_exit(f'Exception {str(e)} occurred during message sending')
if self.verb:
strexc = str(content['body'])[:20] + '...'
sys_exit(f'Posted {msgtype} with body {strexc} to room {room.display_name}', False)
async def send_events(self, room: nio.MatrixRoom, events: list[nio.Event]):
for event in events:
# Filter messages
if isinstance(event, (nio.RoomMessageText,nio.RoomMessageMedia, nio.RoomEncryptedMedia)):
await self.post_event(room, event)
if self.verb:
sys_exit(f'Posted {len(events)} to room {room.display_name}', False)
def get_room_name(self, room: nio.MatrixRoom) -> str:
return room.display_name
def get_room_names(self) -> list[str]:
return self.room_names
# Worker class helper for asynchronous execution
class Worker:
def __init__(self, old: Matrix_Server, new: Matrix_Server) -> None:
self.old = old
self.new = new
async def process_events(self, room_obj: nio.MatrixRoom) -> None:
room_name = self.old.get_room_name(room_obj)
events = await self.old.get_room_events(room_obj)
if len(events) > 0:
if room_name not in self.new.get_room_names():
new_room = await self.new.create_room(room_name)
else:
new_room = self.new.get_room(room_name)
# Copy events from old to new room
await self.new.send_events(new_room, events)
async def main() -> None:
LOG_DIR = pathlib.Path(HOME, 'log')
logging.basicConfig(filename=str(LOG_DIR/pathlib.Path(__file__).stem)+'.log', filemode='a', level=logging.DEBUG, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
config = Config()
verb = config.get_verbose()
old = Matrix_Server(config.old, verbose=verb)
await old.login()
new = Matrix_Server(config.new, verbose=verb, old=old)
await new.login()
aws = []
for room in old.rooms:
room_obj = old.get_room_from_id(room)
worker = Worker(old, new)
aws.append(worker.process_events(room_obj))
await asyncio.gather(*aws)
await new.logout()
await old.logout()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())