Skip to content

Comments

feat(worker-datasets-raw): add redump command to re-extract block ranges#1815

Open
Theodus wants to merge 2 commits intomainfrom
theodus/redump
Open

feat(worker-datasets-raw): add redump command to re-extract block ranges#1815
Theodus wants to merge 2 commits intomainfrom
theodus/redump

Conversation

@Theodus
Copy link
Member

@Theodus Theodus commented Feb 19, 2026

Operators can now submit redump requests via ampctl dataset redump to force re-extraction of a block range after ampctl verify finds corruption.

  • Add redump_requests table and metadata-db CRUD module
  • Add POST /datasets/{ns}/{name}/versions/{rev}/redump admin-api endpoint
  • Add expand_to_segment_boundaries() in segments.rs for per-table range alignment
  • Integrate redump request processing into the raw dataset dump loop, merging expanded ranges into each table's missing_ranges and deleting completed requests after a successful dump
  • Add ampctl dataset redump CLI command

Closes #1758

Operators can now submit redump requests via `ampctl dataset redump` to
force re-extraction of a block range after `ampctl verify` finds corruption.
- Add `redump_requests` table and metadata-db CRUD module
- Add `POST /datasets/{ns}/{name}/versions/{rev}/redump` admin-api endpoint
- Add `expand_to_segment_boundaries()` in `segments.rs` for per-table range alignment
- Integrate redump request processing into the raw dataset dump loop,
  merging expanded ranges into each table's `missing_ranges` and deleting
  completed requests after a successful dump
- Add `ampctl dataset redump` CLI command

Closes #1758
@leoyvens
Copy link
Collaborator

Integrate redump request processing into the raw dataset dump loop, merging expanded ranges into each table's missing_ranges and deleting completed requests after a successful dump

Fancy!

Even with the manifest hash involved, identifying a physical operation by the manifest can be ambiguous if there are multiple copies of the table. We can keep the interface by name and hash, but when this hits the database we should have it resolved to a physical table revision id.

Comment on lines +94 to +103
let RedumpRequest {
start_block,
end_block,
} = match json {
Ok(Json(request)) => request,
Err(err) => {
tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid request body");
return Err(Error::InvalidBody(err).into());
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a validation start_block <= end_block ?

Comment on lines +138 to +161
let request_id = metadata_db::redump_requests::insert(
&ctx.metadata_db,
reference.namespace(),
reference.name(),
reference.hash(),
start_block,
end_block,
)
.await
.map_err(|err| {
// Unique constraint violation -> duplicate request
if let metadata_db::Error::Database(ref db_err) = err
&& let Some(pg_err) = db_err.as_database_error()
{
// PostgreSQL unique violation code: 23505
if pg_err.code().as_deref() == Some("23505") {
return Error::DuplicateRequest {
start_block,
end_block,
};
}
}
Error::Database(err)
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow a pattern of doing metadata_db actions through DataStore
cc: @LNSD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. We should avoid performing any raw metadata_db operations in the Admin API handlers.

I am not sure if this should go through the DataStore or the Scheduler. I am leaning towards the second, as this is "scheduling a redump of a segment".

Comment on lines +12 to +36
#[derive(Debug, Clone)]
pub struct RedumpRequest {
pub id: RequestId,
pub dataset_namespace: String,
pub dataset_name: String,
pub manifest_hash: String,
pub start_block: i64,
pub end_block: i64,
pub created_at: DateTime<Utc>,
}

impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for RedumpRequest {
fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
use sqlx::Row;
Ok(Self {
id: row.try_get("id")?,
dataset_namespace: row.try_get("dataset_namespace")?,
dataset_name: row.try_get("dataset_name")?,
manifest_hash: row.try_get("manifest_hash")?,
start_block: row.try_get("start_block")?,
end_block: row.try_get("end_block")?,
created_at: row.try_get("created_at")?,
})
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Debug, Clone)]
pub struct RedumpRequest {
pub id: RequestId,
pub dataset_namespace: String,
pub dataset_name: String,
pub manifest_hash: String,
pub start_block: i64,
pub end_block: i64,
pub created_at: DateTime<Utc>,
}
impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for RedumpRequest {
fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
use sqlx::Row;
Ok(Self {
id: row.try_get("id")?,
dataset_namespace: row.try_get("dataset_namespace")?,
dataset_name: row.try_get("dataset_name")?,
manifest_hash: row.try_get("manifest_hash")?,
start_block: row.try_get("start_block")?,
end_block: row.try_get("end_block")?,
created_at: row.try_get("created_at")?,
})
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct RedumpRequest {
pub id: RequestId,
pub dataset_namespace: DatasetNamespaceOwned,
pub dataset_name: DatasetNameOwned,
pub manifest_hash: ManifestHashOwned,
pub start_block: i64,
pub end_block: i64,
pub created_at: DateTime<Utc>,
}

///
/// Returns [`Error`] for invalid paths/URLs, API errors (400/404/409/500), or network failures.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, %dataset_ref, start_block, end_block))]
pub async fn run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we place the run fn to top? (After Args)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced that adding a new table is the best option for this. I think defining a job kind would be a better fit for this. Basically, this functionality requires a raw dataset materialization job in RUNNING state to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the rationale above, I am leaning towards adding this endpoint to the /jobs resource APIs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(ampctl): add support for re-dumping existing segments

4 participants