Skip to content

Commit 99a38cc

Browse files
authored
Add Google cloud storage support (#4)
* Add Google cloud storage support Contrarily to the other two storage options, we do not assume immutability, so quite some changes to track correctly the different lifetime between storage and local paths were added. * Rename remaining mentions of dynamic to mutable * Update README.md * fix: mark max_concurrent_downloads as required int
1 parent 7d7ec14 commit 99a38cc

5 files changed

Lines changed: 279 additions & 43 deletions

File tree

README.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@ A Snakemake storage plugin for downloading files via HTTP with local caching, ch
1010
**Supported sources:**
1111
- **zenodo.org** - Zenodo data repository (checksum from API)
1212
- **data.pypsa.org** - PyPSA data repository (checksum from manifest.yaml)
13+
- **storage.googleapis.com** - Google Cloud Storage (checksum from GCS JSON API)
1314

1415
## Features
1516

1617
- **Local caching**: Downloads are cached to avoid redundant transfers (can be disabled)
17-
- **Checksum verification**: Automatically verifies checksums (from Zenodo API or data.pypsa.org manifests)
18+
- **Checksum verification**: Automatically verifies checksums (from Zenodo API, data.pypsa.org manifests, or GCS object metadata)
1819
- **Rate limit handling**: Automatically respects Zenodo's rate limits using `X-RateLimit-*` headers with exponential backoff retry
19-
- **Concurrent download control**: Limits simultaneous downloads to prevent overwhelming Zenodo
20+
- **Concurrent download control**: Limits simultaneous downloads to prevent overwhelming servers
2021
- **Progress bars**: Shows download progress with tqdm
21-
- **Immutable URLs**: Returns mtime=0 since Zenodo URLs are persistent
22+
- **Immutable URLs**: Returns mtime=0 for Zenodo and data.pypsa.org (persistent URLs); uses actual mtime for GCS
2223
- **Environment variable support**: Configure via environment variables for CI/CD workflows
2324

2425
## Installation
@@ -66,7 +67,7 @@ If you don't explicitly configure it, the plugin will use default settings autom
6667

6768
## Usage
6869

69-
Use Zenodo or data.pypsa.org URLs directly in your rules. Snakemake automatically detects supported URLs and routes them to this plugin:
70+
Use Zenodo, data.pypsa.org, or Google Cloud Storage URLs directly in your rules. Snakemake automatically detects supported URLs and routes them to this plugin:
7071

7172
```python
7273
rule download_zenodo:
@@ -84,6 +85,14 @@ rule download_pypsa:
8485
"resources/eez.zip"
8586
shell:
8687
"cp {input} {output}"
88+
89+
rule download_gcs:
90+
input:
91+
storage("https://storage.googleapis.com/open-tyndp-data-store/CBA_projects.zip"),
92+
output:
93+
"resources/cba_projects.zip"
94+
shell:
95+
"cp {input} {output}"
8796
```
8897

8998
Or if you configured a tagged storage entity:
@@ -107,7 +116,7 @@ The plugin will:
107116
- Progress bar showing download status
108117
- Automatic rate limit handling with exponential backoff retry
109118
- Concurrent download limiting
110-
- Checksum verification (from Zenodo API or data.pypsa.org manifest)
119+
- Checksum verification (from Zenodo API, data.pypsa.org manifest, or GCS metadata)
111120
4. Store in cache for future use (if caching is enabled)
112121

113122
### Example: CI/CD Configuration
@@ -139,19 +148,19 @@ The plugin automatically:
139148

140149
## URL Handling
141150

142-
- Handles URLs from `zenodo.org`, `sandbox.zenodo.org`, and `data.pypsa.org`
151+
- Handles URLs from `zenodo.org`, `sandbox.zenodo.org`, `data.pypsa.org`, and `storage.googleapis.com`
143152
- Other HTTP(S) URLs are handled by the standard `snakemake-storage-plugin-http`
144153
- Both plugins can coexist in the same workflow
145154

146155
### Plugin Priority
147156

148157
When using `storage()` without specifying a plugin name, Snakemake checks all installed plugins:
149-
- **Cached HTTP plugin**: Only accepts zenodo.org and data.pypsa.org URLs
158+
- **Cached HTTP plugin**: Only accepts zenodo.org, data.pypsa.org, and storage.googleapis.com URLs
150159
- **HTTP plugin**: Accepts all HTTP/HTTPS URLs (including zenodo.org)
151160

152161
If both plugins are installed, supported URLs would be ambiguous - both plugins accept them.
153162
Typically snakemake would raise an error: **"Multiple suitable storage providers found"** if you try to use `storage()` without specifying which plugin to use, ie. one needs to explicitly call the Cached HTTP provider using `storage.cached_http(url)` instead of `storage(url)`,
154-
but we monkey-patch the http plugin to refuse zenodo.org and data.pypsa.org URLs.
163+
but we monkey-patch the http plugin to refuse zenodo.org, data.pypsa.org, and storage.googleapis.com URLs.
155164

156165
## License
157166

src/snakemake_storage_plugin_cached_http/__init__.py

Lines changed: 143 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22
#
33
# SPDX-License-Identifier: MIT
44

5+
import asyncio
6+
import base64
57
import hashlib
68
import json
79
import shutil
810
import time
911
from contextlib import asynccontextmanager
1012
from dataclasses import dataclass, field
13+
from datetime import datetime
1114
from logging import Logger
1215
from pathlib import Path
1316
from posixpath import basename, dirname, join, normpath, relpath
14-
from urllib.parse import urlparse
17+
from urllib.parse import quote, urlparse
1518

1619
import httpx
1720
import platformdirs
@@ -81,7 +84,7 @@ class StorageProviderSettings(SettingsBase):
8184
"env_var": True,
8285
},
8386
)
84-
max_concurrent_downloads: int | None = field(
87+
max_concurrent_downloads: int = field(
8588
default=3,
8689
metadata={
8790
"help": "Maximum number of concurrent downloads.",
@@ -92,10 +95,11 @@ class StorageProviderSettings(SettingsBase):
9295

9396
@dataclass
9497
class FileMetadata:
95-
"""Metadata for a file in a Zenodo or data.pypsa.org record."""
98+
"""Metadata for a file in a Zenodo, data.pypsa.org, or GCS record."""
9699

97100
checksum: str | None
98101
size: int
102+
mtime: float = 0 # modification time (Unix timestamp), used for GCS
99103
redirect: str | None = None # used to indicate data.pypsa.org redirection
100104

101105

@@ -144,6 +148,7 @@ def __post_init__(self):
144148
# Cache for record metadata to avoid repeated API calls
145149
self._zenodo_record_cache: dict[str, dict[str, FileMetadata]] = {}
146150
self._pypsa_manifest_cache: dict[str, dict[str, FileMetadata]] = {}
151+
self._gcs_metadata_cache: dict[str, FileMetadata] = {}
147152

148153
@override
149154
def use_rate_limiter(self) -> bool:
@@ -173,6 +178,11 @@ def example_queries(cls) -> list[ExampleQuery]:
173178
description="A data pypsa file URL",
174179
type=QueryType.INPUT,
175180
),
181+
ExampleQuery(
182+
query="https://storage.googleapis.com/open-tyndp-data-store/CBA_projects.zip",
183+
description="A Google Cloud Storage file URL",
184+
type=QueryType.INPUT,
185+
),
176186
]
177187

178188
@override
@@ -185,7 +195,7 @@ def is_valid_query(cls, query: str) -> StorageQueryValidationResult:
185195
return StorageQueryValidationResult(
186196
query=query,
187197
valid=False,
188-
reason="Only zenodo.org and data.pypsa.org URLs are handled by this plugin",
198+
reason="Only zenodo.org, data.pypsa.org, and storage.googleapis.com URLs are handled by this plugin",
189199
)
190200

191201
@override
@@ -288,9 +298,24 @@ async def get_metadata(self, path: str, netloc: str) -> FileMetadata | None:
288298
return await self.get_zenodo_metadata(path, netloc)
289299
elif netloc == "data.pypsa.org":
290300
return await self.get_pypsa_metadata(path, netloc)
301+
elif netloc == "storage.googleapis.com":
302+
return await self.get_gcs_metadata(path, netloc)
291303

292304
raise WorkflowError(
293-
"Cached-http storage plugin is only implemented for zenodo.org and data.pypsa.org urls"
305+
"Cached-http storage plugin is only implemented for zenodo.org, data.pypsa.org, and storage.googleapis.com urls"
306+
)
307+
308+
@staticmethod
309+
def is_immutable(netloc: str):
310+
if netloc in ("zenodo.org", "sandbox.zenodo.org"):
311+
return True
312+
elif netloc == "data.pypsa.org":
313+
return True
314+
elif netloc == "storage.googleapis.com":
315+
return False
316+
317+
raise WorkflowError(
318+
"Cached-http storage plugin is only implemented for zenodo.org, data.pypsa.org, and storage.googleapis.com urls"
294319
)
295320

296321
async def get_zenodo_metadata(self, path: str, netloc: str) -> FileMetadata | None:
@@ -407,6 +432,73 @@ async def get_pypsa_metadata(self, path: str, netloc: str) -> FileMetadata | Non
407432
filename = relpath(path, base_path)
408433
return metadata.get(filename)
409434

435+
async def get_gcs_metadata(self, path: str, netloc: str) -> FileMetadata | None:
436+
"""
437+
Retrieve and cache file metadata from Google Cloud Storage.
438+
439+
Uses the GCS JSON API to fetch object metadata including MD5 hash.
440+
URL format: https://storage.googleapis.com/{bucket}/{object-path}
441+
API endpoint: https://storage.googleapis.com/storage/v1/b/{bucket}/o/{encoded-object}
442+
443+
Args:
444+
path: Server path (bucket/object-path)
445+
netloc: Network location (storage.googleapis.com)
446+
447+
Returns:
448+
FileMetadata for the requested file, or None if not found
449+
"""
450+
# Check cache first
451+
if path in self._gcs_metadata_cache:
452+
return self._gcs_metadata_cache[path]
453+
454+
# Parse bucket and object path from the URL path
455+
# Path format: /{bucket}/{object-path}
456+
parts = path.split("/", maxsplit=1)
457+
if len(parts) < 2:
458+
raise WorkflowError(
459+
f"Invalid GCS URL format: http(s)://{netloc}/{path}. "
460+
f"Expected format: https://storage.googleapis.com/{{bucket}}/{{object-path}}"
461+
)
462+
463+
bucket, object_path = parts
464+
465+
# URL-encode the object path for the API request (slashes must be encoded)
466+
encoded_object = quote(object_path, safe="")
467+
468+
# GCS JSON API endpoint for object metadata
469+
api_url = f"https://{netloc}/storage/v1/b/{bucket}/o/{encoded_object}"
470+
471+
async with self.httpr("get", api_url) as response:
472+
if response.status_code == 404:
473+
return None
474+
if response.status_code != 200:
475+
raise WorkflowError(
476+
f"Failed to fetch GCS object metadata: HTTP {response.status_code} ({api_url})"
477+
)
478+
479+
content = await response.aread()
480+
data = json.loads(content)
481+
482+
# GCS returns MD5 as base64-encoded bytes
483+
md5_base64: str | None = data.get("md5Hash")
484+
checksum: str | None = None
485+
if md5_base64:
486+
# Convert base64 to hex digest
487+
md5_bytes = base64.b64decode(md5_base64)
488+
checksum = f"md5:{md5_bytes.hex()}"
489+
490+
size: int = int(data.get("size", 0))
491+
492+
updated: str | None = data.get("updated")
493+
mtime: float = datetime.fromisoformat(updated).timestamp() if updated else 0
494+
495+
metadata = FileMetadata(checksum=checksum, size=size, mtime=mtime)
496+
497+
# Store in cache
498+
self._gcs_metadata_cache[path] = metadata
499+
500+
return metadata
501+
410502

411503
# Implementation of storage object
412504
class StorageObject(StorageObjectRead):
@@ -441,15 +533,19 @@ async def managed_exists(self) -> bool:
441533

442534
if self.provider.cache:
443535
cached = self.provider.cache.get(str(self.query))
444-
if cached is not None:
536+
if cached is not None and self.provider.is_immutable(self.netloc):
445537
return True
446538

447539
metadata = await self.provider.get_metadata(self.path, self.netloc)
448540
return metadata is not None
449541

450542
@override
451543
async def managed_mtime(self) -> float:
452-
return 0
544+
if self.provider.settings.skip_remote_checks:
545+
return 0
546+
547+
metadata = await self.provider.get_metadata(self.path, self.netloc)
548+
return metadata.mtime if metadata is not None else 0
453549

454550
@override
455551
async def managed_size(self) -> int:
@@ -458,11 +554,20 @@ async def managed_size(self) -> int:
458554

459555
if self.provider.cache:
460556
cached = self.provider.cache.get(str(self.query))
461-
if cached is not None:
557+
if cached is not None and self.provider.is_immutable(self.netloc):
462558
return cached.stat().st_size
559+
else:
560+
cached = None
463561

464562
metadata = await self.provider.get_metadata(self.path, self.netloc)
465-
return metadata.size if metadata is not None else 0
563+
if metadata is None:
564+
return 0
565+
566+
if cached is not None:
567+
if cached.stat().st_mtime >= metadata.mtime:
568+
return cached.stat().st_size
569+
570+
return metadata.size
466571

467572
@override
468573
async def inventory(self, cache: IOCacheStorageInterface) -> None:
@@ -483,17 +588,31 @@ async def inventory(self, cache: IOCacheStorageInterface) -> None:
483588

484589
if self.provider.cache:
485590
cached = self.provider.cache.get(str(self.query))
486-
if cached is not None:
591+
if cached is not None and self.provider.is_immutable(self.netloc):
487592
cache.exists_in_storage[key] = True
488-
cache.mtime[key] = Mtime(storage=0)
593+
cache.mtime[key] = Mtime(storage=cached.stat().st_mtime)
489594
cache.size[key] = cached.stat().st_size
490595
return
596+
else:
597+
cached = None
491598

492599
metadata = await self.provider.get_metadata(self.path, self.netloc)
493-
exists = metadata is not None
494-
cache.exists_in_storage[key] = exists
495-
cache.mtime[key] = Mtime(storage=0)
496-
cache.size[key] = metadata.size if exists else 0
600+
if metadata is None:
601+
cache.exists_in_storage[key] = False
602+
cache.mtime[key] = Mtime(storage=0)
603+
cache.size[key] = 0
604+
return
605+
606+
if cached is not None:
607+
if cached.stat().st_mtime >= metadata.mtime:
608+
cache.exists_in_storage[key] = True
609+
cache.mtime[key] = Mtime(storage=cached.stat().st_mtime)
610+
cache.size[key] = cached.stat().st_size
611+
return
612+
613+
cache.exists_in_storage[key] = True
614+
cache.mtime[key] = Mtime(storage=metadata.mtime)
615+
cache.size[key] = metadata.size
497616

498617
@override
499618
def cleanup(self):
@@ -558,17 +677,20 @@ async def managed_retrieve(self):
558677
if metadata is not None and metadata.redirect is not None:
559678
query = f"https://{self.netloc}/{metadata.redirect}"
560679

561-
# If already in cache, just copy
680+
# If already in cache, check if still valid
562681
if self.provider.cache:
563682
cached = self.provider.cache.get(query)
564683
if cached is not None:
565-
logger.info(f"Retrieved {filename} from cache ({query})")
566-
shutil.copy2(cached, local_path)
567-
return
684+
if self.provider.is_immutable(self.netloc) or (
685+
metadata is not None and cached.stat().st_mtime >= metadata.mtime
686+
):
687+
logger.info(f"Retrieved {filename} from cache ({query})")
688+
shutil.copy2(cached, local_path)
689+
return
568690

569691
try:
570-
# Download from Zenodo or data.pypsa.org using a get request, rate limit errors are detected and
571-
# raise WorkflowError to trigger a retry
692+
# Download using a get request, rate limit errors are detected and raise
693+
# WorkflowError to trigger a retry
572694
async with self.provider.httpr("get", query) as response:
573695
if response.status_code != 200:
574696
raise WorkflowError(

src/snakemake_storage_plugin_cached_http/monkeypatch.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def is_pypsa_or_zenodo_url(url: str) -> bool:
2323
"zenodo.org",
2424
"sandbox.zenodo.org",
2525
"data.pypsa.org",
26+
"storage.googleapis.com",
2627
) and parsed.scheme in (
2728
"http",
2829
"https",

0 commit comments

Comments
 (0)