Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/bin/ampctl/src/cmd/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod deploy;
pub mod inspect;
pub mod list;
pub mod manifest;
pub mod redump;
pub mod register;
pub mod restore;
pub mod versions;
Expand Down Expand Up @@ -54,6 +55,13 @@ pub enum Commands {
/// restoration.
#[command(after_help = include_str!("dataset/restore__after_help.md"))]
Restore(restore::Args),

/// Re-extract blocks in a given range for a dataset
///
/// Submits a redump request to re-extract blockchain data for the specified block range.
/// Use this when `ampctl verify` identifies corrupted data. If no active dump job exists for
/// the dataset, one is automatically started.
Redump(redump::Args),
}

/// Execute the dataset command with the given subcommand.
Expand All @@ -66,6 +74,7 @@ pub async fn run(command: Commands) -> anyhow::Result<()> {
Commands::Versions(args) => versions::run(args).await?,
Commands::Manifest(args) => manifest::run(args).await?,
Commands::Restore(args) => restore::run(args).await?,
Commands::Redump(args) => redump::run(args).await?,
}
Ok(())
}
132 changes: 132 additions & 0 deletions crates/bin/ampctl/src/cmd/dataset/redump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! Dataset redump command.
//!
//! Submits a request to re-extract blocks in a given range for a dataset.
//!
//! This is useful when `ampctl verify` identifies corrupted data and the operator
//! needs to re-extract specific block ranges.
//!
//! # Dataset Reference Format
//!
//! `namespace/name@version` (e.g., `graph/eth_mainnet@1.0.0`)
//!
//! # Configuration
//!
//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`)
//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`)

use datasets_common::reference::Reference;

use crate::{args::GlobalArgs, client::datasets::RedumpResponse};

/// Command-line arguments for the `dataset redump` command.
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
pub global: GlobalArgs,

/// The dataset reference in format: namespace/name@version
///
/// Examples: my_namespace/my_dataset@1.0.0, my_namespace/my_dataset@latest
#[arg(value_name = "REFERENCE", required = true, value_parser = clap::value_parser!(Reference))]
pub dataset_ref: Reference,

/// First block of the range to re-extract (inclusive)
#[arg(long, value_name = "BLOCK_NUMBER")]
pub start_block: u64,

/// Last block of the range to re-extract (inclusive)
#[arg(long, value_name = "BLOCK_NUMBER")]
pub end_block: u64,
}

/// Result of a dataset redump operation.
#[derive(serde::Serialize)]
struct RedumpResult {
request_id: i64,
job_id: Option<i64>,
}

impl std::fmt::Display for RedumpResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(
f,
"{} Redump request submitted (request_id: {})",
console::style("✓").green().bold(),
self.request_id,
)?;
match self.job_id {
Some(job_id) => writeln!(
f,
"{} New dump job scheduled (job_id: {})",
console::style("→").cyan(),
job_id,
)?,
None => writeln!(
f,
"{} Active dump job already running; it will pick up this request",
console::style("→").cyan(),
)?,
}
Ok(())
}
}

/// Submit a redump request for a dataset block range.
///
/// # Errors
///
/// Returns [`Error`] for invalid paths/URLs, API errors (400/404/409/500), or network failures.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, %dataset_ref, start_block, end_block))]
pub async fn run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we place the run fn to top? (After Args)

Args {
global,
dataset_ref,
start_block,
end_block,
}: Args,
) -> Result<(), Error> {
tracing::debug!(%dataset_ref, start_block, end_block, "Submitting redump request");

let response = submit_redump(&global, &dataset_ref, start_block, end_block).await?;
let result = RedumpResult {
request_id: response.request_id,
job_id: response.job_id,
};
global.print(&result).map_err(Error::JsonSerialization)?;

Ok(())
}

/// Submit a redump request via the admin API.
#[tracing::instrument(skip_all, fields(%dataset_ref, start_block, end_block))]
async fn submit_redump(
global: &GlobalArgs,
dataset_ref: &Reference,
start_block: u64,
end_block: u64,
) -> Result<RedumpResponse, Error> {
let client = global.build_client().map_err(Error::ClientBuild)?;
let response = client
.datasets()
.redump(dataset_ref, start_block, end_block)
.await
.map_err(Error::Redump)?;

Ok(response)
}

/// Errors for dataset redump operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to build client
#[error("failed to build admin API client")]
ClientBuild(#[source] crate::args::BuildClientError),

/// Redump error from the client
#[error("redump failed")]
Redump(#[source] crate::client::datasets::RedumpError),

/// Failed to serialize result to JSON
#[error("failed to serialize result to JSON")]
JsonSerialization(#[source] serde_json::Error),
}
168 changes: 168 additions & 0 deletions crates/clients/admin/src/datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ fn dataset_list_jobs(reference: &Reference) -> String {
)
}

/// Build URL path for submitting a redump request for a dataset version.
///
/// POST `/datasets/{namespace}/{name}/versions/{version}/redump`
fn dataset_redump(namespace: &Namespace, name: &Name, version: &Revision) -> String {
format!("datasets/{namespace}/{name}/versions/{version}/redump")
}

/// Client for dataset-related API operations.
///
/// Created via [`Client::datasets`](crate::client::Client::datasets).
Expand Down Expand Up @@ -437,6 +444,114 @@ impl<'a> DatasetsClient<'a> {
}
}

