Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ def wrapper(self, args, **kwargs):
make_parent_dirs = getattr(args, 'make_parent_dirs', False)
if argument(args, fake) ^ invert_fake:
return method(self, args, repository=None, **kwargs)
elif location.proto == 'ssh':

elif location.proto == 'ssh' or location.proto == 'serve':
repository = RemoteRepository(location.omit_archive(), create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
make_parent_dirs=make_parent_dirs, args=args)
make_parent_dirs=make_parent_dirs, args=args, serve=(location.proto == 'serve'))
else:
repository = Repository(location.path, create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
Expand Down Expand Up @@ -296,6 +297,7 @@ def do_serve(self, args):
restrict_to_repositories=args.restrict_to_repositories,
append_only=args.append_only,
storage_quota=args.storage_quota,
pull_command=args.pull_command
).serve()

def do_version(self, args):
Expand Down Expand Up @@ -5252,6 +5254,9 @@ def diff_sort_spec_validator(s):
'When a new repository is initialized, sets the storage quota on the new '
'repository as well. Default: no quota.')

subparser.add_argument('--pull-command', metavar='cmd', dest='pull_command',
help='command to use for pulling from a borg server started in serve:// mode')

# borg umount
umount_epilog = process_epilog("""
This command unmounts a FUSE filesystem that was mounted with ``borg mount``.
Expand Down
10 changes: 10 additions & 0 deletions src/borg/helpers/parseformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ class Location:
(?::(?P<port>\d+))? # :port (optional)
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive

serve_re = re.compile(r"""
(?P<proto>serve):// # serve://
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive

file_re = re.compile(r"""
(?P<proto>file):// # file://
""" + file_path_re + optional_archive_re, re.VERBOSE) # servername/path, path or path::archive
Expand Down Expand Up @@ -460,6 +464,12 @@ def normpath_special(p):
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.serve_re.match(text)
if m:
self.proto = m.group('proto')
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.file_re.match(text)
if m:
self.proto = m.group('proto')
Expand Down
68 changes: 46 additions & 22 deletions src/borg/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class RepositoryServer: # pragma: no cover
'inject_exception',
)

def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, pull_command=None):
self.repository = None
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
Expand All @@ -187,6 +187,7 @@ def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, sto
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
self.pull_command = pull_command

def positional_to_named(self, method, argv):
"""Translate from positional protocol to named protocol."""
Expand All @@ -206,13 +207,27 @@ def filter_args(self, f, kwargs):
return {name: kwargs[name] for name in kwargs if name in known}

def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)

if self.pull_command:
self.p = Popen(shlex.split(self.pull_command), bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE)

stdin_fd = self.p.stdout.fileno()
stdout_fd = self.p.stdin.fileno()
stderr_fd = sys.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)

else:
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)

unpacker = get_limited_unpacker('server')

while True:
r, w, es = select.select([stdin_fd], [], [], 10)
if r:
Expand Down Expand Up @@ -549,7 +564,7 @@ def required_version(self):
dictFormat = False # outside of __init__ for testing of legacy free protocol

def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False,
make_parent_dirs=False, args=None):
make_parent_dirs=False, args=None, serve=False):
self.location = self._location = location
self.preload_ids = []
self.msgid = 0
Expand All @@ -571,20 +586,29 @@ def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock
testing = location.host == '__testsuite__'
# when testing, we invoke and talk to a borg process directly (no ssh).
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
# borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
if serve:
self.stdin_fd = sys.stdout.fileno()
self.stdout_fd = sys.stdin.fileno()
self.stderr_fd = sys.stderr.fileno()

os.set_blocking(self.stdin_fd, True)
os.set_blocking(self.stdout_fd, False)

else:
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
# borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
self.r_fds = [self.stdout_fd, self.stderr_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]

Expand Down
Loading