diff --git a/google/cloud/storage/_media/_upload.py b/google/cloud/storage/_media/_upload.py index 4a919d18a..b92ad8f9b 100644 --- a/google/cloud/storage/_media/_upload.py +++ b/google/cloud/storage/_media/_upload.py @@ -1253,7 +1253,8 @@ class XMLMPUPart(UploadBase): Args: upload_url (str): The URL of the object (without query parameters). upload_id (str): The ID of the upload from the initialization response. - filename (str): The name (path) of the file to upload. + filename (str): The name (path) of the file to upload. Can be None if + file_obj is provided. start (int): The byte index of the beginning of the part. end (int): The byte index of the end of the part. part_number (int): The part number. Part numbers will be assembled in @@ -1274,6 +1275,8 @@ class XMLMPUPart(UploadBase): See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. + file_obj (IO[bytes]): file-like object to upload from. Can be None if + filename is provided. Attributes: upload_url (str): The URL of the object (without query parameters). @@ -1297,9 +1300,16 @@ def __init__( headers=None, checksum="auto", retry=DEFAULT_RETRY, + file_obj=None, ): super().__init__(upload_url, headers=headers, retry=retry) + if (filename is None and file_obj is None) or ( + filename is not None and file_obj is not None + ): + raise ValueError("Exactly one of filename or file_obj must be provided.") + self._filename = filename + self._file_obj = file_obj self._start = start self._end = end self._upload_id = upload_id @@ -1364,9 +1374,13 @@ def _prepare_upload_request(self): if self.finished: raise ValueError("This part has already been uploaded.") - with open(self._filename, "br") as f: - f.seek(self._start) - payload = f.read(self._end - self._start) + if self._file_obj: + self._file_obj.seek(self._start) + payload = self._file_obj.read(self._end - self._start) + else: + with open(self._filename, "br") as f: + f.seek(self._start) + payload = f.read(self._end - self._start) self._checksum_object = _helpers._get_checksum_object(self._checksum_type) if self._checksum_object is not None: diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 746334d1c..07fb30857 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -16,11 +16,15 @@ """Create / interact with Google Cloud Storage blobs.""" +import time import base64 +from concurrent.futures import ThreadPoolExecutor +import concurrent.futures import copy import hashlib from io import BytesIO from io import TextIOWrapper +import itertools import logging import mimetypes import os @@ -33,12 +37,15 @@ from urllib.parse import urlunsplit import warnings +from google.cloud.storage import retry from google.cloud.storage._media.requests import ChunkedDownload from google.cloud.storage._media.requests import Download from google.cloud.storage._media.requests import RawDownload from google.cloud.storage._media.requests import RawChunkedDownload from google.cloud.storage._media.requests import MultipartUpload from google.cloud.storage._media.requests import ResumableUpload +from google.cloud.storage._media.requests import XMLMPUContainer +from google.cloud.storage._media.requests import XMLMPUPart from google.api_core.iam import Policy from google.cloud import exceptions @@ -81,6 +88,16 @@ from google.cloud.storage.fileio import BlobReader from google.cloud.storage.fileio import BlobWriter +METADATA_HEADER_TRANSLATION = { + "cacheControl": "Cache-Control", + "contentDisposition": "Content-Disposition", + "contentEncoding": "Content-Encoding", + "contentLanguage": "Content-Language", + "customTime": "x-goog-custom-time", + "storageClass": "x-goog-storage-class", +} + +XML_CHUNK_SIZE = 100 * 1024 * 1024 # 8 MiB _DEFAULT_CONTENT_TYPE = "application/octet-stream" _DOWNLOAD_URL_TEMPLATE = "{hostname}/download/storage/{api_version}{path}?alt=media" @@ -1889,6 +1906,131 @@ def _get_upload_arguments(self, client, content_type, filename=None, command=Non object_metadata = self._get_writable_metadata() return headers, object_metadata, content_type + def _headers_from_metadata(self, metadata): + """Helper function to translate object metadata into a header dictionary.""" + + headers = {} + # Handle standard writable metadata + for key, value in metadata.items(): + if key in METADATA_HEADER_TRANSLATION: + headers[METADATA_HEADER_TRANSLATION[key]] = value + # Handle custom metadata + if "metadata" in metadata: + for key, value in metadata["metadata"].items(): + headers["x-goog-meta-" + key] = value + return headers + + def _do_xml_multipart_upload( + self, file_obj, retry=None, content_type=None, num_of_threads=6 + ): + """ + 1. This should initialize XMLMPUContainer, container.initate(), you can keep filename as None. + 2. read chunks of data from stream, read at most `n` chunks (even if the file_stream is more, hold the stream there) + Where each `chunk_size` is provided as `XML_CHUNK_SIZE` + 3. Spawn multiple threads to upload each chunk using + part = XMLMPUPart() + part.upload() -> + each part upload should return (part_number, etag) + store these part numbers in a list/dictionary + using `container.register_part(part_number, etag)` + + 4. read further chunks from stream and repeat step 3 until stream is exhausted + + + + 5. Once all parts are uploaded, call + `container.finalize(blob._get_transport(client))` + to complete the multipart upload + + """ + bucket = self.bucket + client = self.client + transport = self._get_transport(client) + + hostname = _get_host_name(client._connection) + url = "{hostname}/{bucket}/{blob}".format( + hostname=hostname, bucket=bucket.name, blob=_quote(self.name) + ) + + base_headers, object_metadata, content_type = self._get_upload_arguments( + client, content_type, filename=None, command="tm.upload_sharded" + ) + headers = {**base_headers, **self._headers_from_metadata(object_metadata)} + + if self.user_project is not None: + headers["x-goog-user-project"] = self.user_project + + if ( + self.kms_key_name is not None + and "cryptoKeyVersions" not in self.kms_key_name + ): + headers["x-goog-encryption-kms-key-name"] = self.kms_key_name + + container = XMLMPUContainer(url, filename=None, headers=headers, retry=retry) + container.initiate(transport=transport, content_type=content_type) + upload_id = container.upload_id + + def _upload_part_from_data(data, part_number, checksum="auto"): + data_stream = BytesIO(data) + part = XMLMPUPart( + url, + upload_id, + filename=None, + file_obj=data_stream, + start=0, + end=len(data), + part_number=part_number, + checksum=checksum, + headers=headers.copy(), + retry=retry, + ) + part.upload(transport) + return (part_number, part.etag) + + def read_chunks(stream, chunk_size): + while True: + data = stream.read(chunk_size) + if not data: + break + yield data + + chunk_iterator = read_chunks(file_obj, XML_CHUNK_SIZE) + part_number = 1 + + try: + with ThreadPoolExecutor(max_workers=num_of_threads) as executor: + while True: + # Read a batch of chunks to be processed concurrently. + chunk_batch = list(itertools.islice(chunk_iterator, num_of_threads)) + if not chunk_batch: + break + + futures = [] + # Submit upload tasks for the current batch of chunks. + for i, chunk_data in enumerate(chunk_batch): + current_part_number = part_number + i + future = executor.submit( + _upload_part_from_data, chunk_data, current_part_number + ) + futures.append(future) + + # Wait for the current batch to complete. + for future in futures: + part_num, etag = future.result() + container.register_part(part_num, etag) + + part_number += len(chunk_batch) + print("num parts uploaded:", part_number - 1) + + res = container.finalize(transport) + print("MPU Complete Response:", res) + self.reload(client=client) + return self._properties + + except Exception: + container.cancel(transport) + raise + def _do_multipart_upload( self, client, @@ -2483,6 +2625,7 @@ def _do_upload( retry=None, command=None, crc32c_checksum_value=None, + perform_xml_mpu=True, ): """Determine an upload strategy and then perform the upload. @@ -2626,23 +2769,20 @@ def _do_upload( } retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) - if size is not None and size <= _MAX_MULTIPART_SIZE: - response = self._do_multipart_upload( - client, - stream, - content_type, - size, - predefined_acl, - if_generation_match, - if_generation_not_match, - if_metageneration_match, - if_metageneration_not_match, - timeout=timeout, - checksum=checksum, - retry=retry, - command=command, + st_time = time.monotonic_ns() + if perform_xml_mpu: + print("Performing XML MPU .") + response = self._do_xml_multipart_upload( + stream, retry=None, content_type=None, num_of_threads=1 + ) + print( + "Performed XMLMPU in ", + (time.monotonic_ns() - st_time) / 1_000_000, + response, ) + return response else: + print("Performing Resumable Upload!!!! .") response = self._do_resumable_upload( client, stream, @@ -2659,6 +2799,11 @@ def _do_upload( command=command, crc32c_checksum_value=crc32c_checksum_value, ) + print( + "Performed Resumable upload.", + response, + (time.monotonic_ns() - st_time) / 1_000_000, + ) return response.json() diff --git a/samples/snippets/storage_upload_from_memory.py b/samples/snippets/storage_upload_from_memory.py index eff3d222a..ffa38e310 100644 --- a/samples/snippets/storage_upload_from_memory.py +++ b/samples/snippets/storage_upload_from_memory.py @@ -14,13 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import sys +import time # [START storage_file_upload_from_memory] from google.cloud import storage -def upload_blob_from_memory(bucket_name, contents, destination_blob_name): +def upload_blob_from_memory(bucket_name, destination_blob_name, size_in_mb=1): """Uploads a file to the bucket.""" # The ID of your GCS bucket @@ -36,18 +38,52 @@ def upload_blob_from_memory(bucket_name, contents, destination_blob_name): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) + bytes_to_upload = int(size_in_mb * 1024 * 1024) + contents = os.urandom(bytes_to_upload) + + start_time = time.time_ns() blob.upload_from_string(contents) + end_time = time.time_ns() + + total_bytes_uploaded = len(contents) + # Time is in nanoseconds, convert to seconds for printing + total_time_taken_ns = end_time - start_time + total_time_taken_s = total_time_taken_ns / 1_000_000_000 + + if total_time_taken_ns > 0: + # Throughput calculation using nanoseconds + throughput_mb_s = ( + total_bytes_uploaded / (total_time_taken_ns / 1_000_000_000) + ) / (1024 * 1024) + else: + throughput_mb_s = float("inf") # Avoid division by zero + + print(f"Uploaded {total_bytes_uploaded} bytes in {total_time_taken_s:.9f} seconds.") + print(f"Throughput: {throughput_mb_s:.2f} MB/s") - print( - f"{destination_blob_name} with contents {contents} uploaded to {bucket_name}." - ) # [END storage_file_upload_from_memory] if __name__ == "__main__": + if len(sys.argv) < 3 or len(sys.argv) > 4: + print( + f"Usage: {sys.argv[0]} [size_in_mb]" + ) + sys.exit(1) + + bucket_name = sys.argv[1] + destination_blob_name = sys.argv[2] + size_mb = 1 + if len(sys.argv) == 4: + try: + size_mb = float(sys.argv[3]) + except ValueError: + print("Please provide a valid number for size_in_mb.") + sys.exit(1) + upload_blob_from_memory( - bucket_name=sys.argv[1], - contents=sys.argv[2], - destination_blob_name=sys.argv[3], + bucket_name=bucket_name, + destination_blob_name=destination_blob_name, + size_in_mb=size_mb, )