feat(worker-datasets-raw): add redump command to re-extract block ranges#1815
feat(worker-datasets-raw): add redump command to re-extract block ranges#1815
Conversation
Operators can now submit redump requests via `ampctl dataset redump` to
force re-extraction of a block range after `ampctl verify` finds corruption.
- Add `redump_requests` table and metadata-db CRUD module
- Add `POST /datasets/{ns}/{name}/versions/{rev}/redump` admin-api endpoint
- Add `expand_to_segment_boundaries()` in `segments.rs` for per-table range alignment
- Integrate redump request processing into the raw dataset dump loop,
merging expanded ranges into each table's `missing_ranges` and deleting
completed requests after a successful dump
- Add `ampctl dataset redump` CLI command
Closes #1758
c354f3a to
7c0eb86
Compare
Fancy! Even with the manifest hash involved, identifying a physical operation by the manifest can be ambiguous if there are multiple copies of the table. We can keep the interface by name and hash, but when this hits the database we should have it resolved to a physical table revision id. |
| let RedumpRequest { | ||
| start_block, | ||
| end_block, | ||
| } = match json { | ||
| Ok(Json(request)) => request, | ||
| Err(err) => { | ||
| tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid request body"); | ||
| return Err(Error::InvalidBody(err).into()); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Can we add a validation start_block <= end_block ?
| let request_id = metadata_db::redump_requests::insert( | ||
| &ctx.metadata_db, | ||
| reference.namespace(), | ||
| reference.name(), | ||
| reference.hash(), | ||
| start_block, | ||
| end_block, | ||
| ) | ||
| .await | ||
| .map_err(|err| { | ||
| // Unique constraint violation -> duplicate request | ||
| if let metadata_db::Error::Database(ref db_err) = err | ||
| && let Some(pg_err) = db_err.as_database_error() | ||
| { | ||
| // PostgreSQL unique violation code: 23505 | ||
| if pg_err.code().as_deref() == Some("23505") { | ||
| return Error::DuplicateRequest { | ||
| start_block, | ||
| end_block, | ||
| }; | ||
| } | ||
| } | ||
| Error::Database(err) | ||
| })?; |
There was a problem hiding this comment.
We follow a pattern of doing metadata_db actions through DataStore
cc: @LNSD
There was a problem hiding this comment.
Yes, that's correct. We should avoid performing any raw metadata_db operations in the Admin API handlers.
I am not sure if this should go through the DataStore or the Scheduler. I am leaning towards the second, as this is "scheduling a redump of a segment".
| #[derive(Debug, Clone)] | ||
| pub struct RedumpRequest { | ||
| pub id: RequestId, | ||
| pub dataset_namespace: String, | ||
| pub dataset_name: String, | ||
| pub manifest_hash: String, | ||
| pub start_block: i64, | ||
| pub end_block: i64, | ||
| pub created_at: DateTime<Utc>, | ||
| } | ||
|
|
||
| impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for RedumpRequest { | ||
| fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> { | ||
| use sqlx::Row; | ||
| Ok(Self { | ||
| id: row.try_get("id")?, | ||
| dataset_namespace: row.try_get("dataset_namespace")?, | ||
| dataset_name: row.try_get("dataset_name")?, | ||
| manifest_hash: row.try_get("manifest_hash")?, | ||
| start_block: row.try_get("start_block")?, | ||
| end_block: row.try_get("end_block")?, | ||
| created_at: row.try_get("created_at")?, | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
| #[derive(Debug, Clone)] | |
| pub struct RedumpRequest { | |
| pub id: RequestId, | |
| pub dataset_namespace: String, | |
| pub dataset_name: String, | |
| pub manifest_hash: String, | |
| pub start_block: i64, | |
| pub end_block: i64, | |
| pub created_at: DateTime<Utc>, | |
| } | |
| impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for RedumpRequest { | |
| fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> { | |
| use sqlx::Row; | |
| Ok(Self { | |
| id: row.try_get("id")?, | |
| dataset_namespace: row.try_get("dataset_namespace")?, | |
| dataset_name: row.try_get("dataset_name")?, | |
| manifest_hash: row.try_get("manifest_hash")?, | |
| start_block: row.try_get("start_block")?, | |
| end_block: row.try_get("end_block")?, | |
| created_at: row.try_get("created_at")?, | |
| }) | |
| } | |
| } | |
| #[derive(Debug, Clone, sqlx::FromRow)] | |
| pub struct RedumpRequest { | |
| pub id: RequestId, | |
| pub dataset_namespace: DatasetNamespaceOwned, | |
| pub dataset_name: DatasetNameOwned, | |
| pub manifest_hash: ManifestHashOwned, | |
| pub start_block: i64, | |
| pub end_block: i64, | |
| pub created_at: DateTime<Utc>, | |
| } |
| /// | ||
| /// Returns [`Error`] for invalid paths/URLs, API errors (400/404/409/500), or network failures. | ||
| #[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, %dataset_ref, start_block, end_block))] | ||
| pub async fn run( |
There was a problem hiding this comment.
Can we place the run fn to top? (After Args)
There was a problem hiding this comment.
I am not convinced that adding a new table is the best option for this. I think defining a job kind would be a better fit for this. Basically, this functionality requires a raw dataset materialization job in RUNNING state to work.
There was a problem hiding this comment.
Based on the rationale above, I am leaning towards adding this endpoint to the /jobs resource APIs.
Operators can now submit redump requests via
ampctl dataset redumpto force re-extraction of a block range afterampctl verifyfinds corruption.redump_requeststable and metadata-db CRUD modulePOST /datasets/{ns}/{name}/versions/{rev}/redumpadmin-api endpointexpand_to_segment_boundaries()insegments.rsfor per-table range alignmentmissing_rangesand deleting completed requests after a successful dumpampctl dataset redumpCLI commandCloses #1758