Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions alephclient/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib.metadata
import json
import mimetypes
import uuid
import logging
from itertools import count
Expand Down Expand Up @@ -509,6 +510,84 @@ def ingest_upload(
backoff(ae, attempt)
return {}

def signed_url_upload(
self,
collection_id: str,
file_path: Optional[Path] = None,
metadata: Optional[Dict] = None,
index: bool = True,
) -> Dict:
"""
Upload a document using the signed URL workflow.

For directories (no file), falls back to the standard ingest endpoint
since there is no file content to upload.

The workflow is:
1. POST /file/uploadUrl -> {url, id}
2. PUT file content to the signed url
3. POST /collections/{id}/document with the upload_id and metadata

params
------
collection_id: id of the collection to upload to
file_path: path of the file to upload. None while creating folders
metadata: dict containing metadata for the file or folders
index: whether to index the document after creation
"""
if not file_path or file_path.is_dir():
return self.ingest_upload(
collection_id, file_path, metadata=metadata, index=index
)

mime_type = mimetypes.guess_type(file_path.name)[0] or MIME
meta = dict(metadata or {})
meta["file_name"] = file_path.name
meta["mime_type"] = mime_type

for attempt in count(1):
try:
# Step 1: request a signed upload URL
upload_url = self._make_url("file/uploadUrl")
try:
result = self._request("POST", upload_url)
except AlephException as ae:
if ae.status == 404:
raise AlephException(
"Upload endpoint not found. Is this an Aleph Pro instance?"
) from ae
raise
signed_url = result["url"]
upload_id = result["id"]
log.info("Signed URL id [%s]: %s", upload_id, file_path.name)
log.debug("Signed URL id [%s]", upload_id)

# Step 2: PUT file content to the signed URL
try:
with file_path.open("rb") as fh:
response = self.session.put(
signed_url,
data=fh,
headers={"Content-Type": "application/octet-stream"},
)
response.raise_for_status()
except (RequestException, HTTPError) as exc:
raise AlephException(exc) from exc

# Step 3: create the document record
doc_url_path = f"collections/{collection_id}/document"
doc_url = self._make_url(doc_url_path, params={"index": index})
payload = {"upload_id": upload_id, "Meta": meta}
result = self._request("POST", doc_url, json=payload)
if not result:
return {"id": upload_id, "status": "ok"}
return result
except AlephException as ae:
if not ae.transient or attempt > self.retries:
raise ae from ae
backoff(ae, attempt)
return {}

def create_entityset(
self, collection_id: str, type: str, label: str, summary: Optional[str]
) -> Dict:
Expand Down
8 changes: 8 additions & 0 deletions alephclient/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ def cli(ctx, host, api_key, retries):
help="maximum number of parallel uploads",
)
@click.option("-f", "--foreign-id", required=True, help="foreign-id of the collection")
@click.option(
"--signed-url",
is_flag=True,
default=False,
help="use signed URL workflow for file uploads",
)
@click.argument("path", type=click.Path(exists=True))
@click.pass_context
def crawldir(
Expand All @@ -98,6 +104,7 @@ def crawldir(
noindex=False,
nojunk=False,
parallel=1,
signed_url=False,
):
"""Crawl a directory recursively and upload the documents in it to a
collection."""
Expand All @@ -112,6 +119,7 @@ def crawldir(
index=not noindex,
nojunk=nojunk,
parallel=parallel,
signed_url=signed_url,
)
except AlephException as exc:
raise click.ClickException(str(exc))
Expand Down
27 changes: 20 additions & 7 deletions alephclient/crawldir.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ def __init__(
path: Path,
index: bool = True,
nojunk: bool = False,
signed_url: bool = False,
):
self.api = api
self.index = index
self.signed_url = signed_url
self.exclude = (
{
"f": re.compile(r"\..*|thumbs\.db|desktop\.ini", re.I),
Expand Down Expand Up @@ -128,12 +130,20 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str:
log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id)
if parent_id is not None:
metadata["parent_id"] = parent_id
result = self.api.ingest_upload(
self.collection_id,
path,
metadata=metadata,
index=self.index,
)
if self.signed_url:
result = self.api.signed_url_upload(
self.collection_id,
path,
metadata=metadata,
index=self.index,
)
else:
result = self.api.ingest_upload(
self.collection_id,
path,
metadata=metadata,
index=self.index,
)
if "id" not in result and not hasattr(result, "id"):
raise AlephException("Upload failed")
return result["id"]
Expand All @@ -147,6 +157,7 @@ def crawl_dir(
index: bool = True,
nojunk: bool = False,
parallel: int = 1,
signed_url: bool = False,
):
"""Crawl a directory and upload its content to a collection

Expand All @@ -158,7 +169,9 @@ def crawl_dir(
"""
root = Path(path).resolve()
collection = api.load_collection_by_foreign_id(foreign_id, config)
crawler = CrawlDirectory(api, collection, root, index=index, nojunk=nojunk)
crawler = CrawlDirectory(
api, collection, root, index=index, nojunk=nojunk, signed_url=signed_url
)
consumers = []

# Use one thread to produce using scandir and at least one to consume
Expand Down
10 changes: 10 additions & 0 deletions alephclient/tests/test_crawldir.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ def test_is_excluded_exclude_dir(self):
crawldir.exclude["d"] = re.compile(r"week1\/*", re.I)
is_excluded = crawldir.is_excluded(path)
assert is_excluded

def test_signed_url_default_false(self):
path = Path(os.path.join(self.base_path, "jan/week1"))
crawldir = CrawlDirectory(AlephAPI, {}, path)
assert crawldir.signed_url is False

def test_signed_url_true(self):
path = Path(os.path.join(self.base_path, "jan/week1"))
crawldir = CrawlDirectory(AlephAPI, {}, path, signed_url=True)
assert crawldir.signed_url is True
17 changes: 17 additions & 0 deletions alephclient/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,20 @@ def test_ingest(self, mocker):
]
for call in expected_calls:
assert call in self.api.ingest_upload.mock_calls

def test_ingest_signed_url(self, mocker):
mocker.patch.object(self.api, "signed_url_upload", return_value={"id": 42})
mocker.patch.object(
self.api, "load_collection_by_foreign_id", return_value={"id": 2}
)
mocker.patch.object(self.api, "update_collection")
crawl_dir(
self.api,
"alephclient/tests/testdata",
"test153",
{},
True,
True,
signed_url=True,
)
assert self.api.signed_url_upload.call_count == 6
Loading