1010import math
1111import hashlib
1212from datetime import datetime , timezone
13+ import concurrent .futures
1314
1415this_dir = os .path .dirname (os .path .realpath (__file__ ))
15- CHUNK_SIZE = 10 * 1024 * 1024
16+
17+ CHUNK_SIZE = 100 * 1024 * 1024
18+ # there is an upper limit for chunk size on server, ideally should be requested from there once implemented
19+ UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024
20+
1621IGNORE_EXT = re .compile (r'({})$' .format (
1722 '|' .join (re .escape (x ) for x in ['-shm' , '-wal' , '~' , 'pyc' , 'swap' ])
1823))
@@ -360,7 +365,7 @@ def project_versions(self, project_path):
360365 resp = self .get ("/v1/project/version/{}" .format (project_path ))
361366 return json .load (resp )
362367
363- def download_project (self , project_path , directory ):
368+ def download_project (self , project_path , directory , parallel = True ):
364369 """
365370 Download latest version of project into given directory.
366371
@@ -369,6 +374,9 @@ def download_project(self, project_path, directory):
369374
370375 :param directory: Target directory
371376 :type directory: String
377+
378+ :param parallel: Use multi-thread approach to download files in parallel requests, default True
379+ :type parallel: Boolean
372380 """
373381 if os .path .exists (directory ):
374382 raise Exception ("Project directory already exists" )
@@ -377,8 +385,23 @@ def download_project(self, project_path, directory):
377385 project_info = self .project_info (project_path )
378386 version = project_info ['version' ] if project_info ['version' ] else 'v0'
379387
380- for file in project_info ['files' ]:
381- self ._download_file (project_path , version , file , directory )
388+ # sending parallel requests is good for projects with a lot of small files
389+ if parallel :
390+ with concurrent .futures .ThreadPoolExecutor () as executor :
391+ futures_map = {}
392+ for file in project_info ['files' ]:
393+ future = executor .submit (self ._download_file , project_path , version , file , directory , parallel )
394+ futures_map [future ] = file
395+
396+ for future in concurrent .futures .as_completed (futures_map ):
397+ file = futures_map [future ]
398+ try :
399+ future .result (60 )
400+ except concurrent .futures .TimeoutError :
401+ raise ClientError ("Timeout error: failed to download {}" .format (file ))
402+ else :
403+ for file in project_info ['files' ]:
404+ self ._download_file (project_path , version , file , directory , parallel )
382405
383406 data = {
384407 "name" : project_path ,
@@ -387,12 +410,14 @@ def download_project(self, project_path, directory):
387410 }
388411 save_project_file (directory , data )
389412
390- def push_project (self , directory ):
413+ def push_project (self , directory , parallel = True ):
391414 """
392415 Upload local changes to the repository.
393416
394417 :param directory: Project's directory
395418 :type directory: String
419+ :param parallel: Use multi-thread approach to upload files in parallel requests, defaults to True
420+ :type parallel: Boolean
396421 """
397422 local_info = inspect_project (directory )
398423 project_path = local_info ["name" ]
@@ -408,7 +433,7 @@ def push_project(self, directory):
408433
409434 upload_files = changes ["added" ] + changes ["updated" ]
410435 for f in upload_files :
411- f ["chunks" ] = [str (uuid .uuid4 ()) for i in range (math .ceil (f ["size" ] / CHUNK_SIZE ))]
436+ f ["chunks" ] = [str (uuid .uuid4 ()) for i in range (math .ceil (f ["size" ] / UPLOAD_CHUNK_SIZE ))]
412437
413438 data = {
414439 "version" : local_info .get ("version" ),
@@ -419,19 +444,23 @@ def push_project(self, directory):
419444
420445 # upload files' chunks and close transaction
421446 if upload_files :
422- headers = {"Content-Type" : "application/octet-stream" }
423- for f in upload_files :
424- with open (os .path .join (directory , f ["path" ]), 'rb' ) as file :
425- for chunk in f ["chunks" ]:
426- data = file .read (CHUNK_SIZE )
427- checksum = hashlib .sha1 ()
428- checksum .update (data )
429- size = len (data )
430- resp = self .post ("/v1/project/push/chunk/%s/%s" % (info ["transaction" ], chunk ), data , headers )
431- data = json .load (resp )
432- if not (data ['size' ] == size and data ['checksum' ] == checksum .hexdigest ()):
433- self .post ("/v1/project/push/cancel/%s" % info ["transaction" ])
434- raise ClientError ("Mismatch between uploaded file and local one" )
447+ if parallel :
448+ with concurrent .futures .ThreadPoolExecutor () as executor :
449+ futures_map = {}
450+ for file in upload_files :
451+ future = executor .submit (self ._upload_file , info ["transaction" ], directory , file , parallel )
452+ futures_map [future ] = file
453+
454+ for future in concurrent .futures .as_completed (futures_map ):
455+ file = futures_map [future ]
456+ try :
457+ future .result (60 )
458+ except concurrent .futures .TimeoutError :
459+ raise ClientError ("Timeout error: failed to upload {}" .format (file ))
460+ else :
461+ for file in upload_files :
462+ self ._upload_file (info ["transaction" ], directory , file , parallel )
463+
435464 try :
436465 resp = self .post ("/v1/project/push/finish/%s" % info ["transaction" ])
437466 info = json .load (resp )
@@ -443,12 +472,14 @@ def push_project(self, directory):
443472 local_info ["version" ] = info ["version" ]
444473 save_project_file (directory , local_info )
445474
446- def pull_project (self , directory ):
475+ def pull_project (self , directory , parallel = True ):
447476 """
448477 Fetch and apply changes from repository.
449478
450479 :param directory: Project's directory
451480 :type directory: String
481+ :param parallel: Use multi-thread approach to fetch files in parallel requests, defaults to True
482+ :type parallel: Boolean
452483 """
453484
454485 local_info = inspect_project (directory )
@@ -485,12 +516,31 @@ def backup_if_conflict(path, checksum):
485516 fetch_files = pull_changes ["added" ] + pull_changes ["updated" ]
486517 if fetch_files :
487518 temp_dir = os .path .join (directory , '.mergin' , 'fetch_{}-{}' .format (local_info ["version" ], server_info ["version" ]))
488- for file in fetch_files :
489- self ._download_file (project_path , server_info ['version' ], file , temp_dir )
490- src = os .path .join (temp_dir , file ["path" ])
491- dest = local_path (file ["path" ])
492- backup_if_conflict (file ["path" ], file ["checksum" ])
493- move_file (src , dest )
519+ # sending parallel requests is good for projects with a lot of small files
520+ if parallel :
521+ with concurrent .futures .ThreadPoolExecutor () as executor :
522+ futures_map = {}
523+ for file in fetch_files :
524+ future = executor .submit (project_path , server_info ['version' ], file , temp_dir , parallel )
525+ futures_map [future ] = file
526+
527+ for future in concurrent .futures .as_completed (futures_map ):
528+ file = futures_map [future ]
529+ try :
530+ future .result (60 )
531+ except concurrent .futures .TimeoutError :
532+ raise ClientError ("Timeout error: failed to download {}" .format (file ))
533+ src = os .path .join (temp_dir , file ["path" ])
534+ dest = local_path (file ["path" ])
535+ backup_if_conflict (file ["path" ], file ["checksum" ])
536+ move_file (src , dest )
537+ else :
538+ for file in fetch_files :
539+ self ._download_file (project_path , server_info ['version' ], file , temp_dir , parallel )
540+ src = os .path .join (temp_dir , file ["path" ])
541+ dest = local_path (file ["path" ])
542+ backup_if_conflict (file ["path" ], file ["checksum" ])
543+ move_file (src , dest )
494544 shutil .rmtree (temp_dir )
495545
496546 for file in pull_changes ["removed" ]:
@@ -505,7 +555,7 @@ def backup_if_conflict(path, checksum):
505555 local_info ["version" ] = server_info ["version" ] if server_info ["version" ] else 'v0'
506556 save_project_file (directory , local_info )
507557
508- def _download_file (self , project_path , project_version , file , directory ):
558+ def _download_file (self , project_path , project_version , file , directory , parallel = True ):
509559 """
510560 Helper to download single project file from server in chunks.
511561
@@ -517,29 +567,49 @@ def _download_file(self, project_path, project_version, file, directory):
517567 :type file: dict
518568 :param directory: Project's directory
519569 :type directory: String
570+ :param parallel: Use multi-thread approach to download parts in parallel requests, default True
571+ :type parallel: Boolean
520572 """
521573 query_params = {
522574 "file" : file ['path' ],
523575 "version" : project_version
524576 }
525577 file_dir = os .path .dirname (os .path .normpath (os .path .join (directory , file ['path' ])))
526578 basename = os .path .basename (file ['path' ])
527- length = 0
528- count = 0
529- while length < file ['size' ]:
530- range_header = {"Range" : "bytes={}-{}" .format (length , length + CHUNK_SIZE )}
579+
580+ def download_file_part (part ):
581+ """Callback to get a part of file using request to server with Range header."""
582+ start = part * (1 + CHUNK_SIZE )
583+ range_header = {"Range" : "bytes={}-{}" .format (start , start + CHUNK_SIZE )}
531584 resp = self .get ("/v1/project/raw/{}" .format (project_path ), data = query_params , headers = range_header )
532585 if resp .status in [200 , 206 ]:
533- save_to_file (resp , os .path .join (file_dir , basename + ".{}" .format (count )))
534- length += (CHUNK_SIZE + 1 )
535- count += 1
586+ save_to_file (resp , os .path .join (file_dir , basename + ".{}" .format (part )))
587+ else :
588+ raise ClientError ('Failed to download part {} of file {}' .format (part , basename ))
589+
590+ # download large files in chunks is beneficial mostly for retry on failure
591+ chunks = math .ceil (file ['size' ] / CHUNK_SIZE )
592+ if parallel :
593+ # create separate n threads, default as cores * 5
594+ with concurrent .futures .ThreadPoolExecutor () as executor :
595+ futures_map = {executor .submit (download_file_part , i ): i for i in range (chunks )}
596+ for future in concurrent .futures .as_completed (futures_map ):
597+ i = futures_map [future ]
598+ try :
599+ future .result (60 )
600+ except concurrent .futures .TimeoutError :
601+ raise ClientError ('Timeout error: failed to download part {} of file {}' .format (i , basename ))
602+ else :
603+ for i in range (chunks ):
604+ download_file_part (i )
536605
537606 # merge chunks together
538607 with open (os .path .join (file_dir , basename ), 'wb' ) as final :
539- for i in range (count ):
540- with open (os .path .join (directory , file ['path' ] + ".{}" .format (i )), 'rb' ) as chunk :
608+ for i in range (chunks ):
609+ file_part = os .path .join (directory , file ['path' ] + ".{}" .format (i ))
610+ with open (file_part , 'rb' ) as chunk :
541611 shutil .copyfileobj (chunk , final )
542- os .remove (os . path . join ( directory , file [ 'path' ] + ".{}" . format ( i )) )
612+ os .remove (file_part )
543613
544614 def delete_project (self , project_path ):
545615 """
@@ -553,3 +623,45 @@ def delete_project(self, project_path):
553623 url = urllib .parse .urljoin (self .url , urllib .parse .quote (path ))
554624 request = urllib .request .Request (url , method = "DELETE" )
555625 self ._do_request (request )
626+
627+ def _upload_file (self , transaction , project_dir , file_meta , parallel = True ):
628+ """
629+ Upload file in open upload transaction.
630+
631+ :param transaction: transaction uuid
632+ :type transaction: String
633+ :param project_dir: local project directory
634+ :type project_dir: String
635+ :param file_meta: metadata for file to upload
636+ :type file_meta: Dict
637+ :param parallel: Use multi-thread approach to upload file chunks in parallel requests, defaults to True
638+ :type parallel: Boolean
639+ :raises ClientError: raise on data integrity check failure
640+ """
641+ headers = {"Content-Type" : "application/octet-stream" }
642+ file_path = os .path .join (project_dir , file_meta ["path" ])
643+
644+ def upload_chunk (chunk_id , data ):
645+ checksum = hashlib .sha1 ()
646+ checksum .update (data )
647+ size = len (data )
648+ resp = self .post ("/v1/project/push/chunk/{}/{}" .format (transaction , chunk_id ), data , headers )
649+ data = json .load (resp )
650+ if not (data ['size' ] == size and data ['checksum' ] == checksum .hexdigest ()):
651+ self .post ("/v1/project/push/cancel/{}" .format (transaction ))
652+ raise ClientError ("Mismatch between uploaded file chunk {} and local one" .format (chunk ))
653+
654+ with open (file_path , 'rb' ) as file :
655+ if parallel :
656+ with concurrent .futures .ThreadPoolExecutor () as executor :
657+ futures_map = {executor .submit (upload_chunk , chunk , file .read (UPLOAD_CHUNK_SIZE )): chunk for chunk in file_meta ["chunks" ]}
658+ for future in concurrent .futures .as_completed (futures_map ):
659+ chunk = futures_map [future ]
660+ try :
661+ future .result (60 )
662+ except concurrent .futures .TimeoutError :
663+ raise ClientError ('Timeout error: failed to upload chunk {}' .format (chunk ))
664+ else :
665+ for chunk in file_meta ["chunks" ]:
666+ data = file .read (UPLOAD_CHUNK_SIZE )
667+ upload_chunk (chunk , data )
0 commit comments