1616import re
1717import typing
1818import warnings
19+ from time import sleep
1920from enum import Enum
2021from typing import Optional , Type , Union
2122
2223from .common import (
24+ SYNC_ATTEMPT_WAIT ,
25+ SYNC_ATTEMPTS ,
26+ SYNC_CALLBACK_WAIT ,
2327 ClientError ,
2428 LoginError ,
2529 WorkspaceRole ,
4044 download_diffs_finalize ,
4145)
4246from .client_pull import pull_project_async , pull_project_wait , pull_project_finalize
43- from .client_push import push_project_async , push_project_wait , push_project_finalize
47+ from .client_push import (
48+ get_push_changes_batch ,
49+ push_project_async ,
50+ push_project_is_running ,
51+ push_project_wait ,
52+ push_project_finalize ,
53+ UploadChunksCache ,
54+ )
55+ from .utils import DateTimeEncoder , get_versions_with_file_changes , int_version , is_version_acceptable
4456from .utils import (
4557 DateTimeEncoder ,
4658 get_versions_with_file_changes ,
@@ -127,6 +139,8 @@ def __init__(
127139 self ._user_info = None
128140 self ._server_type = None
129141 self ._server_version = None
142+ self ._server_features = {}
143+ self .upload_chunks_cache = UploadChunksCache ()
130144 self .client_version = "Python-client/" + __version__
131145 if plugin_version is not None : # this could be e.g. "Plugin/2020.1 QGIS/3.14"
132146 self .client_version += " " + plugin_version
@@ -386,8 +400,7 @@ def server_type(self):
386400 """
387401 if not self ._server_type :
388402 try :
389- resp = self .get ("/config" , validate_auth = False )
390- config = json .load (resp )
403+ config = self .server_config ()
391404 stype = config .get ("server_type" )
392405 if stype == "ce" :
393406 self ._server_type = ServerType .CE
@@ -412,14 +425,26 @@ def server_version(self):
412425 """
413426 if self ._server_version is None :
414427 try :
415- resp = self .get ("/config" , validate_auth = False )
416- config = json .load (resp )
428+ config = self .server_config ()
417429 self ._server_version = config ["version" ]
418430 except (ClientError , KeyError ):
419431 self ._server_version = ""
420432
421433 return self ._server_version
422434
435+ def server_features (self ):
436+ """
437+ Returns feature flags of the server.
438+ """
439+ if self ._server_features :
440+ return self ._server_features
441+ config = self .server_config ()
442+ self ._server_features = {
443+ "v2_push_enabled" : config .get ("v2_push_enabled" , False ),
444+ "v2_pull_enabled" : config .get ("v2_pull_enabled" , False ),
445+ }
446+ return self ._server_features
447+
423448 def workspaces_list (self ):
424449 """
425450 Find all available workspaces
@@ -540,7 +565,7 @@ def create_project_and_push(self, project_name, directory, is_public=False, name
540565 MerginProject .write_metadata (directory , project_info )
541566 mp = MerginProject (directory )
542567 if mp .inspect_files ():
543- self .push_project (directory )
568+ self .sync_project (directory )
544569
545570 def paginated_projects_list (
546571 self ,
@@ -810,7 +835,7 @@ def download_project(self, project_path, directory, version=None):
810835 def user_info (self ):
811836 server_type = self .server_type ()
812837 if server_type == ServerType .OLD :
813- resp = self .get ("/v1/user/" + self .username ())
838+ resp = self .get (f "/v1/user/{ self .username ()} " )
814839 else :
815840 resp = self .get ("/v1/user/profile" )
816841 return json .load (resp )
@@ -1527,3 +1552,56 @@ def create_invitation(self, workspace_id: int, email: str, workspace_role: Union
15271552 params = {"email" : email , "role" : role_enum .value }
15281553 ws_inv = self .post (f"v2/workspaces/{ workspace_id } /invitations" , params , json_headers )
15291554 return json .load (ws_inv )
1555+
1556+ def sync_project_generator (self , project_directory ):
1557+ """
1558+ Syncs project by loop with these steps:
1559+ 1. Pull server version
1560+ 2. Get local changes
1561+ 3. Push first change batch
1562+ Repeat if there are more local changes.
1563+
1564+ :param project_directory: Project's directory
1565+ """
1566+ mp = MerginProject (project_directory )
1567+ has_changes = True
1568+ server_conflict_attempts = 0
1569+ while has_changes :
1570+ self .pull_project (project_directory )
1571+ try :
1572+ job = push_project_async (self , project_directory )
1573+ if not job :
1574+ break
1575+ # waiting for progress
1576+ last_size = 0
1577+ while push_project_is_running (job ):
1578+ sleep (SYNC_CALLBACK_WAIT )
1579+ current_size = job .transferred_size
1580+ yield (current_size - last_size , job ) # Yields the size change and the job object
1581+ last_size = current_size
1582+ push_project_finalize (job )
1583+ _ , has_changes = get_push_changes_batch (self , project_directory )
1584+ server_conflict_attempts = 0
1585+ except ClientError as e :
1586+ if e .is_retryable_sync () and server_conflict_attempts < SYNC_ATTEMPTS - 1 :
1587+ # retry on conflict, e.g. when server has changes that we do not have yet
1588+ mp .log .info (
1589+ f"Restarting sync process (conflict on server) - { server_conflict_attempts + 1 } /{ SYNC_ATTEMPTS } "
1590+ )
1591+ server_conflict_attempts += 1
1592+ sleep (SYNC_ATTEMPT_WAIT )
1593+ continue
1594+ raise e
1595+
1596+ def sync_project (self , project_directory ):
1597+ """
1598+ Syncs project by pulling server changes and pushing local changes. There is intorduced retry mechanism
1599+ for handling server conflicts (when server has changes that we do not have yet or somebody else is syncing).
1600+ See description of _sync_project_generator().
1601+
1602+ :param project_directory: Project's directory
1603+ """
1604+ # walk through the generator to perform the sync
1605+ # in this method we do not yield anything to the caller
1606+ for _ in self .sync_project_generator (project_directory ):
1607+ pass
0 commit comments