/// Submit a redump request for a dataset version.
///
/// POSTs to `/datasets/{namespace}/{name}/versions/{version}/redump` endpoint.
///
/// # Errors
///
/// Returns [`RedumpError`] for network errors, API errors (400/404/409/500),
/// or unexpected responses.
#[tracing::instrument(skip(self), fields(dataset_ref = %dataset_ref))]
pub async fn redump(
&self,
dataset_ref: &Reference,
start_block: u64,
end_block: u64,
) -> Result<RedumpResponse, RedumpError> {
let namespace = dataset_ref.namespace();
let name = dataset_ref.name();
let version = dataset_ref.revision();

let url = self
.client
.base_url()
.join(&dataset_redump(namespace, name, version))
.expect("valid URL");

tracing::debug!("Sending dataset redump request");

let body = serde_json::json!({
"start_block": start_block,
"end_block": end_block,
});

let response = self
.client
.http()
.post(url.as_str())
.json(&body)
.send()
.await
.map_err(|err| RedumpError::Network {
url: url.to_string(),
source: err,
})?;

let status = response.status();
tracing::debug!(status = %status, "Received API response");

match status.as_u16() {
200 | 202 => {
let redump_response =
response.json::<RedumpResponse>().await.map_err(|err| {
tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse success response");
RedumpError::UnexpectedResponse {
status: status.as_u16(),
message: format!("Failed to parse response: {}", err),
}
})?;

tracing::debug!(
request_id = redump_response.request_id,
job_id = ?redump_response.job_id,
"Redump request created successfully"
);
Ok(redump_response)
}
400 | 404 | 409 | 500 => {
let text = response.text().await.map_err(|err| {
tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to read error response");
RedumpError::UnexpectedResponse {
status: status.as_u16(),
message: format!("Failed to read error response: {}", err),
}
})?;

let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| {
tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse error response");
RedumpError::UnexpectedResponse {
status: status.as_u16(),
message: text.clone(),
}
})?;

match error_response.error_code.as_str() {
"INVALID_PATH" => Err(RedumpError::InvalidPath(error_response.into())),
"INVALID_BODY" => Err(RedumpError::InvalidBody(error_response.into())),
"DATASET_NOT_FOUND" => Err(RedumpError::DatasetNotFound(error_response.into())),
"DUPLICATE_REQUEST" => {
Err(RedumpError::DuplicateRequest(error_response.into()))
}
_ => Err(RedumpError::UnexpectedResponse {
status: status.as_u16(),
message: text,
}),
}
}
_ => {
let text = response
.text()
.await
.unwrap_or_else(|_| String::from("Failed to read response body"));
Err(RedumpError::UnexpectedResponse {
status: status.as_u16(),
message: text,
})
}
}
}

/// List all registered datasets.
///
/// GETs from `/datasets` endpoint.
Expand Down Expand Up @@ -1892,3 +2007,56 @@ pub enum NodeSelectorParseError {
#[error("invalid glob pattern")]
InvalidGlob(#[source] InvalidGlobError),
}

/// Response from redump operation.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct RedumpResponse {
/// The ID of the created redump request
pub request_id: i64,
/// The ID of the scheduled dump job, if a new job was started.
/// `null` if an active job already exists for this dataset.
pub job_id: Option<i64>,
}

/// Errors that can occur when creating a redump request.
#[derive(Debug, thiserror::Error)]
pub enum RedumpError {
/// Invalid path parameters (400, INVALID_PATH)
///
/// This occurs when:
/// - The namespace, name, or revision in the URL path is invalid
/// - Path parameter parsing fails
#[error("invalid path")]
InvalidPath(#[source] ApiError),

/// Invalid request body (400, INVALID_BODY)
///
/// This occurs when:
/// - The request body is not valid JSON
/// - Required fields are missing or have invalid types
#[error("invalid body")]
InvalidBody(#[source] ApiError),

/// Dataset or revision not found (404, DATASET_NOT_FOUND)
///
/// This occurs when:
/// - The specified dataset name doesn't exist in the namespace
/// - The specified revision doesn't exist for this dataset
#[error("dataset not found")]
DatasetNotFound(#[source] ApiError),

/// A redump request for this block range already exists (409, DUPLICATE_REQUEST)
///
/// This occurs when:
/// - A pending redump request already exists for the same dataset and block range
#[error("duplicate redump request")]
DuplicateRequest(#[source] ApiError),

/// Network or connection error
#[error("network error connecting to {url}")]
Network { url: String, source: reqwest::Error },

/// Unexpected response from API
#[error("unexpected response (status {status}): {message}")]
UnexpectedResponse { status: u16, message: String },
}
Loading