Skip to content

Improve thread-safety for download #252

@arthurlorenzi

Description

@arthurlorenzi

Context

I've been building a platform that periodically monitors DataSUS sources and downloads files when they are new or updated, with the goal of compiling a collection of epidemiological indicators. While working on this, I ran into some concurrency issues worth discussing.

Current Behavior

There are two related issues when using the download methods concurrently:

1. download in multi-threaded settings

download relies on FTPSingleton, a globally shared FTP connection. When two threads call download concurrently, they end up sharing the same socket, which can interleave FTP commands and corrupt the transfer, even when downloading different files. This will usually raise exceptions.

2. async_download under concurrent async tasks

async_download uses aioftp, which is built on asyncio streams with no internal locking. The event loop can interleave await points inside aioftp's internals, mixing up FTP control channel commands across concurrent transfers. This can silently produce corrupted files even when each call targets a different file.

The tricky part is that when the file gets corrupted, no exceptions are raised (I can reproduce it relatively consistently by download multiple files at once). Eventually one .parquet will get corrupted.

Proposed Solution

Remove FTPSingleton and give each download call its own connection. Establishing an FTP connection is lightweight and the transfer time dominates completely, so there's negligible overhead:

from contextlib import contextmanager
from ftplib import FTP

@contextmanager
def ftp_connection():
    ftp = FTP("ftp.datasus.gov.br")
    ftp.login()
    try:
        yield ftp
    finally:
        try:
            ftp.quit()
        except Exception:
            ftp.close()

Remove aioftp and reimplement async_download using asyncio.to_thread:

async def async_download(self, local_dir: str = CACHEPATH) -> Data:
    return await asyncio.to_thread(self.download, local_dir)

This delegates to the (now thread-safe) download in a thread pool. async/await still works and user code doesn't break. Concurrency control (e.g. limiting simultaneous connections via asyncio.Semaphore) is left to the user.

Trade-offs

  • Performance: to_thread uses system threads, which are slightly heavier than asyncio coroutines. In practice this should be irrelevant as the file download itself dominates the time taken.
  • Dependency: removes aioftp as a dependency.

Worth noting that multi-threading is not going to make downloads faster and FTP servers will likely reject multiple simultaneous connections. The main goal is to prevent that PySUS breaks user code that is multi-threaded.

Happy to work on a PR if you are aligned with this direction!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions