Skip to content

Commit a4d27b9

Browse files
authored
fix: race condition in image uploader (#779)
* fix: race condition in image uploader and add tests * Add tests for ImageSequenceUploader. Follow the following principles: - Test that it works in multithreads environement with caches enabled - Test that it works in multithreads with cache disabled - when cache is enabled, test that cache hits for the second run - Do not use mock unless badly needed (See TestSingleImageUploader as exmple) - Do not import modules inside the test class (always add imports to the top of test file) * I saw that you use multithreads in test_image_sequence_uploader_multithreading_with_cache_enabled. No need to do that because `sequence_uploader.upload_images` is using inside multithreading. We can guranteee that sequence_uploader.upload_images won't be called in multithreads * Using cached_time to determine if it's cached is probably not reliable. Try mock some internal implementation, e.g. if single_image_uploader.cache.get() is called and filehandle if filehandle is cached there * increase num_images * Ok, it looks like it is hard to tests with the current implementation. Let's make some changes in uploader.py 1. Make SingleImageUploader fully thread-safe, which means move _maybe_create_persistent_cache_instance to ImageSequenceUploader perhaps 2. Create the cache instance in ImageSequenceUploader and pass in the instance to SingleImageUploader to create the single image uploader instance 3. Use single image uploader instance for all uploads in ImageSequenceUploader Make change in implementation only for now (i.e. do not change tests). * Ok now let's improve tests. As we exposed both ImageSequenceUploader.cache and SingleImageUploader.cache, which means you can update them after construction, and run assertions based on the cache instance * I don't see you are calling `sequence_uploader.upload_images` in test_image_sequence_uploader_cache_hits_second_run Make sure test these scenario in that test: 1. make sure cache is enabled 2. for the first upload_images(a, b, c), make sure no caches are hit 3. fir the second upload_images(c, d, e), make sure c is using the cached value, d and e are not cached 4. eventually a, b, c, d, e are all cached * simplify tests * enable upload_cache_path * refactor * update tests * rename * tests * Assert that results from the uploader.upload_images() in TestImageSequenceUploader has not errors * cache path
1 parent e03d5f5 commit a4d27b9

3 files changed

Lines changed: 588 additions & 62 deletions

File tree

mapillary_tools/history.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ def clear_expired(self) -> list[str]:
162162

163163
return expired_keys
164164

165+
def keys(self):
166+
with self._lock:
167+
with dbm.open(self._file, flag="c") as db:
168+
return db.keys()
169+
165170
def _is_expired(self, payload: JSONDict) -> bool:
166171
expires_at = payload.get("expires_at")
167172
if isinstance(expires_at, (int, float)):

mapillary_tools/uploader.py

Lines changed: 81 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import concurrent.futures
44
import dataclasses
5+
import hashlib
56
import io
67
import json
78
import logging
@@ -56,6 +57,9 @@ class UploadOptions:
5657
user_items: config.UserItem
5758
chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024)
5859
num_upload_workers: int = constants.MAX_IMAGE_UPLOAD_WORKERS
60+
# When set, upload cache will be read/write there
61+
# This option is exposed for testing purpose. In PROD, the path is calculated based on envvar and user_items
62+
upload_cache_path: Path | None = None
5963
dry_run: bool = False
6064
nofinish: bool = False
6165
noresume: bool = False
@@ -471,7 +475,7 @@ def _zip_sequence_fp(
471475
# Arcname should be unique, the name does not matter
472476
arcname = f"{idx}.jpg"
473477
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
474-
zipf.writestr(zipinfo, SingleImageUploader.dump_image_bytes(metadata))
478+
zipf.writestr(zipinfo, CachedImageUploader.dump_image_bytes(metadata))
475479
assert len(sequence) == len(set(zipf.namelist()))
476480
zipf.comment = json.dumps(
477481
{"sequence_md5sum": sequence_md5sum},
@@ -537,6 +541,13 @@ class ImageSequenceUploader:
537541
def __init__(self, upload_options: UploadOptions, emitter: EventEmitter):
538542
self.upload_options = upload_options
539543
self.emitter = emitter
544+
# Create a single shared SingleImageUploader instance that will be used across all uploads
545+
cache = _maybe_create_persistent_cache_instance(self.upload_options)
546+
if cache:
547+
cache.clear_expired()
548+
self.cached_image_uploader = CachedImageUploader(
549+
self.upload_options, cache=cache
550+
)
540551

541552
def upload_images(
542553
self, image_metadatas: T.Sequence[types.ImageMetadata]
@@ -688,10 +699,6 @@ def _upload_images_from_queue(
688699
with api_v4.create_user_session(
689700
self.upload_options.user_items["user_upload_token"]
690701
) as user_session:
691-
single_image_uploader = SingleImageUploader(
692-
self.upload_options, user_session=user_session
693-
)
694-
695702
while True:
696703
# Assert that all images are already pushed into the queue
697704
try:
@@ -710,8 +717,8 @@ def _upload_images_from_queue(
710717
}
711718

712719
# image_progress will be updated during uploading
713-
file_handle = single_image_uploader.upload(
714-
image_metadata, image_progress
720+
file_handle = self.cached_image_uploader.upload(
721+
user_session, image_metadata, image_progress
715722
)
716723

717724
# Update chunk_size (it was constant if set)
@@ -731,24 +738,27 @@ def _upload_images_from_queue(
731738
return indexed_file_handles
732739

733740

734-
class SingleImageUploader:
741+
class CachedImageUploader:
735742
def __init__(
736743
self,
737744
upload_options: UploadOptions,
738-
user_session: requests.Session | None = None,
745+
cache: history.PersistentCache | None = None,
739746
):
740747
self.upload_options = upload_options
741-
self.user_session = user_session
742-
self.cache = self._maybe_create_persistent_cache_instance(
743-
self.upload_options.user_items, upload_options
744-
)
748+
self.cache = cache
749+
if self.cache:
750+
self.cache.clear_expired()
745751

752+
# Thread-safe
746753
def upload(
747-
self, image_metadata: types.ImageMetadata, image_progress: dict[str, T.Any]
754+
self,
755+
user_session: requests.Session,
756+
image_metadata: types.ImageMetadata,
757+
image_progress: dict[str, T.Any],
748758
) -> str:
749759
image_bytes = self.dump_image_bytes(image_metadata)
750760

751-
uploader = Uploader(self.upload_options, user_session=self.user_session)
761+
uploader = Uploader(self.upload_options, user_session=user_session)
752762

753763
session_key = uploader._gen_session_key(io.BytesIO(image_bytes), image_progress)
754764

@@ -786,51 +796,7 @@ def dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes:
786796
f"Failed to dump EXIF bytes: {ex}", metadata.filename
787797
) from ex
788798

789-
@classmethod
790-
def _maybe_create_persistent_cache_instance(
791-
cls, user_items: config.UserItem, upload_options: UploadOptions
792-
) -> history.PersistentCache | None:
793-
if not constants.UPLOAD_CACHE_DIR:
794-
LOG.debug(
795-
"Upload cache directory is set empty, skipping caching upload file handles"
796-
)
797-
return None
798-
799-
if upload_options.dry_run:
800-
LOG.debug("Dry-run mode enabled, skipping caching upload file handles")
801-
return None
802-
803-
# Different python/CLI versions use different cache (dbm) formats.
804-
# Separate them to avoid conflicts
805-
py_version_parts = [str(part) for part in sys.version_info[:3]]
806-
version = f"py_{'_'.join(py_version_parts)}_{VERSION}"
807-
808-
cache_path_dir = (
809-
Path(constants.UPLOAD_CACHE_DIR)
810-
.joinpath(version)
811-
.joinpath(api_v4.MAPILLARY_CLIENT_TOKEN.replace("|", "_"))
812-
.joinpath(
813-
user_items.get("MAPSettingsUserKey", user_items["user_upload_token"])
814-
)
815-
)
816-
cache_path_dir.mkdir(parents=True, exist_ok=True)
817-
cache_path = cache_path_dir.joinpath("cached_file_handles")
818-
819-
# Sanitize sensitive segments for logging
820-
sanitized_cache_path = (
821-
Path(constants.UPLOAD_CACHE_DIR)
822-
.joinpath(version)
823-
.joinpath("***")
824-
.joinpath("***")
825-
.joinpath("cached_file_handles")
826-
)
827-
LOG.debug(f"File handle cache path: {sanitized_cache_path}")
828-
829-
cache = history.PersistentCache(str(cache_path.resolve()))
830-
cache.clear_expired()
831-
832-
return cache
833-
799+
# Thread-safe
834800
def _get_cached_file_handle(self, key: str) -> str | None:
835801
if self.cache is None:
836802
return None
@@ -840,6 +806,7 @@ def _get_cached_file_handle(self, key: str) -> str | None:
840806

841807
return self.cache.get(key)
842808

809+
# Thread-safe
843810
def _set_file_handle_cache(self, key: str, value: str) -> None:
844811
if self.cache is None:
845812
return
@@ -1168,3 +1135,57 @@ def _prefixed_uuid4():
11681135

11691136
def _is_uuid(key: str) -> bool:
11701137
return key.startswith("uuid_") or key.startswith("mly_tools_uuid_")
1138+
1139+
1140+
def _build_upload_cache_path(upload_options: UploadOptions) -> Path:
1141+
# Different python/CLI versions use different cache (dbm) formats.
1142+
# Separate them to avoid conflicts
1143+
py_version_parts = [str(part) for part in sys.version_info[:3]]
1144+
version = f"py_{'_'.join(py_version_parts)}_{VERSION}"
1145+
# File handles are not sharable between different users
1146+
user_id = str(
1147+
upload_options.user_items.get(
1148+
"MAPSettingsUserKey", upload_options.user_items["user_upload_token"]
1149+
)
1150+
)
1151+
# Use hash to avoid log sensitive data
1152+
user_fingerprint = utils.md5sum_fp(
1153+
io.BytesIO((api_v4.MAPILLARY_CLIENT_TOKEN + user_id).encode("utf-8")),
1154+
md5=hashlib.sha256(),
1155+
).hexdigest()[:24]
1156+
1157+
cache_path = (
1158+
Path(constants.UPLOAD_CACHE_DIR)
1159+
.joinpath(version)
1160+
.joinpath(user_fingerprint)
1161+
.joinpath("cached_file_handles")
1162+
)
1163+
1164+
return cache_path
1165+
1166+
1167+
def _maybe_create_persistent_cache_instance(
1168+
upload_options: UploadOptions,
1169+
) -> history.PersistentCache | None:
1170+
"""Create a persistent cache instance if caching is enabled."""
1171+
1172+
if upload_options.dry_run:
1173+
LOG.debug("Dry-run mode enabled, skipping caching upload file handles")
1174+
return None
1175+
1176+
if upload_options.upload_cache_path is None:
1177+
if not constants.UPLOAD_CACHE_DIR:
1178+
LOG.debug(
1179+
"Upload cache directory is set empty, skipping caching upload file handles"
1180+
)
1181+
return None
1182+
1183+
cache_path = _build_upload_cache_path(upload_options)
1184+
else:
1185+
cache_path = upload_options.upload_cache_path
1186+
1187+
LOG.debug(f"File handle cache path: {cache_path}")
1188+
1189+
cache_path.parent.mkdir(parents=True, exist_ok=True)
1190+
1191+
return history.PersistentCache(str(cache_path.resolve()))

0 commit comments

Comments
 (0)