From 2962c21eaa9c73a4a27450d118c65a199f6cc8ae Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 29 May 2026 14:13:48 +0200 Subject: [PATCH 01/11] server.rest / rest backend: support stdio transport --- src/borgstore/backends/rest.py | 180 +++++++++++++++++++++++++++++++-- src/borgstore/server/rest.py | 98 +++++++++++++++++- tests/test_backends.py | 23 +++++ tests/test_server_rest.py | 77 +++++++++++++- 4 files changed, 362 insertions(+), 16 deletions(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index ede3444..4e52647 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -5,7 +5,11 @@ import os import re import json +import sys +import logging import hashlib +import threading +import subprocess from typing import Iterator, Dict, Optional from types import ModuleType from http import HTTPStatus as HTTP @@ -35,15 +39,124 @@ BackendMustNotBeOpen, ) +logger = logging.getLogger(__name__) + + +class StdioSession: + def __init__(self, command, auth=None, headers=None, timeout=30): + self.command = command + self.auth = auth + self.headers = headers or {} + self.timeout = timeout + self.process = None + self._stderr_thread = None + + def _drain_stderr(self): + if self.process is None or self.process.stderr is None: + return + for line in self.process.stderr: + logger.warning("REST stdio server: %s", line.decode("utf-8", errors="replace").rstrip()) + + def open(self): + if self.process is not None: + return + self.process = subprocess.Popen( + self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + self._stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True) + self._stderr_thread.start() + + def close(self): + if self.process is None: + return + try: + if self.process.stdin is not None: + self.process.stdin.close() + self.process.wait(timeout=self.timeout) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=self.timeout) + finally: + if self.process.stdout is not None: + self.process.stdout.close() + if self.process.stderr is not None: + self.process.stderr.close() + self.process = None + self._stderr_thread = None + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + + def request(self, method, url, params=None, data=None, headers=None, timeout=None): + if self.process is None or self.process.stdin is None or self.process.stdout is None: + raise BackendError("stdio session is not open") + + request_headers = dict(self.headers) + if headers: + request_headers.update(headers) + request_headers["Connection"] = "keep-alive" + + prepared = requests.Request( + method=method, url=url, params=params, data=data, headers=request_headers, auth=self.auth + ).prepare() + + body = prepared.body + if body is None: + body = b"" + elif isinstance(body, bytes): + pass # ok + elif isinstance(body, str): + body = body.encode("utf-8") + else: + raise BackendError(f"unsupported body type: {type(body).__name__}") + + request_line = f"{prepared.method} {prepared.path_url} HTTP/1.1\r\n" + header_lines = "".join(f"{k}: {v}\r\n" for k, v in prepared.headers.items()) + self.process.stdin.write((request_line + header_lines + "\r\n").encode("ascii")) + if body: + self.process.stdin.write(body) + self.process.stdin.flush() + + line = self.process.stdout.readline() + if not line: + raise BackendError("stdio server closed connection unexpectedly") + status_line = line.decode("iso-8859-1").strip() + parts = status_line.split(" ", 2) + if len(parts) < 2: + raise BackendError(f"invalid HTTP status line from stdio server: {status_line!r}") + status_code = int(parts[1]) + reason = parts[2] if len(parts) > 2 else "" + + response_headers = requests.structures.CaseInsensitiveDict() + while True: + line = self.process.stdout.readline() + if line in (b"\r\n", b"\n", b""): + break + header_line = line.decode("iso-8859-1").strip() + if ":" in header_line: + key, value = header_line.split(":", 1) + response_headers[key.strip()] = value.strip() + + content_length = int(response_headers.get("Content-Length", "0")) + response_body = self.process.stdout.read(content_length) if content_length else b"" + + response = requests.Response() + response.status_code = status_code + response.headers = response_headers + response._content = response_body + response.url = prepared.url + response.reason = reason + response.encoding = requests.utils.get_encoding_from_headers(response_headers) + response.request = prepared + return response + def get_rest_backend(base_url: str): - # http(s)://username:password@hostname:port/sub/path or - # http(s)://hostname:port/sub/path + authentication from environment - # - # note: borgstore.server.rest does not support sub-paths, but sub-paths are - # supported in the rest client for use with reverse-proxy setups (see contrib/) - # or custom REST servers. - if not base_url.startswith(("http:", "https:")): + if not base_url.startswith(("http:", "https:", "rest:")): return None if requests is None: @@ -51,6 +164,12 @@ def get_rest_backend(base_url: str): "The REST backend requires dependencies. Install them with: 'pip install borgstore[rest]'" ) + # http(s)://username:password@hostname:port/sub/path or + # http(s)://hostname:port/sub/path + authentication from environment + # + # note: borgstore.server.rest does not support sub-paths, but sub-paths are + # supported in the rest client for use with reverse-proxy setups (see contrib/) + # or custom REST servers. http_regex = r""" (?Phttp|https):// ((?P[^:]+):(?P[^@]+)@)? @@ -74,6 +193,34 @@ def get_rest_backend(base_url: str): return REST(base_url, username=username, password=password) + # rest protocol means: use stdio to talk to a borgstore.server.rest process, + # either locally (empty host) or via ssh to the given host. The given path + # is used to construct a "file:" backend URL used by the rest server. + # + # rest:///path - talk to local rest server, path must be abs. fs path + # rest://user@host:port/path - ssh to rest server on host, abs. fs path + rest_regex = r""" + rest:// + (((?P[^@]+)@)(?P[^:/]+)(:(?P\d+))?)? + / # separator always required + (?P/[^?#]*) # absolute path for now + """ + m = re.match(rest_regex, base_url, re.VERBOSE) + if m: + path = m.group("path") + user = m.group("user") + host = m.group("host") + port = m.group("port") or "22" + if not host: + # empty host: don't use ssh, just run the rest server here + command = [] + python = sys.executable + else: + command = ["ssh", "-p", port, f"{user}@{host}"] + python = "python3" + command.extend([python, "-m", "borgstore.server.rest", "--stdio", "--backend", f"file://{path}"]) + return REST(base_url="http://stdio-backend", command=command) + class REST(BackendBase): def __init__( @@ -83,12 +230,14 @@ def __init__( password: Optional[str] = None, headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = 30, + command=None, ): self.base_url = base_url.rstrip("/") # _url method adds slash self.headers = headers or {} self.headers["Accept"] = "application/vnd.x.borgstore.rest.v1" self.timeout = timeout self.auth = HTTPBasicAuth(username, password) if username and password else None + self.command = command self.session = None def _url(self, path: str) -> str: @@ -108,6 +257,11 @@ def _request(self, method, url, *, headers=None, data=None, params=None): else: # .create() and .destroy() are called when backend is not opened if headers is not None: raise ValueError("custom headers are not supported outside of an open session") + if self.command is not None: + with StdioSession( + command=self.command, auth=self.auth, headers=self.headers, timeout=self.timeout + ) as session: + return session.request(method, url, params=params, data=data, timeout=self.timeout) return requests.request( method, url, auth=self.auth, params=params, data=data, headers=self.headers, timeout=self.timeout ) @@ -150,9 +304,15 @@ def destroy(self) -> None: def open(self): self._assert_closed() - self.session = requests.Session() - self.session.auth = self.auth - self.session.headers.update(self.headers) + if self.command is not None: + self.session = StdioSession( + command=self.command, auth=self.auth, headers=self.headers, timeout=self.timeout + ) + self.session.open() + else: + self.session = requests.Session() + self.session.auth = self.auth + self.session.headers.update(self.headers) def close(self): self._assert_open() diff --git a/src/borgstore/server/rest.py b/src/borgstore/server/rest.py index 2656596..4f7a407 100644 --- a/src/borgstore/server/rest.py +++ b/src/borgstore/server/rest.py @@ -6,9 +6,10 @@ import logging import os import socket +import sys import itertools from http import HTTPStatus as HTTP -from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler +from http.server import ThreadingHTTPServer, HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlsplit, parse_qs from ..backends.errors import ( @@ -370,6 +371,82 @@ def get_pre_bound_socket(): return socket.fromfd(3, socket.AF_UNIX, socket.SOCK_STREAM) +class StdinStdoutSocket: + """A mock socket that redirects reads to stdin and writes to stdout.""" + + def __init__(self): + # Use .buffer to handle raw bytes instead of text strings + self.rfile = sys.stdin.buffer + self.wfile = sys.stdout.buffer + + def makefile(self, mode="r", buffering=None, encoding=None, errors=None, newline=None): + """The HTTP request handler calls makefile() to get read/write streams.""" + if "r" in mode: + return self.rfile + elif "w" in mode: + return self.wfile + + def sendall(self, data, flags=0): + """Directly writes all data to stdout and flushes.""" + self.wfile.write(data) + self.wfile.flush() + + def send(self, data, flags=0): + """Writes data to stdout and returns the number of bytes written.""" + self.wfile.write(data) + self.wfile.flush() + return len(data) + + def recv(self, bufsize, flags=0): + """Reads up to bufsize bytes from stdin.""" + return self.rfile.read(bufsize) + + def getsockname(self): + """Required by the server to log or bind addresses.""" + return ("stdin/stdout", 0) + + def getpeername(self): + """Required by the handler for logging client info.""" + return ("stdio-client", 0) + + def close(self): + """Prevent closing the actual sys.stdin/stdout prematurely.""" + pass + + +class StdIOHTTPServer(HTTPServer): + """An HTTPServer variant that handles requests over stdin/stdout.""" + + def __init__(self, RequestHandlerClass): + # Skip the base TCPServer __init__ entirely because we aren't binding to a network port + self.server_address = ("stdin/stdout", 0) + self.RequestHandlerClass = RequestHandlerClass + + # Instantiate our fake socket + self.socket = StdinStdoutSocket() + + def serve_forever(self, poll_interval=0.5): + """Continuously handle requests until stdin is empty/closed.""" + # Instantiate the handler once + handler = self.RequestHandlerClass(self.socket, ("stdio-client", 0), self) + + while True: + # Check if stdin has reached EOF + # If the client closes the stream or sends 0 bytes, we break + if self.socket.rfile.peek(1) == b"": + break + + handler.handle_one_request() + + +class BorgStoreStdioRESTServer(StdIOHTTPServer): + def __init__(self, backend, username=None, password=None): + self.backend = backend + self.username = username + self.password = password + super().__init__(BorgStoreRESTRequestHandler) + + class BorgStoreRESTServer(ThreadingHTTPServer): """ BorgStore REST Server. @@ -456,11 +533,24 @@ def resolve_permissions(permissions): raise ValueError(f"Invalid --permissions value: {permissions!r}. Use a shortcut ({valid}) or a JSON object.") -def serve(host, port, backend_url, username=None, password=None, permissions=None, quota=None, socket_activation=False): +def serve( + host, + port, + backend_url, + username=None, + password=None, + permissions=None, + quota=None, + socket_activation=False, + stdio=False, +): backend = get_backend(backend_url, permissions=permissions, quota=quota) if backend is None: raise ValueError(f"Invalid backend URL: {backend_url}") - if socket_activation: + if stdio: + server = BorgStoreStdioRESTServer(backend, username, password) + logger.info("BorgStore REST server listening on stdin/stdout") + elif socket_activation: adopted = get_pre_bound_socket() adopted.setblocking(True) server_address = adopted.getsockname() @@ -493,6 +583,7 @@ def serve(host, port, backend_url, username=None, password=None, permissions=Non parser.add_argument( "--socket-activation", action="store_true", help="Adopt pre-bound socket from systemd (SD_LISTEN_FDS)" ) + parser.add_argument("--stdio", action="store_true", help="Serve on stdio") args = parser.parse_args() permissions = resolve_permissions(args.permissions) serve( @@ -504,4 +595,5 @@ def serve(host, port, backend_url, username=None, password=None, permissions=Non permissions, args.quota, args.socket_activation, + args.stdio, ) diff --git a/tests/test_backends.py b/tests/test_backends.py index ad5e115..5c567ca 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -360,6 +360,29 @@ def test_rest_url(url, base_url, username, password): assert backend.auth is None +@pytest.mark.skipif(not rest_is_available, reason="REST is not available (requests missing)") +def test_rest_stdio_backend(tmp_path): + backend_url = f"rest:///{tmp_path}" # empty host == no ssh + backend = get_rest_backend(backend_url) + assert isinstance(backend, REST) + + backend.create() + backend.open() + try: + backend.store("item1", b"stdio-backend-data") + assert backend.load("item1") == b"stdio-backend-data" + + items = [item.name for item in backend.list(ROOTNS)] + assert "item1" in items + + backend.delete("item1") + with pytest.raises(ObjectNotFound): + backend.load("item1") + finally: + backend.close() + backend.destroy() + + @pytest.mark.parametrize( "url,bucket,path", [("s3:/bucket/path", "bucket", "path"), ("s3:/bucket/my%20path", "bucket", "my path")] ) diff --git a/tests/test_server_rest.py b/tests/test_server_rest.py index 03eacdf..6463d08 100644 --- a/tests/test_server_rest.py +++ b/tests/test_server_rest.py @@ -1,4 +1,7 @@ import hashlib +import json +import subprocess +import sys import threading import pytest @@ -12,6 +15,7 @@ from borgstore.backends.rest import get_rest_backend from borgstore.backends.posixfs import get_file_backend from borgstore.backends.errors import ObjectNotFound, BackendAlreadyExists, QuotaExceeded +from borgstore.store import get_backend def start_server(backend_url, address, port, username=None, password=None, permissions=None, quota=None): @@ -281,9 +285,6 @@ def test_rest_server_hash(rest_server_with_auth): def test_rest_server_defrag(tmp_path): - import json - import requests - backend_url = tmp_path.as_uri() address, port = "127.0.0.1", 0 username, password = "testuser", "testpassword" @@ -568,3 +569,73 @@ def test_rest_server_quota_method_no_quota(rest_server_with_auth): assert q["usage"] == -1 finally: be.close() + + +def test_rest_server_stdio(tmp_path): + backend_url = tmp_path.as_uri() + proc = subprocess.Popen( + [sys.executable, "-m", "borgstore.server.rest", "--stdio", "--backend", backend_url], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + def do_request(method, path, body=b""): + headers = {"Accept": "application/vnd.x.borgstore.rest.v1", "Connection": "keep-alive"} + req = f"{method} {path} HTTP/1.1\r\n" + for k, v in headers.items(): + req += f"{k}: {v}\r\n" + if body: + req += f"Content-Length: {len(body)}\r\n" + req += "\r\n" + proc.stdin.write(req.encode("ascii")) + if body: + proc.stdin.write(body) + proc.stdin.flush() + + # Read status line + line = proc.stdout.readline() + if not line: + return None, b"EOF" + status_line = line.decode("ascii").strip() + status = int(status_line.split(" ")[1]) + + # Read headers + resp_headers = {} + while True: + line = proc.stdout.readline() + if line == b"\r\n" or line == b"\n" or not line: + break + header_line = line.decode("ascii").strip() + if ": " in header_line: + k, v = header_line.split(": ", 1) + resp_headers[k] = v + + # Read body + resp_body = b"" + if "Content-Length" in resp_headers: + resp_body = proc.stdout.read(int(resp_headers["Content-Length"])) + return status, resp_body + + try: + # 1. Create store + status, body = do_request("POST", "/?cmd=create") + assert status == 200 + + # 2. Store something + item_data = b"stdio data" + status, body = do_request("POST", "/item1", body=item_data) + assert status == 200 + + # 3. List the store + status, body = do_request("GET", "/") + assert status == 200 + items = json.loads(body.decode("utf-8")) + assert any(item["name"] == "item1" for item in items) + + finally: + proc.stdin.close() + try: + proc.wait(timeout=2) + except subprocess.TimeoutExpired: + proc.kill() From 08d09be0662a7778c9b76ee284973df2891598b6 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 29 May 2026 23:51:46 +0200 Subject: [PATCH 02/11] rest: URIs can support relative paths now rest://user@host/rel/path rest://user@host/~/path rest://user@host/~user/path rest://user@host//abs/path --- src/borgstore/backends/rest.py | 7 +++++-- src/borgstore/server/rest.py | 7 +++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index 4e52647..2671306 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -203,7 +203,7 @@ def get_rest_backend(base_url: str): rest:// (((?P[^@]+)@)(?P[^:/]+)(:(?P\d+))?)? / # separator always required - (?P/[^?#]*) # absolute path for now + (?P[^?#]*) # rel/path or /abs/path or even ~/path or ~user/path """ m = re.match(rest_regex, base_url, re.VERBOSE) if m: @@ -218,7 +218,10 @@ def get_rest_backend(base_url: str): else: command = ["ssh", "-p", port, f"{user}@{host}"] python = "python3" - command.extend([python, "-m", "borgstore.server.rest", "--stdio", "--backend", f"file://{path}"]) + # hack: we do NOT use a standards-compliant file:// URI here, because they only support absolute paths. + # we just use FILE:path and that path can be relative or absolute or even have ~ or ~user. + # borgstore.server.rest will translate it to an absolute file:// URI internally. + command.extend([python, "-m", "borgstore.server.rest", "--stdio", "--backend", f"FILE:{path}"]) return REST(base_url="http://stdio-backend", command=command) diff --git a/src/borgstore/server/rest.py b/src/borgstore/server/rest.py index 4f7a407..e57b82e 100644 --- a/src/borgstore/server/rest.py +++ b/src/borgstore/server/rest.py @@ -10,6 +10,7 @@ import itertools from http import HTTPStatus as HTTP from http.server import ThreadingHTTPServer, HTTPServer, BaseHTTPRequestHandler +from pathlib import Path from urllib.parse import urlsplit, parse_qs from ..backends.errors import ( @@ -544,6 +545,12 @@ def serve( socket_activation=False, stdio=False, ): + if backend_url.startswith("FILE:"): + # FILE: URIs are special: they are relative to the current working directory. + path = backend_url[5:] + # If path is relative, make it absolute. expand ~ and ~user. + backend_url = Path(path).expanduser().resolve().as_uri() + # Now we have a valid file:// URI! backend = get_backend(backend_url, permissions=permissions, quota=quota) if backend is None: raise ValueError(f"Invalid backend URL: {backend_url}") From 07088dc555393a51945baf4004314e4a7c68310b Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 00:20:54 +0200 Subject: [PATCH 03/11] refine rest URI parsing, ssh_cmd --- src/borgstore/backends/rest.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index 2671306..3ca84a6 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -4,6 +4,7 @@ import os import re +import shlex import json import sys import logging @@ -155,6 +156,16 @@ def request(self, method, url, params=None, data=None, headers=None, timeout=Non return response +def ssh_cmd(rsh, user, host, port): + """return an ssh command line that can be prefixed to another command line""" + rsh = rsh or os.environ.get("BORGSTORE_RSH", "ssh") + args = shlex.split(rsh) + if port: + args += ["-p", str(port)] + args += [f"{user}@{host}"] if user else [host] + return args + + def get_rest_backend(base_url: str): if not base_url.startswith(("http:", "https:", "rest:")): return None @@ -195,15 +206,23 @@ def get_rest_backend(base_url: str): # rest protocol means: use stdio to talk to a borgstore.server.rest process, # either locally (empty host) or via ssh to the given host. The given path - # is used to construct a "file:" backend URL used by the rest server. + # is used to construct a "FILE:" (hack!) backend URI used by the rest server. # # rest:///path - talk to local rest server, path must be abs. fs path # rest://user@host:port/path - ssh to rest server on host, abs. fs path rest_regex = r""" rest:// - (((?P[^@]+)@)(?P[^:/]+)(:(?P\d+))?)? + ( + (?:(?P[^@:/]+)@)? # optional user + (?P( + (?!\[)[^:/]+(?\d+))? # optional port + )? / # separator always required - (?P[^?#]*) # rel/path or /abs/path or even ~/path or ~user/path + (?P[^?#]+) # non-empty rel/path or /abs/path or even ~/path or ~user/path """ m = re.match(rest_regex, base_url, re.VERBOSE) if m: @@ -216,7 +235,7 @@ def get_rest_backend(base_url: str): command = [] python = sys.executable else: - command = ["ssh", "-p", port, f"{user}@{host}"] + command = ssh_cmd("ssh", user, host, port) python = "python3" # hack: we do NOT use a standards-compliant file:// URI here, because they only support absolute paths. # we just use FILE:path and that path can be relative or absolute or even have ~ or ~user. From 280430dbfe7060af951072c117aca454ed8367cb Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 01:40:03 +0200 Subject: [PATCH 04/11] avoid that stdin/stdout is closed --- src/borgstore/server/rest.py | 61 +++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/src/borgstore/server/rest.py b/src/borgstore/server/rest.py index e57b82e..489acc6 100644 --- a/src/borgstore/server/rest.py +++ b/src/borgstore/server/rest.py @@ -372,6 +372,36 @@ def get_pre_bound_socket(): return socket.fromfd(3, socket.AF_UNIX, socket.SOCK_STREAM) +class _UnclosableStream: + """Wraps a binary stream and makes close() a no-op so that + StreamRequestHandler.finish() cannot close sys.stdin/sys.stdout.""" + + def __init__(self, stream): + self._stream = stream + + def close(self): + pass # intentionally do nothing + + @property + def closed(self): + return self._stream.closed + + def read(self, *args, **kwargs): + return self._stream.read(*args, **kwargs) + + def readline(self, *args, **kwargs): + return self._stream.readline(*args, **kwargs) + + def peek(self, *args, **kwargs): + return self._stream.peek(*args, **kwargs) + + def write(self, *args, **kwargs): + return self._stream.write(*args, **kwargs) + + def flush(self, *args, **kwargs): + return self._stream.flush(*args, **kwargs) + + class StdinStdoutSocket: """A mock socket that redirects reads to stdin and writes to stdout.""" @@ -381,11 +411,13 @@ def __init__(self): self.wfile = sys.stdout.buffer def makefile(self, mode="r", buffering=None, encoding=None, errors=None, newline=None): - """The HTTP request handler calls makefile() to get read/write streams.""" + """The HTTP request handler calls makefile() to get read/write streams. + We wrap the underlying buffer in _UnclosableStream so that + StreamRequestHandler.finish() cannot close sys.stdin/sys.stdout.""" if "r" in mode: - return self.rfile + return _UnclosableStream(self.rfile) elif "w" in mode: - return self.wfile + return _UnclosableStream(self.wfile) def sendall(self, data, flags=0): """Directly writes all data to stdout and flushes.""" @@ -428,17 +460,24 @@ def __init__(self, RequestHandlerClass): def serve_forever(self, poll_interval=0.5): """Continuously handle requests until stdin is empty/closed.""" - # Instantiate the handler once - handler = self.RequestHandlerClass(self.socket, ("stdio-client", 0), self) - while True: - # Check if stdin has reached EOF - # If the client closes the stream or sends 0 bytes, we break - if self.socket.rfile.peek(1) == b"": + # Instantiate a fresh handler per request so that handler state + # (close_connection, headers, etc.) never carries over between + # requests. BaseHTTPRequestHandler.__init__ calls handle() which + # calls handle_one_request() internally, so construction == handling. + # + # handle_one_request() sets raw_requestline=b"" and returns without + # sending a response when readline() hits EOF (client closed stdin). + # We detect that here and exit cleanly. + # + # Note: close_connection is NOT a reliable EOF signal — send_error() + # and parse_request() both set it True for non-EOF reasons (e.g. 404). + # _UnclosableStream ensures finish() cannot close sys.stdin/sys.stdout, + # so the stream stays open across requests even after error responses. + handler = self.RequestHandlerClass(self.socket, ("stdio-client", 0), self) + if getattr(handler, "raw_requestline", b"") == b"": break - handler.handle_one_request() - class BorgStoreStdioRESTServer(StdIOHTTPServer): def __init__(self, backend, username=None, password=None): From 40201092e166c6c3622a20b92c7f5d45bfeffbc9 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 01:40:46 +0200 Subject: [PATCH 05/11] log server stderr at debug level there is one log line per http request. --- src/borgstore/backends/rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index 3ca84a6..befd888 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -56,7 +56,7 @@ def _drain_stderr(self): if self.process is None or self.process.stderr is None: return for line in self.process.stderr: - logger.warning("REST stdio server: %s", line.decode("utf-8", errors="replace").rstrip()) + logger.debug("Remote: %s", line.decode("utf-8", errors="replace").rstrip()) def open(self): if self.process is not None: From 4f4f5d1a1bca957abc2adf41a70ea180d92bef00 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 13:30:27 +0200 Subject: [PATCH 06/11] server/rest: fix StdIOHTTPServer missing server_name/server_port attributes --- src/borgstore/server/rest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/borgstore/server/rest.py b/src/borgstore/server/rest.py index 489acc6..4fff0f8 100644 --- a/src/borgstore/server/rest.py +++ b/src/borgstore/server/rest.py @@ -436,7 +436,7 @@ def recv(self, bufsize, flags=0): def getsockname(self): """Required by the server to log or bind addresses.""" - return ("stdin/stdout", 0) + return ("stdio", 0) def getpeername(self): """Required by the handler for logging client info.""" @@ -452,7 +452,10 @@ class StdIOHTTPServer(HTTPServer): def __init__(self, RequestHandlerClass): # Skip the base TCPServer __init__ entirely because we aren't binding to a network port - self.server_address = ("stdin/stdout", 0) + host, port = "stdio", 0 + self.server_name = host + self.server_port = port + self.server_address = (host, port) self.RequestHandlerClass = RequestHandlerClass # Instantiate our fake socket From a6c3704b93e918087467b13ca61ce36a3044cbab Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 14:17:14 +0200 Subject: [PATCH 07/11] backends/rest: include stderr tail in EOF error message from StdioSession --- src/borgstore/backends/rest.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index befd888..a803d8b 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -2,6 +2,7 @@ REST http client based backend implementation (use with borgstore.server.rest). """ +import collections import os import re import shlex @@ -51,12 +52,15 @@ def __init__(self, command, auth=None, headers=None, timeout=30): self.timeout = timeout self.process = None self._stderr_thread = None + self._stderr_lines: collections.deque = collections.deque(maxlen=10) # recent stderr for error messages def _drain_stderr(self): if self.process is None or self.process.stderr is None: return for line in self.process.stderr: - logger.debug("Remote: %s", line.decode("utf-8", errors="replace").rstrip()) + decoded = line.decode("utf-8", errors="replace").rstrip() + self._stderr_lines.append(decoded) + logger.debug("Remote: %s", decoded) def open(self): if self.process is not None: @@ -84,6 +88,7 @@ def close(self): self.process.stderr.close() self.process = None self._stderr_thread = None + self._stderr_lines.clear() def __enter__(self): self.open() @@ -124,7 +129,9 @@ def request(self, method, url, params=None, data=None, headers=None, timeout=Non line = self.process.stdout.readline() if not line: - raise BackendError("stdio server closed connection unexpectedly") + stderr_tail = "\n".join(self._stderr_lines) + detail = f":\n{stderr_tail}" if stderr_tail else "" + raise BackendError(f"stdio server closed connection unexpectedly{detail}") status_line = line.decode("iso-8859-1").strip() parts = status_line.split(" ", 2) if len(parts) < 2: From fd5bf224558fc4a4d5e43bdf07b6e153b630dd13 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 14:19:02 +0200 Subject: [PATCH 08/11] backends/rest: join stderr thread before reading error buffer on EOF --- src/borgstore/backends/rest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index a803d8b..ac8c624 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -129,6 +129,8 @@ def request(self, method, url, params=None, data=None, headers=None, timeout=Non line = self.process.stdout.readline() if not line: + if self._stderr_thread is not None: + self._stderr_thread.join(timeout=0.5) stderr_tail = "\n".join(self._stderr_lines) detail = f":\n{stderr_tail}" if stderr_tail else "" raise BackendError(f"stdio server closed connection unexpectedly{detail}") From a02bef1b91832b164b7cb859dea4db0d11b08cdc Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 May 2026 16:46:50 +0200 Subject: [PATCH 09/11] backends/rest: raise BackendError if stdio server exits with non-zero code on close --- src/borgstore/backends/rest.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index ac8c624..1e277c4 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -74,10 +74,12 @@ def open(self): def close(self): if self.process is None: return + returncode = None try: if self.process.stdin is not None: self.process.stdin.close() self.process.wait(timeout=self.timeout) + returncode = self.process.returncode except subprocess.TimeoutExpired: self.process.kill() self.process.wait(timeout=self.timeout) @@ -86,9 +88,16 @@ def close(self): self.process.stdout.close() if self.process.stderr is not None: self.process.stderr.close() + if self._stderr_thread is not None: + self._stderr_thread.join(timeout=0.5) self.process = None self._stderr_thread = None + if returncode: + stderr_tail = "\n".join(self._stderr_lines) + detail = f":\n{stderr_tail}" if stderr_tail else "" self._stderr_lines.clear() + raise BackendError(f"stdio server exited with code {returncode}{detail}") + self._stderr_lines.clear() def __enter__(self): self.open() From af72377727cfa19174ea563d58efe7b740d25a41 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 1 Jun 2026 22:18:44 +0200 Subject: [PATCH 10/11] ssh_cmd tweaking --- src/borgstore/backends/rest.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index 1e277c4..e2d5922 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -174,12 +174,15 @@ def request(self, method, url, params=None, data=None, headers=None, timeout=Non return response -def ssh_cmd(rsh, user, host, port): +def ssh_cmd(user, host, port): """return an ssh command line that can be prefixed to another command line""" - rsh = rsh or os.environ.get("BORGSTORE_RSH", "ssh") - args = shlex.split(rsh) - if port: - args += ["-p", str(port)] + rsh = os.environ.get("BORGSTORE_RSH") + if rsh: + args = shlex.split(rsh) + else: + args = ["ssh"] + if port: + args += ["-p", str(port)] args += [f"{user}@{host}"] if user else [host] return args @@ -253,7 +256,7 @@ def get_rest_backend(base_url: str): command = [] python = sys.executable else: - command = ssh_cmd("ssh", user, host, port) + command = ssh_cmd(user, host, port) python = "python3" # hack: we do NOT use a standards-compliant file:// URI here, because they only support absolute paths. # we just use FILE:path and that path can be relative or absolute or even have ~ or ~user. From e1637a210e215ea4d63d9ed7e095d3526266fd57 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 1 Jun 2026 22:47:11 +0200 Subject: [PATCH 11/11] cli command: borgstore-server-rest --- contrib/server/nginx-systemd/README.md | 2 +- .../server/nginx-systemd/borgstore@.service | 2 +- docs/servers.rst | 74 +++++++++++-------- pyproject.toml | 3 + src/borgstore/backends/rest.py | 12 +-- src/borgstore/server/rest.py | 6 +- tests/test_server_rest.py | 2 +- 7 files changed, 59 insertions(+), 42 deletions(-) diff --git a/contrib/server/nginx-systemd/README.md b/contrib/server/nginx-systemd/README.md index 5609cd4..3e4d52f 100644 --- a/contrib/server/nginx-systemd/README.md +++ b/contrib/server/nginx-systemd/README.md @@ -18,7 +18,7 @@ compatibility. For production use, you **must** add SSL/TLS configuration. - **`borgstore@.socket`** — systemd creates `/run/borgstore/.sock` and starts the matching service on the first incoming connection. -- **`borgstore@.service`** — runs `borgstore.server.rest --socket-activation`, +- **`borgstore@.service`** — runs `borgstore-server-rest --socket-activation`, adopting the pre-bound socket from systemd. - **`nginx-borgstore.conf`** — a single wildcard `location` block routes any `/repos//` URL to the matching Unix socket; nginx strips the path diff --git a/contrib/server/nginx-systemd/borgstore@.service b/contrib/server/nginx-systemd/borgstore@.service index f7f38e5..9ee0b03 100644 --- a/contrib/server/nginx-systemd/borgstore@.service +++ b/contrib/server/nginx-systemd/borgstore@.service @@ -27,7 +27,7 @@ Group=borgstore # Minimum required: BORGSTORE_BACKEND, BORGSTORE_USERNAME, BORGSTORE_PASSWORD EnvironmentFile=/etc/borgstore/%i.env -ExecStart=/usr/bin/python3 -m borgstore.server.rest \ +ExecStart=/usr/bin/borgstore-server-rest \ --backend ${BORGSTORE_BACKEND} \ --username ${BORGSTORE_USERNAME} \ --password ${BORGSTORE_PASSWORD} \ diff --git a/docs/servers.rst b/docs/servers.rst index a89b9ec..2bb18f0 100644 --- a/docs/servers.rst +++ b/docs/servers.rst @@ -13,18 +13,27 @@ cloud storage servers: - server-side hash computation (e.g. sha256) for item content - server-side defragmentation helper (copies blocks to new items) -Running the server ------------------- +Running the server on host:port +------------------------------- Run a server with a file: backend (for a local directory), using HTTP Basic Authentication:: - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore For production deployments, consider using systemd socket activation (see contrib/server/nginx-systemd/README.md). +Running the server via stdio +---------------------------- + +Run a server with a file: backend (for a local directory), talking http via +stdin/stdout, logging via stderr: + + borgstore-server-rest --stdio \ + --backend file:///tmp/teststore + Accessing the server from a client ---------------------------------- @@ -32,6 +41,13 @@ The borgstore REST client can then access via:: http://user:pass@127.0.0.1:5618/ +or (when using http over stdio): + + rest://user@host:port//tmp/teststore # via ssh, abs. path + rest://user@host:port/teststore # via ssh, rel. path + rest:////tmp/teststore # locally, without ssh, abs. path + rest:///teststore # locally, without ssh, rel. path + Permissions ----------- @@ -45,24 +61,24 @@ Examples: Read-only access:: - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore \ - --permissions '{"": "lr"}' + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore \ + --permissions '{"": "lr"}' No-delete, no-overwrite (allow adding new items):: - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore \ - --permissions '{"": "lrw"}' + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore \ + --permissions '{"": "lrw"}' Full access:: - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore \ - --permissions '{"": "lrwWD"}' + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore \ + --permissions '{"": "lrwWD"}' BorgBackup shortcuts ~~~~~~~~~~~~~~~~~~~~ @@ -87,10 +103,10 @@ Example — restrict a backup server to no-delete access: .. code-block:: bash - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///home/user/repos/repo1 \ - --permissions borgbackup-no-delete + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///home/user/repos/repo1 \ + --permissions borgbackup-no-delete Custom JSON permissions ~~~~~~~~~~~~~~~~~~~~~~~ @@ -99,10 +115,10 @@ You can also pass an arbitrary JSON-encoded permissions mapping directly. Hierarchical rules (list-only at root, read/write in ``data/``):: - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore \ - --permissions '{"": "l", "data": "lrw"}' + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore \ + --permissions '{"": "l", "data": "lrw"}' Quota ----- @@ -120,8 +136,8 @@ Example — limit storage to 1 GiB: .. code-block:: bash - python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \ - --username user --password pass \ - --backend file:///tmp/teststore \ - --quota 1073741824 + borgstore-server-rest --host 127.0.0.1 --port 5618 \ + --username user --password pass \ + --backend file:///tmp/teststore \ + --quota 1073741824 diff --git a/pyproject.toml b/pyproject.toml index 401a9f4..7c7dda1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,9 @@ s3 = [ ] none = [] +[project.scripts] +borgstore-server-rest = "borgstore.server.rest:main" + [project.urls] Homepage = "https://github.com/borgbackup/borgstore" diff --git a/src/borgstore/backends/rest.py b/src/borgstore/backends/rest.py index e2d5922..64f2125 100644 --- a/src/borgstore/backends/rest.py +++ b/src/borgstore/backends/rest.py @@ -7,7 +7,6 @@ import re import shlex import json -import sys import logging import hashlib import threading @@ -251,17 +250,12 @@ def get_rest_backend(base_url: str): user = m.group("user") host = m.group("host") port = m.group("port") or "22" - if not host: - # empty host: don't use ssh, just run the rest server here - command = [] - python = sys.executable - else: - command = ssh_cmd(user, host, port) - python = "python3" + # empty host: don't use ssh, just run the rest server here + command = [] if not host else ssh_cmd(user, host, port) # hack: we do NOT use a standards-compliant file:// URI here, because they only support absolute paths. # we just use FILE:path and that path can be relative or absolute or even have ~ or ~user. # borgstore.server.rest will translate it to an absolute file:// URI internally. - command.extend([python, "-m", "borgstore.server.rest", "--stdio", "--backend", f"FILE:{path}"]) + command.extend(["borgstore-server-rest", "--stdio", "--backend", f"FILE:{path}"]) return REST(base_url="http://stdio-backend", command=command) diff --git a/src/borgstore/server/rest.py b/src/borgstore/server/rest.py index 4fff0f8..b30df3c 100644 --- a/src/borgstore/server/rest.py +++ b/src/borgstore/server/rest.py @@ -616,7 +616,7 @@ def serve( server.server_close() -if __name__ == "__main__": +def main(): logging.basicConfig(level=logging.INFO, format="%(message)s") logger.setLevel(logging.INFO) parser = argparse.ArgumentParser(description="BorgStore REST Server") @@ -646,3 +646,7 @@ def serve( args.socket_activation, args.stdio, ) + + +if __name__ == "__main__": + main() diff --git a/tests/test_server_rest.py b/tests/test_server_rest.py index 6463d08..ccd532a 100644 --- a/tests/test_server_rest.py +++ b/tests/test_server_rest.py @@ -574,7 +574,7 @@ def test_rest_server_quota_method_no_quota(rest_server_with_auth): def test_rest_server_stdio(tmp_path): backend_url = tmp_path.as_uri() proc = subprocess.Popen( - [sys.executable, "-m", "borgstore.server.rest", "--stdio", "--backend", backend_url], + ["borgstore-server-rest", "--stdio", "--backend", backend_url], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,