Skip to content

Commit 7a144b1

Browse files
committed
Merge remote-tracking branch 'origin/v2-pull-integration' into pull-initial
2 parents 9c82771 + 2bca5bd commit 7a144b1

19 files changed

+1374
-282
lines changed

.github/workflows/autotests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ env:
1010
concurrency:
1111
group: ci-${{github.ref}}-autotests
1212
cancel-in-progress: true
13-
13+
1414
jobs:
1515
tests:
1616
runs-on: ubuntu-latest

.github/workflows/code_style.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ jobs:
99
- uses: actions/checkout@v2
1010
- uses: psf/black@stable
1111
with:
12+
# bump this version as needed
13+
version: 26.1.0
1214
options: "--check --diff --verbose -l 120"
13-
src: "./mergin"
15+
src: "./mergin"

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ htmlcov
1212
.pytest_cache
1313
deps
1414
venv
15-
.vscode/settings.json
1615
debug.py
16+
.vscode/

mergin/cli.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,58 @@ def pull(ctx):
469469
_print_unhandled_exception()
470470

471471

472+
@cli.command()
473+
@click.pass_context
474+
def sync(ctx):
475+
"""Synchronize the project. Pull latest project version from the server and push split changes."""
476+
mc = ctx.obj["client"]
477+
if mc is None:
478+
return
479+
directory = os.getcwd()
480+
current_job = None
481+
current_bar = None
482+
try:
483+
# Iterate over the generator to get updates
484+
for size_change, job in mc.sync_project_generator(directory):
485+
# Check if this is a new job (a new push operation)
486+
if job and job != current_job:
487+
# If a previous bar exists, close it
488+
if current_bar:
489+
current_bar.finish()
490+
491+
# A new push job has started. Initialize a new progress bar.
492+
click.echo(f"\nStarting upload")
493+
current_job = job
494+
495+
# The length of the progress bar should be the total size of the job
496+
# You'll need to get this from your job object (e.g., job.total_size)
497+
total_size = job.total_size
498+
current_bar = click.progressbar(
499+
length=total_size,
500+
label=f"Uploading project",
501+
)
502+
503+
# Update the current progress bar with the size increment
504+
current_bar.update(size_change)
505+
506+
# After the loop finishes, make sure to close the final progress bar
507+
if current_bar:
508+
current_bar.finish()
509+
click.secho("\nProject synced successfully", fg="green")
510+
511+
except InvalidProject as e:
512+
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
513+
except ClientError as e:
514+
click.secho("Error: " + str(e), fg="red")
515+
return
516+
except KeyboardInterrupt:
517+
click.secho("Cancelling...")
518+
if current_job:
519+
push_project_cancel(current_job)
520+
except Exception as e:
521+
_print_unhandled_exception()
522+
523+
472524
@cli.command()
473525
@click.argument("version")
474526
@click.pass_context

mergin/client.py

Lines changed: 116 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import re
1717
import typing
1818
import warnings
19+
from time import sleep
20+
from enum import Enum
21+
from typing import Optional, Type, Union
1922

2023
from .models import (
2124
ProjectDelta,
@@ -27,6 +30,9 @@
2730
)
2831

2932
from .common import (
33+
SYNC_ATTEMPT_WAIT,
34+
SYNC_ATTEMPTS,
35+
SYNC_CALLBACK_WAIT,
3036
ClientError,
3137
LoginError,
3238
WorkspaceRole,
@@ -47,8 +53,22 @@
4753
download_diffs_finalize,
4854
)
4955
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
50-
from .client_push import push_project_async, push_project_wait, push_project_finalize
56+
from .client_push import (
57+
get_push_changes_batch,
58+
push_project_async,
59+
push_project_is_running,
60+
push_project_wait,
61+
push_project_finalize,
62+
UploadChunksCache,
63+
)
5164
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
65+
from .utils import (
66+
DateTimeEncoder,
67+
get_versions_with_file_changes,
68+
int_version,
69+
is_version_acceptable,
70+
normalize_role,
71+
)
5272
from .version import __version__
5373

