diff --git a/sagemaker-core/src/sagemaker/core/helper/pipeline_variable.py b/sagemaker-core/src/sagemaker/core/helper/pipeline_variable.py index b3d3c6d6e2..6e8cc3ec54 100644 --- a/sagemaker-core/src/sagemaker/core/helper/pipeline_variable.py +++ b/sagemaker-core/src/sagemaker/core/helper/pipeline_variable.py @@ -80,3 +80,7 @@ def __get_pydantic_core_schema__(cls, source_type, handler): # This is a type that could be either string or pipeline variable StrPipeVar = Union[str, PipelineVariable] +# This is a type that could be either integer or pipeline variable +IntPipeVar = Union[int, PipelineVariable] +# This is a type that could be either boolean or pipeline variable +BoolPipeVar = Union[bool, PipelineVariable] diff --git a/sagemaker-core/src/sagemaker/core/shapes/shapes.py b/sagemaker-core/src/sagemaker/core/shapes/shapes.py index f0d7361b12..c0715fc4ae 100644 --- a/sagemaker-core/src/sagemaker/core/shapes/shapes.py +++ b/sagemaker-core/src/sagemaker/core/shapes/shapes.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, ConfigDict, Field from typing import List, Dict, Optional, Any, Union from sagemaker.core.utils.utils import Unassigned -from sagemaker.core.helper.pipeline_variable import StrPipeVar +from sagemaker.core.helper.pipeline_variable import StrPipeVar, IntPipeVar, BoolPipeVar # Suppress Pydantic warnings about field names shadowing parent attributes warnings.filterwarnings("ignore", message=".*shadows an attribute.*") @@ -1324,10 +1324,10 @@ class ResourceConfig(Base): """ instance_type: Optional[StrPipeVar] = Unassigned() - instance_count: Optional[int] = Unassigned() - volume_size_in_gb: Optional[int] = Unassigned() + instance_count: Optional[IntPipeVar] = Unassigned() + volume_size_in_gb: Optional[IntPipeVar] = Unassigned() volume_kms_key_id: Optional[StrPipeVar] = Unassigned() - keep_alive_period_in_seconds: Optional[int] = Unassigned() + keep_alive_period_in_seconds: Optional[IntPipeVar] = Unassigned() capacity_reservation_ids: Optional[List[StrPipeVar]] = Unassigned() instance_groups: Optional[List[InstanceGroup]] = Unassigned() capacity_schedules_config: Optional[CapacitySchedulesConfig] = Unassigned() diff --git a/sagemaker-core/src/sagemaker/core/training/configs.py b/sagemaker-core/src/sagemaker/core/training/configs.py index e0b0445a80..a308ed40ee 100644 --- a/sagemaker-core/src/sagemaker/core/training/configs.py +++ b/sagemaker-core/src/sagemaker/core/training/configs.py @@ -25,7 +25,7 @@ from pydantic import BaseModel, model_validator, ConfigDict import sagemaker.core.shapes as shapes -from sagemaker.core.helper.pipeline_variable import StrPipeVar +from sagemaker.core.helper.pipeline_variable import StrPipeVar, IntPipeVar, BoolPipeVar # TODO: Can we add custom logic to some of these to set better defaults? from sagemaker.core.shapes import ( @@ -158,23 +158,23 @@ class Compute(shapes.ResourceConfig): instance_type (Optional[StrPipeVar]): The ML compute instance type. For information about available instance types, see https://aws.amazon.com/sagemaker/pricing/. - instance_count (Optional[int]): The number of ML compute instances to use. For distributed + instance_count (Optional[IntPipeVar]): The number of ML compute instances to use. For distributed training, provide a value greater than 1. - volume_size_in_gb (Optional[int]): + volume_size_in_gb (Optional[IntPipeVar]): The size of the ML storage volume that you want to provision. ML storage volumes store model artifacts and incremental states. Training algorithms might also use the ML storage volume for scratch space. Default: 30 volume_kms_key_id (Optional[StrPipeVar]): The Amazon Web Services KMS key that SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s) that run the training job. - keep_alive_period_in_seconds (Optional[int]): + keep_alive_period_in_seconds (Optional[IntPipeVar]): The duration of time in seconds to retain configured resources in a warm pool for subsequent training jobs. instance_groups (Optional[List[InstanceGroup]]): A list of instance groups for heterogeneous clusters to be used in the training job. training_plan_arn (Optional[StrPipeVar]): The Amazon Resource Name (ARN) of the training plan to use for this resource configuration. - enable_managed_spot_training (Optional[bool]): + enable_managed_spot_training (Optional[BoolPipeVar]): To train models using managed spot training, choose True. Managed spot training provides a fully managed and scalable infrastructure for training machine learning models. this option is useful when training jobs can be interrupted and when there diff --git a/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py b/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py index 04e2a4e6a1..90c1eb3aaf 100644 --- a/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py +++ b/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py @@ -6,7 +6,12 @@ from sagemaker.train import ModelTrainer from sagemaker.train.configs import InputData, Compute from sagemaker.core.processing import ScriptProcessor -from sagemaker.core.shapes import ProcessingInput, ProcessingS3Input, ProcessingOutput, ProcessingS3Output +from sagemaker.core.shapes import ( + ProcessingInput, + ProcessingS3Input, + ProcessingOutput, + ProcessingS3Output, +) from sagemaker.serve.model_builder import ModelBuilder from sagemaker.core.workflow.parameters import ParameterInteger, ParameterString from sagemaker.mlops.workflow.pipeline import Pipeline @@ -37,22 +42,27 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r bucket = sagemaker_session.default_bucket() prefix = "integ-test-v3-pipeline" base_job_prefix = "train-registry-job" - + # Upload abalone data to S3 - s3_client = boto3.client('s3') + s3_client = boto3.client("s3") abalone_path = os.path.join(os.path.dirname(__file__), "data", "pipeline", "abalone.csv") s3_client.upload_file(abalone_path, bucket, f"{prefix}/input/abalone.csv") input_data_s3 = f"s3://{bucket}/{prefix}/input/abalone.csv" - + # Parameters processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) + training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1) + instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") input_data = ParameterString( name="InputDataUrl", default_value=input_data_s3, ) - + hyper_parameter_objective = ParameterString( + name="TrainingObjective", default_value="reg:linear" + ) + cache_config = CacheConfig(enable_caching=True, expire_after="30d") - + # Processing step sklearn_processor = ScriptProcessor( image_uri=image_uris.retrieve( @@ -62,13 +72,13 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r py_version="py3", instance_type="ml.m5.xlarge", ), - instance_type="ml.m5.xlarge", + instance_type=instance_type, instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}-sklearn", sagemaker_session=pipeline_session, role=role, ) - + processor_args = sklearn_processor.run( inputs=[ ProcessingInput( @@ -79,7 +89,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="ShardedByS3Key", - ) + ), ) ], outputs=[ @@ -88,36 +98,36 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r s3_output=ProcessingS3Output( s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/train", local_path="/opt/ml/processing/train", - s3_upload_mode="EndOfJob" - ) + s3_upload_mode="EndOfJob", + ), ), ProcessingOutput( output_name="validation", s3_output=ProcessingS3Output( s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/validation", local_path="/opt/ml/processing/validation", - s3_upload_mode="EndOfJob" - ) + s3_upload_mode="EndOfJob", + ), ), ProcessingOutput( output_name="test", s3_output=ProcessingS3Output( s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/test", local_path="/opt/ml/processing/test", - s3_upload_mode="EndOfJob" - ) + s3_upload_mode="EndOfJob", + ), ), ], code=os.path.join(os.path.dirname(__file__), "code", "pipeline", "preprocess.py"), arguments=["--input-data", input_data], ) - + step_process = ProcessingStep( name="PreprocessData", step_args=processor_args, cache_config=cache_config, ) - + # Training step image_uri = image_uris.retrieve( framework="xgboost", @@ -126,34 +136,36 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r py_version="py3", instance_type="ml.m5.xlarge", ) - + model_trainer = ModelTrainer( training_image=image_uri, - compute=Compute(instance_type="ml.m5.xlarge", instance_count=1), + compute=Compute(instance_type=instance_type, instance_count=training_instance_count), base_job_name=f"{base_job_prefix}-xgboost", sagemaker_session=pipeline_session, role=role, hyperparameters={ - "objective": "reg:linear", + "objective": hyper_parameter_objective, "num_round": 50, "max_depth": 5, }, input_data_config=[ InputData( channel_name="train", - data_source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, - content_type="text/csv" + data_source=step_process.properties.ProcessingOutputConfig.Outputs[ + "train" + ].S3Output.S3Uri, + content_type="text/csv", ), ], ) - + train_args = model_trainer.train() step_train = TrainingStep( name="TrainModel", step_args=train_args, cache_config=cache_config, ) - + # Model step model_builder = ModelBuilder( s3_model_data_url=step_train.properties.ModelArtifacts.S3ModelArtifacts, @@ -161,12 +173,9 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r sagemaker_session=pipeline_session, role_arn=role, ) - - step_create_model = ModelStep( - name="CreateModel", - step_args=model_builder.build() - ) - + + step_create_model = ModelStep(name="CreateModel", step_args=model_builder.build()) + # Register step model_package_group_name = f"integ-test-model-group-{uuid.uuid4().hex[:8]}" step_register_model = ModelStep( @@ -176,33 +185,39 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.m5.xlarge"], - approval_status="Approved" - ) + approval_status="Approved", + ), ) - + # Pipeline pipeline_name = f"integ-test-train-registry-{uuid.uuid4().hex[:8]}" pipeline = Pipeline( name=pipeline_name, - parameters=[processing_instance_count, input_data], + parameters=[ + processing_instance_count, + training_instance_count, + instance_type, + input_data, + hyper_parameter_objective, + ], steps=[step_process, step_train, step_create_model, step_register_model], sagemaker_session=pipeline_session, ) - + model_name = None try: # Upsert and execute pipeline pipeline.upsert(role_arn=role) execution = pipeline.start() - + # Poll execution status with 30 minute timeout timeout = 1800 start_time = time.time() - + while time.time() - start_time < timeout: execution_desc = execution.describe() execution_status = execution_desc["PipelineExecutionStatus"] - + if execution_status == "Succeeded": # Get model name from execution steps steps = sagemaker_session.sagemaker_client.list_pipeline_execution_steps( @@ -219,33 +234,39 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r steps = sagemaker_session.sagemaker_client.list_pipeline_execution_steps( PipelineExecutionArn=execution_desc["PipelineExecutionArn"] )["PipelineExecutionSteps"] - + failed_steps = [] for step in steps: if step.get("StepStatus") == "Failed": failure_reason = step.get("FailureReason", "Unknown reason") failed_steps.append(f"{step['StepName']}: {failure_reason}") - - failure_details = "\n".join(failed_steps) if failed_steps else "No detailed failure information available" - pytest.fail(f"Pipeline execution {execution_status}. Failed steps:\n{failure_details}") - + + failure_details = ( + "\n".join(failed_steps) + if failed_steps + else "No detailed failure information available" + ) + pytest.fail( + f"Pipeline execution {execution_status}. Failed steps:\n{failure_details}" + ) + time.sleep(60) else: pytest.fail(f"Pipeline execution timed out after {timeout} seconds") - + finally: # Cleanup S3 resources - s3 = boto3.resource('s3') + s3 = boto3.resource("s3") bucket_obj = s3.Bucket(bucket) - bucket_obj.objects.filter(Prefix=f'{prefix}/').delete() - + bucket_obj.objects.filter(Prefix=f"{prefix}/").delete() + # Cleanup model if model_name: try: sagemaker_session.sagemaker_client.delete_model(ModelName=model_name) except Exception: pass - + # Cleanup model package group try: sagemaker_session.sagemaker_client.delete_model_package_group( @@ -253,7 +274,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r ) except Exception: pass - + # Cleanup pipeline try: sagemaker_session.sagemaker_client.delete_pipeline(PipelineName=pipeline_name) diff --git a/sagemaker-train/src/sagemaker/train/utils.py b/sagemaker-train/src/sagemaker/train/utils.py index ac066b232b..0abd7596b5 100644 --- a/sagemaker-train/src/sagemaker/train/utils.py +++ b/sagemaker-train/src/sagemaker/train/utils.py @@ -26,6 +26,7 @@ from sagemaker.core.helper.session_helper import Session from sagemaker.core.shapes import Unassigned from sagemaker.train import logger +from sagemaker.core.workflow.parameters import PipelineVariable def _default_bucket_and_prefix(session: Session) -> str: @@ -172,9 +173,10 @@ def safe_serialize(data): This function handles the following cases: 1. If `data` is a string, it returns the string as-is without wrapping in quotes. - 2. If `data` is serializable (e.g., a dictionary, list, int, float), it returns + 2. If `data` is of type `PipelineVariable`, it returns the json representation of the PipelineVariable + 3. If `data` is serializable (e.g., a dictionary, list, int, float), it returns the JSON-encoded string using `json.dumps()`. - 3. If `data` cannot be serialized (e.g., a custom object), it returns the string + 4. If `data` cannot be serialized (e.g., a custom object), it returns the string representation of the data using `str(data)`. Args: @@ -185,6 +187,8 @@ def safe_serialize(data): """ if isinstance(data, str): return data + elif isinstance(data, PipelineVariable): + return data try: return json.dumps(data) except TypeError: diff --git a/v3-examples/ml-ops-examples/v3-pipeline-train-create-registry.ipynb b/v3-examples/ml-ops-examples/v3-pipeline-train-create-registry.ipynb index 0b46e48550..2db40fa50e 100644 --- a/v3-examples/ml-ops-examples/v3-pipeline-train-create-registry.ipynb +++ b/v3-examples/ml-ops-examples/v3-pipeline-train-create-registry.ipynb @@ -92,6 +92,7 @@ "model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\"\n", ")\n", + "hyperparameter_max_depth = ParameterString(name=\"MaxDepth\", default_value=\"5\")\n", "\n", "# Cache Pipeline steps to reduce execution time on subsequent executions\n", "cache_config = CacheConfig(enable_caching=True, expire_after=\"30d\")" @@ -344,7 +345,7 @@ "model_trainer = ModelTrainer(\n", " training_image=image_uri,\n", " compute=Compute(\n", - " instance_type=\"ml.m5.xlarge\",\n", + " instance_type=training_instance_type,\n", " instance_count=1,\n", " ),\n", " base_job_name=f\"{base_job_prefix}-xgboost-train\",\n", @@ -353,7 +354,7 @@ " hyperparameters={\n", " \"objective\": \"reg:linear\",\n", " \"num_round\": 50,\n", - " \"max_depth\": 5,\n", + " \"max_depth\": hyperparameter_max_depth,\n", " \"eta\": 0.2,\n", " \"gamma\": 4,\n", " \"min_child_weight\": 6,\n", @@ -450,8 +451,10 @@ " name=\"pipeline-v3\",\n", " parameters=[\n", " processing_instance_count,\n", + " training_instance_type,\n", " input_data,\n", " model_approval_status,\n", + " hyperparameter_max_depth\n", " ],\n", " steps=[step_process, step_train, step_create_model, step_register_model],\n", " sagemaker_session=pipeline_session,\n",