Skip to content

Commit 03434fe

Browse files
bgmellokdeden
andauthored
Adding signed ingestion (#58)
* adding signed ingestion flag without modifying regular crawldir flow --------- Co-authored-by: Klil Eden <keden@protonmail.com>
1 parent b29539f commit 03434fe

5 files changed

Lines changed: 134 additions & 7 deletions

File tree

alephclient/api.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import importlib.metadata
22
import json
3+
import mimetypes
34
import uuid
45
import logging
56
from itertools import count
@@ -509,6 +510,84 @@ def ingest_upload(
509510
backoff(ae, attempt)
510511
return {}
511512

513+
def signed_url_upload(
514+
self,
515+
collection_id: str,
516+
file_path: Optional[Path] = None,
517+
metadata: Optional[Dict] = None,
518+
index: bool = True,
519+
) -> Dict:
520+
"""
521+
Upload a document using the signed URL workflow.
522+
523+
For directories (no file), falls back to the standard ingest endpoint
524+
since there is no file content to upload.
525+
526+
The workflow is:
527+
1. POST /file/uploadUrl -> {url, id}
528+
2. PUT file content to the signed url
529+
3. POST /collections/{id}/document with the upload_id and metadata
530+
531+
params
532+
------
533+
collection_id: id of the collection to upload to
534+
file_path: path of the file to upload. None while creating folders
535+
metadata: dict containing metadata for the file or folders
536+
index: whether to index the document after creation
537+
"""
538+
if not file_path or file_path.is_dir():
539+
return self.ingest_upload(
540+
collection_id, file_path, metadata=metadata, index=index
541+
)
542+
543+
mime_type = mimetypes.guess_type(file_path.name)[0] or MIME
544+
meta = dict(metadata or {})
545+
meta["file_name"] = file_path.name
546+
meta["mime_type"] = mime_type
547+
548+
for attempt in count(1):
549+
try:
550+
# Step 1: request a signed upload URL
551+
upload_url = self._make_url("file/uploadUrl")
552+
try:
553+
result = self._request("POST", upload_url)
554+
except AlephException as ae:
555+
if ae.status == 404:
556+
raise AlephException(
557+
"Upload endpoint not found. Is this an Aleph Pro instance?"
558+
) from ae
559+
raise
560+
signed_url = result["url"]
561+
upload_id = result["id"]
562+
log.info("Signed URL id [%s]: %s", upload_id, file_path.name)
563+
log.debug("Signed URL id [%s]", upload_id)
564+
565+
# Step 2: PUT file content to the signed URL
566+
try:
567+
with file_path.open("rb") as fh:
568+
response = self.session.put(
569+
signed_url,
570+
data=fh,
571+
headers={"Content-Type": "application/octet-stream"},
572+
)
573+
response.raise_for_status()
574+
except (RequestException, HTTPError) as exc:
575+
raise AlephException(exc) from exc
576+
577+
# Step 3: create the document record
578+
doc_url_path = f"collections/{collection_id}/document"
579+
doc_url = self._make_url(doc_url_path, params={"index": index})
580+
payload = {"upload_id": upload_id, "Meta": meta}
581+
result = self._request("POST", doc_url, json=payload)
582+
if not result:
583+
return {"id": upload_id, "status": "ok"}
584+
return result
585+
except AlephException as ae:
586+
if not ae.transient or attempt > self.retries:
587+
raise ae from ae
588+
backoff(ae, attempt)
589+
return {}
590+
512591
def create_entityset(
513592
self, collection_id: str, type: str, label: str, summary: Optional[str]
514593
) -> Dict:

alephclient/cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ def cli(ctx, host, api_key, retries):
8787
help="maximum number of parallel uploads",
8888
)
8989
@click.option("-f", "--foreign-id", required=True, help="foreign-id of the collection")
90+
@click.option(
91+
"--signed-url",
92+
is_flag=True,
93+
default=False,
94+
help="use signed URL workflow for file uploads",
95+
)
9096
@click.argument("path", type=click.Path(exists=True))
9197
@click.pass_context
9298
def crawldir(
@@ -98,6 +104,7 @@ def crawldir(
98104
noindex=False,
99105
nojunk=False,
100106
parallel=1,
107+
signed_url=False,
101108
):
102109
"""Crawl a directory recursively and upload the documents in it to a
103110
collection."""
@@ -112,6 +119,7 @@ def crawldir(
112119
index=not noindex,
113120
nojunk=nojunk,
114121
parallel=parallel,
122+
signed_url=signed_url,
115123
)
116124
except AlephException as exc:
117125
raise click.ClickException(str(exc))

alephclient/crawldir.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ def __init__(
2222
path: Path,
2323
index: bool = True,
2424
nojunk: bool = False,
25+
signed_url: bool = False,
2526
):
2627
self.api = api
2728
self.index = index
29+
self.signed_url = signed_url
2830
self.exclude = (
2931
{
3032
"f": re.compile(r"\..*|thumbs\.db|desktop\.ini", re.I),
@@ -128,12 +130,20 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str:
128130
log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id)
129131
if parent_id is not None:
130132
metadata["parent_id"] = parent_id
131-
result = self.api.ingest_upload(
132-
self.collection_id,
133-
path,
134-
metadata=metadata,
135-
index=self.index,
136-
)
133+
if self.signed_url:
134+
result = self.api.signed_url_upload(
135+
self.collection_id,
136+
path,
137+
metadata=metadata,
138+
index=self.index,
139+
)
140+
else:
141+
result = self.api.ingest_upload(
142+
self.collection_id,
143+
path,
144+
metadata=metadata,
145+
index=self.index,
146+
)
137147
if "id" not in result and not hasattr(result, "id"):
138148
raise AlephException("Upload failed")
139149
return result["id"]
@@ -147,6 +157,7 @@ def crawl_dir(
147157
index: bool = True,
148158
nojunk: bool = False,
149159
parallel: int = 1,
160+
signed_url: bool = False,
150161
):
151162
"""Crawl a directory and upload its content to a collection
152163
@@ -158,7 +169,9 @@ def crawl_dir(
158169
"""
159170
root = Path(path).resolve()
160171
collection = api.load_collection_by_foreign_id(foreign_id, config)
161-
crawler = CrawlDirectory(api, collection, root, index=index, nojunk=nojunk)
172+
crawler = CrawlDirectory(
173+
api, collection, root, index=index, nojunk=nojunk, signed_url=signed_url
174+
)
162175
consumers = []
163176

164177
# Use one thread to produce using scandir and at least one to consume

alephclient/tests/test_crawldir.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,13 @@ def test_is_excluded_exclude_dir(self):
4848
crawldir.exclude["d"] = re.compile(r"week1\/*", re.I)
4949
is_excluded = crawldir.is_excluded(path)
5050
assert is_excluded
51+
52+
def test_signed_url_default_false(self):
53+
path = Path(os.path.join(self.base_path, "jan/week1"))
54+
crawldir = CrawlDirectory(AlephAPI, {}, path)
55+
assert crawldir.signed_url is False
56+
57+
def test_signed_url_true(self):
58+
path = Path(os.path.join(self.base_path, "jan/week1"))
59+
crawldir = CrawlDirectory(AlephAPI, {}, path, signed_url=True)
60+
assert crawldir.signed_url is True

alephclient/tests/test_tasks.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,20 @@ def test_ingest(self, mocker):
120120
]
121121
for call in expected_calls:
122122
assert call in self.api.ingest_upload.mock_calls
123+
124+
def test_ingest_signed_url(self, mocker):
125+
mocker.patch.object(self.api, "signed_url_upload", return_value={"id": 42})
126+
mocker.patch.object(
127+
self.api, "load_collection_by_foreign_id", return_value={"id": 2}
128+
)
129+
mocker.patch.object(self.api, "update_collection")
130+
crawl_dir(
131+
self.api,
132+
"alephclient/tests/testdata",
133+
"test153",
134+
{},
135+
True,
136+
True,
137+
signed_url=True,
138+
)
139+
assert self.api.signed_url_upload.call_count == 6

0 commit comments

Comments
 (0)