-
Notifications
You must be signed in to change notification settings - Fork 168
XML MPU v/s Resumable Upload #1702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,15 @@ | |
|
|
||
| """Create / interact with Google Cloud Storage blobs.""" | ||
|
|
||
| import time | ||
| import base64 | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| import concurrent.futures | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| import copy | ||
| import hashlib | ||
| from io import BytesIO | ||
| from io import TextIOWrapper | ||
| import itertools | ||
| import logging | ||
| import mimetypes | ||
| import os | ||
|
|
@@ -33,12 +37,15 @@ | |
| from urllib.parse import urlunsplit | ||
| import warnings | ||
|
|
||
| from google.cloud.storage import retry | ||
| from google.cloud.storage._media.requests import ChunkedDownload | ||
| from google.cloud.storage._media.requests import Download | ||
| from google.cloud.storage._media.requests import RawDownload | ||
| from google.cloud.storage._media.requests import RawChunkedDownload | ||
| from google.cloud.storage._media.requests import MultipartUpload | ||
| from google.cloud.storage._media.requests import ResumableUpload | ||
| from google.cloud.storage._media.requests import XMLMPUContainer | ||
| from google.cloud.storage._media.requests import XMLMPUPart | ||
|
|
||
| from google.api_core.iam import Policy | ||
| from google.cloud import exceptions | ||
|
|
@@ -81,6 +88,16 @@ | |
| from google.cloud.storage.fileio import BlobReader | ||
| from google.cloud.storage.fileio import BlobWriter | ||
|
|
||
| METADATA_HEADER_TRANSLATION = { | ||
| "cacheControl": "Cache-Control", | ||
| "contentDisposition": "Content-Disposition", | ||
| "contentEncoding": "Content-Encoding", | ||
| "contentLanguage": "Content-Language", | ||
| "customTime": "x-goog-custom-time", | ||
| "storageClass": "x-goog-storage-class", | ||
| } | ||
|
|
||
| XML_CHUNK_SIZE = 100 * 1024 * 1024 # 8 MiB | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| _DEFAULT_CONTENT_TYPE = "application/octet-stream" | ||
| _DOWNLOAD_URL_TEMPLATE = "{hostname}/download/storage/{api_version}{path}?alt=media" | ||
|
|
@@ -1889,6 +1906,131 @@ def _get_upload_arguments(self, client, content_type, filename=None, command=Non | |
| object_metadata = self._get_writable_metadata() | ||
| return headers, object_metadata, content_type | ||
|
|
||
| def _headers_from_metadata(self, metadata): | ||
| """Helper function to translate object metadata into a header dictionary.""" | ||
|
|
||
| headers = {} | ||
| # Handle standard writable metadata | ||
| for key, value in metadata.items(): | ||
| if key in METADATA_HEADER_TRANSLATION: | ||
| headers[METADATA_HEADER_TRANSLATION[key]] = value | ||
| # Handle custom metadata | ||
| if "metadata" in metadata: | ||
| for key, value in metadata["metadata"].items(): | ||
| headers["x-goog-meta-" + key] = value | ||
| return headers | ||
|
|
||
| def _do_xml_multipart_upload( | ||
| self, file_obj, retry=None, content_type=None, num_of_threads=6 | ||
| ): | ||
| """ | ||
| 1. This should initialize XMLMPUContainer, container.initate(), you can keep filename as None. | ||
| 2. read chunks of data from stream, read at most `n` chunks (even if the file_stream is more, hold the stream there) | ||
| Where each `chunk_size` is provided as `XML_CHUNK_SIZE` | ||
| 3. Spawn multiple threads to upload each chunk using | ||
| part = XMLMPUPart() | ||
| part.upload() -> | ||
| each part upload should return (part_number, etag) | ||
| store these part numbers in a list/dictionary | ||
| using `container.register_part(part_number, etag)` | ||
|
|
||
| 4. read further chunks from stream and repeat step 3 until stream is exhausted | ||
|
|
||
|
|
||
|
|
||
| 5. Once all parts are uploaded, call | ||
| `container.finalize(blob._get_transport(client))` | ||
| to complete the multipart upload | ||
|
|
||
| """ | ||
|
Comment on lines
+1926
to
+1945
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docstring for this method appears to be a set of implementation notes. It should be converted into a proper docstring that explains what the method does, its parameters ( |
||
| bucket = self.bucket | ||
| client = self.client | ||
| transport = self._get_transport(client) | ||
|
|
||
| hostname = _get_host_name(client._connection) | ||
| url = "{hostname}/{bucket}/{blob}".format( | ||
| hostname=hostname, bucket=bucket.name, blob=_quote(self.name) | ||
| ) | ||
|
|
||
| base_headers, object_metadata, content_type = self._get_upload_arguments( | ||
| client, content_type, filename=None, command="tm.upload_sharded" | ||
| ) | ||
| headers = {**base_headers, **self._headers_from_metadata(object_metadata)} | ||
|
|
||
| if self.user_project is not None: | ||
| headers["x-goog-user-project"] = self.user_project | ||
|
|
||
| if ( | ||
| self.kms_key_name is not None | ||
| and "cryptoKeyVersions" not in self.kms_key_name | ||
| ): | ||
| headers["x-goog-encryption-kms-key-name"] = self.kms_key_name | ||
|
|
||
| container = XMLMPUContainer(url, filename=None, headers=headers, retry=retry) | ||
| container.initiate(transport=transport, content_type=content_type) | ||
| upload_id = container.upload_id | ||
|
|
||
| def _upload_part_from_data(data, part_number, checksum="auto"): | ||
| data_stream = BytesIO(data) | ||
| part = XMLMPUPart( | ||
| url, | ||
| upload_id, | ||
| filename=None, | ||
| file_obj=data_stream, | ||
| start=0, | ||
| end=len(data), | ||
| part_number=part_number, | ||
| checksum=checksum, | ||
| headers=headers.copy(), | ||
| retry=retry, | ||
| ) | ||
| part.upload(transport) | ||
| return (part_number, part.etag) | ||
|
|
||
| def read_chunks(stream, chunk_size): | ||
| while True: | ||
| data = stream.read(chunk_size) | ||
| if not data: | ||
| break | ||
| yield data | ||
|
|
||
| chunk_iterator = read_chunks(file_obj, XML_CHUNK_SIZE) | ||
| part_number = 1 | ||
|
|
||
| try: | ||
| with ThreadPoolExecutor(max_workers=num_of_threads) as executor: | ||
| while True: | ||
| # Read a batch of chunks to be processed concurrently. | ||
| chunk_batch = list(itertools.islice(chunk_iterator, num_of_threads)) | ||
| if not chunk_batch: | ||
| break | ||
|
|
||
| futures = [] | ||
| # Submit upload tasks for the current batch of chunks. | ||
| for i, chunk_data in enumerate(chunk_batch): | ||
| current_part_number = part_number + i | ||
| future = executor.submit( | ||
| _upload_part_from_data, chunk_data, current_part_number | ||
| ) | ||
| futures.append(future) | ||
|
|
||
| # Wait for the current batch to complete. | ||
| for future in futures: | ||
| part_num, etag = future.result() | ||
| container.register_part(part_num, etag) | ||
|
|
||
| part_number += len(chunk_batch) | ||
| print("num parts uploaded:", part_number - 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| res = container.finalize(transport) | ||
| print("MPU Complete Response:", res) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| self.reload(client=client) | ||
| return self._properties | ||
|
|
||
| except Exception: | ||
| container.cancel(transport) | ||
| raise | ||
|
|
||
| def _do_multipart_upload( | ||
| self, | ||
| client, | ||
|
|
@@ -2483,6 +2625,7 @@ def _do_upload( | |
| retry=None, | ||
| command=None, | ||
| crc32c_checksum_value=None, | ||
| perform_xml_mpu=True, | ||
| ): | ||
| """Determine an upload strategy and then perform the upload. | ||
|
|
||
|
|
@@ -2626,23 +2769,20 @@ def _do_upload( | |
| } | ||
| retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) | ||
|
|
||
| if size is not None and size <= _MAX_MULTIPART_SIZE: | ||
| response = self._do_multipart_upload( | ||
| client, | ||
| stream, | ||
| content_type, | ||
| size, | ||
| predefined_acl, | ||
| if_generation_match, | ||
| if_generation_not_match, | ||
| if_metageneration_match, | ||
| if_metageneration_not_match, | ||
| timeout=timeout, | ||
| checksum=checksum, | ||
| retry=retry, | ||
| command=command, | ||
| st_time = time.monotonic_ns() | ||
| if perform_xml_mpu: | ||
| print("Performing XML MPU .") | ||
| response = self._do_xml_multipart_upload( | ||
| stream, retry=None, content_type=None, num_of_threads=1 | ||
| ) | ||
| print( | ||
| "Performed XMLMPU in ", | ||
| (time.monotonic_ns() - st_time) / 1_000_000, | ||
| response, | ||
| ) | ||
| return response | ||
| else: | ||
| print("Performing Resumable Upload!!!! .") | ||
| response = self._do_resumable_upload( | ||
| client, | ||
| stream, | ||
|
|
@@ -2659,6 +2799,11 @@ def _do_upload( | |
| command=command, | ||
| crc32c_checksum_value=crc32c_checksum_value, | ||
| ) | ||
| print( | ||
| "Performed Resumable upload.", | ||
| response, | ||
| (time.monotonic_ns() - st_time) / 1_000_000, | ||
| ) | ||
|
|
||
| return response.json() | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition can be simplified for better readability. A more Pythonic way to express that exactly one of
filenameorfile_objmust be provided isif (filename is None) == (file_obj is None):.