diff --git a/sagemaker-train/src/sagemaker/train/tuner.py b/sagemaker-train/src/sagemaker/train/tuner.py index d1af08e2f1..f99f055f0b 100644 --- a/sagemaker-train/src/sagemaker/train/tuner.py +++ b/sagemaker-train/src/sagemaker/train/tuner.py @@ -35,7 +35,6 @@ HyperParameterTuningInstanceConfig, TuningJobCompletionCriteria, Channel, - ) from sagemaker.core.resources import HyperParameterTuningJob from sagemaker.core.common_utils import ( @@ -48,8 +47,10 @@ ) from sagemaker.core.helper.pipeline_variable import PipelineVariable from sagemaker.core.workflow.pipeline_context import PipelineSession, runnable_by_pipeline + # Lazy import to avoid circular dependency - ModelTrainer imports from core from typing import TYPE_CHECKING + if TYPE_CHECKING: from sagemaker.train.model_trainer import ModelTrainer from sagemaker.core.training.configs import InputData @@ -220,6 +221,7 @@ def __init__( self.objective_metric_name_dict = None self._hyperparameter_ranges_dict = None self.metric_definitions_dict = None + self.static_hyperparameters = None self.static_hyperparameters_dict = None self.auto_parameters = None self.auto_parameters_dict = None @@ -254,7 +256,10 @@ def __init__( def override_resource_config( self, - instance_configs: Union[List[HyperParameterTuningInstanceConfig], Dict[str, List[HyperParameterTuningInstanceConfig]]], + instance_configs: Union[ + List[HyperParameterTuningInstanceConfig], + Dict[str, List[HyperParameterTuningInstanceConfig]], + ], ): """Override the instance configuration of the model_trainers used by the tuner. @@ -301,7 +306,9 @@ def _prepare_tags_for_tuning(self): """Add tags to tuning job (from ModelTrainer and JumpStart tags).""" # Add tags from ModelTrainer class - model_trainer = self.model_trainer or self.model_trainer_dict[sorted(self.model_trainer_dict.keys())[0]] + model_trainer = ( + self.model_trainer or self.model_trainer_dict[sorted(self.model_trainer_dict.keys())[0]] + ) model_trainer_tags = getattr(model_trainer, "tags", []) or [] @@ -327,7 +334,8 @@ def _prepare_job_name_for_tuning(self, job_name=None): base_name = self.base_tuning_job_name if base_name is None: model_trainer = ( - self.model_trainer or self.model_trainer_dict[sorted(self.model_trainer_dict.keys())[0]] + self.model_trainer + or self.model_trainer_dict[sorted(self.model_trainer_dict.keys())[0]] ) base_name = base_name_from_image( model_trainer.training_image, @@ -393,14 +401,12 @@ def _prepare_auto_parameters_for_tuning(self): self.auto_parameters_dict[model_trainer_name] = auto_parameters @classmethod - def _prepare_static_hyperparameters( - cls, model_trainer, hyperparameter_ranges - ): + def _prepare_static_hyperparameters(cls, model_trainer, hyperparameter_ranges): """Prepare static hyperparameters for one model_trainer before tuning.""" # Initialize hyperparameters if None if model_trainer.hyperparameters is None: model_trainer.hyperparameters = {} - + # Remove any hyperparameter that will be tuned static_hyperparameters = { str(k): to_string(v) for (k, v) in model_trainer.hyperparameters.items() @@ -439,11 +445,11 @@ def _prepare_auto_parameters(self, static_hyperparameters, hyperparameters_to_ke @classmethod def _prepare_model_trainer_for_tuning(cls, model_trainer, inputs=None, job_name=None, **kwargs): """Prepare ModelTrainer before tuning by uploading source code and configuring hyperparameters. - + This method mimics V2's _prepare_estimator_for_tuning() pattern, adapted for V3's ModelTrainer architecture. It ensures that script mode hyperparameters are set before the tuning job is created, which framework containers (PyTorch, TensorFlow) require. - + Args: model_trainer: ModelTrainer instance to prepare inputs: Training inputs (unused, for V2 compatibility) @@ -451,21 +457,21 @@ def _prepare_model_trainer_for_tuning(cls, model_trainer, inputs=None, job_name= **kwargs: Additional arguments (unused, for V2 compatibility) """ # Only proceed if source_code is configured - if hasattr(model_trainer, 'source_code') and model_trainer.source_code is not None: + if hasattr(model_trainer, "source_code") and model_trainer.source_code is not None: cls._upload_source_code_and_configure_hyperparameters(model_trainer) @classmethod def _upload_source_code_and_configure_hyperparameters(cls, model_trainer): """Upload source code to S3 and add script mode hyperparameters. - + Framework containers (PyTorch, TensorFlow) expect sagemaker_program and sagemaker_submit_directory hyperparameters for script mode execution. This method: 1. Checks if source_dir is a local path or S3 URI 2. Creates a tar.gz archive and uploads to S3 3. Adds required script mode hyperparameters to model_trainer.hyperparameters - + This follows V2's pattern of creating sourcedir.tar.gz files. - + Args: model_trainer: ModelTrainer instance with source_code configured """ @@ -473,13 +479,13 @@ def _upload_source_code_and_configure_hyperparameters(cls, model_trainer): import tarfile import tempfile import time - + source_code = model_trainer.source_code - + # Get source directory and entry script source_dir = source_code.source_dir entry_script = source_code.entry_script - + # Check if already an S3 URI if _is_valid_s3_uri(source_dir): # Already uploaded, use as-is @@ -488,18 +494,20 @@ def _upload_source_code_and_configure_hyperparameters(cls, model_trainer): # Local directory - need to create tar.gz and upload session = model_trainer.sagemaker_session bucket = session.default_bucket() - + # Generate S3 key timestamp = int(time.time()) - s3_key = f"{model_trainer.base_job_name or 'source'}/source-{timestamp}/sourcedir.tar.gz" - + s3_key = ( + f"{model_trainer.base_job_name or 'source'}/source-{timestamp}/sourcedir.tar.gz" + ) + # Create tar.gz file - with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp_file: + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: tar_path = tmp_file.name - + try: # Create tar.gz archive - with tarfile.open(tar_path, 'w:gz') as tar: + with tarfile.open(tar_path, "w:gz") as tar: # Add all files from source_dir for root, dirs, files in os.walk(source_dir): for file in files: @@ -507,25 +515,25 @@ def _upload_source_code_and_configure_hyperparameters(cls, model_trainer): # Calculate arcname to preserve directory structure arcname = os.path.relpath(file_path, source_dir) tar.add(file_path, arcname=arcname) - + # Upload to S3 - s3_client = session.boto_session.client('s3', region_name=session.boto_region_name) + s3_client = session.boto_session.client("s3", region_name=session.boto_region_name) s3_client.upload_file(tar_path, bucket, s3_key) - + # Construct S3 URI source_s3_uri = f"s3://{bucket}/{s3_key}" finally: # Clean up temp file if os.path.exists(tar_path): os.remove(tar_path) - + # Initialize hyperparameters dict if None if model_trainer.hyperparameters is None: model_trainer.hyperparameters = {} - + # Add script mode hyperparameters required by framework containers - model_trainer.hyperparameters['sagemaker_program'] = entry_script - model_trainer.hyperparameters['sagemaker_submit_directory'] = source_s3_uri + model_trainer.hyperparameters["sagemaker_program"] = entry_script + model_trainer.hyperparameters["sagemaker_submit_directory"] = source_s3_uri @runnable_by_pipeline def tune( @@ -636,16 +644,17 @@ def _get_best_training_job(self): hyperparameter tuning job. """ self._ensure_last_tuning_job() - + # Refresh the tuning job to get latest status tuning_job = self.latest_tuning_job.refresh() - + if tuning_job.best_training_job: # Convert the best training job to the expected format best_job = tuning_job.best_training_job return { "TrainingJobName": best_job.training_job_name, - "TrainingJobDefinitionName": best_job.training_job_definition_name or "training-job-definition" + "TrainingJobDefinitionName": best_job.training_job_definition_name + or "training-job-definition", } else: raise Exception( @@ -753,9 +762,7 @@ def hyperparameter_ranges(self): if self._hyperparameter_ranges is None: return None - return self._prepare_parameter_ranges_for_tuning( - self._hyperparameter_ranges - ) + return self._prepare_parameter_ranges_for_tuning(self._hyperparameter_ranges) def hyperparameter_ranges_dict(self): """Return a dictionary of hyperparameter ranges for all model_trainers in ``model_trainer_dict``""" @@ -783,7 +790,9 @@ def _prepare_parameter_ranges_for_tuning(cls, parameter_ranges): tuning_range_snake = {} for key, value in tuning_range.items(): # Convert PascalCase to snake_case - snake_key = ''.join(['_' + c.lower() if c.isupper() else c for c in key]).lstrip('_') + snake_key = "".join( + ["_" + c.lower() if c.isupper() else c for c in key] + ).lstrip("_") tuning_range_snake[snake_key] = value hp_ranges.append(tuning_range_snake) processed_parameter_ranges[range_type + "ParameterRanges"] = hp_ranges @@ -809,8 +818,7 @@ def analytics(self): """ self._ensure_last_tuning_job() return HyperparameterTuningJobAnalytics( - self.latest_tuning_job.hyper_parameter_tuning_job_name, - self.sagemaker_session + self.latest_tuning_job.hyper_parameter_tuning_job_name, self.sagemaker_session ) def _validate_parameter_ranges(self, model_trainer, hyperparameter_ranges): @@ -935,7 +943,9 @@ def _create_warm_start_tuner(self, additional_parents, warm_start_type, model_tr max_jobs=self.max_jobs, max_parallel_jobs=self.max_parallel_jobs, max_runtime_in_seconds=self.max_runtime_in_seconds, - warm_start_config=HyperParameterTuningJobWarmStartConfig(warm_start_type=warm_start_type, parents=all_parents), + warm_start_config=HyperParameterTuningJobWarmStartConfig( + warm_start_type=warm_start_type, parents=all_parents + ), early_stopping_type=self.early_stopping_type, random_seed=self.random_seed, ) @@ -1203,24 +1213,25 @@ def _add_model_trainer( if metric_definitions is not None: self.metric_definitions_dict[model_trainer_name] = metric_definitions - def _start_tuning_job(self, inputs): """Start a new hyperparameter tuning job using HyperParameterTuningJob.""" tuning_job_config = self._build_tuning_job_config() training_job_definition = self._build_training_job_definition(inputs) - + # Prepare autotune parameter autotune_param = None if self.autotune: from sagemaker.core.shapes import Autotune + autotune_param = Autotune(mode="Enabled") - + # Convert tags to proper Tag objects tag_objects = None if self.tags: from sagemaker.core.shapes import Tag + tag_objects = [Tag(key=tag["Key"], value=tag["Value"]) for tag in self.tags] - + # Build tuning request tuning_request = { "hyper_parameter_tuning_job_name": self._current_job_name, @@ -1230,12 +1241,12 @@ def _start_tuning_job(self, inputs): "tags": tag_objects, "autotune": autotune_param, } - + # Handle PipelineSession if isinstance(self.sagemaker_session, PipelineSession): from sagemaker.core.utils.utils import serialize from sagemaker.core.apiutils._boto_functions import to_pascal_case - + # Remove job name for pipeline as it's auto-generated at execution time tuning_request.pop("hyper_parameter_tuning_job_name", None) # Convert snake_case to PascalCase for AWS API @@ -1243,42 +1254,49 @@ def _start_tuning_job(self, inputs): serialized_request = serialize(pipeline_request) self.sagemaker_session._intercept_create_request(serialized_request, None, "tune") return None - + # Create the tuning job using HyperParameterTuningJob for regular session tuning_job = HyperParameterTuningJob.create( - session=self.sagemaker_session.boto_session if hasattr(self.sagemaker_session, 'boto_session') else None, - region=self.sagemaker_session.boto_region_name if hasattr(self.sagemaker_session, 'boto_region_name') else None, - **tuning_request + session=( + self.sagemaker_session.boto_session + if hasattr(self.sagemaker_session, "boto_session") + else None + ), + region=( + self.sagemaker_session.boto_region_name + if hasattr(self.sagemaker_session, "boto_region_name") + else None + ), + **tuning_request, ) - + return tuning_job - + def _build_tuning_job_config(self): """Build the hyperparameter tuning job configuration.""" from sagemaker.core.shapes import ( HyperParameterTuningJobConfig, HyperParameterTuningJobObjective, ResourceLimits, - ParameterRanges + ParameterRanges, ) - + # Build objective objective = None if self.objective_metric_name: objective = HyperParameterTuningJobObjective( - type=self.objective_type, - metric_name=self.objective_metric_name + type=self.objective_type, metric_name=self.objective_metric_name ) - + # Build resource limits resource_limits = ResourceLimits( max_number_of_training_jobs=self.max_jobs, - max_parallel_training_jobs=self.max_parallel_jobs + max_parallel_training_jobs=self.max_parallel_jobs, ) - + if self.max_runtime_in_seconds: resource_limits.max_runtime_in_seconds = self.max_runtime_in_seconds - + # Build parameter ranges parameter_ranges = None if self._hyperparameter_ranges: @@ -1286,28 +1304,28 @@ def _build_tuning_job_config(self): parameter_ranges = ParameterRanges( integer_parameter_ranges=ranges_dict.get("IntegerParameterRanges", []), continuous_parameter_ranges=ranges_dict.get("ContinuousParameterRanges", []), - categorical_parameter_ranges=ranges_dict.get("CategoricalParameterRanges", []) + categorical_parameter_ranges=ranges_dict.get("CategoricalParameterRanges", []), ) - + config = HyperParameterTuningJobConfig( strategy=self.strategy, hyper_parameter_tuning_job_objective=objective, resource_limits=resource_limits, parameter_ranges=parameter_ranges, - training_job_early_stopping_type=self.early_stopping_type + training_job_early_stopping_type=self.early_stopping_type, ) - + if self.random_seed: config.random_seed = self.random_seed - + if self.strategy_config: config.strategy_config = self.strategy_config - + if self.completion_criteria_config: config.tuning_job_completion_criteria = self.completion_criteria_config - + return config - + def _build_training_job_definition(self, inputs): """Build the training job definition for the tuning job.""" from sagemaker.core.shapes import ( @@ -1318,17 +1336,17 @@ def _build_training_job_definition(self, inputs): StoppingCondition, Channel, DataSource, - S3DataSource + S3DataSource, ) - + model_trainer = self.model_trainer - + # Build algorithm specification - use HyperParameterAlgorithmSpecification for tuning algorithm_spec = HyperParameterAlgorithmSpecification( training_image=model_trainer.training_image, - training_input_mode=model_trainer.training_input_mode or "File" + training_input_mode=model_trainer.training_input_mode or "File", ) - + if self.metric_definitions: # Convert metric definitions to snake_case for v3 Pydantic models metric_defs_snake = [] @@ -1336,75 +1354,104 @@ def _build_training_job_definition(self, inputs): metric_def_snake = {} for key, value in metric_def.items(): # Convert PascalCase to snake_case - snake_key = ''.join(['_' + c.lower() if c.isupper() else c for c in key]).lstrip('_') + snake_key = "".join( + ["_" + c.lower() if c.isupper() else c for c in key] + ).lstrip("_") metric_def_snake[snake_key] = value metric_defs_snake.append(metric_def_snake) algorithm_spec.metric_definitions = metric_defs_snake - + # Build input data config from inputs input_data_config = [] if inputs: if isinstance(inputs, str): # Single S3 URI string - input_data_config = [Channel( - channel_name="training", - data_source=DataSource( - s3_data_source=S3DataSource( - s3_data_type="S3Prefix", - s3_uri=inputs, - s3_data_distribution_type="FullyReplicated" - ) + input_data_config = [ + Channel( + channel_name="training", + data_source=DataSource( + s3_data_source=S3DataSource( + s3_data_type="S3Prefix", + s3_uri=inputs, + s3_data_distribution_type="FullyReplicated", + ) + ), ) - )] + ] elif isinstance(inputs, list): # List of InputData or Channel objects for inp in inputs: if isinstance(inp, InputData): # Convert InputData to Channel - input_data_config.append(Channel( - channel_name=inp.channel_name, - data_source=DataSource( - s3_data_source=S3DataSource( - s3_data_type="S3Prefix", - s3_uri=inp.data_source, - s3_data_distribution_type="FullyReplicated" - ) + input_data_config.append( + Channel( + channel_name=inp.channel_name, + data_source=DataSource( + s3_data_source=S3DataSource( + s3_data_type="S3Prefix", + s3_uri=inp.data_source, + s3_data_distribution_type="FullyReplicated", + ) + ), ) - )) + ) elif isinstance(inp, Channel): # Already a Channel object input_data_config.append(inp) elif isinstance(inputs, dict): # Dict mapping channel names to S3 URIs for channel_name, s3_uri in inputs.items(): - input_data_config.append(Channel( - channel_name=channel_name, - data_source=DataSource( - s3_data_source=S3DataSource( - s3_data_type="S3Prefix", - s3_uri=s3_uri, - s3_data_distribution_type="FullyReplicated" - ) + input_data_config.append( + Channel( + channel_name=channel_name, + data_source=DataSource( + s3_data_source=S3DataSource( + s3_data_type="S3Prefix", + s3_uri=s3_uri, + s3_data_distribution_type="FullyReplicated", + ) + ), ) - )) - + ) + + # Include ModelTrainer's internal channels (code, sm_drivers, etc.) + # These are created by ModelTrainer and are required for custom training logic + if hasattr(model_trainer, "input_data_config") and model_trainer.input_data_config: + for channel in model_trainer.input_data_config: + # Add internal channels that aren't already in input_data_config + if not any(c.channel_name == channel.channel_name for c in input_data_config): + input_data_config.append(channel) + # Build output data config output_config = OutputDataConfig( - s3_output_path=model_trainer.output_data_config.s3_output_path if model_trainer.output_data_config else None + s3_output_path=( + model_trainer.output_data_config.s3_output_path + if model_trainer.output_data_config + else None + ) ) - + # Build resource config resource_config = ResourceConfig( - instance_type=model_trainer.compute.instance_type if model_trainer.compute else "ml.m5.xlarge", + instance_type=( + model_trainer.compute.instance_type if model_trainer.compute else "ml.m5.xlarge" + ), instance_count=model_trainer.compute.instance_count if model_trainer.compute else 1, - volume_size_in_gb=model_trainer.compute.volume_size_in_gb if model_trainer.compute else 30 + volume_size_in_gb=( + model_trainer.compute.volume_size_in_gb if model_trainer.compute else 30 + ), ) - + # Build stopping condition stopping_condition = StoppingCondition() - if model_trainer.stopping_condition and model_trainer.stopping_condition.max_runtime_in_seconds: - stopping_condition.max_runtime_in_seconds = model_trainer.stopping_condition.max_runtime_in_seconds - + if ( + model_trainer.stopping_condition + and model_trainer.stopping_condition.max_runtime_in_seconds + ): + stopping_condition.max_runtime_in_seconds = ( + model_trainer.stopping_condition.max_runtime_in_seconds + ) + definition = HyperParameterTrainingJobDefinition( algorithm_specification=algorithm_spec, role_arn=model_trainer.role, @@ -1412,7 +1459,7 @@ def _build_training_job_definition(self, inputs): output_data_config=output_config, resource_config=resource_config, stopping_condition=stopping_condition, - static_hyper_parameters=self.static_hyperparameters or {} + static_hyper_parameters=self.static_hyperparameters or {}, ) - + return definition diff --git a/sagemaker-train/tests/unit/train/test_tuner.py b/sagemaker-train/tests/unit/train/test_tuner.py index bb09b86204..c0255eac47 100644 --- a/sagemaker-train/tests/unit/train/test_tuner.py +++ b/sagemaker-train/tests/unit/train/test_tuner.py @@ -14,7 +14,7 @@ from __future__ import absolute_import import pytest -from unittest.mock import MagicMock, patch, PropertyMock +from unittest.mock import MagicMock, patch from sagemaker.train.tuner import ( HyperparameterTuner, @@ -26,7 +26,79 @@ ContinuousParameter, IntegerParameter, ) -from sagemaker.core.shapes import HyperParameterTuningJobWarmStartConfig +from sagemaker.core.shapes import ( + HyperParameterTuningJobWarmStartConfig, + Channel, + DataSource, + S3DataSource, +) + + +# --------------------------------------------------------------------------- +# Factory functions for creating test objects (reduces fixture duplication) +# --------------------------------------------------------------------------- + + +def _create_mock_model_trainer(with_internal_channels=False): + """Create a mock ModelTrainer with common attributes. + + Args: + with_internal_channels: If True, adds internal channels (code, sm_drivers) + to input_data_config for testing channel inclusion in tuning jobs. + """ + trainer = MagicMock() + trainer.sagemaker_session = MagicMock() + trainer.hyperparameters = {"learning_rate": 0.1, "batch_size": 32, "optimizer": "adam"} + trainer.training_image = "test-image:latest" + trainer.training_input_mode = "File" + trainer.role = "arn:aws:iam::123456789012:role/SageMakerRole" + trainer.output_data_config = MagicMock() + trainer.output_data_config.s3_output_path = "s3://bucket/output" + trainer.compute = MagicMock() + trainer.compute.instance_type = "ml.m5.xlarge" + trainer.compute.instance_count = 1 + trainer.compute.volume_size_in_gb = 30 + trainer.stopping_condition = MagicMock() + trainer.stopping_condition.max_runtime_in_seconds = 3600 + trainer.input_data_config = None + + if with_internal_channels: + trainer.input_data_config = [ + _create_channel("code", "s3://bucket/code"), + _create_channel("sm_drivers", "s3://bucket/drivers"), + ] + return trainer + + +def _create_hyperparameter_ranges(): + """Create sample hyperparameter ranges.""" + return { + "learning_rate": ContinuousParameter(0.001, 0.1), + "batch_size": IntegerParameter(32, 256), + "optimizer": CategoricalParameter(["sgd", "adam"]), + } + + +def _create_single_hp_range(): + """Create a single hyperparameter range for simple tests.""" + return {"learning_rate": ContinuousParameter(0.001, 0.1)} + + +def _create_channel(name: str, uri: str) -> Channel: + """Create a Channel with S3 data source.""" + return Channel( + channel_name=name, + data_source=DataSource( + s3_data_source=S3DataSource( + s3_data_type="S3Prefix", s3_uri=uri, s3_data_distribution_type="FullyReplicated" + ) + ), + ) + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- class TestWarmStartTypes: @@ -47,19 +119,12 @@ class TestHyperparameterTunerInit: @pytest.fixture def mock_model_trainer(self): """Create a mock ModelTrainer.""" - trainer = MagicMock() - trainer.sagemaker_session = MagicMock() - trainer.hyperparameters = {"learning_rate": 0.1} - return trainer + return _create_mock_model_trainer() @pytest.fixture def hyperparameter_ranges(self): """Create sample hyperparameter ranges.""" - return { - "learning_rate": ContinuousParameter(0.001, 0.1), - "batch_size": IntegerParameter(32, 256), - "optimizer": CategoricalParameter(["sgd", "adam"]), - } + return _create_hyperparameter_ranges() def test_init_with_basic_params(self, mock_model_trainer, hyperparameter_ranges): """Test initialization with basic parameters.""" @@ -266,14 +331,10 @@ class TestHyperparameterTunerProperties: @pytest.fixture def tuner(self): """Create a basic tuner instance.""" - mock_trainer = MagicMock() - mock_trainer.sagemaker_session = MagicMock() return HyperparameterTuner( - model_trainer=mock_trainer, + model_trainer=_create_mock_model_trainer(), objective_metric_name="accuracy", - hyperparameter_ranges={ - "learning_rate": ContinuousParameter(0.001, 0.1), - }, + hyperparameter_ranges=_create_single_hp_range(), ) def test_sagemaker_session_property(self, tuner): @@ -293,15 +354,10 @@ def test_hyperparameter_ranges_dict_property_returns_none(self, tuner): def test_hyperparameter_ranges_dict_property_with_dict(self): """Test hyperparameter_ranges_dict property with model_trainer_dict.""" - mock_trainer = MagicMock() - mock_trainer.sagemaker_session = MagicMock() - tuner = HyperparameterTuner( - model_trainer=mock_trainer, + model_trainer=_create_mock_model_trainer(), objective_metric_name="accuracy", - hyperparameter_ranges={ - "learning_rate": ContinuousParameter(0.001, 0.1), - }, + hyperparameter_ranges=_create_single_hp_range(), model_trainer_name="trainer1", ) @@ -316,14 +372,10 @@ class TestHyperparameterTunerMethods: @pytest.fixture def tuner_with_job(self): """Create a tuner with a latest_tuning_job.""" - mock_trainer = MagicMock() - mock_trainer.sagemaker_session = MagicMock() tuner = HyperparameterTuner( - model_trainer=mock_trainer, + model_trainer=_create_mock_model_trainer(), objective_metric_name="accuracy", - hyperparameter_ranges={ - "learning_rate": ContinuousParameter(0.001, 0.1), - }, + hyperparameter_ranges=_create_single_hp_range(), ) tuner.latest_tuning_job = MagicMock() tuner._current_job_name = "test-tuning-job" @@ -331,13 +383,10 @@ def tuner_with_job(self): def test_ensure_last_tuning_job_raises_error_when_none(self): """Test _ensure_last_tuning_job raises error when no job exists.""" - mock_trainer = MagicMock() tuner = HyperparameterTuner( - model_trainer=mock_trainer, + model_trainer=_create_mock_model_trainer(), objective_metric_name="accuracy", - hyperparameter_ranges={ - "learning_rate": ContinuousParameter(0.001, 0.1), - }, + hyperparameter_ranges=_create_single_hp_range(), ) with pytest.raises(ValueError): @@ -363,7 +412,7 @@ def test_best_training_job(self, tuner_with_job): mock_best_job = MagicMock() mock_best_job.training_job_name = "best-job-123" mock_best_job.training_job_definition_name = "training-def" - + mock_tuning_job = MagicMock() mock_tuning_job.best_training_job = mock_best_job tuner_with_job.latest_tuning_job.refresh.return_value = mock_tuning_job @@ -378,7 +427,9 @@ def test_analytics(self, tuner_with_job): # Analytics is called with positional args assert mock_analytics.called call_args = mock_analytics.call_args - assert call_args[0][0] == tuner_with_job.latest_tuning_job.hyper_parameter_tuning_job_name + assert ( + call_args[0][0] == tuner_with_job.latest_tuning_job.hyper_parameter_tuning_job_name + ) class TestHyperparameterTunerValidation: @@ -397,9 +448,7 @@ def test_validate_model_trainer_dict_with_empty_dict(self): def test_validate_dict_argument_with_none(self): """Test _validate_dict_argument with None returns without error.""" # None is allowed and returns without raising - HyperparameterTuner._validate_dict_argument( - "test_arg", None, ["key1", "key2"] - ) + HyperparameterTuner._validate_dict_argument("test_arg", None, ["key1", "key2"]) def test_validate_dict_argument_with_invalid_keys(self): """Test _validate_dict_argument with invalid keys.""" @@ -426,16 +475,8 @@ class TestHyperparameterTunerStaticMethods: def test_prepare_static_hyperparameters(self): """Test _prepare_static_hyperparameters method.""" - mock_trainer = MagicMock() - mock_trainer.hyperparameters = { - "learning_rate": 0.1, - "batch_size": 32, - "optimizer": "adam", - } - - hyperparameter_ranges = { - "learning_rate": ContinuousParameter(0.001, 0.1), - } + mock_trainer = _create_mock_model_trainer() + hyperparameter_ranges = _create_single_hp_range() static_hps = HyperparameterTuner._prepare_static_hyperparameters( mock_trainer, hyperparameter_ranges @@ -451,9 +492,7 @@ def test_prepare_parameter_ranges_from_job_description(self): "ContinuousParameterRanges": [ {"Name": "learning_rate", "MinValue": "0.001", "MaxValue": "0.1"} ], - "IntegerParameterRanges": [ - {"Name": "batch_size", "MinValue": "32", "MaxValue": "256"} - ], + "IntegerParameterRanges": [{"Name": "batch_size", "MinValue": "32", "MaxValue": "256"}], "CategoricalParameterRanges": [ {"Name": "optimizer", "Values": ["sgd", "adam", "rmsprop"]} ], @@ -476,9 +515,7 @@ def test_extract_hyperparameters_from_parameter_ranges(self): "ContinuousParameterRanges": [ {"Name": "learning_rate", "MinValue": "0.001", "MaxValue": "0.1"} ], - "IntegerParameterRanges": [ - {"Name": "batch_size", "MinValue": "32", "MaxValue": "256"} - ], + "IntegerParameterRanges": [{"Name": "batch_size", "MinValue": "32", "MaxValue": "256"}], "CategoricalParameterRanges": [], } @@ -491,11 +528,7 @@ def test_extract_hyperparameters_from_parameter_ranges(self): def test_prepare_parameter_ranges_for_tuning(self): """Test _prepare_parameter_ranges_for_tuning method.""" - parameter_ranges = { - "learning_rate": ContinuousParameter(0.001, 0.1), - "batch_size": IntegerParameter(32, 256), - "optimizer": CategoricalParameter(["sgd", "adam"]), - } + parameter_ranges = _create_hyperparameter_ranges() processed_ranges = HyperparameterTuner._prepare_parameter_ranges_for_tuning( parameter_ranges @@ -507,3 +540,37 @@ def test_prepare_parameter_ranges_for_tuning(self): assert len(processed_ranges["ContinuousParameterRanges"]) == 1 assert len(processed_ranges["IntegerParameterRanges"]) == 1 assert len(processed_ranges["CategoricalParameterRanges"]) == 1 + + def test_build_training_job_definition_includes_internal_channels(self): + """Test that _build_training_job_definition includes ModelTrainer's internal channels. + + This test verifies the fix for GitHub issue #5508 where tuning jobs were missing + internal channels (code, sm_drivers) that ModelTrainer creates for custom training. + """ + from sagemaker.core.training.configs import InputData + + # Create mock ModelTrainer with internal channels (code, sm_drivers) + mock_trainer = _create_mock_model_trainer(with_internal_channels=True) + + # User-provided inputs + user_inputs = [ + InputData(channel_name="train", data_source="s3://bucket/train"), + InputData(channel_name="validation", data_source="s3://bucket/val"), + ] + + tuner = HyperparameterTuner( + model_trainer=mock_trainer, + objective_metric_name="accuracy", + hyperparameter_ranges=_create_single_hp_range(), + ) + + # Build training job definition + definition = tuner._build_training_job_definition(user_inputs) + + # Verify all channels are included + channel_names = [ch.channel_name for ch in definition.input_data_config] + assert "code" in channel_names, "Internal 'code' channel should be included" + assert "sm_drivers" in channel_names, "Internal 'sm_drivers' channel should be included" + assert "train" in channel_names, "User 'train' channel should be included" + assert "validation" in channel_names, "User 'validation' channel should be included" + assert len(channel_names) == 4, "Should have exactly 4 channels"