feat: add async job mode to Docling Serve converter#3371
Conversation
Coverage report (docling_serve)Click to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||
| source=source, | ||
| error=e, | ||
| ) | ||
| except DoclingServeConversionError as e: |
There was a problem hiding this comment.
This execption DoclingServeConversionError raised on timeout ends up being caught by the same except block in run() / run_async() that also handles ordinary conversion failures:
If a job exceeds job_timeout (default 600s), the caller silently gets an empty documents list after waiting 10 minutes — indistinguishable from "file had no content". A timeout feels like a different signal from a conversion failure.
One option would be a dedicated subclass:
class DoclingServeTimeoutError(DoclingServeConversionError):
"""Raised when a job exceeds job_timeout."""That way callers can catch it separately, and you could re-raise it in run() or at least use a more specific log message.
Same for the run_async()
| Conversion mode. `sync` uses DoclingServe's synchronous endpoints. `async` submits | ||
| conversion jobs to DoclingServe's async endpoints and polls until completion. | ||
| :param poll_interval: | ||
| Maximum server-side long-poll wait in seconds when `mode="async"`. |
There was a problem hiding this comment.
| Maximum server-side long-poll wait in seconds when `mode="async"`. | |
| Controls both the server-side long-poll wait (?wait= parameter) and the maximum local sleep between polls. A higher value reduces round-trips; a lower value increases polling frequency. |
| :param job_timeout: | ||
| Maximum time in seconds to wait for each async conversion job. | ||
| """ | ||
| if poll_interval <= 0: |
There was a problem hiding this comment.
those are only used in async, so we can do:
if self.mode == ConversionMode.ASYNC:
if poll_interval <= 0:
msg = "poll_interval must be greater than 0."
raise ValueError(msg)
if job_timeout <= 0:
msg = "job_timeout must be greater than 0."
raise ValueError(msg)| @@ -184,6 +264,24 @@ def _post_file(self, client: httpx.Client, source: str | Path | ByteStream) -> d | |||
| response.raise_for_status() | |||
| return response.json() | |||
There was a problem hiding this comment.
This is a pre-existing issue, not introduced by this PR, but worth fixing.
The sync endpoints (_post_file, _post_url) return the raw response JSON without checking the status field in the body. This means if DoclingServe returns a 200 OK with {"status": "failure", "errors": [...]}, the error details are silently discarded — the code falls through to _extract_content, gets None, and logs "No content returned for source" with no indication of why.
The async job path handles this correctly via _raise_for_failed_conversion in _fetch_job_result.
Let's apply the same check to _post_file and _post_url to make the behaviour consistent.
| assert len(result["documents"]) == 2 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_run_async_async_mode_uses_job_endpoints(self): |
There was a problem hiding this comment.
There's a test for run_async() with ASYNC mode and a URL source, but no equivalent for a file source.
Could you add a test that verifies run_async() with mode=ASYNC and a file path hits /v1/convert/file/async and returns the converted document?
| assert data["to_formats"] == "md" | ||
| assert data["target_type"] == "inbody" | ||
|
|
||
| def test_run_async_mode_skips_failed_job(self, caplog): |
There was a problem hiding this comment.
Could you add a test for the timeout path?
When job_timeout is exceeded, DoclingServeConversionError is raised and caught — the source should be skipped with a warning containing "Timed out".
There was a problem hiding this comment.
Also a test for the "skipped" status?
A job can complete with task_status: "success" but the conversion result can still have status: "skipped"
Summary
Closes #3345
Validation