From a1e00b4587bd4a19618c1fa56bf971e163004576 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20G=C3=B6rresen=20Mello?= Date: Wed, 11 Feb 2026 12:35:32 -0300 Subject: [PATCH 1/6] adding signed ingestion --- alephclient/api.py | 56 ++++++++++++++++++++++++++++++ alephclient/cli.py | 8 +++++ alephclient/crawldir.py | 11 +++++- alephclient/tests/test_crawldir.py | 10 ++++++ alephclient/tests/test_tasks.py | 19 ++++++++++ 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/alephclient/api.py b/alephclient/api.py index fb912ab..4814286 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -1,5 +1,6 @@ import importlib.metadata import json +import mimetypes import uuid import logging from itertools import count @@ -471,6 +472,7 @@ def ingest_upload( metadata: Optional[Dict] = None, sync: bool = False, index: bool = True, + signed_url: bool = False, ) -> Dict: """ Create an empty folder in a collection or upload a document to it @@ -483,6 +485,9 @@ def ingest_upload( files, metadata contains foreign_id of the parent. Metadata for a directory contains foreign_id for itself as well as its parent and the name of the directory. + signed_url: use the signed URL workflow for file uploads. When True, + files are uploaded via a signed URL instead of multipart ingest. + Directories always use the standard ingest endpoint. """ url_path = "collections/{0}/ingest".format(collection_id) params = {"sync": sync, "index": index} @@ -491,6 +496,9 @@ def ingest_upload( data = {"meta": json.dumps(metadata)} return self._request("POST", url, data=data) + if signed_url: + return self._signed_url_upload(collection_id, file_path, metadata, index) + for attempt in count(1): try: with file_path.open("rb") as fh: @@ -509,6 +517,54 @@ def ingest_upload( backoff(ae, attempt) return {} + def _signed_url_upload( + self, + collection_id: str, + file_path: Path, + metadata: Optional[Dict], + index: bool, + ) -> Dict: + mime_type = mimetypes.guess_type(file_path.name)[0] or MIME + meta = dict(metadata or {}) + meta["file_name"] = file_path.name + meta["mime_type"] = mime_type + + for attempt in count(1): + try: + # Request a signed upload URL + upload_url = self._make_url("file/uploadUrl") + result = self._request("POST", upload_url) + signed_url = result["url"] + upload_id = result["id"] + + # PUT file content to the signed URL + try: + with file_path.open("rb") as fh: + response = self.session.put( + signed_url, + data=fh, + headers={"Content-Type": "application/octet-stream"}, + ) + response.raise_for_status() + except (RequestException, HTTPError) as exc: + raise AlephException(exc) from exc + + # Create document record. + # The server returns an empty 200 when a document with + # the same foreign_id already exists in the collection. + doc_url_path = f"collections/{collection_id}/document" + doc_url = self._make_url(doc_url_path, params={"index": index}) + payload = {"upload_id": upload_id, "meta": meta} + result = self._request("POST", doc_url, json=payload) + if not result: + return {"id": upload_id, "status": "ok"} + return result + except AlephException as ae: + if not ae.transient or attempt > self.retries: + raise ae from ae + backoff(ae, attempt) + return {} + def create_entityset( self, collection_id: str, type: str, label: str, summary: Optional[str] ) -> Dict: diff --git a/alephclient/cli.py b/alephclient/cli.py index b9399b0..83d8cd4 100644 --- a/alephclient/cli.py +++ b/alephclient/cli.py @@ -87,6 +87,12 @@ def cli(ctx, host, api_key, retries): help="maximum number of parallel uploads", ) @click.option("-f", "--foreign-id", required=True, help="foreign-id of the collection") +@click.option( + "--signed-url", + is_flag=True, + default=False, + help="use signed URL workflow for file uploads", +) @click.argument("path", type=click.Path(exists=True)) @click.pass_context def crawldir( @@ -98,6 +104,7 @@ def crawldir( noindex=False, nojunk=False, parallel=1, + signed_url=False, ): """Crawl a directory recursively and upload the documents in it to a collection.""" @@ -112,6 +119,7 @@ def crawldir( index=not noindex, nojunk=nojunk, parallel=parallel, + signed_url=signed_url, ) except AlephException as exc: raise click.ClickException(str(exc)) diff --git a/alephclient/crawldir.py b/alephclient/crawldir.py index 36d3d8e..9af3c9e 100644 --- a/alephclient/crawldir.py +++ b/alephclient/crawldir.py @@ -22,9 +22,11 @@ def __init__( path: Path, index: bool = True, nojunk: bool = False, + signed_url: bool = False, ): self.api = api self.index = index + self.signed_url = signed_url self.exclude = ( { "f": re.compile(r"\..*|thumbs\.db|desktop\.ini", re.I), @@ -128,11 +130,15 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str: log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id) if parent_id is not None: metadata["parent_id"] = parent_id + kwargs = {} + if self.signed_url: + kwargs["signed_url"] = True result = self.api.ingest_upload( self.collection_id, path, metadata=metadata, index=self.index, + **kwargs, ) if "id" not in result and not hasattr(result, "id"): raise AlephException("Upload failed") @@ -147,6 +153,7 @@ def crawl_dir( index: bool = True, nojunk: bool = False, parallel: int = 1, + signed_url: bool = False, ): """Crawl a directory and upload its content to a collection @@ -158,7 +165,9 @@ def crawl_dir( """ root = Path(path).resolve() collection = api.load_collection_by_foreign_id(foreign_id, config) - crawler = CrawlDirectory(api, collection, root, index=index, nojunk=nojunk) + crawler = CrawlDirectory( + api, collection, root, index=index, nojunk=nojunk, signed_url=signed_url + ) consumers = [] # Use one thread to produce using scandir and at least one to consume diff --git a/alephclient/tests/test_crawldir.py b/alephclient/tests/test_crawldir.py index b906c60..c3e340e 100644 --- a/alephclient/tests/test_crawldir.py +++ b/alephclient/tests/test_crawldir.py @@ -48,3 +48,13 @@ def test_is_excluded_exclude_dir(self): crawldir.exclude["d"] = re.compile(r"week1\/*", re.I) is_excluded = crawldir.is_excluded(path) assert is_excluded + + def test_signed_url_default_false(self): + path = Path(os.path.join(self.base_path, "jan/week1")) + crawldir = CrawlDirectory(AlephAPI, {}, path) + assert crawldir.signed_url is False + + def test_signed_url_true(self): + path = Path(os.path.join(self.base_path, "jan/week1")) + crawldir = CrawlDirectory(AlephAPI, {}, path, signed_url=True) + assert crawldir.signed_url is True diff --git a/alephclient/tests/test_tasks.py b/alephclient/tests/test_tasks.py index c3afd57..d9ff7ea 100644 --- a/alephclient/tests/test_tasks.py +++ b/alephclient/tests/test_tasks.py @@ -120,3 +120,22 @@ def test_ingest(self, mocker): ] for call in expected_calls: assert call in self.api.ingest_upload.mock_calls + + def test_ingest_signed_url(self, mocker): + mocker.patch.object(self.api, "ingest_upload", return_value={"id": 42}) + mocker.patch.object( + self.api, "load_collection_by_foreign_id", return_value={"id": 2} + ) + mocker.patch.object(self.api, "update_collection") + crawl_dir( + self.api, + "alephclient/tests/testdata", + "test153", + {}, + True, + True, + signed_url=True, + ) + assert self.api.ingest_upload.call_count == 6 + for call in self.api.ingest_upload.call_args_list: + assert call.kwargs.get("signed_url") is True From 7ede56d9df3e2e355a69ca5c102b602a9ffb1db8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20G=C3=B6rresen=20Mello?= Date: Wed, 11 Feb 2026 15:13:04 -0300 Subject: [PATCH 2/6] separating the signed ingestion functionality from the original code and adding new logging on crawldir signed usage --- alephclient/api.py | 47 ++++++++++++++++++++++----------- alephclient/crawldir.py | 22 ++++++++------- alephclient/tests/test_tasks.py | 6 ++--- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/alephclient/api.py b/alephclient/api.py index 4814286..11ecac2 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -472,7 +472,6 @@ def ingest_upload( metadata: Optional[Dict] = None, sync: bool = False, index: bool = True, - signed_url: bool = False, ) -> Dict: """ Create an empty folder in a collection or upload a document to it @@ -485,9 +484,6 @@ def ingest_upload( files, metadata contains foreign_id of the parent. Metadata for a directory contains foreign_id for itself as well as its parent and the name of the directory. - signed_url: use the signed URL workflow for file uploads. When True, - files are uploaded via a signed URL instead of multipart ingest. - Directories always use the standard ingest endpoint. """ url_path = "collections/{0}/ingest".format(collection_id) params = {"sync": sync, "index": index} @@ -496,9 +492,6 @@ def ingest_upload( data = {"meta": json.dumps(metadata)} return self._request("POST", url, data=data) - if signed_url: - return self._signed_url_upload(collection_id, file_path, metadata, index) - for attempt in count(1): try: with file_path.open("rb") as fh: @@ -517,13 +510,36 @@ def ingest_upload( backoff(ae, attempt) return {} - def _signed_url_upload( + def signed_url_upload( self, collection_id: str, - file_path: Path, - metadata: Optional[Dict], - index: bool, + file_path: Optional[Path] = None, + metadata: Optional[Dict] = None, + index: bool = True, ) -> Dict: + """ + Upload a document using the signed URL workflow. + + For directories (no file), falls back to the standard ingest endpoint + since there is no file content to upload. + + The workflow is: + 1. POST /file/uploadUrl -> {url, id} + 2. PUT file content to the signed url + 3. POST /collections/{id}/document with the upload_id and metadata + + params + ------ + collection_id: id of the collection to upload to + file_path: path of the file to upload. None while creating folders + metadata: dict containing metadata for the file or folders + index: whether to index the document after creation + """ + if not file_path or file_path.is_dir(): + return self.ingest_upload( + collection_id, file_path, metadata=metadata, index=index + ) + mime_type = mimetypes.guess_type(file_path.name)[0] or MIME meta = dict(metadata or {}) meta["file_name"] = file_path.name @@ -531,13 +547,14 @@ def _signed_url_upload( for attempt in count(1): try: - # Request a signed upload URL + # Step 1: request a signed upload URL upload_url = self._make_url("file/uploadUrl") result = self._request("POST", upload_url) signed_url = result["url"] upload_id = result["id"] + log.info("Signed URL [%s]: %s", upload_id, signed_url) - # PUT file content to the signed URL + # Step 2: PUT file content to the signed URL try: with file_path.open("rb") as fh: response = self.session.put( @@ -549,9 +566,7 @@ def _signed_url_upload( except (RequestException, HTTPError) as exc: raise AlephException(exc) from exc - # Create document record. - # The server returns an empty 200 when a document with - # the same foreign_id already exists in the collection. + # Step 3: create the document record doc_url_path = f"collections/{collection_id}/document" doc_url = self._make_url(doc_url_path, params={"index": index}) payload = {"upload_id": upload_id, "meta": meta} diff --git a/alephclient/crawldir.py b/alephclient/crawldir.py index 9af3c9e..9a0b69b 100644 --- a/alephclient/crawldir.py +++ b/alephclient/crawldir.py @@ -130,16 +130,20 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str: log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id) if parent_id is not None: metadata["parent_id"] = parent_id - kwargs = {} if self.signed_url: - kwargs["signed_url"] = True - result = self.api.ingest_upload( - self.collection_id, - path, - metadata=metadata, - index=self.index, - **kwargs, - ) + result = self.api.signed_url_upload( + self.collection_id, + path, + metadata=metadata, + index=self.index, + ) + else: + result = self.api.ingest_upload( + self.collection_id, + path, + metadata=metadata, + index=self.index, + ) if "id" not in result and not hasattr(result, "id"): raise AlephException("Upload failed") return result["id"] diff --git a/alephclient/tests/test_tasks.py b/alephclient/tests/test_tasks.py index d9ff7ea..2b32391 100644 --- a/alephclient/tests/test_tasks.py +++ b/alephclient/tests/test_tasks.py @@ -122,7 +122,7 @@ def test_ingest(self, mocker): assert call in self.api.ingest_upload.mock_calls def test_ingest_signed_url(self, mocker): - mocker.patch.object(self.api, "ingest_upload", return_value={"id": 42}) + mocker.patch.object(self.api, "signed_url_upload", return_value={"id": 42}) mocker.patch.object( self.api, "load_collection_by_foreign_id", return_value={"id": 2} ) @@ -136,6 +136,4 @@ def test_ingest_signed_url(self, mocker): True, signed_url=True, ) - assert self.api.ingest_upload.call_count == 6 - for call in self.api.ingest_upload.call_args_list: - assert call.kwargs.get("signed_url") is True + assert self.api.signed_url_upload.call_count == 6 From 349159d1f07bfed1355004f238cb6abf37be1ad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20G=C3=B6rresen=20Mello?= Date: Thu, 12 Feb 2026 17:43:16 -0300 Subject: [PATCH 3/6] moving url to debug and printing only id --- alephclient/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/alephclient/api.py b/alephclient/api.py index 11ecac2..a176d41 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -552,7 +552,8 @@ def signed_url_upload( result = self._request("POST", upload_url) signed_url = result["url"] upload_id = result["id"] - log.info("Signed URL [%s]: %s", upload_id, signed_url) + log.info("Signed URL id [%s]: %s", upload_id, file_path.name) + log.debug("Signed URL: %s", signed_url) # Step 2: PUT file content to the signed URL try: From 50c8fd04e4226011337ad52b9f71a5a17a2c2ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20G=C3=B6rresen=20Mello?= Date: Thu, 12 Feb 2026 17:44:50 -0300 Subject: [PATCH 4/6] improving error message --- alephclient/api.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/alephclient/api.py b/alephclient/api.py index a176d41..3697c49 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -549,7 +549,15 @@ def signed_url_upload( try: # Step 1: request a signed upload URL upload_url = self._make_url("file/uploadUrl") - result = self._request("POST", upload_url) + try: + result = self._request("POST", upload_url) + except AlephException as ae: + if ae.status == 404: + raise AlephException( + "Upload endpoint not found. " + "Is this an Aleph Pro instance?" + ) from ae + raise signed_url = result["url"] upload_id = result["id"] log.info("Signed URL id [%s]: %s", upload_id, file_path.name) From 0d3cc7c6959f238aff8cf162fc18437c720460b6 Mon Sep 17 00:00:00 2001 From: Klil Eden Date: Tue, 3 Mar 2026 00:57:48 -0700 Subject: [PATCH 5/6] fix payload keys and stdout --- alephclient/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alephclient/api.py b/alephclient/api.py index 3697c49..1e4bd28 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -561,7 +561,7 @@ def signed_url_upload( signed_url = result["url"] upload_id = result["id"] log.info("Signed URL id [%s]: %s", upload_id, file_path.name) - log.debug("Signed URL: %s", signed_url) + log.debug("Signed URL id [%s]", upload_id) # Step 2: PUT file content to the signed URL try: @@ -578,7 +578,7 @@ def signed_url_upload( # Step 3: create the document record doc_url_path = f"collections/{collection_id}/document" doc_url = self._make_url(doc_url_path, params={"index": index}) - payload = {"upload_id": upload_id, "meta": meta} + payload = {"upload_id": upload_id, "Meta": meta} result = self._request("POST", doc_url, json=payload) if not result: return {"id": upload_id, "status": "ok"} From a3493ee1500cc101df94f13ed359568178c2b543 Mon Sep 17 00:00:00 2001 From: Klil Eden Date: Tue, 3 Mar 2026 12:39:11 -0700 Subject: [PATCH 6/6] fix linter error --- alephclient/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/alephclient/api.py b/alephclient/api.py index 1e4bd28..781d93f 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -554,8 +554,7 @@ def signed_url_upload( except AlephException as ae: if ae.status == 404: raise AlephException( - "Upload endpoint not found. " - "Is this an Aleph Pro instance?" + "Upload endpoint not found. Is this an Aleph Pro instance?" ) from ae raise signed_url = result["url"]