5474
this_dir = os.path.dirname(os.path.realpath(__file__))
@@ -129,6 +149,7 @@ def __init__(
129149
self._server_type = None
130150
self._server_version = None
131151
self._server_features = {}
152+
self.upload_chunks_cache = UploadChunksCache()
132153
self.client_version = "Python-client/" + __version__
133154
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14"
134155
self.client_version += " " + plugin_version
@@ -388,8 +409,7 @@ def server_type(self):
388409
"""
389410
if not self._server_type:
390411
try:
391-
resp = self.get("/config", validate_auth=False)
392-
config = json.load(resp)
412+
config = self.server_config()
393413
stype = config.get("server_type")
394414
if stype == "ce":
395415
self._server_type = ServerType.CE
@@ -414,8 +434,7 @@ def server_version(self):
414434
"""
415435
if self._server_version is None:
416436
try:
417-
resp = self.get("/config", validate_auth=False)
418-
config = json.load(resp)
437+
config = self.server_config()
419438
self._server_version = config["version"]
420439
except (ClientError, KeyError):
421440
self._server_version = ""
@@ -555,7 +574,7 @@ def create_project_and_push(self, project_name, directory, is_public=False, name
555574
MerginProject.write_metadata(directory, project_info)
556575
mp = MerginProject(directory)
557576
if mp.inspect_files():
558-
self.push_project(directory)
577+
self.sync_project(directory)
559578

560579
def paginated_projects_list(
561580
self,
@@ -909,7 +928,7 @@ def download_project(self, project_path, directory, version=None):
909928
def user_info(self):
910929
server_type = self.server_type()
911930
if server_type == ServerType.OLD:
912-
resp = self.get("/v1/user/" + self.username())
931+
resp = self.get(f"/v1/user/{self.username()}")
913932
else:
914933
resp = self.get("/v1/user/profile")
915934
return json.load(resp)
@@ -1428,8 +1447,8 @@ def create_user(
14281447
email: str,
14291448
password: str,
14301449
workspace_id: int,
1431-
workspace_role: WorkspaceRole,
1432-
username: typing.Optional[str] = None,
1450+
workspace_role: Union[str, WorkspaceRole],
1451+
username: Optional[str] = None,
14331452
notify_user: bool = False,
14341453
) -> dict:
14351454
"""
@@ -1443,11 +1462,15 @@ def create_user(
14431462
param notify_user: flag for email notifications - confirmation email will be sent
14441463
"""
14451464
self.check_collaborators_members_support()
1465+
role_enum = normalize_role(workspace_role, WorkspaceRole)
1466+
if role_enum is None:
1467+
raise ValueError(f"Invalid role: {workspace_role}")
1468+
14461469
params = {
14471470
"email": email,
14481471
"password": password,
14491472
"workspace_id": workspace_id,
1450-
"role": workspace_role.value,
1473+
"role": role_enum.value,
14511474
"notify_user": notify_user,
14521475
}
14531476
if username:
@@ -1472,17 +1495,26 @@ def list_workspace_members(self, workspace_id: int) -> typing.List[dict]:
14721495
return json.load(resp)
14731496

14741497
def update_workspace_member(
1475-
self, workspace_id: int, user_id: int, workspace_role: WorkspaceRole, reset_projects_roles: bool = False
1498+
self,
1499+
workspace_id: int,
1500+
user_id: int,
1501+
workspace_role: Union[str, WorkspaceRole],
1502+
reset_projects_roles: bool = False,
14761503
) -> dict:
14771504
"""
14781505
Update workspace role of a workspace member, optionally resets the projects role
14791506
14801507
param reset_projects_roles: all project specific roles will be removed
14811508
"""
14821509
self.check_collaborators_members_support()
1510+
1511+
role_enum = normalize_role(workspace_role, WorkspaceRole)
1512+
if role_enum is None:
1513+
raise ValueError(f"Invalid role: {workspace_role}")
1514+
14831515
params = {
14841516
"reset_projects_roles": reset_projects_roles,
1485-
"workspace_role": workspace_role.value,
1517+
"workspace_role": role_enum.value,
14861518
}
14871519
workspace_member = self.patch(f"v2/workspaces/{workspace_id}/members/{user_id}", params, json_headers)
14881520
return json.load(workspace_member)
@@ -1502,25 +1534,35 @@ def list_project_collaborators(self, project_id: str) -> typing.List[dict]:
15021534
project_collaborators = self.get(f"v2/projects/{project_id}/collaborators")
15031535
return json.load(project_collaborators)
15041536

1505-
def add_project_collaborator(self, project_id: str, user: str, project_role: ProjectRole) -> dict:
1537+
def add_project_collaborator(self, project_id: str, user: str, project_role: Union[str, ProjectRole]) -> dict:
15061538
"""
15071539
Add a user to project collaborators and grant them a project role.
15081540
Fails if user is already a member of the project.
15091541
15101542
param user: login (username or email) of the user
15111543
"""
15121544
self.check_collaborators_members_support()
1545+
1546+
role_enum = normalize_role(project_role, ProjectRole)
1547+
if role_enum is None:
1548+
raise ValueError(f"Invalid role: {project_role}")
1549+
15131550
params = {"role": project_role.value, "user": user}
15141551
project_collaborator = self.post(f"v2/projects/{project_id}/collaborators", params, json_headers)
15151552
return json.load(project_collaborator)
15161553

1517-
def update_project_collaborator(self, project_id: str, user_id: int, project_role: ProjectRole) -> dict:
1554+
def update_project_collaborator(self, project_id: str, user_id: int, project_role: Union[str, ProjectRole]) -> dict:
15181555
"""
15191556
Update project role of the existing project collaborator.
15201557
Fails if user is not a member of the project yet.
15211558
"""
15221559
self.check_collaborators_members_support()
1560+
1561+
role_enum = normalize_role(project_role, ProjectRole)
1562+
if role_enum is None:
1563+
raise ValueError(f"Invalid role: {project_role}")
15231564
params = {"role": project_role.value}
1565+
15241566
project_collaborator = self.patch(f"v2/projects/{project_id}/collaborators/{user_id}", params, json_headers)
15251567
return json.load(project_collaborator)
15261568

@@ -1596,13 +1638,71 @@ def send_logs(
15961638
request = urllib.request.Request(url, data=payload, headers=header)
15971639
return self._do_request(request)
15981640

1599-
def create_invitation(self, workspace_id: int, email: str, workspace_role: WorkspaceRole):
1641+
def create_invitation(self, workspace_id: int, email: str, workspace_role: Union[str, WorkspaceRole]):
16001642
"""
16011643
Create invitation to workspace for specific role
16021644
"""
16031645
min_version = "2025.6.1"
16041646
if not is_version_acceptable(self.server_version(), min_version):
16051647
raise NotImplementedError(f"This needs server at version {min_version} or later")
1606-
params = {"email": email, "role": workspace_role.value}
1648+
1649+
role_enum = normalize_role(workspace_role, WorkspaceRole)
1650+
if role_enum is None:
1651+
raise ValueError(f"Invalid role: {workspace_role}")
1652+
1653+
params = {"email": email, "role": role_enum.value}
16071654
ws_inv = self.post(f"v2/workspaces/{workspace_id}/invitations", params, json_headers)
16081655
return json.load(ws_inv)
1656+
1657+
def sync_project_generator(self, project_directory):
1658+
"""
1659+
Syncs project by loop with these steps:
1660+
1. Pull server version
1661+
2. Get local changes
1662+
3. Push first change batch
1663+
Repeat if there are more local changes.
1664+
1665+
:param project_directory: Project's directory
1666+
"""
1667+
mp = MerginProject(project_directory)
1668+
has_changes = True
1669+
server_conflict_attempts = 0
1670+
while has_changes:
1671+
self.pull_project(project_directory)
1672+
try:
1673+
job = push_project_async(self, project_directory)
1674+
if not job:
1675+
break
1676+
# waiting for progress
1677+
last_size = 0
1678+
while push_project_is_running(job):
1679+
sleep(SYNC_CALLBACK_WAIT)
1680+
current_size = job.transferred_size
1681+
yield (current_size - last_size, job) # Yields the size change and the job object
1682+
last_size = current_size
1683+
push_project_finalize(job)
1684+
_, has_changes = get_push_changes_batch(self, project_directory)
1685+
server_conflict_attempts = 0
1686+
except ClientError as e:
1687+
if e.is_retryable_sync() and server_conflict_attempts < SYNC_ATTEMPTS - 1:
1688+
# retry on conflict, e.g. when server has changes that we do not have yet
1689+
mp.log.info(
1690+
f"Restarting sync process (conflict on server) - {server_conflict_attempts + 1}/{SYNC_ATTEMPTS}"
1691+
)
1692+
server_conflict_attempts += 1
1693+
sleep(SYNC_ATTEMPT_WAIT)
1694+
continue
1695+
raise e
1696+
1697+
def sync_project(self, project_directory):
1698+
"""
1699+
Syncs project by pulling server changes and pushing local changes. There is intorduced retry mechanism
1700+
for handling server conflicts (when server has changes that we do not have yet or somebody else is syncing).
1701+
See description of _sync_project_generator().
1702+
1703+
:param project_directory: Project's directory
1704+
"""
1705+
# walk through the generator to perform the sync
1706+
# in this method we do not yield anything to the caller
1707+
for _ in self.sync_project_generator(project_directory):
1708+
pass

0 commit comments

Comments
 (0)