-
Notifications
You must be signed in to change notification settings - Fork 173
XML MPU v/s Resumable Upload #1702
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,15 @@ | |
|
|
||
| """Create / interact with Google Cloud Storage blobs.""" | ||
|
|
||
| import time | ||
| import base64 | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| import concurrent.futures | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| import copy | ||
| import hashlib | ||
| from io import BytesIO | ||
| from io import TextIOWrapper | ||
| import itertools | ||
| import logging | ||
| import mimetypes | ||
| import os | ||
|
|
@@ -33,12 +37,15 @@ | |
| from urllib.parse import urlunsplit | ||
| import warnings | ||
|
|
||
| from google.cloud.storage import retry | ||
| from google.cloud.storage._media.requests import ChunkedDownload | ||
| from google.cloud.storage._media.requests import Download | ||
| from google.cloud.storage._media.requests import RawDownload | ||
| from google.cloud.storage._media.requests import RawChunkedDownload | ||
| from google.cloud.storage._media.requests import MultipartUpload | ||
| from google.cloud.storage._media.requests import ResumableUpload | ||
| from google.cloud.storage._media.requests import XMLMPUContainer | ||
| from google.cloud.storage._media.requests import XMLMPUPart | ||
|
|
||
| from google.api_core.iam import Policy | ||
| from google.cloud import exceptions | ||
|
|
@@ -81,6 +88,16 @@ | |
| from google.cloud.storage.fileio import BlobReader | ||
| from google.cloud.storage.fileio import BlobWriter | ||
|
|
||
| METADATA_HEADER_TRANSLATION = { | ||
| "cacheControl": "Cache-Control", | ||
| "contentDisposition": "Content-Disposition", | ||
| "contentEncoding": "Content-Encoding", | ||
| "contentLanguage": "Content-Language", | ||
| "customTime": "x-goog-custom-time", | ||
| "storageClass": "x-goog-storage-class", | ||
| } | ||
|
|
||
| XML_CHUNK_SIZE = 100 * 1024 * 1024 # 8 MiB | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| _DEFAULT_CONTENT_TYPE = "application/octet-stream" | ||
| _DOWNLOAD_URL_TEMPLATE = "{hostname}/download/storage/{api_version}{path}?alt=media" | ||
|
|
@@ -1889,6 +1906,131 @@ def _get_upload_arguments(self, client, content_type, filename=None, command=Non | |
| object_metadata = self._get_writable_metadata() | ||
| return headers, object_metadata, content_type | ||
|
|
||
| def _headers_from_metadata(self, metadata): | ||
| """Helper function to translate object metadata into a header dictionary.""" | ||
|
|
||
| headers = {} | ||
| # Handle standard writable metadata | ||
| for key, value in metadata.items(): | ||
| if key in METADATA_HEADER_TRANSLATION: | ||
| headers[METADATA_HEADER_TRANSLATION[key]] = value | ||
| # Handle custom metadata | ||
| if "metadata" in metadata: | ||
| for key, value in metadata["metadata"].items(): | ||
| headers["x-goog-meta-" + key] = value | ||
| return headers | ||
|
|
||
| def _do_xml_multipart_upload( | ||
| self, file_obj, retry=None, content_type=None, num_of_threads=6 | ||
| ): | ||
| """ | ||
| 1. This should initialize XMLMPUContainer, container.initate(), you can keep filename as None. | ||
| 2. read chunks of data from stream, read at most `n` chunks (even if the file_stream is more, hold the stream there) | ||
| Where each `chunk_size` is provided as `XML_CHUNK_SIZE` | ||
| 3. Spawn multiple threads to upload each chunk using | ||
| part = XMLMPUPart() | ||
| part.upload() -> | ||
| each part upload should return (part_number, etag) | ||
| store these part numbers in a list/dictionary | ||
| using `container.register_part(part_number, etag)` | ||
|
|
||
| 4. read further chunks from stream and repeat step 3 until stream is exhausted | ||
|
|
||
|
|
||
|
|
||
| 5. Once all parts are uploaded, call | ||
| `container.finalize(blob._get_transport(client))` | ||
| to complete the multipart upload | ||
|
|
||
| """ | ||
|
Comment on lines
+1926
to
+1945
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docstring for this method appears to be a set of implementation notes. It should be converted into a proper docstring that explains what the method does, its parameters ( |
||
| bucket = self.bucket | ||
| client = self.client | ||
| transport = self._get_transport(client) | ||
|
|
||
| hostname = _get_host_name(client._connection) | ||
| url = "{hostname}/{bucket}/{blob}".format( | ||
| hostname=hostname, bucket=bucket.name, blob=_quote(self.name) | ||
| ) | ||
|
|
||
| base_headers, object_metadata, content_type = self._get_upload_arguments( | ||
| client, content_type, filename=None, command="tm.upload_sharded" | ||
| ) | ||
| headers = {**base_headers, **self._headers_from_metadata(object_metadata)} | ||
|
|
||
| if self.user_project is not None: | ||
| headers["x-goog-user-project"] = self.user_project | ||
|
|
||
| if ( | ||
| self.kms_key_name is not None | ||
| and "cryptoKeyVersions" not in self.kms_key_name | ||
| ): | ||
| headers["x-goog-encryption-kms-key-name"] = self.kms_key_name | ||
|
|
||
| container = XMLMPUContainer(url, filename=None, headers=headers, retry=retry) | ||
| container.initiate(transport=transport, content_type=content_type) | ||
| upload_id = container.upload_id | ||
|
|
||
| def _upload_part_from_data(data, part_number, checksum="auto"): | ||
| data_stream = BytesIO(data) | ||
| part = XMLMPUPart( | ||
| url, | ||
| upload_id, | ||
| filename=None, | ||
| file_obj=data_stream, | ||
| start=0, | ||
| end=len(data), | ||
| part_number=part_number, | ||
| checksum=checksum, | ||
| headers=headers.copy(), | ||
| retry=retry, | ||
| ) | ||
| part.upload(transport) | ||
| return (part_number, part.etag) | ||
|
|
||
| def read_chunks(stream, chunk_size): | ||
| while True: | ||
| data = stream.read(chunk_size) | ||
| if not data: | ||
| break | ||
| yield data | ||
|
|
||
| chunk_iterator = read_chunks(file_obj, XML_CHUNK_SIZE) | ||
| part_number = 1 | ||
|
|
||
| try: | ||
| with ThreadPoolExecutor(max_workers=num_of_threads) as executor: | ||
| while True: | ||
| # Read a batch of chunks to be processed concurrently. | ||
| chunk_batch = list(itertools.islice(chunk_iterator, num_of_threads)) | ||
| if not chunk_batch: | ||
| break | ||
|
|
||
| futures = [] | ||
| # Submit upload tasks for the current batch of chunks. | ||
| for i, chunk_data in enumerate(chunk_batch): | ||
| current_part_number = part_number + i | ||
| future = executor.submit( | ||
| _upload_part_from_data, chunk_data, current_part_number | ||
| ) | ||
| futures.append(future) | ||
|
|
||
| # Wait for the current batch to complete. | ||
| for future in futures: | ||
| part_num, etag = future.result() | ||
| container.register_part(part_num, etag) | ||
|
|
||
| part_number += len(chunk_batch) | ||
| print("num parts uploaded:", part_number - 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| res = container.finalize(transport) | ||
| print("MPU Complete Response:", res) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| self.reload(client=client) | ||
| return self._properties | ||
|
|
||
| except Exception: | ||
| container.cancel(transport) | ||
| raise | ||
|
|
||
| def _do_multipart_upload( | ||
| self, | ||
| client, | ||
|
|
@@ -2483,6 +2625,7 @@ def _do_upload( | |
| retry=None, | ||
| command=None, | ||
| crc32c_checksum_value=None, | ||
| perform_xml_mpu=True, | ||
| ): | ||
| """Determine an upload strategy and then perform the upload. | ||
|
|
||
|
|
@@ -2626,23 +2769,20 @@ def _do_upload( | |
| } | ||
| retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) | ||
|
|
||
| if size is not None and size <= _MAX_MULTIPART_SIZE: | ||
| response = self._do_multipart_upload( | ||
| client, | ||
| stream, | ||
| content_type, | ||
| size, | ||
| predefined_acl, | ||
| if_generation_match, | ||
| if_generation_not_match, | ||
| if_metageneration_match, | ||
| if_metageneration_not_match, | ||
| timeout=timeout, | ||
| checksum=checksum, | ||
| retry=retry, | ||
| command=command, | ||
| st_time = time.monotonic_ns() | ||
| if perform_xml_mpu: | ||
| print("Performing XML MPU .") | ||
| response = self._do_xml_multipart_upload( | ||
| stream, retry=None, content_type=None, num_of_threads=1 | ||
| ) | ||
| print( | ||
| "Performed XMLMPU in ", | ||
| (time.monotonic_ns() - st_time) / 1_000_000, | ||
| response, | ||
| ) | ||
| return response | ||
| else: | ||
| print("Performing Resumable Upload!!!! .") | ||
| response = self._do_resumable_upload( | ||
| client, | ||
| stream, | ||
|
|
@@ -2659,6 +2799,11 @@ def _do_upload( | |
| command=command, | ||
| crc32c_checksum_value=crc32c_checksum_value, | ||
| ) | ||
| print( | ||
| "Performed Resumable upload.", | ||
| response, | ||
| (time.monotonic_ns() - st_time) / 1_000_000, | ||
| ) | ||
|
|
||
| return response.json() | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition can be simplified for better readability. A more Pythonic way to express that exactly one of
filenameorfile_objmust be provided isif (filename is None) == (file_obj is None):.