diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100755 index 96a3a2d..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "[python]": { - "editor.defaultFormatter": "charliermarsh.ruff", - "editor.formatOnSave": true, - }, - "python.testing.pytestArgs": [ - "src" - ], - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true -} \ No newline at end of file diff --git a/README.md b/README.md index 1323e6d..caad6e2 100755 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ χ₀ addresses the systematic distributional shift among the human demonstration distribution ($P_\text{train}$), the inductive bias learned by the policy ($Q_\text{model}$), and the test-time execution distribution ($P_\text{test}$) through three technical modules: - **[Model Arithmetic](#model-arithmetic)**: A weight-space merging strategy that combines models trained on different data subsets, efficiently capturing diverse knowledge without architectural complexity. **[Released]** -- **[Stage Advantage](#stage-advantage-coming-soon)**: A stage-aware advantage estimator that provides stable, dense progress signals for policy training. **[Coming Soon]** +- **[Stage Advantage](#stage-advantage)**: A stage-aware advantage estimator that provides stable, dense progress signals for policy training. **[Released]** - **[Train-Deploy Alignment](#train-deploy-alignment-coming-soon)**: Bridges the distribution gap via spatio-temporal augmentation, heuristic DAgger corrections, and temporal chunk-wise smoothing. **[Coming Soon]** χ₀ enables two sets of dual-arm robots to collaboratively orchestrate long-horizon garment manipulation — flattening, folding, and hanging — surpassing the state-of-the-art $\pi_{0.5}$ baseline by approximately 250% in success rate, with `only 20 hours of data and 8 A100 GPUs`. @@ -46,7 +46,7 @@ https://github.com/user-attachments/assets/3f5f0c48-ff3f-4b9b-985b-59ad0b2ea97c - [Model Arithmetic](#model-arithmetic) - [Workflow](#workflow) - [Quick Start](#quick-start) -- [Stage Advantage (Coming Soon)](#stage-advantage-coming-soon) +- [Stage Advantage](#stage-advantage) - [Train-Deploy Alignment (Coming Soon)](#train-deploy-alignment-coming-soon) - [Citation](#licenseandcitation) - [Troubleshooting](#troubleshooting) @@ -54,6 +54,7 @@ https://github.com/user-attachments/assets/3f5f0c48-ff3f-4b9b-985b-59ad0b2ea97c ## Update +- [Feb 14 2026] Release of the **Stage Advantage** module: advantage estimator training, evaluation, GT labeling, and AWBC training pipeline. - [Feb 10 2026] Initial release of the **Model Arithmetic** module with support for both JAX and PyTorch checkpoints (not tested thoroughly). - [Feb 10 2026] χ₀ paper released. @@ -208,9 +209,9 @@ Checkpoints are written to the config’s checkpoint directory. You can then use - [x] kai0 oracle: training and inference code with non-advantage data of three tasks - [x] Model Arithmetic: code of different baselines for weight-space interpolation -- [ ] Stage Advantage: code, data (advantage labels), and checkpoints — **Feb 12** -- [ ] HuggingFace & ModelScope: upload Stage Advantage data and checkpoints — **Feb 12** -- [ ] Train-Deploy Alignment — **Feb 15** +- [x] Stage Advantage: code, data (advantage labels), and checkpoints +- [ ] HuggingFace & ModelScope: upload Stage Advantage data and checkpoints — **Feb 14** +- [ ] Train-Deploy Alignment — **Feb 14** ## Model Arithmetic @@ -265,11 +266,54 @@ python model_arithmetic/arithmetic_torch.py \ For gradient-based optimization, dataset splitting, and all other methods, see the full documentation in [`model_arithmetic/README.md`](model_arithmetic/README.md). -## Stage Advantage (Coming Soon) +## Stage Advantage Stage Advantage decomposes long-horizon tasks into semantic stages and provides stage-aware advantage signals for policy training. It addresses the numerical instability of prior non-stage approaches by computing advantage as progress differentials within each stage, yielding smoother and more stable supervision. -**This module is currently under refinement and will be released soon.** +The full pipeline has four stages: + +``` +Stage 0: GT Labeling → Stage 1: Train Advantage Estimator → Stage 2: Advantage Estimation → Stage 3: AWBC Training +``` + +### Quick Start + +**Stage 0 — GT Data Labeling**: Compute advantage values and discretize into `task_index` labels. + +```bash +cd stage_advantage/annotation +python gt_label.py \ + --threshold 30 --chunk-size 50 --discretion-type binary \ + --advantage-source absolute_advantage +``` + +For batch labeling across multiple dataset variants, see `stage_advantage/annotation/gt_labeling.sh`. + +**Stage 1 — Train Advantage Estimator**: Fine-tune a pi0-based model to predict advantage from observations. + +```bash +uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 +``` + +For a ready-to-use script with environment setup (conda/venv activation, DDP configuration) and automatic log management, see `stage_advantage/annotation/train_estimator.sh`. + +**Stage 2 — Advantage Estimation on New Data**: Use the trained estimator to label datasets with predicted advantage values. + +```bash +uv run python stage_advantage/annotation/eval.py Flatten-Fold KAI0 /path/to/dataset +``` + +For a ready-to-use script with environment setup and status logging, see `stage_advantage/annotation/eval.sh`. + +**Stage 3 — AWBC Training**: Train a policy with Advantage-Weighted Behavior Cloning. + +```bash +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_flatten_fold_awbc --exp_name=run1 +``` + +For a ready-to-use script with environment setup and automatic log management, see `stage_advantage/awbc/train_awbc.sh`. + +For the full pipeline details, configuration instructions, and all parameters, see [`stage_advantage/README.md`](stage_advantage/README.md). ## Train-Deploy Alignment (Coming Soon) diff --git a/src/openpi/policies/agilex_policy.py b/src/openpi/policies/agilex_policy.py index 24270bd..dfb7073 100755 --- a/src/openpi/policies/agilex_policy.py +++ b/src/openpi/policies/agilex_policy.py @@ -15,7 +15,9 @@ class AgilexInputs(transforms.DataTransformFn): """Inputs for the Agilex policy. Expected inputs: - - images: dict[name, img] where img is [channel, height, width]. name must be in EXPECTED_CAMERAS. + - images: dict[name, img] where img is [channel, height, width]. For normal pi05 + training, names must be exactly the keys of required_rename_map. For advantage + estimator, optional_rename_map keys may be included as well. - state: [14] - actions: [action_horizon, 14] """ @@ -28,13 +30,23 @@ class AgilexInputs(transforms.DataTransformFn): # The expected cameras names. All input cameras must be in this set. Missing cameras will be # replaced with black images and the corresponding `image_mask` will be set to False. - EXPECTED_CAMERAS: ClassVar[tuple[str, ...]] = ("top_head", "hand_left", "hand_right") - rename_map = { + required_rename_map = { "top_head": "base_0_rgb", "hand_left": "left_wrist_0_rgb", "hand_right": "right_wrist_0_rgb" } + # Optional cameras for advantage-estimator training (history frames). + optional_rename_map = { + "his_-100_top_head": "base_-100_rgb", + "his_-100_hand_left": "left_wrist_-100_rgb", + "his_-100_hand_right": "right_wrist_-100_rgb", + } + + all_rename_map = {**required_rename_map, **optional_rename_map} + + EXPECTED_CAMERAS: ClassVar[tuple[str, ...]] = tuple(required_rename_map.keys()) + EXTRA_CAMERAS: ClassVar[tuple[str, ...]] = tuple(optional_rename_map.keys()) # if set all state to zeros mask_state: bool = False @@ -43,6 +55,11 @@ def __call__(self, data: dict) -> dict: # We only mask padding for pi0 model, not pi0-FAST mask_padding = self.model_type == _model.ModelType.PI0 + in_images = data["images"] + + if set(in_images) - set(self.EXPECTED_CAMERAS) - set(self.EXTRA_CAMERAS): + raise ValueError(f"Expected images to contain {self.EXPECTED_CAMERAS}, got {tuple(in_images)}") + # Pad the proprioceptive input to the action dimension of the model state = transforms.pad_to_dim(data["state"], self.action_dim) # Ensure state has correct shape [batch_size, state_dim] @@ -50,9 +67,10 @@ def __call__(self, data: dict) -> dict: # Parse images to uint8 (H,W,C) since LeRobot automatically stores as float32 (C,H,W) images = {} - for camera in self.EXPECTED_CAMERAS: - if camera in data["images"]: - img = data["images"][camera] + image_masks = {} + for camera in self.EXPECTED_CAMERAS + self.EXTRA_CAMERAS: + if camera in in_images: + img = in_images[camera] # Convert torch tensor to numpy array if needed if isinstance(img, torch.Tensor): img = img.cpu().numpy() @@ -62,12 +80,14 @@ def __call__(self, data: dict) -> dict: # Convert from [C,H,W] to [H,W,C] if needed if img.shape[0] == 3: img = np.transpose(img, (1, 2, 0)) - images[self.rename_map[camera]] = img + images[self.all_rename_map[camera]] = img + image_masks[self.all_rename_map[camera]] = np.True_ + + elif camera not in in_images and camera in self.EXTRA_CAMERAS: + continue # optional camera can be skipped else: raise ValueError(f"Camera {camera} not found in data") - # Create image mask based on available cameras - image_mask = {self.rename_map[camera]: np.True_ for camera in self.EXPECTED_CAMERAS} # filter unnormal state / action value, set to 0 state = np.where(state > np.pi, 0, state) @@ -77,7 +97,7 @@ def __call__(self, data: dict) -> dict: masked_state = np.zeros_like(state) if self.mask_state else state inputs = { "image": images, - "image_mask": image_mask, + "image_mask": image_masks, "state": masked_state, } @@ -91,17 +111,34 @@ def __call__(self, data: dict) -> dict: action_mask = np.ones_like(actions, dtype=bool) action_mask[:, self.action_dim:] = False inputs["action_mask"] = action_mask - - if self.convert_to_eef_position: - actions[..., :14] = batch_qpos_to_eef_pos(actions[..., :14]) + inputs["actions"] = actions.squeeze() # Add prompt if present if "prompt" in data: inputs["prompt"] = data["prompt"] - + + # Advantage-estimator optional fields: passthrough or convert to tensor + for key in ("frame_index", "episode_length", "progress", "image_original", "episode_index"): + if key in data: + inputs[key] = data[key] + + def _to_tensor(x, default=None): + if x is None and default is not None: + return default + if isinstance(x, np.ndarray): + return torch.from_numpy(x) + if isinstance(x, torch.Tensor): + return x.detach().clone() + raise NotImplementedError(f"Unsupported type: {type(x)}") + + if "action_advantage" in data: + inputs["action_advantage"] = _to_tensor(data["action_advantage"], default=torch.tensor(1.0)) + if "action_advantage_original" in data: + inputs["action_advantage_original"] = _to_tensor(data["action_advantage_original"]) return inputs + @dataclasses.dataclass(frozen=True) class AgilexOutputs(transforms.DataTransformFn): """Outputs for the Agilex policy.""" diff --git a/src/openpi/policies/arx_policy.py b/src/openpi/policies/arx_policy.py index f8b0733..52150b5 100755 --- a/src/openpi/policies/arx_policy.py +++ b/src/openpi/policies/arx_policy.py @@ -9,12 +9,15 @@ import openpi.models.model as _model import openpi.transforms as transforms + @dataclasses.dataclass(frozen=True) class ARXInputs(transforms.DataTransformFn): """Inputs for the ARX policy. Expected inputs: - - images: dict[name, img] where img is [channel, height, width]. name must be in EXPECTED_CAMERAS. + - images: dict[name, img] where img is [channel, height, width]. For normal pi05 + training, names must be exactly the keys of required_rename_map. For advantage + estimator, optional_rename_map keys may be included as well. - state: [14] - actions: [action_horizon, 14] """ @@ -27,22 +30,36 @@ class ARXInputs(transforms.DataTransformFn): # The expected cameras names. All input cameras must be in this set. Missing cameras will be # replaced with black images and the corresponding `image_mask` will be set to False. - EXPECTED_CAMERAS: ClassVar[tuple[str, ...]] = ("top_head", "hand_left", "hand_right") - rename_map = { + required_rename_map = { "top_head": "base_0_rgb", "hand_left": "left_wrist_0_rgb", "hand_right": "right_wrist_0_rgb" } + # Optional cameras for advantage-estimator training (history frames). + optional_rename_map = { + "his_-100_top_head": "base_-100_rgb", + "his_-100_hand_left": "left_wrist_-100_rgb", + "his_-100_hand_right": "right_wrist_-100_rgb", + } + + all_rename_map = {**required_rename_map, **optional_rename_map} + + EXPECTED_CAMERAS: ClassVar[tuple[str, ...]] = tuple(required_rename_map.keys()) + EXTRA_CAMERAS: ClassVar[tuple[str, ...]] = tuple(optional_rename_map.keys()) # if set all state to zeros mask_state: bool = False - def __call__(self, data: dict) -> dict: # We only mask padding for pi0 model, not pi0-FAST mask_padding = self.model_type == _model.ModelType.PI0 + in_images = data["images"] + + if set(in_images) - set(self.EXPECTED_CAMERAS) - set(self.EXTRA_CAMERAS): + raise ValueError(f"Expected images to contain {self.EXPECTED_CAMERAS}, got {tuple(in_images)}") + # Pad the proprioceptive input to the action dimension of the model state = transforms.pad_to_dim(data["state"], self.action_dim) # Ensure state has correct shape [batch_size, state_dim] @@ -50,9 +67,10 @@ def __call__(self, data: dict) -> dict: # Parse images to uint8 (H,W,C) since LeRobot automatically stores as float32 (C,H,W) images = {} - for camera in self.EXPECTED_CAMERAS: - if camera in data["images"]: - img = data["images"][camera] + image_masks = {} + for camera in self.EXPECTED_CAMERAS + self.EXTRA_CAMERAS: + if camera in in_images: + img = in_images[camera] # Convert torch tensor to numpy array if needed if isinstance(img, torch.Tensor): img = img.cpu().numpy() @@ -62,38 +80,57 @@ def __call__(self, data: dict) -> dict: # Convert from [C,H,W] to [H,W,C] if needed if img.shape[0] == 3: img = np.transpose(img, (1, 2, 0)) - images[self.rename_map[camera]] = img + images[self.all_rename_map[camera]] = img + image_masks[self.all_rename_map[camera]] = np.True_ + + elif camera not in in_images and camera in self.EXTRA_CAMERAS: + continue # optional camera can be skipped else: raise ValueError(f"Camera {camera} not found in data") - # Create image mask based on available cameras - image_mask = {self.rename_map[camera]: np.True_ for camera in self.EXPECTED_CAMERAS} - # Prepare inputs dictionary masked_state = np.zeros_like(state) if self.mask_state else state inputs = { "image": images, - "image_mask": image_mask, + "image_mask": image_masks, "state": masked_state, } # Add actions if present if "actions" in data: actions = transforms.pad_to_dim(data["actions"], self.action_dim) - # actions = np.where(actions > np.pi, 0, actions) - # actions = np.where(actions < -np.pi, 0, actions) + actions = np.where(actions > np.pi, 0, actions) + actions = np.where(actions < -np.pi, 0, actions) if mask_padding: # Create action mask for padding action_mask = np.ones_like(actions, dtype=bool) action_mask[:, self.action_dim:] = False inputs["action_mask"] = action_mask - + inputs["actions"] = actions.squeeze() # Add prompt if present if "prompt" in data: inputs["prompt"] = data["prompt"] - + + # Advantage-estimator optional fields: passthrough or convert to tensor + for key in ("frame_index", "episode_length", "progress", "image_original", "episode_index"): + if key in data: + inputs[key] = data[key] + + def _to_tensor(x, default=None): + if x is None and default is not None: + return default + if isinstance(x, np.ndarray): + return torch.from_numpy(x) + if isinstance(x, torch.Tensor): + return x.detach().clone() + raise NotImplementedError(f"Unsupported type: {type(x)}") + + if "action_advantage" in data: + inputs["action_advantage"] = _to_tensor(data["action_advantage"], default=torch.tensor(1.0)) + if "action_advantage_original" in data: + inputs["action_advantage_original"] = _to_tensor(data["action_advantage_original"]) return inputs diff --git a/src/openpi/training/config.py b/src/openpi/training/config.py index e60dc88..7ee98d9 100755 --- a/src/openpi/training/config.py +++ b/src/openpi/training/config.py @@ -1216,119 +1216,161 @@ def __post_init__(self) -> None: batch_size=256, ), - #**************************FlattenFold AWBC******************************* + #************************Advantage Estimator*************************** + TrainConfig( + name="ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD", + advantage_estimator=True, + model=pi0_config.AdvantageEstimatorConfig( + pi05=True, + loss_value_weight=1., + loss_action_weight=0., + discrete_state_input=False, + ), + data=LerobotAgilexDataConfig( + repo_id = "Path/to/your/advantage/dataset", + assets=AssetsConfig( + assets_dir="Path/to/your/advantage/dataset/assets", + asset_id="Your_advantage_dataset_name", + ), + default_prompt="Flatten and fold the cloth.", + # * why removing "prompt" here will lead to an error in transforms.py + repack_transforms=_transforms.Group( + inputs=[ + _transforms.RepackTransform( + { + "images": { + "top_head": "observation.images.top_head", + "hand_left": "observation.images.hand_left", + "hand_right": "observation.images.hand_right", + "his_-100_top_head": "his_-100_observation.images.top_head", + "his_-100_hand_left": "his_-100_observation.images.hand_left", + "his_-100_hand_right": "his_-100_observation.images.hand_right", + }, + "state": "observation.state", + "actions": "action", + # "prompt": "prompt", # ! Not adding this for default prompt. + "episode_length": "episode_length", + "frame_index": "frame_index", + "episode_index": "episode_index", + "progress_gt": "progress_gt", + "stage_progress_gt": "stage_progress_gt", + "progress": "progress", + # "is_suboptimal": "is_suboptimal", + } + ) + ] + ) + ), + pytorch_weight_path="Path/to/your/pi05_base/checkpoint", + num_train_steps=100_000, + keep_period=10000, + save_interval=10000, + num_workers=8, + batch_size=16, # * 1 gpus + # batch_size=128, # * 8 gpus + skip_norm_stats=True, # * No norm stats used. + ), + TrainConfig( + name="ADVANTAGE_TORCH_PI06_FLATTEN_FOLD", + advantage_estimator=True, + model=pi0_config.AdvantageEstimatorConfig( + pi05=True, + loss_value_weight=1., + loss_action_weight=0., # No action loss in advantage estimator training + discrete_state_input=False, # Not using states into prompt like pi05 + ), + data=LerobotAgilexDataConfig( + # repo_id = "/cpfs01/shared/filtered_cut_data/short_sleeve/flatten_fold/v9-3/1022_20_590_v9-3_2000_lerobot", + repo_id = "Path/to/your/advantage/dataset", + assets=AssetsConfig( + assets_dir="Path/to/your/advantage/dataset/assets", + asset_id="Your_advantage_dataset_name", + ), + default_prompt="Flatten and fold the cloth.", + # * why removing "prompt" here will lead to an error in transforms.py + repack_transforms=_transforms.Group( + inputs=[ + _transforms.RepackTransform( + { + "images": { + "top_head": "observation.images.top_head", + "hand_left": "observation.images.hand_left", + "hand_right": "observation.images.hand_right", + }, + "state": "observation.state", + "actions": "action", + # "prompt": "prompt", # No need if default prompt is used. + "episode_length": "episode_length", + "frame_index": "frame_index", + "episode_index": "episode_index", + "progress_gt": "progress_gt", + "stage_progress_gt": "stage_progress_gt", + "progress": "progress", + # "is_suboptimal": "is_suboptimal", + } + ) + ] + ) + ), + pytorch_weight_path="Path/to/your/pi06_base/checkpoint", + num_train_steps=100_000, + keep_period=10000, + save_interval=10000, + num_workers=55, + # batch_size=16, # * 1 gpus + batch_size=18*8, # * 8 gpus + skip_norm_stats=True, # * No norm stats used. + ), + #************************advantage estimator*************************** + #**************************FlattenFold AWBC******************************* + TrainConfig( + name="pi05_flatten_fold_awbc", + model=pi0_config.Pi0Config(pi05=True), + data = LerobotAgilexDataConfig( + repo_id="/data/FlattenFold/advantage", + default_prompt="Flatten and fold the cloth.", + use_delta_joint_actions=False, + base_config=DataConfig(prompt_from_task=True), + ), + weight_loader=weight_loaders.CheckpointWeightLoader("/cpfs01/shared/checkpoint/pi05_base/params"), + num_train_steps=100_000, + keep_period=5000, + num_workers=8, + batch_size=256, + ), #**************************TeeShirtSort AWBC******************************* - + TrainConfig( + name="pi05_tee_shirt_sort_awbc", + model=pi0_config.Pi0Config(pi05=True), + data = LerobotAgilexDataConfig( + repo_id="/data/TeeShirtSort/advantage", + default_prompt="Fetch the clothes, fold the tee shirts and hand-over the collared shirts.", + use_delta_joint_actions=False, + base_config=DataConfig(prompt_from_task=True), + ), + weight_loader=weight_loaders.CheckpointWeightLoader(""), + num_train_steps=100_000, + keep_period=5000, + num_workers=8, + batch_size=256, + ), #**************************HangCloth AWBC******************************* - - - #************************Advantage Estimator*************************** - # TrainConfig( - # name="ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD", - # advantage_estimator=True, - # model=pi0_config.AdvantageEstimatorConfig( - # pi05=True, - # loss_value_weight=1., - # loss_action_weight=0., - # discrete_state_input=False, - # ), - # data=LerobotAgilexDataConfig( - # repo_id = "Path/to/your/advantage/dataset", - # assets=AssetsConfig( - # assets_dir="Path/to/your/advantage/dataset/assets", - # asset_id="Your_advantage_dataset_name", - # ), - # default_prompt="Flatten and fold the cloth.", - # # * why removing "prompt" here will lead to an error in transforms.py - # repack_transforms=_transforms.Group( - # inputs=[ - # _transforms.RepackTransform( - # { - # "images": { - # "top_head": "observation.images.top_head", - # "hand_left": "observation.images.hand_left", - # "hand_right": "observation.images.hand_right", - # "his_-100_top_head": "his_-100_observation.images.top_head", - # "his_-100_hand_left": "his_-100_observation.images.hand_left", - # "his_-100_hand_right": "his_-100_observation.images.hand_right", - # }, - # "state": "observation.state", - # "actions": "action", - # # "prompt": "prompt", # ! Not adding this for default prompt. - # "episode_length": "episode_length", - # "frame_index": "frame_index", - # "episode_index": "episode_index", - # "progress_gt": "progress_gt", - # "stage_progress_gt": "stage_progress_gt", - # "progress": "progress", - # # "is_suboptimal": "is_suboptimal", - # } - # ) - # ] - # ) - # ), - # pytorch_weight_path="Path/to/your/pi05_base/checkpoint", - # num_train_steps=100_000, - # keep_period=10000, - # save_interval=10000, - # num_workers=8, - # batch_size=16, # * 1 gpus - # # batch_size=128, # * 8 gpus - # skip_norm_stats=True, # * No norm stats used. - # ), - # TrainConfig( - # name="ADVANTAGE_TORCH_PI06_FLATTEN_FOLD", - # advantage_estimator=True, - # model=pi0_config.AdvantageEstimatorConfig( - # pi05=True, - # loss_value_weight=1., - # loss_action_weight=0., # No action loss in advantage estimator training - # discrete_state_input=False, # Not using states into prompt like pi05 - # ), - # data=LerobotAgilexDataConfig( - # # repo_id = "/cpfs01/shared/filtered_cut_data/short_sleeve/flatten_fold/v9-3/1022_20_590_v9-3_2000_lerobot", - # repo_id = "Path/to/your/advantage/dataset", - # assets=AssetsConfig( - # assets_dir="Path/to/your/advantage/dataset/assets", - # asset_id="Your_advantage_dataset_name", - # ), - # default_prompt="Flatten and fold the cloth.", - # # * why removing "prompt" here will lead to an error in transforms.py - # repack_transforms=_transforms.Group( - # inputs=[ - # _transforms.RepackTransform( - # { - # "images": { - # "top_head": "observation.images.top_head", - # "hand_left": "observation.images.hand_left", - # "hand_right": "observation.images.hand_right", - # }, - # "state": "observation.state", - # "actions": "action", - # # "prompt": "prompt", # No need if default prompt is used. - # "episode_length": "episode_length", - # "frame_index": "frame_index", - # "episode_index": "episode_index", - # "progress_gt": "progress_gt", - # "stage_progress_gt": "stage_progress_gt", - # "progress": "progress", - # # "is_suboptimal": "is_suboptimal", - # } - # ) - # ] - # ) - # ), - # pytorch_weight_path="Path/to/your/pi06_base/checkpoint", - # num_train_steps=100_000, - # keep_period=10000, - # save_interval=10000, - # num_workers=55, - # # batch_size=16, # * 1 gpus - # batch_size=18*8, # * 8 gpus - # skip_norm_stats=True, # * No norm stats used. - # ), - #************************advantage estimator*************************** + TrainConfig( + name="pi05_hang_cloth_awbc", + model=pi0_config.Pi0Config(pi05=True), + data = LerobotARXDataConfig( + repo_id="/data/HangCloth/advantage", + default_prompt="Fetch and hang the cloth.", + use_delta_joint_actions=False, + base_config=DataConfig(prompt_from_task=True), + ), + weight_loader=weight_loaders.CheckpointWeightLoader("/cpfs01/shared/checkpoint/pi05_base/params"), + num_train_steps=100_000, + keep_period=5000, + num_workers=8, + batch_size=256, + ), # RoboArena & PolaRiS configs. *roboarena_config.get_roboarena_configs(), *polaris_config.get_polaris_configs(), diff --git a/stage_advantage/README.md b/stage_advantage/README.md new file mode 100644 index 0000000..9d835b9 --- /dev/null +++ b/stage_advantage/README.md @@ -0,0 +1,340 @@ +# Stage Advantage Pipeline + +This module implements a pipeline for training an **Advantage Estimator** and using it in **Advantage-Weighted Behavior Cloning (AWBC)**. + +## Pipeline Overview + +``` + ┌──────────────────────────────────────────────────────────────────────────┐ + │ Stage 0: GT Labeling (annotation/gt_labeling.sh + gt_label.py) │ + │ Compute advantage (from progress or from Stage 2 output) → task_index │ + ├──────────────────────────────────────────────────────────────────────────┤ + │ Stage 1: Train Advantage Estimator (scripts/train_pytorch.py) │ + │ Fine-tune pi0 model to predict advantage from observations │ + ├──────────────────────────────────────────────────────────────────────────┤ + │ Stage 2: Advantage Estimation on New Data (annotation/eval.py) │ + │ Use trained estimator → parquets with data_PI06_* / data_KAI0_* │ + ├──────────────────────────────────────────────────────────────────────────┤ + │ Stage 3: AWBC Training (scripts/train.py pi05_*_awbc) │ + │ Train policy with advantage-weighted behavior cloning (prompt_from_task) │ + └──────────────────────────────────────────────────────────────────────────┘ +``` + +**End-to-end order for AWBC:** (1) Stage 0 on data with `progress` → optional for Stage 1. (2) Stage 1 → train estimator. (3) Stage 2 → run eval on your dataset so it gets `data_PI06_100000/` or `data_KAI0_100000/` with advantage columns. (4) Run Stage 0 again with `--advantage-source absolute_advantage` on that dataset (e.g. via `gt_labeling.sh` with `DATA_PATH` = the repo you ran eval on, and source subdirs `data_PI06_100000` / `data_KAI0_100000`). (5) Point AWBC config `repo_id` at the resulting advantage-labeled directory and run Stage 3 training. + +--- + +## Stage 0: GT Data Labeling + +**Goal**: Compute advantage values (from `progress` or from Stage 2’s `absolute_advantage`) and label each frame with a discretized `task_index`; write `meta/tasks.jsonl` (prompt strings per `task_index`). + +**Script**: `annotation/gt_labeling.sh` (calls `annotation/gt_label.py`) + +**For AWBC:** Run Stage 2 (eval) first so the dataset has `data_PI06_100000/` or `data_KAI0_100000/` with advantage columns. Then run Stage 0 with `--advantage-source absolute_advantage` on that output (e.g. set `gt_labeling.sh`’s `DATA_PATH` to the eval repo and use source subdirs `data_PI06_100000` / `data_KAI0_100000`; the script copies them into the target’s `data/` and runs `gt_label.py`). + +### How it works + +1. **Prepare dataset directory**: Copy/link the source (parquet + videos + meta) into a new working directory with standard LeRobot layout. For AWBC, the source parquets are the Stage 2 output (with `absolute_advantage`). +2. **Compute advantage**: For each frame `i`, the advantage is defined as: + ``` + advantage[i] = progress[i + chunk_size] - progress[i] + ``` + where `chunk_size` defaults to 50 frames. For frames near the end of an episode, a normalized extrapolation is used. +3. **Discretize into task_index**: Based on the advantage distribution across the entire dataset: + - **Binary mode** (`--discretion-type binary`): Frames in the top `threshold%` get `task_index=1`, the rest get `task_index=0`. + - **N-slices mode** (`--discretion-type n_slices`): Frames are divided into `n` equal-percentile bins, each assigned `task_index` from `0` to `n-1`. +4. **Stage-aware labeling** (`--stage-nums > 1`): Divides frames by their `stage_progress_gt` value into stages, then computes independent percentile boundaries per stage. +5. **Write back**: Updates `task_index` column in each parquet file and writes `meta/tasks.jsonl`. + +### Required Source Data Columns + +The source parquet files must contain these columns for the full pipeline to work: + +| Column | Required By | Description | +|---|---|---| +| `progress` / `absolute_advantage` / `relative_advantage` | `gt_label.py` | Used to compute advantage values | +| `stage_progress_gt` | `AdvantageLerobotDataset` | Stage progress ground truth (0-1), used for random timestep comparison | +| `progress_gt` | Training config repack_transforms | Progress ground truth, mapped as model input | +| `observation.state` | Training config | Robot state | +| `action` | Training config | Robot action sequence | +| `episode_index`, `frame_index` | LeRobot format | Standard metadata | + +### Usage + +```bash +cd stage_advantage/annotation + +# Example: binary labeling using absolute_advantage as the advantage source +python gt_label.py \ + --threshold 30 \ + --chunk-size 50 \ + --discretion-type binary \ + --advantage-source absolute_advantage + +# Example: 2-stage binary labeling +python gt_label.py \ + --threshold 30 \ + --chunk-size 50 \ + --discretion-type binary \ + --advantage-source absolute_advantage \ + --stage-nums 2 + +# Dry run (only print statistics, do not modify files) +python gt_label.py --dry-run +``` + +### Key Parameters + +| Parameter | Default | Description | +|---|---|---| +| `--threshold` | 70.0 | Top percentile for positive advantage (binary mode) | +| `--chunk-size` | 50 | Number of frames to look ahead for progress diff | +| `--discretion-type` | `binary` | `binary` or `n_slices` | +| `--n-slices` | 10 | Number of slices (only for `n_slices` mode) | +| `--advantage-source` | `progress` | `progress`, `absolute_advantage`, or `relative_advantage` | +| `--stage-nums` | 1 | Number of stages to divide data by `stage_progress_gt` | +| `--dry-run` | false | Only compute and print statistics without modifying files | + +See `gt_labeling.sh` for batch labeling examples across multiple dataset variants. + +--- + +## Stage 1: Train Advantage Estimator + +**Goal**: Fine-tune a pi0-based model to predict advantage values from observations (images + state), producing a learned Advantage Estimator. + +**Configs**: `ADVANTAGE_TORCH_PI06_FLATTEN_FOLD` or `ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD` (defined in `src/openpi/training/config.py`) + +### How it works + +1. The training uses `scripts/train_pytorch.py`, which supports single-GPU and multi-GPU (DDP) training via `torchrun`. +2. The model architecture is `AdvantageEstimator` (defined in `src/openpi/models_pytorch/pi0_pytorch.py`), initialized from a pre-trained pi0.5 checkpoint (`pytorch_weight_path`). +3. The model is trained to regress advantage/progress values: + - `loss_value_weight=1.0` (value prediction loss is active) + - `loss_action_weight=0.0` (action prediction loss is disabled) +4. `skip_norm_stats=True` since the advantage estimator does not require normalization statistics. +5. Data is loaded via `AdvantageLerobotDataset` which: + - Reads `task_index` to get the task prompt string + - Samples a random same-episode comparison frame (prefixed with `his_-100_`) + - Computes `progress = stage_progress_gt - his_-100_stage_progress_gt` as the regression target + +### Before Training + +1. **Complete Stage 0** to get a labeled dataset. +2. **Update config.py** with the correct paths: + +```python +# In src/openpi/training/config.py, find ADVANTAGE_TORCH_PI06_FLATTEN_FOLD or ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD: +TrainConfig( + name="ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD", # or ADVANTAGE_TORCH_PI06_FLATTEN_FOLD + data=LerobotAgilexDataConfig( + repo_id="", # <-- update this + assets=AssetsConfig( + assets_dir="/assets", # <-- update this + asset_id="", # <-- update this + ), + ), + pytorch_weight_path="", # <-- update this + ... +) +``` + +### Usage + +From the **repository root**, the core training command is: + +```bash +# Single GPU (KAI0 or PI06) +uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 +uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_PI06_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 + +# Multi-GPU (e.g. 8 GPUs on one node) +uv run torchrun --standalone --nproc_per_node=8 scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD \ + --exp_name=run1 --save_interval 10000 + +# Multi-node (e.g. 2 nodes × 8 GPUs): on master node set WORLD_SIZE=2, RANK=0, MASTER_ADDR, MASTER_PORT; +# on worker set RANK=1, then: +uv run torchrun --nnodes=2 --nproc_per_node=8 --node_rank=$RANK --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT \ + scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 + +# Resume from latest checkpoint +uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD --exp_name=run1 --resume +``` + +Logs and checkpoints go to `experiment//` and `experiment//log/.log`. Redirect to a log file if desired, e.g. `2>&1 | tee experiment/ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD/log/run1.log`. + +For a ready-to-use script with environment setup (conda/venv activation, DDP configuration) and automatic log management, see **`annotation/train_estimator.sh`**: + +```bash +RUNNAME=ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD RUNTIME=run1 bash stage_advantage/annotation/train_estimator.sh + +# Multi-GPU +RUNNAME=ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD RUNTIME=run1 NPROC_PER_NODE=8 bash stage_advantage/annotation/train_estimator.sh +``` + +The shell script handles output directory creation, log redirection (via `tee`), and multi-GPU/multi-node dispatch automatically. + +### Training Outputs + +``` +experiment/ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD/ # or ADVANTAGE_TORCH_PI06_FLATTEN_FOLD + ├── / + │ ├── 10000/ # checkpoint at step 10000 + │ │ ├── model.safetensors + │ │ ├── optimizer.pt + │ │ ├── metadata.pt + │ │ └── assets/ + │ ├── 20000/ # checkpoint at step 20000 + │ └── ... + └── log/ + └── .log +``` + +--- + +## Stage 2: Advantage Estimation on New Data + +**Goal**: Use the trained Advantage Estimator to label new/unseen datasets with predicted advantage values. + +**Script**: `annotation/eval.py` (uses `annotation/evaluator.py`) + +### How it works + +1. Loads a trained Advantage Estimator checkpoint (from Stage 1). +2. Iterates over all episodes in the target LeRobot dataset. +3. For each episode, reads video frames from three camera views (top_head, hand_left, hand_right). +4. Runs batched GPU inference with parallel data prefetching to predict per-frame advantage values. +5. Writes results as new parquet files with advantage columns appended: + - `relative_advantage`: Predicted progress difference between frame n and frame n+50 (2-timestep mode only). + - `absolute_value`: Predicted cumulative progress from the initial frame to frame n. + - `absolute_advantage`: Difference of absolute values between frame n+50 and frame n, clipped to [-1, 1]. + +### Model Variants + +| Variant | Description | +|---|---| +| `PI06` | Single-timestep (absolute value only) | +| `KAI0` | Two-timestep, stage-level progress (relative + absolute advantage) | + +### Before Evaluation + +1. **Complete Stage 1** to get a trained Advantage Estimator checkpoint. +2. **Update `MODELS_CONFIG_MAP`** in `eval.py` with the correct `ckpt_dir` and `ckpt_steps` for your trained model. + +### Usage + +From the **repository root**, the core evaluation command is: + +```bash +uv run python stage_advantage/annotation/eval.py +``` + +Examples: + +```bash +# KAI0 (two-timestep) on a dataset +uv run python stage_advantage/annotation/eval.py Flatten-Fold KAI0 /path/to/dataset + +# PI06 (single-timestep) +uv run python stage_advantage/annotation/eval.py Flatten-Fold PI06 /path/to/dataset +``` + +`` is a key in `eval.py`'s `MODELS_CONFIG_MAP` (e.g. `Flatten-Fold`); `` is `PI06` or `KAI0`; `` is the path to the LeRobot dataset. Results are written under `/data__/`. + +For a ready-to-use script with environment setup (conda/venv activation, environment variables) and status logging, see **`annotation/eval.sh`**: + +```bash +bash stage_advantage/annotation/eval.sh Flatten-Fold KAI0 /path/to/dataset +``` + +### Evaluation Outputs + +Results are saved alongside the original data directory: + +``` +/ + ├── data/ # Original data (unchanged) + │ chunk-000/ + │ episode_000000.parquet + │ ... + ├── data_KAI0_100000/ # New parquets with advantage columns (or data_PI06_100000) + │ chunk-000/ + │ episode_000000.parquet # = original + relative_advantage, absolute_value, absolute_advantage + │ ... + └── videos/ # Shared videos (unchanged) +``` + +The output parquets can then be used in Stage 3 (AWBC) or fed back into Stage 0 (`gt_label.py --advantage-source absolute_advantage`) for discretized labeling. + +--- + +## Stage 3: AWBC Training + +**Goal**: Train a policy using **Advantage-Weighted Behavior Cloning (AWBC)**. The advantage labels (from Stage 0 + Stage 2) are stored as `task_index` per frame and as prompt strings in `meta/tasks.jsonl`. By setting **`prompt_from_task=True`** in the data config, each sample’s prompt is taken from that mapping, so the policy is conditioned on the advantage-derived label (e.g. high vs low advantage) and effectively does advantage-weighted behavior cloning via the language channel. + +**Configs** (in `src/openpi/training/config.py`): `pi05_flatten_fold_awbc`, `pi05_tee_shirt_sort_awbc`, `pi05_hang_cloth_awbc`. Each uses `LerobotAgilexDataConfig` or `LerobotARXDataConfig` with `base_config=DataConfig(prompt_from_task=True)` and `repo_id` pointing to the **advantage** dataset (e.g. `.../data/FlattenFold/advantage`). + +### What the policy sees as prompt (training) + +The prompt is read from the dataset’s **`meta/tasks.jsonl`**: each frame’s `task_index` is mapped to a task string, and that string is passed to the policy as the language prompt. **`gt_label.py`** (Stage 0) writes these strings when it builds the advantage-labeled dataset. + +- **Binary mode** (typical): `task_index=0` → `", Advantage: negative"`, `task_index=1` → `", Advantage: positive"`. The `` text is set in `gt_label.py` (e.g. `"fold the cloth"` for FlattenFold). +- **n_slices mode**: `task_index=i` → `", Advantage: {i}"`. + +So during AWBC training the model is conditioned on prompts that explicitly include the advantage label (e.g. `"fold the cloth, Advantage: positive"` or `"fold the cloth, Advantage: negative"`). + +### Inference with an AWBC-trained model + +At **inference** time you must use the **same prompt format** as in training. To run the policy in the high-advantage regime, pass the **positive**-advantage prompt, e.g. `", Advantage: positive"` (with the same `` wording as in your `tasks.jsonl`). Using a different format or omitting the advantage part can hurt performance, since the model was trained to condition on this exact style of prompt. + +### How it works (data flow) + +1. **Data**: The advantage dataset must contain `task_index` in each parquet and `meta/tasks.jsonl` mapping `task_index` → prompt string. This is produced by running Stage 2 (eval) to get advantage columns, then Stage 0 (`gt_label.py --advantage-source absolute_advantage`) to discretize into `task_index` and write `tasks.jsonl`. +2. **Config**: `prompt_from_task=True` causes the data loader to wrap the dataset with `PromptFromLeRobotTask(dataset_meta.tasks)`, which sets `prompt = tasks[task_index]` for each sample. The repack transform includes `"prompt"` so the policy receives this text as conditioning. +3. **Training**: Standard JAX training via `scripts/train.py` with the AWBC config; the policy is trained with the task-derived prompt, so the language input carries the advantage weighting. + +### Before training + +1. **Produce the advantage dataset:** Run Stage 2 (eval) on your dataset so it has `data_PI06_100000/` or `data_KAI0_100000/`. Then run Stage 0 (e.g. `gt_labeling.sh`) with `DATA_PATH` = that repo and source subdirs `data_PI06_100000` / `data_KAI0_100000`; the script outputs a directory with `data/` (parquets with `task_index`), `meta/tasks.jsonl`, and `videos`. Use that directory as the advantage dataset (e.g. copy or link it to `./data/FlattenFold/advantage`). +2. In `config.py`, set **`repo_id`** to that advantage dataset path and **`weight_loader`** to your π₀.5 base checkpoint for the AWBC config(s) you use. +3. **Compute norm stats:** + `uv run python scripts/compute_norm_states_fast.py --config-name pi05_flatten_fold_awbc` + (and similarly for `pi05_tee_shirt_sort_awbc` / `pi05_hang_cloth_awbc` if needed.) + +### Usage + +From the repository root, the core training command is: + +```bash +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_flatten_fold_awbc --exp_name=run1 +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_tee_shirt_sort_awbc --exp_name=run1 +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_hang_cloth_awbc --exp_name=run1 +``` + +For a ready-to-use script with environment setup (venv activation, `XLA_PYTHON_CLIENT_MEM_FRACTION`, `WANDB_MODE`) and automatic log management, see **`awbc/train_awbc.sh`**: + +```bash +RUNNAME=pi05_flatten_fold_awbc RUNTIME=run1 bash stage_advantage/awbc/train_awbc.sh +``` + +The shell script handles output directory creation and log redirection (via `tee`) automatically. + +--- + +## Directory Structure + +``` +stage_advantage/ +├── README.md # This file +├── annotation/ # Stages 0–2: labeling & estimator training +│ ├── README.md +│ ├── gt_label.py # Core labeling script (progress → advantage → task_index) +│ ├── gt_labeling.sh # Batch labeling for PI06 / KAI0 variants +│ ├── train_estimator.sh # Shell script for Stage 1 training (env + DDP + logging) +│ ├── eval.py # Evaluate trained estimator on datasets +│ ├── eval.sh # Shell script for Stage 2 evaluation (env + logging) +│ └── evaluator.py # SimpleValueEvaluator: batched GPU inference +└── awbc/ # Stage 3: AWBC + ├── README.md + └── train_awbc.sh # Shell script for Stage 3 AWBC training (env + logging) +``` diff --git a/stage_advantage/annotation/README.md b/stage_advantage/annotation/README.md new file mode 100644 index 0000000..4fae7bf --- /dev/null +++ b/stage_advantage/annotation/README.md @@ -0,0 +1,36 @@ +## Annotation: Stage 0–2 (Labeling, Estimator Training, Eval) + +This directory contains **Stage 0** (GT labeling with `gt_label.py` / `gt_labeling.sh`), **Stage 1** (advantage estimator training via `scripts/train_pytorch.py`), and **Stage 2** (advantage estimation on new data via `eval.py`). All commands below assume you are at the **repository root** unless noted. Full pipeline and options are in the [parent README](../README.md). + +### Quick Start + +```bash +# Step 1: Label a dataset with advantage-based task_index (GT labels from progress) +# Edit DATA_PATH in gt_labeling.sh, then from repo root: +bash stage_advantage/annotation/gt_labeling.sh + +# Step 2: Train the Advantage Estimator (update config.py repo_id / pytorch_weight_path first) +# From repo root: +uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 +# Or: uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_PI06_FLATTEN_FOLD --exp_name=run1 --save_interval 10000 + +# Step 3: Evaluate the trained estimator on new data (PI06 or KAI0) +# From repo root: +uv run python stage_advantage/annotation/eval.py Flatten-Fold KAI0 /path/to/dataset + +# Step 4: Use the advantage-labeled data for AWBC (Stage 3) +# After Stage 2, run gt_labeling.sh with DATA_PATH = eval repo (or gt_label.py --advantage-source absolute_advantage). +# Then from repo root: +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_flatten_fold_awbc --exp_name=run1 +``` + +### File Descriptions + +| File | Stage | Description | +|---|---|---| +| `gt_label.py` | 0 | Core script: computes advantage from progress/absolute_advantage and assigns `task_index` to parquet frames | +| `gt_labeling.sh` | 0 | Batch labeling: prepares dataset dirs and runs `gt_label.py` (only .sh in this dir) | +| `eval.py` | 2 | Evaluates a trained estimator on a dataset, writing predicted advantages to new parquets | +| `evaluator.py` | 2 | `SimpleValueEvaluator`: batched GPU inference with parallel video loading and prefetching | + +For Stage 0 parameters, Stage 1 config fields, Stage 2 `MODELS_CONFIG_MAP`, and end-to-end AWBC order, see the [parent README](../README.md). diff --git a/stage_advantage/annotation/eval.py b/stage_advantage/annotation/eval.py new file mode 100644 index 0000000..89c5a70 --- /dev/null +++ b/stage_advantage/annotation/eval.py @@ -0,0 +1,228 @@ +""" +Evaluate a trained Advantage Estimator on a LeRobot dataset and write +predicted advantage values back into new parquet files. + +Expected dataset layout: + dataset_root/ + ├── data/ + │ chunk-000/ + │ episode_000000.parquet + │ episode_000001.parquet + │ ... + ├── videos/ + │ chunk-000/ + │ observation.images.hand_left/ + │ episode_000000.mp4 + │ observation.images.hand_right/ + │ episode_000000.mp4 + │ observation.images.top_head/ + │ episode_000000.mp4 + ├── meta/ + │ info.json + │ episodes.jsonl + │ tasks.jsonl + └── README.md + +Usage: + python eval.py + +Arguments: + model_type : Flatten-Fold / demo_A / demo_B + model_name : PI06 (single-timestep) / KAI0 (two-timestep stage-level) + repo_id : Path to the LeRobot dataset +""" +import os +import argparse +from evaluator import SimpleValueEvaluator +import pyarrow.parquet as pq +import pyarrow.compute as pc +from pathlib import Path +from typing import List, Dict +import pyarrow as pa +from tqdm import tqdm +import lerobot.common.datasets.lerobot_dataset as lerobot_dataset + +# Model configuration registry: maps (model_type, model_name) to checkpoint info. +# Only two variants: PI06 (single-timestep) and KAI0 (two-timestep stage-level). +# Update ckpt_dir / ckpt_steps to point to your trained Advantage Estimator checkpoints. +MODELS_CONFIG_MAP = { + 'Flatten-Fold': { + 'PI06': { + 'name': 'PI06', + 'config_name': 'ADVANTAGE_TORCH_PI06_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_PI06_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + 'KAI0': { + 'name': 'KAI0', + 'config_name': 'ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + }, + 'demo_A': { + 'PI06': { + 'name': 'PI06', + 'config_name': 'ADVANTAGE_TORCH_PI06_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_PI06_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + 'KAI0': { + 'name': 'KAI0', + 'config_name': 'ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + }, + 'demo_B': { + 'PI06': { + 'name': 'PI06', + 'config_name': 'ADVANTAGE_TORCH_PI06_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_PI06_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + 'KAI0': { + 'name': 'KAI0', + 'config_name': 'ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD', + 'ckpt_dir': 'experiment/ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD/run1', + 'ckpt_steps': 100000 + }, + }, +} + + +def parse_args(): + parser = argparse.ArgumentParser( + description='Evaluate a trained Advantage Estimator on a LeRobot dataset' + ) + parser.add_argument('model_type', type=str, choices=['Flatten-Fold', 'demo_A', 'demo_B'], + help='Model type: Flatten-Fold / demo_A / demo_B') + parser.add_argument('model_name', type=str, choices=['PI06', 'KAI0'], + help='Model variant: PI06 (single-timestep) / KAI0 (two-timestep stage-level)') + parser.add_argument('repo_id', type=str, + help='Path to the LeRobot dataset') + return parser.parse_args() + + +def edit_parquet_file(src_parquet: Path, output_path: Path, advantages_dict: Dict[str, list]): + """Read source parquet, append predicted advantage columns, and write to output_path.""" + table = pq.read_table(src_parquet) + advantages_table = pa.Table.from_pylist(advantages_dict) + + cols_to_add = ["relative_advantage", "absolute_value", "absolute_advantage"] + new_columns = {} + for col in cols_to_add: + if col not in table.column_names and col in advantages_table.column_names: + new_columns[col] = advantages_table[col] + if new_columns: + for name, column in new_columns.items(): + table = table.append_column(name, column) + + output_path.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(table, output_path) + + +def main(): + args = parse_args() + + model_type = args.model_type + model_name = args.model_name + repo_id = Path(args.repo_id) + + # Look up model configuration + if model_type not in MODELS_CONFIG_MAP: + raise ValueError(f"Unknown model_type: {model_type}, available: {list(MODELS_CONFIG_MAP.keys())}") + if model_name not in MODELS_CONFIG_MAP[model_type]: + raise ValueError(f"Unknown model_name: {model_name}, available: {list(MODELS_CONFIG_MAP[model_type].keys())}") + + models_config = [MODELS_CONFIG_MAP[model_type][model_name]] + + print(f"Model type: {model_type}") + print(f"Model name: {model_name}") + print(f"Using dataset at {repo_id}") + + # Interval for relative advantage computation (frames to look ahead) + relative_interval = 50 + + for model_cfg in models_config: + config_name = model_cfg['config_name'] + ckpt_dir = f"{model_cfg['ckpt_dir']}/{model_cfg['ckpt_steps']}" + is_1timestep = (model_cfg['name'] == 'PI06') + + # Initialize the evaluator + evaluator = SimpleValueEvaluator( + config_name=config_name, + ckpt_dir=ckpt_dir, + num_workers=64, # Parallel threads for video loading; adjust based on CPU cores + ) + + dataset_metadata = lerobot_dataset.LeRobotDatasetMetadata(repo_id=repo_id) + + for i in tqdm(range(dataset_metadata.total_episodes), desc="Evaluating episodes"): + parquet_file = repo_id / dataset_metadata.data_path.format( + episode_chunk=i // dataset_metadata.chunks_size, episode_index=i + ) + if not parquet_file.exists(): + print(f"Parquet file {parquet_file} not found, skipping") + continue + + # Resolve video paths for all three camera views + top_video = repo_id / dataset_metadata.video_path.format( + episode_chunk=i // dataset_metadata.chunks_size, episode_index=i, video_key='top_head' + ) + left_video = repo_id / dataset_metadata.video_path.format( + episode_chunk=i // dataset_metadata.chunks_size, episode_index=i, video_key='hand_left' + ) + right_video = repo_id / dataset_metadata.video_path.format( + episode_chunk=i // dataset_metadata.chunks_size, episode_index=i, video_key='hand_right' + ) + if not top_video.exists() or not left_video.exists() or not right_video.exists(): + print(f"Missing video file(s) for episode {i}, skipping") + continue + + video_paths = (top_video, left_video, right_video) + + # Read frame index range from parquet + frame_indices = pq.read_table(parquet_file)['frame_index'].to_pylist() + min_frame_index = frame_indices[0] + max_frame_index = frame_indices[-1] + + # Output path: data__/chunk-*/episode_*.parquet + output_path = repo_id / f"data_{model_cfg['name']}_{model_cfg['ckpt_steps']}" / parquet_file.relative_to(repo_id / "data") + if output_path.exists(): + print(f"Output {output_path} already exists, skipping") + continue + + # Run inference + if is_1timestep: + results = evaluator.evaluate_video_1timestep_advantage( + video_paths=video_paths, + prompt="Flatten and fold the cloth.", + batch_size=400, + frame_interval=1, # 1 = evaluate every frame + min_frame_index=min_frame_index, + max_frame_index=max_frame_index, + prefetch=True, + ) + else: + results = evaluator.evaluate_video_2timesteps_advantages( + video_paths=video_paths, + prompt="Flatten and fold the cloth.", + batch_size=160, + frame_interval=1, # 1 = evaluate every frame + relative_interval=relative_interval, + min_frame_index=min_frame_index, + max_frame_index=max_frame_index, + prefetch=True, + ) + + # Write results back as new parquet with advantage columns + edit_parquet_file( + src_parquet=parquet_file, + output_path=output_path, + advantages_dict=results, + ) + + +if __name__ == "__main__": + main() diff --git a/stage_advantage/annotation/eval.sh b/stage_advantage/annotation/eval.sh new file mode 100644 index 0000000..d5a7410 --- /dev/null +++ b/stage_advantage/annotation/eval.sh @@ -0,0 +1,70 @@ +#!/bin/bash +############################################################################### +# eval.sh +# +# Use a trained Advantage Estimator to label a dataset with predicted +# advantage values (relative_advantage, absolute_value, absolute_advantage). +# +# This script calls eval.py, which: +# 1. Loads a trained Advantage Estimator checkpoint +# 2. Iterates over all episodes in the LeRobot dataset +# 3. Reads video frames from three camera views (top, left, right) +# 4. Runs batched GPU inference to predict advantage values per frame +# 5. Writes results as new parquet files with advantage columns appended +# +# The output parquets are saved under: +# /data__/chunk-*/episode_*.parquet +# +# Prerequisites: +# - A trained Advantage Estimator checkpoint (from Stage 1) +# - Update MODELS_CONFIG_MAP in eval.py with the correct checkpoint paths +# +# Usage: +# bash eval.sh +# +# Examples: +# bash eval.sh Flatten-Fold KAI0 /path/to/dataset +# bash eval.sh Flatten-Fold PI06 /path/to/dataset +# +# Arguments: +# model_type : Flatten-Fold / demo_A / demo_B +# model_name : PI06 (single-timestep) / KAI0 (two-timestep stage-level) +# repo_id : Path to the LeRobot dataset to evaluate +############################################################################### +set -xe +set -o pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../../" && pwd)" +cd "${PROJECT_ROOT}" +echo "Project root: ${PROJECT_ROOT}" + +# ─── Conda / venv activation ───────────────────────────────────────────────── +source /cpfs01/shared/smch/miniconda3/etc/profile.d/conda.sh +conda activate uv_py311 +source .venv/bin/activate + +export TZ='Asia/Shanghai' + +# ─── Other environment variables ────────────────────────────────────────────── +export UV_DEFAULT_INDEX="https://mirrors.aliyun.com/pypi/simple/" +export WANDB_MODE=offline + +# ─── Parse arguments ───────────────────────────────────────────────────────── +MODEL_TYPE=${1:?"Usage: bash eval.sh "} +MODEL_NAME=${2:?"Usage: bash eval.sh "} +REPO_ID=${3:?"Usage: bash eval.sh "} + +echo "============================================================" +echo " Advantage Estimator Evaluation" +echo " Model type: ${MODEL_TYPE}" +echo " Model name: ${MODEL_NAME}" +echo " Dataset: ${REPO_ID}" +echo "============================================================" + +uv run python "${SCRIPT_DIR}/eval.py" "${MODEL_TYPE}" "${MODEL_NAME}" "${REPO_ID}" + +echo "============================================================" +echo " Evaluation complete!" +echo " Results saved under: ${REPO_ID}/data_${MODEL_NAME}_*/" +echo "============================================================" \ No newline at end of file diff --git a/stage_advantage/annotation/evaluator.py b/stage_advantage/annotation/evaluator.py new file mode 100644 index 0000000..74160dc --- /dev/null +++ b/stage_advantage/annotation/evaluator.py @@ -0,0 +1,694 @@ +""" +Advantage Estimator inference module. + +Provides SimpleValueEvaluator: a class that loads a trained Advantage Estimator +checkpoint, reads multi-view video frames, and outputs per-frame advantage +predictions via batched GPU inference with parallel data prefetching. +""" + +from __future__ import annotations + +import dataclasses +import os +import cv2 +import numpy as np +import logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +import torch +import safetensors.torch +from PIL import Image +from typing import List, Tuple, Dict, Any, Optional, Callable +from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading +from queue import Queue + +from openpi.training import config as _config +from openpi.shared import download +from openpi.models_pytorch.pi0_pytorch import PI0Pytorch_Custom as PI0Pytorch +import openpi.models.tokenizer as _tokenizer +from types import SimpleNamespace +from openpi.shared import image_tools + + +class SimpleValueEvaluator: + """Evaluator that runs inference and returns per-frame advantage predictions.""" + + def __init__(self, config_name: str, ckpt_dir: str, num_workers: int = 4): + """ + Args: + config_name: Training config name (must exist in config registry). + ckpt_dir: Path to the checkpoint directory containing model.safetensors. + num_workers: Number of parallel threads for video loading and image preprocessing. + """ + self.config_name = config_name + self.ckpt_dir = ckpt_dir + self.num_workers = num_workers + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + self._load_model() + self.tokenizer = _tokenizer.PaligemmaTokenizer(self.config.model.max_token_len) + self._executor = ThreadPoolExecutor(max_workers=num_workers) + + logging.info(f"Evaluator initialized on device: {self.device}, num_workers: {num_workers}") + + def __del__(self): + """Shut down the thread pool.""" + if hasattr(self, '_executor'): + self._executor.shutdown(wait=False) + + def shutdown(self): + """Explicitly shut down the thread pool.""" + if hasattr(self, '_executor'): + self._executor.shutdown(wait=True) + logging.info("Thread pool shut down") + + def _load_model(self): + """Load model config and weights from checkpoint.""" + self.config = _config.get_config(self.config_name) + checkpoint_dir = download.maybe_download(self.ckpt_dir) + + new_model = self.config.model.__class__(**{**self.config.model.__dict__}) + self.config = dataclasses.replace(self.config, model=new_model) + + self.model = PI0Pytorch(new_model).to(self.device) + self.model.eval() + model_path = os.path.join(checkpoint_dir, "model.safetensors") + logging.info(f"Loading model weights: {model_path}") + safetensors.torch.load_model(self.model, model_path, strict=True) + logging.info("Model loaded successfully") + + def _load_video_frames(self, video_path: str, frame_interval: int = 1) -> List[np.ndarray]: + """ + Load frames from a video file with optional interval sampling. + + Args: + video_path: Path to the video file. + frame_interval: Sampling interval (1 = every frame, 2 = every other frame, etc.). + + Returns: + List of RGB numpy arrays. + """ + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video file not found: {video_path}") + + cap = cv2.VideoCapture(video_path) + frames = [] + frame_count = 0 + + try: + while True: + ret, frame = cap.read() + if not ret: + break + if frame_count % frame_interval == 0: + # Convert BGR (OpenCV) to RGB + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + frames.append(frame_rgb) + frame_count += 1 + finally: + cap.release() + + logging.info( + f"Loaded {len(frames)} frames from {os.path.basename(video_path)} " + f"(total: {frame_count}, interval: {frame_interval})" + ) + return frames + + def _load_videos_parallel( + self, + video_paths: Tuple[str, str, str], + frame_interval: int = 1 + ) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: + """ + Load three video files in parallel. + + Args: + video_paths: Tuple of (top_video, left_video, right_video) paths. + frame_interval: Sampling interval. + + Returns: + Tuple of frame lists for (top, left, right) views. + """ + top_video_path, left_video_path, right_video_path = video_paths + + futures = { + self._executor.submit(self._load_video_frames, top_video_path, frame_interval): 'top', + self._executor.submit(self._load_video_frames, left_video_path, frame_interval): 'left', + self._executor.submit(self._load_video_frames, right_video_path, frame_interval): 'right' + } + + results = {} + for future in as_completed(futures): + video_type = futures[future] + results[video_type] = future.result() + + return results['top'], results['left'], results['right'] + + def _process_single_image(self, rgb_img: np.ndarray) -> torch.Tensor: + """ + Preprocess a single RGB image into a model-ready tensor. + + Args: + rgb_img: RGB numpy array (H, W, 3). + + Returns: + Tensor of shape (C, H, W), normalized to [-1, 1], resized to 224x224 with padding. + """ + tensor = torch.from_numpy(rgb_img).float() / 255.0 + tensor = tensor * 2.0 - 1.0 # Normalize to [-1, 1] + tensor = image_tools.resize_with_pad_torch(tensor, 224, 224) + tensor = tensor.permute(2, 0, 1) # HWC -> CHW + return tensor + + def _batch_numpy_to_tensor_parallel(self, np_images: List[np.ndarray]) -> torch.Tensor: + """ + Convert a list of RGB numpy images to a batched tensor in parallel. + + Args: + np_images: List of RGB numpy arrays. + + Returns: + Tensor of shape (batch_size, C, H, W). + """ + futures = [self._executor.submit(self._process_single_image, img) for img in np_images] + tensors = [future.result() for future in futures] + return torch.stack(tensors, dim=0) + + def _batch_numpy_to_tensor(self, np_images: List[np.ndarray]) -> torch.Tensor: + """ + Convert a list of RGB numpy images to a batched tensor (sequential). + + Args: + np_images: List of RGB numpy arrays. + + Returns: + Tensor of shape (batch_size, C, H, W). + """ + tensors = [] + for rgb_img in np_images: + tensor = torch.from_numpy(rgb_img).float() / 255.0 + tensor = tensor * 2.0 - 1.0 + tensor = image_tools.resize_with_pad_torch(tensor, 224, 224) + tensor = tensor.permute(2, 0, 1) + tensors.append(tensor) + return torch.stack(tensors, dim=0) + + def _prepare_batch_tensors( + self, + top_frames: List[np.ndarray], + left_frames: List[np.ndarray], + right_frames: List[np.ndarray], + batch_indices: List[int], + future_indices: List[int], + initial_tensors: Optional[Tuple[torch.Tensor, torch.Tensor, torch.Tensor]] = None + ) -> Dict[str, torch.Tensor]: + """ + Prepare a batch of tensors for current and future frames in parallel. + + Args: + top_frames, left_frames, right_frames: Frame lists for each camera view. + batch_indices: Indices of frames in the current batch. + future_indices: Corresponding future frame indices. + initial_tensors: Optional pre-computed initial frame tensors. + + Returns: + Dict with keys 'base_top', 'base_left', 'base_right', + 'future_top', 'future_left', 'future_right'. + """ + base_top_list = [top_frames[j] for j in batch_indices] + base_left_list = [left_frames[j] for j in batch_indices] + base_right_list = [right_frames[j] for j in batch_indices] + future_top_list = [top_frames[j] for j in future_indices] + future_left_list = [left_frames[j] for j in future_indices] + future_right_list = [right_frames[j] for j in future_indices] + + all_lists = [ + base_top_list, base_left_list, base_right_list, + future_top_list, future_left_list, future_right_list + ] + + futures = [ + self._executor.submit(self._batch_numpy_to_tensor_parallel, img_list) + for img_list in all_lists + ] + results = [f.result() for f in futures] + + return { + 'base_top': results[0], + 'base_left': results[1], + 'base_right': results[2], + 'future_top': results[3], + 'future_left': results[4], + 'future_right': results[5] + } + + def evaluate_video_2timesteps_advantages( + self, + video_paths: Tuple[str, str, str], + prompt: str, + batch_size: int = 8, + frame_interval: int = 1, + relative_interval: int = 50, + min_frame_index: int = None, + max_frame_index: int = None, + prefetch: bool = True + ) -> List[Dict[str, Any]]: + """ + Evaluate advantage using two-timestep comparison (relative + absolute). + + For each frame n, computes: + - relative_advantage: model(frame_n, frame_{n+relative_interval}) + - absolute_value: model(frame_0, frame_n) + - absolute_advantage: absolute_value[n+interval] - absolute_value[n] + + Args: + video_paths: Tuple of (top, left, right) video file paths. + prompt: Task prompt string. + batch_size: Batch size for GPU inference. + frame_interval: Sampling interval (1 = every frame). + relative_interval: Number of frames ahead for comparison (default: 50). + min_frame_index: Start frame index (inclusive). + max_frame_index: End frame index (inclusive). + prefetch: Whether to prefetch next batch during GPU inference. + + Returns: + List of dicts, each containing: frame_idx, future_frame_idx, + relative_advantage, absolute_value, absolute_advantage. + """ + if len(video_paths) != 3: + raise ValueError("Expected 3 video paths: (top, left, right)") + + # Load video frames in parallel + logging.info(f"Loading video frames in parallel (interval: {frame_interval}, workers: {self.num_workers})...") + top_frames, left_frames, right_frames = self._load_videos_parallel(video_paths, frame_interval) + + # Validate consistent frame counts + if len(left_frames) != len(top_frames) or len(right_frames) != len(top_frames): + raise ValueError( + f"Inconsistent frame counts: top={len(top_frames)}, " + f"left={len(left_frames)}, right={len(right_frames)}" + ) + + top_frames = top_frames[min_frame_index:max_frame_index+1] + left_frames = left_frames[min_frame_index:max_frame_index+1] + right_frames = right_frames[min_frame_index:max_frame_index+1] + num_frames = len(top_frames) + if num_frames < 2: + raise ValueError(f"Insufficient frames: {num_frames}, need at least 2") + + logging.info(f"Total frames after slicing: {num_frames}, relative_interval: {relative_interval}") + + # Preprocess the first frame as the initial reference (for absolute value) + initial_futures = [ + self._executor.submit(self._batch_numpy_to_tensor_parallel, [top_frames[0]]), + self._executor.submit(self._batch_numpy_to_tensor_parallel, [left_frames[0]]), + self._executor.submit(self._batch_numpy_to_tensor_parallel, [right_frames[0]]) + ] + initial_top_tensor = initial_futures[0].result().to(self.device) + initial_left_tensor = initial_futures[1].result().to(self.device) + initial_right_tensor = initial_futures[2].result().to(self.device) + + # Tokenize prompt + tokens, token_masks = self.tokenizer.tokenize(prompt, state=None) + + all_results = [] + + logging.info(f"Starting batched evaluation (n vs n+{relative_interval}, prefetch: {prefetch})...") + if num_frames <= 0: + raise ValueError(f"Insufficient frames ({num_frames}), need at least {relative_interval + 1}") + + max_frame_idx = num_frames - 1 + batch_starts = list(range(0, num_frames, batch_size)) + + def prepare_batch_data(batch_start: int) -> Tuple[Dict, List[int], int]: + """Prepare tensors for a single batch.""" + end_idx = min(batch_start + batch_size, num_frames) + current_batch_size = end_idx - batch_start + batch_indices = list(range(batch_start, end_idx)) + future_frame_indices = [min(j + relative_interval, max_frame_idx) for j in batch_indices] + + # Collect all images and convert in parallel + all_images = [] + for j in batch_indices: + all_images.extend([top_frames[j], left_frames[j], right_frames[j]]) + for fidx in future_frame_indices: + all_images.extend([top_frames[fidx], left_frames[fidx], right_frames[fidx]]) + + tensor_futures = [self._executor.submit(self._process_single_image, img) for img in all_images] + tensors = [f.result() for f in tensor_futures] + + n = current_batch_size + base_top = torch.stack(tensors[0:n*3:3], dim=0) + base_left = torch.stack(tensors[1:n*3:3], dim=0) + base_right = torch.stack(tensors[2:n*3:3], dim=0) + future_top = torch.stack(tensors[n*3::3], dim=0) + future_left = torch.stack(tensors[n*3+1::3], dim=0) + future_right = torch.stack(tensors[n*3+2::3], dim=0) + + return { + 'base_top': base_top, + 'base_left': base_left, + 'base_right': base_right, + 'future_top': future_top, + 'future_left': future_left, + 'future_right': future_right + }, future_frame_indices, current_batch_size + + # Prefetch the first batch + prefetch_future = None + if prefetch and len(batch_starts) > 1: + prefetch_future = self._executor.submit(prepare_batch_data, batch_starts[0]) + + for batch_idx, i in enumerate(tqdm(batch_starts, desc="Evaluating")): + # Get current batch data + if prefetch and prefetch_future is not None: + batch_tensors, future_frame_indices, current_batch_size = prefetch_future.result() + else: + batch_tensors, future_frame_indices, current_batch_size = prepare_batch_data(i) + + # Prefetch next batch while GPU is busy + if prefetch and batch_idx + 1 < len(batch_starts): + prefetch_future = self._executor.submit(prepare_batch_data, batch_starts[batch_idx + 1]) + + # Move to device + base_top_batch = batch_tensors['base_top'].to(self.device) + base_left_batch = batch_tensors['base_left'].to(self.device) + base_right_batch = batch_tensors['base_right'].to(self.device) + future_top_batch = batch_tensors['future_top'].to(self.device) + future_left_batch = batch_tensors['future_left'].to(self.device) + future_right_batch = batch_tensors['future_right'].to(self.device) + + # Expand initial frame tensors to batch size + initial_top_batch = initial_top_tensor.expand(current_batch_size, -1, -1, -1) + initial_left_batch = initial_left_tensor.expand(current_batch_size, -1, -1, -1) + initial_right_batch = initial_right_tensor.expand(current_batch_size, -1, -1, -1) + + # Build relative observation: compare frame_n (his_-100) vs frame_{n+interval} (base_0) + relative_observation = { + "state": torch.zeros((current_batch_size, 32), dtype=torch.float32).to(self.device), + "images": { + "base_-100_rgb": base_top_batch, + "left_wrist_-100_rgb": base_left_batch, + "right_wrist_-100_rgb": base_right_batch, + "base_0_rgb": future_top_batch, + "left_wrist_0_rgb": future_left_batch, + "right_wrist_0_rgb": future_right_batch, + }, + "image_masks": {} + } + + # Build absolute observation: compare frame_0 (his_-100) vs frame_n (base_0) + absolute_observation = { + "state": torch.zeros((current_batch_size, 32), dtype=torch.float32).to(self.device), + "images": { + "base_-100_rgb": initial_top_batch, + "left_wrist_-100_rgb": initial_left_batch, + "right_wrist_-100_rgb": initial_right_batch, + "base_0_rgb": base_top_batch, + "left_wrist_0_rgb": base_left_batch, + "right_wrist_0_rgb": base_right_batch, + }, + "image_masks": {} + } + + # Expand tokenized prompt to batch + tokens_batch = np.tile(tokens[np.newaxis, :], (current_batch_size, 1)) + token_masks_batch = np.tile(token_masks[np.newaxis, :], (current_batch_size, 1)) + + relative_observation = { + **relative_observation, + "tokenized_prompt": torch.from_numpy(tokens_batch).to(self.device), + "tokenized_prompt_mask": torch.from_numpy(token_masks_batch).to(self.device) + } + absolute_observation = { + **absolute_observation, + "tokenized_prompt": torch.from_numpy(tokens_batch).to(self.device), + "tokenized_prompt_mask": torch.from_numpy(token_masks_batch).to(self.device) + } + + relative_observation = SimpleNamespace(**relative_observation) + absolute_observation = SimpleNamespace(**absolute_observation) + + # Batched inference + with torch.no_grad(): + relative_val_arr = self.model.sample_values(self.device, relative_observation) # (batch, 1) + absolute_val_arr = self.model.sample_values(self.device, absolute_observation) # (batch, 1) + + # Collect per-frame results + for j in range(current_batch_size): + frame_idx = i + j + + # Normalize relative advantage when interval differs from expected + if future_frame_indices[j] - frame_idx == relative_interval: + relative_val = float(relative_val_arr[j, 0].item()) + elif future_frame_indices[j] == frame_idx: + relative_val = float(0) + else: + relative_val = float(relative_val_arr[j, 0].item()) / (future_frame_indices[j] - frame_idx) * relative_interval + + # First frame has zero absolute value by definition + if frame_idx == 0: + absolute_val = float(0) + else: + absolute_val = float(absolute_val_arr[j, 0].item()) + + result = { + "frame_idx": frame_idx, + "future_frame_idx": future_frame_indices[j], + "relative_advantage": relative_val, + "absolute_value": absolute_val + } + all_results.append(result) + + # Compute absolute_advantage from absolute_value differences + all_results_dict = {result["frame_idx"]: result for result in all_results} + for result in all_results: + frame_idx = result["frame_idx"] + future_frame_idx = result["future_frame_idx"] + future_result = all_results_dict.get(future_frame_idx) + if future_frame_idx == frame_idx: + result["absolute_advantage"] = 0.0 + elif future_frame_idx - frame_idx != relative_interval: + result["absolute_advantage"] = (future_result["absolute_value"] - result["absolute_value"]) / (future_frame_idx - frame_idx) * relative_interval + else: + result["absolute_advantage"] = future_result["absolute_value"] - result["absolute_value"] + + result["absolute_advantage"] = max(-1.0, min(1.0, result["absolute_advantage"])) + result["relative_advantage"] = max(-1.0, min(1.0, result["relative_advantage"])) + + logging.info(f"Evaluation complete, processed {len(all_results)} frames") + return all_results + + def evaluate_video_1timestep_advantage( + self, + video_paths: Tuple[str, str, str], + prompt: str, + batch_size: int = 8, + frame_interval: int = 1, + relative_interval: int = 50, + min_frame_index: int = None, + max_frame_index: int = None, + prefetch: bool = True + ) -> List[Dict[str, Any]]: + """ + Evaluate advantage using single-timestep mode (absolute value only). + + For each frame n, computes: + - absolute_value: model(frame_n) + - absolute_advantage: absolute_value[n+interval] - absolute_value[n] + + Args: + video_paths: Tuple of (top, left, right) video file paths. + prompt: Task prompt string. + batch_size: Batch size for GPU inference. + frame_interval: Sampling interval (1 = every frame). + relative_interval: Frames ahead for advantage computation (default: 50). + min_frame_index: Start frame index (inclusive). + max_frame_index: End frame index (inclusive). + prefetch: Whether to prefetch next batch during GPU inference. + + Returns: + List of dicts, each containing: frame_idx, future_frame_idx, + absolute_value, absolute_advantage. + """ + if len(video_paths) != 3: + raise ValueError("Expected 3 video paths: (top, left, right)") + + # Load video frames in parallel + logging.info(f"Loading video frames in parallel (interval: {frame_interval}, workers: {self.num_workers})...") + top_frames, left_frames, right_frames = self._load_videos_parallel(video_paths, frame_interval) + + # Validate consistent frame counts + if len(left_frames) != len(top_frames) or len(right_frames) != len(top_frames): + raise ValueError( + f"Inconsistent frame counts: top={len(top_frames)}, " + f"left={len(left_frames)}, right={len(right_frames)}" + ) + + top_frames = top_frames[min_frame_index:max_frame_index+1] + left_frames = left_frames[min_frame_index:max_frame_index+1] + right_frames = right_frames[min_frame_index:max_frame_index+1] + num_frames = len(top_frames) + if num_frames < 2: + raise ValueError(f"Insufficient frames: {num_frames}, need at least 2") + logging.info(f"Total frames after slicing: {num_frames}, relative_interval: {relative_interval}") + + # Tokenize prompt + tokens, token_masks = self.tokenizer.tokenize(prompt, state=None) + + all_results = [] + + logging.info(f"Starting batched evaluation (1-timestep mode, prefetch: {prefetch})...") + if num_frames <= 0: + raise ValueError(f"Insufficient frames ({num_frames}), need at least {relative_interval + 1}") + + max_frame_idx = num_frames - 1 + batch_starts = list(range(0, num_frames, batch_size)) + + def prepare_batch_data_1timestep(batch_start: int) -> Tuple[Dict, List[int], int]: + """Prepare tensors for a single batch (current frames only).""" + end_idx = min(batch_start + batch_size, num_frames) + current_batch_size = end_idx - batch_start + batch_indices = list(range(batch_start, end_idx)) + future_frame_indices = [min(j + relative_interval, max_frame_idx) for j in batch_indices] + + all_images = [] + for j in batch_indices: + all_images.extend([top_frames[j], left_frames[j], right_frames[j]]) + + tensor_futures = [self._executor.submit(self._process_single_image, img) for img in all_images] + tensors = [f.result() for f in tensor_futures] + + n = current_batch_size + base_top = torch.stack(tensors[0:n*3:3], dim=0) + base_left = torch.stack(tensors[1:n*3:3], dim=0) + base_right = torch.stack(tensors[2:n*3:3], dim=0) + + return { + 'base_top': base_top, + 'base_left': base_left, + 'base_right': base_right, + }, future_frame_indices, current_batch_size + + # Prefetch the first batch + prefetch_future = None + if prefetch and len(batch_starts) > 1: + prefetch_future = self._executor.submit(prepare_batch_data_1timestep, batch_starts[0]) + + for batch_idx, i in enumerate(tqdm(batch_starts, desc="Evaluating")): + if prefetch and prefetch_future is not None: + batch_tensors, future_frame_indices, current_batch_size = prefetch_future.result() + else: + batch_tensors, future_frame_indices, current_batch_size = prepare_batch_data_1timestep(i) + + # Prefetch next batch + if prefetch and batch_idx + 1 < len(batch_starts): + prefetch_future = self._executor.submit(prepare_batch_data_1timestep, batch_starts[batch_idx + 1]) + + # Move to device + base_top_batch = batch_tensors['base_top'].to(self.device) + base_left_batch = batch_tensors['base_left'].to(self.device) + base_right_batch = batch_tensors['base_right'].to(self.device) + + absolute_observation = { + "state": torch.zeros((current_batch_size, 32), dtype=torch.float32).to(self.device), + "images": { + "base_0_rgb": base_top_batch, + "left_wrist_0_rgb": base_left_batch, + "right_wrist_0_rgb": base_right_batch, + }, + "image_masks": {} + } + + # Expand tokenized prompt to batch + tokens_batch = np.tile(tokens[np.newaxis, :], (current_batch_size, 1)) + token_masks_batch = np.tile(token_masks[np.newaxis, :], (current_batch_size, 1)) + + absolute_observation = { + **absolute_observation, + "tokenized_prompt": torch.from_numpy(tokens_batch).to(self.device), + "tokenized_prompt_mask": torch.from_numpy(token_masks_batch).to(self.device) + } + + absolute_observation = SimpleNamespace(**absolute_observation) + + # Batched inference + with torch.no_grad(): + absolute_val_arr = self.model.sample_values(self.device, absolute_observation) # (batch, 1) + + for j in range(current_batch_size): + frame_idx = i + j + if frame_idx == 0: + absolute_val = float(0) + else: + absolute_val = float(absolute_val_arr[j, 0].item()) + + result = { + "frame_idx": frame_idx, + "future_frame_idx": future_frame_indices[j], + "absolute_value": absolute_val + } + all_results.append(result) + + # Compute absolute_advantage from absolute_value differences + all_results_dict = {result["frame_idx"]: result for result in all_results} + for result in all_results: + frame_idx = result["frame_idx"] + future_frame_idx = result["future_frame_idx"] + future_result = all_results_dict.get(future_frame_idx) + if future_frame_idx == frame_idx: + result["absolute_advantage"] = 0.0 + elif future_frame_idx - frame_idx != relative_interval: + result["absolute_advantage"] = (future_result["absolute_value"] - result["absolute_value"]) / (future_frame_idx - frame_idx) * relative_interval + else: + result["absolute_advantage"] = future_result["absolute_value"] - result["absolute_value"] + + result["absolute_advantage"] = max(-1.0, min(1.0, result["absolute_advantage"])) + + logging.info(f"Evaluation complete, processed {len(all_results)} frames") + return all_results + + +def main(): + """Example usage for quick testing.""" + config_name = "VALUE_TORCH_Pi05_KAI_cloth_11_15" + ckpt_dir = "/path/to/checkpoint/100000" + + video_root = "/path/to/test_videos" + top_video = os.path.join(video_root, "top_head.mp4") + left_video = os.path.join(video_root, "hand_left.mp4") + right_video = os.path.join(video_root, "hand_right.mp4") + + evaluator = SimpleValueEvaluator( + config_name=config_name, + ckpt_dir=ckpt_dir, + num_workers=48, + ) + + results = evaluator.evaluate_video_2timesteps_advantages( + video_paths=(top_video, left_video, right_video), + prompt="Flatten and fold the cloth.", + batch_size=8, + frame_interval=1, + prefetch=True, + ) + + print(f"\n=== Evaluation complete ===") + print(f"Total results: {len(results)}") + for res in results: + print( + f"frame {res['frame_idx']}, future {res['future_frame_idx']}: " + f"relative_adv={res['relative_advantage']:.4f}, " + f"absolute_adv={res['absolute_advantage']:.4f}, " + f"absolute_val={res['absolute_value']:.4f}" + ) + + evaluator.shutdown() + + +if __name__ == "__main__": + main() diff --git a/stage_advantage/annotation/gt_label.py b/stage_advantage/annotation/gt_label.py new file mode 100644 index 0000000..21e73a5 --- /dev/null +++ b/stage_advantage/annotation/gt_label.py @@ -0,0 +1,527 @@ +#!/usr/bin/env python3 +""" +# python label.py --threshold 30 --chunk-size 50 --discretion-type binary --advantage-source absolute_advantage --stage-nums 2 --dry-run +Script to modify task_index in parquet files based on progress rewards. + +This script: +1. Reads all parquet files from path/data/chunk-*/*.parquet +2. Calculates reward as: progress[i+50] - progress[i] for each frame +3. Computes reward distribution statistics across all parquets +4. Labels frames with task_index based on reward percentile threshold + Binary mode: + - task_index=0 for rewards in bottom (1-threshold)% + - task_index=1 for rewards in top threshold% + n_slices mode: + - task_index=0 to (n-1) based on reward percentiles (higher reward -> higher task_index) + - Each slice contains ~(100/n)% of frames + +Stage-based mode (--stage-nums > 1): + - Each frame is assigned to a stage based on its stage_progress_gt value + - Frames with stage_progress_gt in [i/stage_nums, (i+1)/stage_nums) belong to stage i + - Each stage has its own reward statistics and percentile boundaries + - task_index is assigned based on stage-specific percentiles +""" + +import argparse +import glob +import json +import os +from pathlib import Path +from typing import Dict, List, Tuple + +import numpy as np +import pandas as pd +import pyarrow.parquet as pq +from tqdm import tqdm + + +def calculate_rewards(data: pd.DataFrame, chunk_size: int = 50, advantage_source: str = "progress") -> np.ndarray: + """ + Calculate rewards based on progress differences. + + Args: + data: DataFrame containing 'progress' column + chunk_size: Number of frames to look ahead for progress calculation + + Returns: + Array of rewards for each frame + """ + n_frames = len(data) + rewards = np.zeros(n_frames, dtype=np.float32) + if advantage_source == "absolute_advantage": + absolute_advantage = data['absolute_advantage'].values + for i in range(n_frames): + rewards[i] = absolute_advantage[i] + elif advantage_source == "relative_advantage": + relative_advantage = data['relative_advantage'].values + for i in range(n_frames): + rewards[i] = relative_advantage[i] + elif advantage_source == "progress": + progress = data['progress'].values + for i in range(n_frames): + if i + chunk_size < n_frames: + rewards[i] = progress[i + chunk_size] - progress[i] + else: + # For frames near the end, use the last available frame + rewards[i] = (progress[-1] - progress[i]) / (len(progress) - i) * chunk_size + else: + raise ValueError(f"Unknown advantage source: {advantage_source}") + return rewards + + +def get_stage_index(stage_progress_gt: float, stage_nums: int) -> int: + """ + Get the stage index based on stage_progress_gt value. + + Args: + stage_progress_gt: The stage progress value (0-1) for a single frame + stage_nums: Number of stages to divide into + + Returns: + Stage index (0 to stage_nums-1) + """ + if stage_nums == 1: + return 0 + + step = 1.0 / stage_nums + stage_idx = int(stage_progress_gt / step) + # Handle edge case where stage_progress_gt == 1.0 + if stage_idx >= stage_nums: + stage_idx = stage_nums - 1 + return stage_idx + + +def collect_all_rewards(base_path: str, chunk_size: int = 50, advantage_source: str = "progress", + stage_nums: int = 1) -> Tuple[Dict[int, List[float]], List[str]]: + """ + Collect all rewards from all parquet files to compute statistics. + + Args: + base_path: Base directory path containing data/chunk-*/*.parquet files + chunk_size: Number of frames to look ahead for progress calculation + advantage_source: Source of advantage values + stage_nums: Number of stages to divide data into based on stage_progress_gt + + Returns: + Tuple of (rewards_by_stage, parquet_files) + rewards_by_stage: Dict mapping stage_index to list of rewards + """ + # Find all parquet files + pattern = os.path.join(base_path, "data", "chunk-*", "*.parquet") + parquet_files = sorted(glob.glob(pattern)) + + if not parquet_files: + raise ValueError(f"No parquet files found matching pattern: {pattern}") + + print(f"Found {len(parquet_files)} parquet files") + + # Initialize rewards by stage + rewards_by_stage = {i: [] for i in range(stage_nums)} + + # Collect rewards from all files + print("Collecting rewards from all files...") + for parquet_file in tqdm(parquet_files): + try: + # Read parquet file + df = pd.read_parquet(parquet_file) + + # Calculate rewards for all frames + rewards = calculate_rewards(df, chunk_size, advantage_source) + + if stage_nums == 1: + # No stage division, all rewards go to stage 0 + rewards_by_stage[0].extend(rewards.tolist()) + else: + # Divide rewards by stage based on each frame's stage_progress_gt + if 'stage_progress_gt' not in df.columns: + raise ValueError(f"Column 'stage_progress_gt' not found in {parquet_file}. " + f"Required when stage_nums > 1.") + + stage_progress_gt_values = df['stage_progress_gt'].values + # Each frame has its own stage_progress_gt, assign it to the corresponding stage + for frame_idx in range(len(rewards)): + spg = stage_progress_gt_values[frame_idx] + stage_idx = get_stage_index(spg, stage_nums) + rewards_by_stage[stage_idx].append(rewards[frame_idx]) + + except Exception as e: + print(f"Error processing {parquet_file}: {e}") + continue + + return rewards_by_stage, parquet_files + + +def compute_reward_statistics(rewards: List[float]) -> dict: + """ + Compute reward distribution statistics. + + Args: + rewards: List of all rewards + + Returns: + Dictionary containing percentile information + """ + if len(rewards) == 0: + return { + 'mean': 0.0, + 'std': 0.0, + 'min': 0.0, + 'max': 0.0, + 'percentiles': {p: 0.0 for p in range(0, 101, 10)} + } + + rewards_array = np.array(rewards) + + # Compute percentiles in 10% increments + percentiles = list(range(0, 101, 10)) + percentile_values = np.percentile(rewards_array, percentiles) + + stats = { + 'mean': np.mean(rewards_array), + 'std': np.std(rewards_array), + 'min': np.min(rewards_array), + 'max': np.max(rewards_array), + 'percentiles': dict(zip(percentiles, percentile_values)) + } + + return stats + + +def update_tasks_jsonl(base_path: str, discretion_type: str, n_slices: int = 10) -> None: + """ + Update the tasks.jsonl file based on discretization type. + + Args: + base_path: Base directory path containing meta/tasks.jsonl + discretion_type: Type of discretization ("binary" or "n_slices") + n_slices: Number of slices for n_slices mode + """ + tasks_file = os.path.join(base_path, "meta", "tasks.jsonl") + + # Ensure meta directory exists + meta_dir = os.path.join(base_path, "meta") + os.makedirs(meta_dir, exist_ok=True) + + tasks = [] + if discretion_type == "binary": + tasks = [ + {"task_index": 0, "task": "fold the cloth, Advantage: negative"}, + {"task_index": 1, "task": "fold the cloth, Advantage: positive"}, + ] + elif discretion_type == "n_slices": + for i in range(n_slices): + tasks.append({"task_index": i, "task": f"fold the cloth, Advantage: {i}"}) + + # Write tasks to jsonl file + with open(tasks_file, 'w') as f: + for task in tasks: + f.write(json.dumps(task) + '\n') + + print(f"\n✓ Updated {tasks_file} with {len(tasks)} task(s)") + + +def assign_task_index(parquet_file: str, threshold_percentile: float, + chunk_size: int = 50, discretion_type: str = "binary", + percentile_boundaries: List[float] = None, n_slices: int = 10, + advantage_source: str = "progress") -> None: + """ + Assign task_index to frames in a parquet file based on reward threshold. + (Used when stage_nums=1) + + Args: + parquet_file: Path to the parquet file + threshold_percentile: Percentile value for threshold (used in binary mode) + chunk_size: Number of frames to look ahead for progress calculation + discretion_type: Type of discretization ("binary" or "n_slices") + percentile_boundaries: List of percentile boundary values (used in n_slices mode) + n_slices: Number of slices for n_slices mode + """ + # Read parquet file + df = pd.read_parquet(parquet_file) + + # Calculate rewards + rewards = calculate_rewards(df, chunk_size, advantage_source) + + if discretion_type == "binary": + # Binary mode: task_index = 0 for rewards below threshold, 1 for >= threshold + task_index = (rewards >= threshold_percentile).astype(np.int32) + elif discretion_type == "n_slices": + # n-slices mode: task_index from 0 to (n_slices-1) based on percentile boundaries + task_index = np.zeros(len(rewards), dtype=np.int32) + for i in range(len(percentile_boundaries) - 1): + mask = (rewards >= percentile_boundaries[i]) & (rewards < percentile_boundaries[i + 1]) + task_index[mask] = i + # Handle the top slice + task_index[rewards >= percentile_boundaries[-1]] = n_slices - 1 + else: + raise ValueError(f"Unknown discretion_type: {discretion_type}") + + # Add or update task_index column + df['task_index'] = task_index + + # Save back to parquet file + df.to_parquet(parquet_file, index=False) + + +def assign_task_index_staged(parquet_file: str, + threshold_percentiles_by_stage: Dict[int, float], + percentile_boundaries_by_stage: Dict[int, List[float]], + chunk_size: int = 50, + discretion_type: str = "binary", + n_slices: int = 10, + advantage_source: str = "progress", + stage_nums: int = 1) -> None: + """ + Assign task_index to frames in a parquet file based on stage-specific thresholds. + Each frame's stage is determined by its own stage_progress_gt value. + + Args: + parquet_file: Path to the parquet file + threshold_percentiles_by_stage: Dict mapping stage_idx to threshold value (binary mode) + percentile_boundaries_by_stage: Dict mapping stage_idx to percentile boundaries (n_slices mode) + chunk_size: Number of frames to look ahead for progress calculation + discretion_type: Type of discretization ("binary" or "n_slices") + n_slices: Number of slices for n_slices mode + advantage_source: Source of advantage values + stage_nums: Number of stages + """ + # Read parquet file + df = pd.read_parquet(parquet_file) + + # Calculate rewards for all frames + rewards = calculate_rewards(df, chunk_size, advantage_source) + + # Get stage_progress_gt values for each frame + if 'stage_progress_gt' not in df.columns: + raise ValueError(f"Column 'stage_progress_gt' not found in {parquet_file}") + stage_progress_gt_values = df['stage_progress_gt'].values + + # Initialize task_index array + task_index = np.zeros(len(rewards), dtype=np.int32) + + # Assign task_index based on each frame's stage and stage-specific thresholds + for frame_idx in range(len(rewards)): + reward = rewards[frame_idx] + spg = stage_progress_gt_values[frame_idx] + stage_idx = get_stage_index(spg, stage_nums) + + if discretion_type == "binary": + threshold = threshold_percentiles_by_stage[stage_idx] + task_index[frame_idx] = 1 if reward >= threshold else 0 + elif discretion_type == "n_slices": + boundaries = percentile_boundaries_by_stage[stage_idx] + # Find the slice this reward belongs to + slice_idx = 0 + for j in range(len(boundaries) - 1): + if reward >= boundaries[j] and reward < boundaries[j + 1]: + slice_idx = j + break + # Handle the top slice + if reward >= boundaries[-1]: + slice_idx = n_slices - 1 + task_index[frame_idx] = slice_idx + + # Add or update task_index column + df['task_index'] = task_index + + # Save back to parquet file + df.to_parquet(parquet_file, index=False) + + +def main(): + parser = argparse.ArgumentParser( + description="Modify task_index in parquet files based on progress rewards" + ) + parser.add_argument( + "data_path", + type=str, + help="Base path containing data/chunk-*/*.parquet files" + ) + parser.add_argument( + "--threshold", + type=float, + default=70.0, + help="Threshold percentile for task_index labeling (default: 70, meaning top 70%% get task_index=1)" + ) + parser.add_argument( + "--chunk-size", + type=int, + default=50, + help="Number of frames to look ahead for progress calculation (default: 50)" + ) + parser.add_argument( + "--discretion-type", + type=str, + default="binary", + choices=["binary", "n_slices"], + help="Discretization type: 'binary' splits into 0/1, 'n_slices' splits into 0 to (n-1) (default: binary)" + ) + parser.add_argument( + "--n-slices", + type=int, + default=10, + help="Number of slices for n_slices mode (default: 10)" + ) + parser.add_argument( + "--advantage-source", + type=str, + default="progress", + choices=["progress", "absolute_advantage", "relative_advantage"] + ) + parser.add_argument( + "--stage-nums", + type=int, + default=1, + help="Number of stages to divide data based on each frame's stage_progress_gt. " + "1 means no division (original behavior). " + "2 means divide by 0.5 (frames with stage_progress_gt < 0.5 and >= 0.5). " + "3 means divide by 1/3 and 2/3, etc. " + "Each stage calculates its own reward percentiles independently. (default: 1)" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Only compute statistics without modifying files" + ) + + args = parser.parse_args() + + # Validate path + if not os.path.exists(args.data_path): + raise ValueError(f"Path does not exist: {args.data_path}") + + print(f"Processing data from: {args.data_path}") + print(f"Discretization type: {args.discretion_type}") + if args.discretion_type == "binary": + print(f"Threshold: {args.threshold}% (top {args.threshold}% will be task_index=1)") + elif args.discretion_type == "n_slices": + print(f"Number of slices: {args.n_slices}") + print(f"Progress offset: {args.chunk_size} frames") + print(f"Stage nums: {args.stage_nums}") + if args.stage_nums > 1: + step = 1.0 / args.stage_nums + boundaries = [step * i for i in range(args.stage_nums + 1)] + print(f"Stage boundaries (based on frame's stage_progress_gt): {boundaries}") + print("-" * 80) + + # Step 1: Collect all rewards (by stage if stage_nums > 1) + rewards_by_stage, parquet_files = collect_all_rewards( + args.data_path, args.chunk_size, args.advantage_source, args.stage_nums + ) + + total_frames = sum(len(rewards) for rewards in rewards_by_stage.values()) + print(f"\nTotal frames processed: {total_frames}") + + # Print frames per stage + if args.stage_nums > 1: + print("\nFrames per stage:") + for stage_idx in range(args.stage_nums): + stage_frames = len(rewards_by_stage[stage_idx]) + percentage = 100.0 * stage_frames / total_frames if total_frames > 0 else 0 + step = 1.0 / args.stage_nums + lower = step * stage_idx + upper = step * (stage_idx + 1) + print(f" Stage {stage_idx} (stage_progress_gt in [{lower:.3f}, {upper:.3f})): " + f"{stage_frames} frames ({percentage:.1f}%)") + + # Step 2: Compute statistics for each stage + stats_by_stage = {} + threshold_percentiles_by_stage = {} + percentile_boundaries_by_stage = {} + + for stage_idx in range(args.stage_nums): + print(f"\n{'=' * 80}") + if args.stage_nums > 1: + step = 1.0 / args.stage_nums + lower = step * stage_idx + upper = step * (stage_idx + 1) + print(f"STAGE {stage_idx} REWARD STATISTICS (stage_progress_gt in [{lower:.3f}, {upper:.3f}))") + else: + print("REWARD STATISTICS") + print("=" * 80) + + stage_rewards = rewards_by_stage[stage_idx] + stats = compute_reward_statistics(stage_rewards) + stats_by_stage[stage_idx] = stats + + print(f"Frames count: {len(stage_rewards)}") + print(f"Mean: {stats['mean']:.6f}") + print(f"Std: {stats['std']:.6f}") + print(f"Min: {stats['min']:.6f}") + print(f"Max: {stats['max']:.6f}") + print("\nPercentiles:") + for p, v in stats['percentiles'].items(): + print(f" {p:3d}%: {v:.6f}") + + # Calculate threshold/boundaries for this stage + if len(stage_rewards) > 0: + if args.discretion_type == "binary": + threshold_value = np.percentile(stage_rewards, (100 - args.threshold)) + threshold_percentiles_by_stage[stage_idx] = threshold_value + print(f"\nThreshold value (top {args.threshold}%): {threshold_value:.6f}") + elif args.discretion_type == "n_slices": + n_slices = args.n_slices + step_pct = 100 / n_slices + percentile_points = [step_pct * i for i in range(n_slices)] + boundaries = [np.percentile(stage_rewards, p) for p in percentile_points] + percentile_boundaries_by_stage[stage_idx] = boundaries + + print(f"\n{n_slices}-Slices Boundaries (higher reward -> higher task_index):") + for i in range(len(boundaries)): + if i < len(boundaries) - 1: + print(f" task_index={i}: reward in [{boundaries[i]:.6f}, {boundaries[i+1]:.6f})") + else: + print(f" task_index={i}: reward >= {boundaries[i]:.6f}") + else: + # Empty stage - use default values + if args.discretion_type == "binary": + threshold_percentiles_by_stage[stage_idx] = 0.0 + elif args.discretion_type == "n_slices": + percentile_boundaries_by_stage[stage_idx] = [0.0] * args.n_slices + print(f"\nWarning: Stage {stage_idx} has no data!") + + print("=" * 80) + + if args.dry_run: + print("\nDry run mode - no files will be modified") + return + + # Step 3: Update tasks.jsonl + update_tasks_jsonl(args.data_path, args.discretion_type, args.n_slices) + + # Step 4: Assign task_index to all parquet files + print(f"\nAssigning task_index to {len(parquet_files)} files...") + for parquet_file in tqdm(parquet_files): + try: + if args.stage_nums == 1: + # Use original function for backward compatibility + assign_task_index( + parquet_file, + threshold_percentiles_by_stage.get(0, 0.0), + args.chunk_size, + args.discretion_type, + percentile_boundaries_by_stage.get(0, None), + args.n_slices, + args.advantage_source + ) + else: + # Use staged function - each frame's stage determined by its stage_progress_gt + assign_task_index_staged( + parquet_file, + threshold_percentiles_by_stage, + percentile_boundaries_by_stage, + args.chunk_size, + args.discretion_type, + args.n_slices, + args.advantage_source, + args.stage_nums + ) + except Exception as e: + print(f"\nError processing {parquet_file}: {e}") + continue + + print("\n✓ Task completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/stage_advantage/annotation/gt_labeling.sh b/stage_advantage/annotation/gt_labeling.sh new file mode 100755 index 0000000..8fad65f --- /dev/null +++ b/stage_advantage/annotation/gt_labeling.sh @@ -0,0 +1,72 @@ +#!/bin/bash +############################################################################### +# Prepare advantage-labeled datasets for training the Advantage Estimator. +############################################################################### +set -xe +set -o pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# ─── Source dataset path (modify this to your dataset) ──────────────────────── +DATA_PATH="Path/to/your/dataset" + +# ─── Output base directory ──────────────────────────────────────────────────── +base_name=$(basename "$DATA_PATH") +dir_name=$(dirname "$DATA_PATH")/${base_name}_advantage_data + +# ─── Helper function: prepare dataset and run labeling ──────────────────────── +prepare_and_label() { + local data_subdir=$1 # source data subfolder name (e.g. data_PI06_100000 or data_KAI0_100000) + local output_name=$2 # output dataset name suffix + local extra_args=$3 # extra arguments for gt_label.py + local target_path="${dir_name}/${output_name}" + + echo "============================================================" + echo " Preparing: ${output_name}" + echo " Source: ${DATA_PATH}/${data_subdir}" + echo " Target: ${target_path}" + echo "============================================================" + + mkdir -p "${target_path}" + + # Symlink videos (shared, read-only) + ln -sfn "${DATA_PATH}/videos" "${target_path}/videos" + + # Copy norm_stats and meta (will be modified by gt_label.py) + cp -f "${DATA_PATH}/norm_stats.json" "${target_path}/norm_stats.json" + cp -rf "${DATA_PATH}/meta" "${target_path}/meta" + + # Copy data parquets into the standard "data" directory + if [ -d "${target_path}/data" ]; then + rm -rf "${target_path}/data" + fi + cp -r "${DATA_PATH}/${data_subdir}" "${target_path}/data" + + # Run gt_label.py to assign task_index and update tasks.jsonl + python "${SCRIPT_DIR}/gt_label.py" "${target_path}" \ + --threshold 30 \ + --chunk-size 50 \ + --discretion-type binary \ + --advantage-source absolute_advantage \ + ${extra_args} + + echo " Done: ${output_name}" + echo "" +} + +# ─── Dataset variants (only PI06 and KAI0) ───────────────────────────────────── +# Source subdirs must match Stage 2 (eval) output: data_PI06_100000 / data_KAI0_100000 +# PI06: single-timestep labeling (1 stage) +prepare_and_label "data_PI06_100000" "${base_name}_PI06_binary" "" + +# KAI0: two-stage, stage-level labeling +prepare_and_label "data_KAI0_100000" "${base_name}_KAI0_abs_binary" "--stage-nums 2" + +echo "============================================================" +echo " All datasets labeled successfully!" +echo "" +echo " Output directory: ${dir_name}" +echo "" +echo " Next step: set repo_id in config.py to the target dataset path," +echo " then run: uv run python scripts/train_pytorch.py ADVANTAGE_TORCH_* --exp_name=run1 --save_interval 10000" +echo "============================================================" diff --git a/stage_advantage/annotation/train_estimator.sh b/stage_advantage/annotation/train_estimator.sh new file mode 100644 index 0000000..cd4d07b --- /dev/null +++ b/stage_advantage/annotation/train_estimator.sh @@ -0,0 +1,71 @@ +#!/bin/bash +############################################################################### +# train_estimator.sh +########################################################### +set -xe +set -o pipefail + +# ─── Navigate to project root ──────────────────────────────────────────────── +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../../" && pwd)" +cd "${PROJECT_ROOT}" + +source .venv/bin/activate + +# ─── Training config name ──────────────────────────────────────────────────── +# RUNNAME must be one of: ADVANTAGE_TORCH_PI06_FLATTEN_FOLD, ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD +# Default to ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD if RUNNAME is not set +CFG=${RUNNAME:-ADVANTAGE_TORCH_KAI0_FLATTEN_FOLD} + +# ─── DDP environment variables ─────────────────────────────────────────────── +WORLD_SIZE=${WORLD_SIZE:-1} +MASTER_ADDR=${MASTER_ADDR:-127.0.0.1} +RANK=${RANK:-0} +NPROC_PER_NODE=${NPROC_PER_NODE:-1} +MASTER_PORT=${MASTER_PORT:-12345} + +# ─── Validate required environment variables ───────────────────────────────── +if [ -z "${RUNNAME+x}" ]; then + echo "[WARNING] RUNNAME is not set, using default: ${CFG}" + export RUNNAME=${CFG} +else + echo "RUNNAME is set to: ${RUNNAME}" +fi + +if [ -z "${RUNTIME+x}" ]; then + echo "[ERROR] RUNTIME is not set. Please set RUNTIME for experiment output directory." + echo " Example: RUNTIME=run1 bash train_estimator.sh" + exit 1 +else + echo "RUNTIME is set to: ${RUNTIME}" +fi + +# ─── Create output directories ─────────────────────────────────────────────── +OUTPUT_DIR="./experiment/${RUNNAME}" +LOG_OUTPUT_DIR="${OUTPUT_DIR}/log" +mkdir -p "${OUTPUT_DIR}" "${LOG_OUTPUT_DIR}" + +# Set to "offline" for offline logging; remove or set to "online" for cloud sync +export WANDB_MODE=${WANDB_MODE:-offline} + +if [ "${NPROC_PER_NODE}" -gt 1 ] || [ "${WORLD_SIZE}" -gt 1 ]; then + # Multi-GPU / Multi-Node training via torchrun + echo "Launching DDP training with torchrun..." + uv run torchrun \ + --nnodes=${WORLD_SIZE} \ + --nproc_per_node=${NPROC_PER_NODE} \ + --node_rank=${RANK} \ + --master_addr=${MASTER_ADDR} \ + --master_port=${MASTER_PORT} \ + scripts/train_pytorch.py ${CFG} \ + --exp_name=${RUNTIME} \ + --save_interval 10000 \ + 2>&1 | tee "${LOG_OUTPUT_DIR}/${RUNTIME}.log" +else + # Single-GPU training + echo "Launching single-GPU training..." + uv run python scripts/train_pytorch.py ${CFG} \ + --exp_name=${RUNTIME} \ + --save_interval 10000 \ + 2>&1 | tee "${LOG_OUTPUT_DIR}/${RUNTIME}.log" +fi diff --git a/stage_advantage/awbc/README.md b/stage_advantage/awbc/README.md new file mode 100644 index 0000000..427c78f --- /dev/null +++ b/stage_advantage/awbc/README.md @@ -0,0 +1,66 @@ +# Stage 3: AWBC (Advantage-Weighted Behavior Cloning) + +Train a policy on **advantage-labeled** data so that the prompt conditions the policy on the advantage bin (e.g. high vs low advantage). This is implemented by setting **`prompt_from_task=True`** in the data config: each sample’s `task_index` is mapped to a prompt string via `meta/tasks.jsonl`, and that prompt is fed to the policy as language conditioning. Full pipeline (Stage 0 → 1 → 2 → 0 → 3) is in the [parent README](../README.md). + +## Configs + +All three are defined in `src/openpi/training/config.py`: + +| Config name | Task | Data config | +|-------------|------|-------------| +| `pi05_flatten_fold_awbc` | FlattenFold | `LerobotAgilexDataConfig`, `repo_id=.../data/FlattenFold/advantage` | +| `pi05_tee_shirt_sort_awbc` | TeeShirtSort | `LerobotAgilexDataConfig`, `repo_id=.../data/TeeShirtSort/advantage` | +| `pi05_hang_cloth_awbc` | HangCloth | `LerobotARXDataConfig`, `repo_id=.../data/HangCloth/advantage` | + +Each uses `base_config=DataConfig(prompt_from_task=True)` so that the dataset’s `task_index` column and `meta/tasks.jsonl` supply the prompt (advantage-derived label) per frame. + +## Prerequisites + +1. **Advantage dataset** + The data must have `task_index` in each parquet and `meta/tasks.jsonl` (prompt strings per `task_index`). To build it: + - Run **Stage 2** (eval) on your dataset → get `data_PI06_100000/` or `data_KAI0_100000/` with advantage columns. + - Run **Stage 0** on that output: `gt_label.py --advantage-source absolute_advantage` (or `gt_labeling.sh` with `DATA_PATH` = the eval repo). The resulting directory (with `data/`, `meta/tasks.jsonl`, `videos/`) is your advantage dataset. + - Place or link it at e.g. `./data/FlattenFold/advantage` and set `repo_id` in config to that path. + +2. **Config paths** + In `src/openpi/training/config.py`, for the AWBC config(s) you use: + - Set **`repo_id`** to the **absolute path** of the advantage dataset (e.g. `/data/FlattenFold/advantage`). + - Set **`weight_loader`** to your **π₀.5 base checkpoint** path. + +3. **Norm stats** + From the repo root, run: + ```bash + uv run python scripts/compute_norm_states_fast.py --config-name pi05_flatten_fold_awbc + ``` + (Repeat for `pi05_tee_shirt_sort_awbc` or `pi05_hang_cloth_awbc` if you train those.) + +## Usage + +From the **repository root**, the core training command is: + +```bash +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_flatten_fold_awbc --exp_name=run1 +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_tee_shirt_sort_awbc --exp_name=run1 +XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_hang_cloth_awbc --exp_name=run1 +``` + +Checkpoints and logs are written under `experiment///` (e.g. `experiment/pi05_flatten_fold_awbc/run1/`). + +For a ready-to-use script with environment setup (venv activation, `XLA_PYTHON_CLIENT_MEM_FRACTION`, `WANDB_MODE`) and automatic log management, see **`train_awbc.sh`**: + +```bash +RUNNAME=pi05_flatten_fold_awbc RUNTIME=run1 bash stage_advantage/awbc/train_awbc.sh +``` + +The shell script handles output directory creation and log redirection (via `tee`) automatically. + +## Prompt format (training and inference) + +During **training**, the prompt is taken from **`meta/tasks.jsonl`**: each sample’s `task_index` is mapped to a string (written by `gt_label.py` when creating the advantage dataset). + +- **Binary mode**: `task_index=0` → `", Advantage: negative"`, `task_index=1` → `", Advantage: positive"` (e.g. `"fold the cloth, Advantage: positive"`). The `` text is defined in `annotation/gt_label.py`. +- **n_slices mode**: `task_index=i` → `", Advantage: {i}"`. + +At **inference**, use the **same format** so the model sees the conditioning it was trained on. To get high-advantage behavior, pass the **positive**-advantage prompt, e.g. `", Advantage: positive"` (with the same `` wording as in your `tasks.jsonl`). Using a different prompt format or omitting the advantage part can hurt performance. + + diff --git a/stage_advantage/awbc/train_awbc.sh b/stage_advantage/awbc/train_awbc.sh new file mode 100644 index 0000000..b083967 --- /dev/null +++ b/stage_advantage/awbc/train_awbc.sh @@ -0,0 +1,71 @@ +#!/bin/bash +############################################################################### +# train_awbc.sh +# +# Train a policy with Advantage-Weighted Behavior Cloning (AWBC) using +# advantage-estimator-labeled data. The data must have task_index per frame and +# meta/tasks.jsonl mapping task_index -> prompt string (from Stage 0 + Stage 2). +# +# Configs (see src/openpi/training/config.py): +# pi05_flatten_fold_awbc +# pi05_tee_shirt_sort_awbc +# pi05_hang_cloth_awbc +# +# Prerequisites: +# - Complete Stage 0 (GT labeling) and Stage 2 (advantage estimation on data), +# then run gt_label.py with --advantage-source absolute_advantage to produce +# the "advantage" dataset with task_index and tasks.jsonl. +# - Set repo_id in the AWBC config to the path of that dataset +# (e.g. /data/FlattenFold/advantage). +# - Run compute_norm_states_fast.py for the chosen config before training. +# - Set weight_loader in config to your π₀.5 base checkpoint. +# +# Usage: +# RUNNAME=pi05_flatten_fold_awbc RUNTIME=run1 bash stage_advantage/awbc/train_awbc.sh +############################################################################### +set -xe +set -o pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../../" && pwd)" +cd "${PROJECT_ROOT}" + +source .venv/bin/activate + +# ─── Training config name ──────────────────────────────────────────────────── +# RUNNAME must be one of: pi05_flatten_fold_awbc, pi05_tee_shirt_sort_awbc, pi05_hang_cloth_awbc +CFG=${RUNNAME:-pi05_flatten_fold_awbc} + +# ─── Validate required environment variables ───────────────────────────────── +if [ -z "${RUNNAME+x}" ]; then + echo "[WARNING] RUNNAME is not set, using default: ${CFG}" + export RUNNAME=${CFG} +else + echo "RUNNAME is set to: ${RUNNAME}" +fi + +if [ -z "${RUNTIME+x}" ]; then + echo "[ERROR] RUNTIME is not set. Please set RUNTIME for experiment output directory." + echo " Example: RUNTIME=run1 bash stage_advantage/awbc/train_awbc.sh" + exit 1 +else + echo "RUNTIME is set to: ${RUNTIME}" +fi + +# ─── Output directories ───────────────────────────────────────────────────── +OUTPUT_DIR="./experiment/${RUNNAME}" +LOG_OUTPUT_DIR="${OUTPUT_DIR}/log" +mkdir -p "${OUTPUT_DIR}" "${LOG_OUTPUT_DIR}" + +export WANDB_MODE=${WANDB_MODE:-offline} +export XLA_PYTHON_CLIENT_MEM_FRACTION=${XLA_PYTHON_CLIENT_MEM_FRACTION:-0.9} + +# ─── Launch JAX training ──────────────────────────────────────────────────── +echo "Launching AWBC training (JAX)..." +uv run scripts/train.py ${CFG} \ + --exp_name=${RUNTIME} \ + 2>&1 | tee "${LOG_OUTPUT_DIR}/${RUNTIME}.log" + +echo "============================================================" +echo " AWBC training finished. Checkpoints: ${OUTPUT_DIR}/${RUNTIME}/" +echo "============================================================"