diff --git a/alephclient/api.py b/alephclient/api.py index fb912ab..781d93f 100644 --- a/alephclient/api.py +++ b/alephclient/api.py @@ -1,5 +1,6 @@ import importlib.metadata import json +import mimetypes import uuid import logging from itertools import count @@ -509,6 +510,84 @@ def ingest_upload( backoff(ae, attempt) return {} + def signed_url_upload( + self, + collection_id: str, + file_path: Optional[Path] = None, + metadata: Optional[Dict] = None, + index: bool = True, + ) -> Dict: + """ + Upload a document using the signed URL workflow. + + For directories (no file), falls back to the standard ingest endpoint + since there is no file content to upload. + + The workflow is: + 1. POST /file/uploadUrl -> {url, id} + 2. PUT file content to the signed url + 3. POST /collections/{id}/document with the upload_id and metadata + + params + ------ + collection_id: id of the collection to upload to + file_path: path of the file to upload. None while creating folders + metadata: dict containing metadata for the file or folders + index: whether to index the document after creation + """ + if not file_path or file_path.is_dir(): + return self.ingest_upload( + collection_id, file_path, metadata=metadata, index=index + ) + + mime_type = mimetypes.guess_type(file_path.name)[0] or MIME + meta = dict(metadata or {}) + meta["file_name"] = file_path.name + meta["mime_type"] = mime_type + + for attempt in count(1): + try: + # Step 1: request a signed upload URL + upload_url = self._make_url("file/uploadUrl") + try: + result = self._request("POST", upload_url) + except AlephException as ae: + if ae.status == 404: + raise AlephException( + "Upload endpoint not found. Is this an Aleph Pro instance?" + ) from ae + raise + signed_url = result["url"] + upload_id = result["id"] + log.info("Signed URL id [%s]: %s", upload_id, file_path.name) + log.debug("Signed URL id [%s]", upload_id) + + # Step 2: PUT file content to the signed URL + try: + with file_path.open("rb") as fh: + response = self.session.put( + signed_url, + data=fh, + headers={"Content-Type": "application/octet-stream"}, + ) + response.raise_for_status() + except (RequestException, HTTPError) as exc: + raise AlephException(exc) from exc + + # Step 3: create the document record + doc_url_path = f"collections/{collection_id}/document" + doc_url = self._make_url(doc_url_path, params={"index": index}) + payload = {"upload_id": upload_id, "Meta": meta} + result = self._request("POST", doc_url, json=payload) + if not result: + return {"id": upload_id, "status": "ok"} + return result + except AlephException as ae: + if not ae.transient or attempt > self.retries: + raise ae from ae + backoff(ae, attempt) + return {} + def create_entityset( self, collection_id: str, type: str, label: str, summary: Optional[str] ) -> Dict: diff --git a/alephclient/cli.py b/alephclient/cli.py index b9399b0..83d8cd4 100644 --- a/alephclient/cli.py +++ b/alephclient/cli.py @@ -87,6 +87,12 @@ def cli(ctx, host, api_key, retries): help="maximum number of parallel uploads", ) @click.option("-f", "--foreign-id", required=True, help="foreign-id of the collection") +@click.option( + "--signed-url", + is_flag=True, + default=False, + help="use signed URL workflow for file uploads", +) @click.argument("path", type=click.Path(exists=True)) @click.pass_context def crawldir( @@ -98,6 +104,7 @@ def crawldir( noindex=False, nojunk=False, parallel=1, + signed_url=False, ): """Crawl a directory recursively and upload the documents in it to a collection.""" @@ -112,6 +119,7 @@ def crawldir( index=not noindex, nojunk=nojunk, parallel=parallel, + signed_url=signed_url, ) except AlephException as exc: raise click.ClickException(str(exc)) diff --git a/alephclient/crawldir.py b/alephclient/crawldir.py index 36d3d8e..9a0b69b 100644 --- a/alephclient/crawldir.py +++ b/alephclient/crawldir.py @@ -22,9 +22,11 @@ def __init__( path: Path, index: bool = True, nojunk: bool = False, + signed_url: bool = False, ): self.api = api self.index = index + self.signed_url = signed_url self.exclude = ( { "f": re.compile(r"\..*|thumbs\.db|desktop\.ini", re.I), @@ -128,12 +130,20 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str: log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id) if parent_id is not None: metadata["parent_id"] = parent_id - result = self.api.ingest_upload( - self.collection_id, - path, - metadata=metadata, - index=self.index, - ) + if self.signed_url: + result = self.api.signed_url_upload( + self.collection_id, + path, + metadata=metadata, + index=self.index, + ) + else: + result = self.api.ingest_upload( + self.collection_id, + path, + metadata=metadata, + index=self.index, + ) if "id" not in result and not hasattr(result, "id"): raise AlephException("Upload failed") return result["id"] @@ -147,6 +157,7 @@ def crawl_dir( index: bool = True, nojunk: bool = False, parallel: int = 1, + signed_url: bool = False, ): """Crawl a directory and upload its content to a collection @@ -158,7 +169,9 @@ def crawl_dir( """ root = Path(path).resolve() collection = api.load_collection_by_foreign_id(foreign_id, config) - crawler = CrawlDirectory(api, collection, root, index=index, nojunk=nojunk) + crawler = CrawlDirectory( + api, collection, root, index=index, nojunk=nojunk, signed_url=signed_url + ) consumers = [] # Use one thread to produce using scandir and at least one to consume diff --git a/alephclient/tests/test_crawldir.py b/alephclient/tests/test_crawldir.py index b906c60..c3e340e 100644 --- a/alephclient/tests/test_crawldir.py +++ b/alephclient/tests/test_crawldir.py @@ -48,3 +48,13 @@ def test_is_excluded_exclude_dir(self): crawldir.exclude["d"] = re.compile(r"week1\/*", re.I) is_excluded = crawldir.is_excluded(path) assert is_excluded + + def test_signed_url_default_false(self): + path = Path(os.path.join(self.base_path, "jan/week1")) + crawldir = CrawlDirectory(AlephAPI, {}, path) + assert crawldir.signed_url is False + + def test_signed_url_true(self): + path = Path(os.path.join(self.base_path, "jan/week1")) + crawldir = CrawlDirectory(AlephAPI, {}, path, signed_url=True) + assert crawldir.signed_url is True diff --git a/alephclient/tests/test_tasks.py b/alephclient/tests/test_tasks.py index c3afd57..2b32391 100644 --- a/alephclient/tests/test_tasks.py +++ b/alephclient/tests/test_tasks.py @@ -120,3 +120,20 @@ def test_ingest(self, mocker): ] for call in expected_calls: assert call in self.api.ingest_upload.mock_calls + + def test_ingest_signed_url(self, mocker): + mocker.patch.object(self.api, "signed_url_upload", return_value={"id": 42}) + mocker.patch.object( + self.api, "load_collection_by_foreign_id", return_value={"id": 2} + ) + mocker.patch.object(self.api, "update_collection") + crawl_dir( + self.api, + "alephclient/tests/testdata", + "test153", + {}, + True, + True, + signed_url=True, + ) + assert self.api.signed_url_upload.call_count == 6