From 95ebc9b38882070d722e6fe7bd7b5d04801464bf Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 19 Feb 2026 08:56:05 -0700 Subject: [PATCH 1/2] feat(worker-datasets-raw): add redump command to re-extract block ranges Operators can now submit redump requests via `ampctl dataset redump` to force re-extraction of a block range after `ampctl verify` finds corruption. - Add `redump_requests` table and metadata-db CRUD module - Add `POST /datasets/{ns}/{name}/versions/{rev}/redump` admin-api endpoint - Add `expand_to_segment_boundaries()` in `segments.rs` for per-table range alignment - Integrate redump request processing into the raw dataset dump loop, merging expanded ranges into each table's `missing_ranges` and deleting completed requests after a successful dump - Add `ampctl dataset redump` CLI command Closes #1758 --- crates/bin/ampctl/src/cmd/dataset.rs | 9 + crates/bin/ampctl/src/cmd/dataset/redump.rs | 132 ++++++++ crates/clients/admin/src/datasets.rs | 168 ++++++++++ crates/core/common/src/metadata/segments.rs | 104 ++++++ ...219083754_create_redump_requests_table.sql | 29 ++ crates/core/metadata-db/src/lib.rs | 2 + .../core/metadata-db/src/redump_requests.rs | 95 ++++++ .../src/redump_requests/request_id.rs | 98 ++++++ .../metadata-db/src/redump_requests/sql.rs | 126 +++++++ .../core/worker-datasets-raw/src/dataset.rs | 105 +++++- .../admin-api/src/handlers/datasets.rs | 1 + .../admin-api/src/handlers/datasets/redump.rs | 315 ++++++++++++++++++ crates/services/admin-api/src/lib.rs | 7 + 13 files changed, 1188 insertions(+), 3 deletions(-) create mode 100644 crates/bin/ampctl/src/cmd/dataset/redump.rs create mode 100644 crates/core/metadata-db/migrations/20260219083754_create_redump_requests_table.sql create mode 100644 crates/core/metadata-db/src/redump_requests.rs create mode 100644 crates/core/metadata-db/src/redump_requests/request_id.rs create mode 100644 crates/core/metadata-db/src/redump_requests/sql.rs create mode 100644 crates/services/admin-api/src/handlers/datasets/redump.rs diff --git a/crates/bin/ampctl/src/cmd/dataset.rs b/crates/bin/ampctl/src/cmd/dataset.rs index 320de3084..b888424f5 100644 --- a/crates/bin/ampctl/src/cmd/dataset.rs +++ b/crates/bin/ampctl/src/cmd/dataset.rs @@ -4,6 +4,7 @@ pub mod deploy; pub mod inspect; pub mod list; pub mod manifest; +pub mod redump; pub mod register; pub mod restore; pub mod versions; @@ -54,6 +55,13 @@ pub enum Commands { /// restoration. #[command(after_help = include_str!("dataset/restore__after_help.md"))] Restore(restore::Args), + + /// Re-extract blocks in a given range for a dataset + /// + /// Submits a redump request to re-extract blockchain data for the specified block range. + /// Use this when `ampctl verify` identifies corrupted data. If no active dump job exists for + /// the dataset, one is automatically started. + Redump(redump::Args), } /// Execute the dataset command with the given subcommand. @@ -66,6 +74,7 @@ pub async fn run(command: Commands) -> anyhow::Result<()> { Commands::Versions(args) => versions::run(args).await?, Commands::Manifest(args) => manifest::run(args).await?, Commands::Restore(args) => restore::run(args).await?, + Commands::Redump(args) => redump::run(args).await?, } Ok(()) } diff --git a/crates/bin/ampctl/src/cmd/dataset/redump.rs b/crates/bin/ampctl/src/cmd/dataset/redump.rs new file mode 100644 index 000000000..3247e9673 --- /dev/null +++ b/crates/bin/ampctl/src/cmd/dataset/redump.rs @@ -0,0 +1,132 @@ +//! Dataset redump command. +//! +//! Submits a request to re-extract blocks in a given range for a dataset. +//! +//! This is useful when `ampctl verify` identifies corrupted data and the operator +//! needs to re-extract specific block ranges. +//! +//! # Dataset Reference Format +//! +//! `namespace/name@version` (e.g., `graph/eth_mainnet@1.0.0`) +//! +//! # Configuration +//! +//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`) +//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`) + +use datasets_common::reference::Reference; + +use crate::{args::GlobalArgs, client::datasets::RedumpResponse}; + +/// Command-line arguments for the `dataset redump` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The dataset reference in format: namespace/name@version + /// + /// Examples: my_namespace/my_dataset@1.0.0, my_namespace/my_dataset@latest + #[arg(value_name = "REFERENCE", required = true, value_parser = clap::value_parser!(Reference))] + pub dataset_ref: Reference, + + /// First block of the range to re-extract (inclusive) + #[arg(long, value_name = "BLOCK_NUMBER")] + pub start_block: u64, + + /// Last block of the range to re-extract (inclusive) + #[arg(long, value_name = "BLOCK_NUMBER")] + pub end_block: u64, +} + +/// Result of a dataset redump operation. +#[derive(serde::Serialize)] +struct RedumpResult { + request_id: i64, + job_id: Option, +} + +impl std::fmt::Display for RedumpResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} Redump request submitted (request_id: {})", + console::style("✓").green().bold(), + self.request_id, + )?; + match self.job_id { + Some(job_id) => writeln!( + f, + "{} New dump job scheduled (job_id: {})", + console::style("→").cyan(), + job_id, + )?, + None => writeln!( + f, + "{} Active dump job already running; it will pick up this request", + console::style("→").cyan(), + )?, + } + Ok(()) + } +} + +/// Submit a redump request for a dataset block range. +/// +/// # Errors +/// +/// Returns [`Error`] for invalid paths/URLs, API errors (400/404/409/500), or network failures. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, %dataset_ref, start_block, end_block))] +pub async fn run( + Args { + global, + dataset_ref, + start_block, + end_block, + }: Args, +) -> Result<(), Error> { + tracing::debug!(%dataset_ref, start_block, end_block, "Submitting redump request"); + + let response = submit_redump(&global, &dataset_ref, start_block, end_block).await?; + let result = RedumpResult { + request_id: response.request_id, + job_id: response.job_id, + }; + global.print(&result).map_err(Error::JsonSerialization)?; + + Ok(()) +} + +/// Submit a redump request via the admin API. +#[tracing::instrument(skip_all, fields(%dataset_ref, start_block, end_block))] +async fn submit_redump( + global: &GlobalArgs, + dataset_ref: &Reference, + start_block: u64, + end_block: u64, +) -> Result { + let client = global.build_client().map_err(Error::ClientBuild)?; + let response = client + .datasets() + .redump(dataset_ref, start_block, end_block) + .await + .map_err(Error::Redump)?; + + Ok(response) +} + +/// Errors for dataset redump operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuild(#[source] crate::args::BuildClientError), + + /// Redump error from the client + #[error("redump failed")] + Redump(#[source] crate::client::datasets::RedumpError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index 97b7ef1c4..e174170dd 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -105,6 +105,13 @@ fn dataset_list_jobs(reference: &Reference) -> String { ) } +/// Build URL path for submitting a redump request for a dataset version. +/// +/// POST `/datasets/{namespace}/{name}/versions/{version}/redump` +fn dataset_redump(namespace: &Namespace, name: &Name, version: &Revision) -> String { + format!("datasets/{namespace}/{name}/versions/{version}/redump") +} + /// Client for dataset-related API operations. /// /// Created via [`Client::datasets`](crate::client::Client::datasets). @@ -437,6 +444,114 @@ impl<'a> DatasetsClient<'a> { } } + /// Submit a redump request for a dataset version. + /// + /// POSTs to `/datasets/{namespace}/{name}/versions/{version}/redump` endpoint. + /// + /// # Errors + /// + /// Returns [`RedumpError`] for network errors, API errors (400/404/409/500), + /// or unexpected responses. + #[tracing::instrument(skip(self), fields(dataset_ref = %dataset_ref))] + pub async fn redump( + &self, + dataset_ref: &Reference, + start_block: u64, + end_block: u64, + ) -> Result { + let namespace = dataset_ref.namespace(); + let name = dataset_ref.name(); + let version = dataset_ref.revision(); + + let url = self + .client + .base_url() + .join(&dataset_redump(namespace, name, version)) + .expect("valid URL"); + + tracing::debug!("Sending dataset redump request"); + + let body = serde_json::json!({ + "start_block": start_block, + "end_block": end_block, + }); + + let response = self + .client + .http() + .post(url.as_str()) + .json(&body) + .send() + .await + .map_err(|err| RedumpError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "Received API response"); + + match status.as_u16() { + 200 | 202 => { + let redump_response = + response.json::().await.map_err(|err| { + tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse success response"); + RedumpError::UnexpectedResponse { + status: status.as_u16(), + message: format!("Failed to parse response: {}", err), + } + })?; + + tracing::debug!( + request_id = redump_response.request_id, + job_id = ?redump_response.job_id, + "Redump request created successfully" + ); + Ok(redump_response) + } + 400 | 404 | 409 | 500 => { + let text = response.text().await.map_err(|err| { + tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to read error response"); + RedumpError::UnexpectedResponse { + status: status.as_u16(), + message: format!("Failed to read error response: {}", err), + } + })?; + + let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| { + tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse error response"); + RedumpError::UnexpectedResponse { + status: status.as_u16(), + message: text.clone(), + } + })?; + + match error_response.error_code.as_str() { + "INVALID_PATH" => Err(RedumpError::InvalidPath(error_response.into())), + "INVALID_BODY" => Err(RedumpError::InvalidBody(error_response.into())), + "DATASET_NOT_FOUND" => Err(RedumpError::DatasetNotFound(error_response.into())), + "DUPLICATE_REQUEST" => { + Err(RedumpError::DuplicateRequest(error_response.into())) + } + _ => Err(RedumpError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }), + } + } + _ => { + let text = response + .text() + .await + .unwrap_or_else(|_| String::from("Failed to read response body")); + Err(RedumpError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }) + } + } + } + /// List all registered datasets. /// /// GETs from `/datasets` endpoint. @@ -1892,3 +2007,56 @@ pub enum NodeSelectorParseError { #[error("invalid glob pattern")] InvalidGlob(#[source] InvalidGlobError), } + +/// Response from redump operation. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct RedumpResponse { + /// The ID of the created redump request + pub request_id: i64, + /// The ID of the scheduled dump job, if a new job was started. + /// `null` if an active job already exists for this dataset. + pub job_id: Option, +} + +/// Errors that can occur when creating a redump request. +#[derive(Debug, thiserror::Error)] +pub enum RedumpError { + /// Invalid path parameters (400, INVALID_PATH) + /// + /// This occurs when: + /// - The namespace, name, or revision in the URL path is invalid + /// - Path parameter parsing fails + #[error("invalid path")] + InvalidPath(#[source] ApiError), + + /// Invalid request body (400, INVALID_BODY) + /// + /// This occurs when: + /// - The request body is not valid JSON + /// - Required fields are missing or have invalid types + #[error("invalid body")] + InvalidBody(#[source] ApiError), + + /// Dataset or revision not found (404, DATASET_NOT_FOUND) + /// + /// This occurs when: + /// - The specified dataset name doesn't exist in the namespace + /// - The specified revision doesn't exist for this dataset + #[error("dataset not found")] + DatasetNotFound(#[source] ApiError), + + /// A redump request for this block range already exists (409, DUPLICATE_REQUEST) + /// + /// This occurs when: + /// - A pending redump request already exists for the same dataset and block range + #[error("duplicate redump request")] + DuplicateRequest(#[source] ApiError), + + /// Network or connection error + #[error("network error connecting to {url}")] + Network { url: String, source: reqwest::Error }, + + /// Unexpected response from API + #[error("unexpected response (status {status}): {message}")] + UnexpectedResponse { status: u16, message: String }, +} diff --git a/crates/core/common/src/metadata/segments.rs b/crates/core/common/src/metadata/segments.rs index 46a86ae54..c7519f014 100644 --- a/crates/core/common/src/metadata/segments.rs +++ b/crates/core/common/src/metadata/segments.rs @@ -311,6 +311,62 @@ pub fn missing_ranges( merge_ranges(missing) } +/// Expands a block range to include full segment boundaries. +/// +/// Given a desired range and existing segments, returns the smallest range that: +/// 1. Contains all blocks in the desired range +/// 2. Is aligned to segment boundaries (includes full segments that overlap) +/// +/// This is used for redump operations to ensure we re-extract complete segments +/// rather than partial ranges, maintaining segment integrity. +/// +/// This function is designed for single-network segments (raw datasets). All segments must +/// have exactly one network range. +/// +/// ```text +/// ┌───────────────────────────────────────────────┐ +/// segments: │ 00-99 │ 100-199 │ 200-299 │ 300-399 │ 400-499 │ +/// └───────────────────────────────────────────────┘ +/// ┌───────────┐ +/// desired: │ 250-350 │ +/// └───────────┘ +/// ┌────────────────────┐ +/// result: │ 200-399 │ +/// └────────────────────┘ +/// ``` +/// +/// The desired range 250-350 overlaps with segments 200-299 and 300-399, +/// so the expanded range is 200-399. +/// +/// # Panics +/// +/// Panics if any segment has more than one block range (i.e., not a single-network raw dataset +/// segment). +pub fn expand_to_segment_boundaries( + segments: &[Segment], + desired: RangeInclusive, +) -> RangeInclusive { + // Invariant: this function only works for single-network segments (raw datasets) + if let Some(first_segment) = segments.first() { + assert_eq!(first_segment.ranges.len(), 1); + } + + let mut start = *desired.start(); + let mut end = *desired.end(); + + for segment in segments { + assert_eq!(segment.ranges.len(), 1); + let seg_range = &segment.ranges[0].numbers; + // If segment overlaps with desired range, expand to include it + if seg_range.end() >= desired.start() && seg_range.start() <= desired.end() { + start = start.min(*seg_range.start()); + end = end.max(*seg_range.end()); + } + } + + start..=end +} + #[derive(Debug, PartialEq, Eq)] struct Chains { /// See `canonical_segments`. @@ -979,4 +1035,52 @@ mod test { vec![1..=5, 7..=10] ); } + + #[test] + fn expand_to_segment_boundaries() { + fn expand( + ranges: &[RangeInclusive], + desired: RangeInclusive, + ) -> RangeInclusive { + let segments = ranges + .iter() + .enumerate() + .map(|(i, range)| test_segment(range.clone(), (i as u8, i as u8), 0)) + .collect::>(); + super::expand_to_segment_boundaries(&segments, desired) + } + + // empty segments - return desired as-is + assert_eq!(expand(&[], 10..=20), 10..=20); + + // single segment fully containing desired + assert_eq!(expand(&[0..=100], 10..=20), 0..=100); + + // single segment overlapping start + assert_eq!(expand(&[0..=15], 10..=20), 0..=20); + + // single segment overlapping end + assert_eq!(expand(&[15..=30], 10..=20), 10..=30); + + // multiple segments spanning desired range + assert_eq!(expand(&[0..=99, 100..=199, 200..=299], 50..=150), 0..=199); + + // non-overlapping segment should not expand + assert_eq!(expand(&[0..=50, 200..=300], 100..=150), 100..=150); + + // partial overlap on both ends + assert_eq!( + expand(&[0..=99, 100..=199, 200..=299, 300..=399], 150..=350), + 100..=399 + ); + + // exact match + assert_eq!(expand(&[100..=200], 100..=200), 100..=200); + + // desired fully outside any segment + assert_eq!(expand(&[0..=50, 100..=150], 60..=90), 60..=90); + + // adjacent segment boundary (no overlap) + assert_eq!(expand(&[0..=99, 100..=199], 100..=150), 100..=199); + } } diff --git a/crates/core/metadata-db/migrations/20260219083754_create_redump_requests_table.sql b/crates/core/metadata-db/migrations/20260219083754_create_redump_requests_table.sql new file mode 100644 index 000000000..be62804a2 --- /dev/null +++ b/crates/core/metadata-db/migrations/20260219083754_create_redump_requests_table.sql @@ -0,0 +1,29 @@ +-- Create redump_requests table for tracking block range re-extraction requests +-- These requests are picked up by active dump jobs to re-extract corrupted segments + +CREATE TABLE IF NOT EXISTS redump_requests ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + + -- Dataset identification (matches job descriptor) + dataset_namespace TEXT NOT NULL, + dataset_name TEXT NOT NULL, + manifest_hash TEXT NOT NULL, + + -- Requested block range (before segment expansion) + start_block BIGINT NOT NULL, + end_block BIGINT NOT NULL, + + -- Metadata + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + + -- Validate block range + CONSTRAINT valid_block_range CHECK (start_block <= end_block) +); + +-- Index for efficient lookup by dataset +CREATE INDEX IF NOT EXISTS idx_redump_requests_dataset +ON redump_requests (dataset_namespace, dataset_name, manifest_hash); + +-- Unique constraint to prevent duplicate requests for same dataset and range +CREATE UNIQUE INDEX IF NOT EXISTS idx_redump_requests_unique +ON redump_requests (dataset_namespace, dataset_name, manifest_hash, start_block, end_block); diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 7f9987d65..57eba47e5 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -15,6 +15,7 @@ pub mod manifests; pub mod notification_multiplexer; pub mod physical_table; pub mod physical_table_revision; +pub mod redump_requests; pub mod workers; pub use self::{ @@ -34,6 +35,7 @@ pub use self::{ LocationNotification, }, }, + redump_requests::{RedumpRequest, RequestId as RedumpRequestId}, workers::{ Worker, WorkerInfo, WorkerInfoOwned, WorkerNodeId, WorkerNodeIdOwned, events::{NotifListener as WorkerNotifListener, NotifRecvError as WorkerNotifRecvError}, diff --git a/crates/core/metadata-db/src/redump_requests.rs b/crates/core/metadata-db/src/redump_requests.rs new file mode 100644 index 000000000..c404ea645 --- /dev/null +++ b/crates/core/metadata-db/src/redump_requests.rs @@ -0,0 +1,95 @@ +//! Redump request management +//! +//! This module provides operations for managing redump requests, which are +//! used to trigger re-extraction of specific block ranges for corrupted segments. +//! +//! ## Database Tables +//! +//! - **redump_requests**: Stores pending requests for block range re-extraction +//! +//! ## Workflow +//! +//! 1. Operator creates a redump request via `ampctl dataset redump` +//! 2. Active dump job checks for pending requests in its main loop +//! 3. When found, the dump job expands the range to segment boundaries +//! 4. After successful extraction, the request is deleted + +mod request_id; +pub(crate) mod sql; + +pub use self::{request_id::RequestId, sql::RedumpRequest}; +use crate::{DatasetName, DatasetNamespace, Error, Executor, ManifestHash}; + +/// Insert a new redump request +/// +/// Creates a request for re-extracting blocks in the given range. +/// Returns the request ID on success. +/// +/// Returns an error if a request with the same dataset and block range already exists +/// (unique constraint violation). +#[tracing::instrument(skip(exe), err)] +pub async fn insert<'c, E>( + exe: E, + namespace: impl Into> + std::fmt::Debug, + name: impl Into> + std::fmt::Debug, + manifest_hash: impl Into> + std::fmt::Debug, + start_block: u64, + end_block: u64, +) -> Result +where + E: Executor<'c>, +{ + sql::insert( + exe, + namespace.into(), + name.into(), + manifest_hash.into(), + start_block as i64, + end_block as i64, + ) + .await + .map_err(Error::Database) +} + +/// Get all pending redump requests for a dataset +/// +/// Returns requests ordered by creation time (oldest first). +/// The active dump job calls this to check for pending work. +#[tracing::instrument(skip(exe), err)] +pub async fn get_pending_for_dataset<'c, E>( + exe: E, + namespace: impl Into> + std::fmt::Debug, + name: impl Into> + std::fmt::Debug, + manifest_hash: impl Into> + std::fmt::Debug, +) -> Result, Error> +where + E: Executor<'c>, +{ + sql::get_pending_for_dataset(exe, namespace.into(), name.into(), manifest_hash.into()) + .await + .map_err(Error::Database) +} + +/// Delete a redump request by ID +/// +/// Called after successful re-dump to remove the processed request. +/// Returns true if the request was found and deleted. +#[tracing::instrument(skip(exe), err)] +pub async fn delete<'c, E>(exe: E, id: RequestId) -> Result +where + E: Executor<'c>, +{ + sql::delete_by_id(exe, id).await.map_err(Error::Database) +} + +/// Delete multiple redump requests by their IDs +/// +/// Called after successful re-dump to remove all processed requests in batch. +/// Returns the number of requests deleted. +#[tracing::instrument(skip(exe), err)] +pub async fn delete_batch<'c, E>(exe: E, ids: &[RequestId]) -> Result +where + E: Executor<'c>, +{ + sql::delete_by_ids(exe, ids).await.map_err(Error::Database) +} diff --git a/crates/core/metadata-db/src/redump_requests/request_id.rs b/crates/core/metadata-db/src/redump_requests/request_id.rs new file mode 100644 index 000000000..bd7fcdf6d --- /dev/null +++ b/crates/core/metadata-db/src/redump_requests/request_id.rs @@ -0,0 +1,98 @@ +//! Redump request ID new-type wrapper for database values +//! +//! This module provides a [`RequestId`] new-type wrapper around `i64` that maintains +//! request ID invariants for database operations. + +use sqlx::{Database, Postgres, encode::IsNull, error::BoxDynError}; + +/// A type-safe identifier for redump request records. +/// +/// [`RequestId`] is a new-type wrapper around `i64` that maintains the following invariants: +/// - Values must be positive (> 0) +/// - Values must fit within the range of `i64` +/// +/// The type trusts that values are already validated. Validation must occur at system +/// boundaries before conversion into this type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[repr(transparent)] +pub struct RequestId(i64); + +impl RequestId { + /// Creates a RequestId from an i64 without validation + /// + /// # Safety + /// The caller must ensure the provided value upholds the request ID invariants: + /// - Value must be positive (> 0) + /// - Value must fit within the range of i64 + pub fn from_i64_unchecked(value: impl Into) -> Self { + Self(value.into()) + } + + /// Converts the RequestId to its inner i64 value + pub fn into_i64(self) -> i64 { + self.0 + } +} + +impl std::ops::Deref for RequestId { + type Target = i64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for RequestId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl sqlx::Type for RequestId { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } +} + +impl sqlx::postgres::PgHasArrayType for RequestId { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + ::array_type_info() + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for RequestId { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result { + let id = >::decode(value)?; + // SAFETY: Database values are trusted to uphold invariants. + Ok(Self::from_i64_unchecked(id)) + } +} + +impl<'q> sqlx::Encode<'q, Postgres> for RequestId { + fn encode_by_ref( + &self, + buf: &mut ::ArgumentBuffer<'q>, + ) -> Result { + >::encode_by_ref(&self.0, buf) + } +} + +impl serde::Serialize for RequestId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.0.serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for RequestId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let id = i64::deserialize(deserializer)?; + // SAFETY: Deserialized values are trusted to uphold invariants. + Ok(Self::from_i64_unchecked(id)) + } +} diff --git a/crates/core/metadata-db/src/redump_requests/sql.rs b/crates/core/metadata-db/src/redump_requests/sql.rs new file mode 100644 index 000000000..8890857b5 --- /dev/null +++ b/crates/core/metadata-db/src/redump_requests/sql.rs @@ -0,0 +1,126 @@ +//! Internal SQL operations for redump request management + +use sqlx::{ + Executor, Postgres, + types::chrono::{DateTime, Utc}, +}; + +use super::request_id::RequestId; +use crate::{DatasetName, DatasetNamespace, ManifestHash}; + +/// A pending redump request from the database +#[derive(Debug, Clone)] +pub struct RedumpRequest { + pub id: RequestId, + pub dataset_namespace: String, + pub dataset_name: String, + pub manifest_hash: String, + pub start_block: i64, + pub end_block: i64, + pub created_at: DateTime, +} + +impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for RedumpRequest { + fn from_row(row: &'r sqlx::postgres::PgRow) -> Result { + use sqlx::Row; + Ok(Self { + id: row.try_get("id")?, + dataset_namespace: row.try_get("dataset_namespace")?, + dataset_name: row.try_get("dataset_name")?, + manifest_hash: row.try_get("manifest_hash")?, + start_block: row.try_get("start_block")?, + end_block: row.try_get("end_block")?, + created_at: row.try_get("created_at")?, + }) + } +} + +/// Insert a new redump request +/// +/// Returns the new request ID. Returns a unique constraint violation error +/// if a request with the same dataset and block range already exists. +pub async fn insert<'c, E>( + exe: E, + namespace: DatasetNamespace<'_>, + name: DatasetName<'_>, + manifest_hash: ManifestHash<'_>, + start_block: i64, + end_block: i64, +) -> Result +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + INSERT INTO redump_requests (dataset_namespace, dataset_name, manifest_hash, start_block, end_block) + VALUES ($1, $2, $3, $4, $5) + RETURNING id + "#}; + + sqlx::query_scalar(query) + .bind(namespace) + .bind(name) + .bind(manifest_hash) + .bind(start_block) + .bind(end_block) + .fetch_one(exe) + .await +} + +/// Get all pending redump requests for a dataset +/// +/// Returns requests ordered by creation time (oldest first). +pub async fn get_pending_for_dataset<'c, E>( + exe: E, + namespace: DatasetNamespace<'_>, + name: DatasetName<'_>, + manifest_hash: ManifestHash<'_>, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + SELECT id, dataset_namespace, dataset_name, manifest_hash, start_block, end_block, created_at + FROM redump_requests + WHERE dataset_namespace = $1 AND dataset_name = $2 AND manifest_hash = $3 + ORDER BY created_at ASC + "#}; + + sqlx::query_as(query) + .bind(namespace) + .bind(name) + .bind(manifest_hash) + .fetch_all(exe) + .await +} + +/// Delete a redump request by ID +/// +/// Called after successful re-dump to remove the processed request. +/// Returns true if the request was found and deleted, false if not found. +pub async fn delete_by_id<'c, E>(exe: E, id: RequestId) -> Result +where + E: Executor<'c, Database = Postgres>, +{ + let query = "DELETE FROM redump_requests WHERE id = $1"; + + let result = sqlx::query(query).bind(id).execute(exe).await?; + Ok(result.rows_affected() > 0) +} + +/// Delete multiple redump requests by their IDs +/// +/// Called after successful re-dump to remove all processed requests in batch. +/// Returns the number of requests deleted. +pub async fn delete_by_ids<'c, E>(exe: E, ids: &[RequestId]) -> Result +where + E: Executor<'c, Database = Postgres>, +{ + if ids.is_empty() { + return Ok(0); + } + + let query = "DELETE FROM redump_requests WHERE id = ANY($1)"; + + let result = sqlx::query(query).bind(ids).execute(exe).await?; + Ok(result.rows_affected()) +} diff --git a/crates/core/worker-datasets-raw/src/dataset.rs b/crates/core/worker-datasets-raw/src/dataset.rs index 3c2db603d..c1716a484 100644 --- a/crates/core/worker-datasets-raw/src/dataset.rs +++ b/crates/core/worker-datasets-raw/src/dataset.rs @@ -105,15 +105,17 @@ use common::{ BlockNum, LogicalCatalog, catalog::{ logical::LogicalTable, - physical::{Catalog, MissingRangesError, PhysicalTable}, + physical::{CanonicalChainError, Catalog, MissingRangesError, PhysicalTable}, }, - metadata::segments::merge_ranges, + metadata::segments::{expand_to_segment_boundaries, merge_ranges}, parquet::errors::ParquetError, }; use datasets_common::{hash_reference::HashReference, table_name::TableName}; use datasets_raw::client::{BlockStreamError, BlockStreamer, CleanupError, LatestBlockError}; use futures::TryStreamExt as _; -use metadata_db::{MetadataDb, NotificationMultiplexerHandle, physical_table_revision::LocationId}; +use metadata_db::{ + MetadataDb, NotificationMultiplexerHandle, physical_table_revision::LocationId, redump_requests, +}; use monitoring::logging; use tokio::task::JoinHandle; use tracing::{Instrument, instrument}; @@ -317,6 +319,64 @@ pub async fn dump( compactors_by_table.insert(table_name.clone(), Arc::clone(compactor)); } + // Check for pending redump requests and merge their ranges into the work set. + // Redump ranges are added to all tables regardless of existing coverage, forcing + // re-extraction of already-covered blocks. + let pending_redumps = redump_requests::get_pending_for_dataset( + &ctx.metadata_db, + dataset_reference.namespace(), + dataset_reference.name(), + dataset_reference.hash(), + ) + .await + .map_err(Error::GetRedumpRequests)?; + + let pending_redump_ids: Vec = + pending_redumps.iter().map(|r| r.id).collect(); + + if !pending_redumps.is_empty() { + tracing::info!( + count = pending_redumps.len(), + "found pending redump requests, merging ranges" + ); + + // For each table, expand the redump ranges to that table's segment boundaries + // and merge them into the table's missing ranges. Each table has its own + // segment layout, so the expansion must be done per-table. + for (table, _) in &tables { + let table_name = table.table_name(); + let canonical_segments: Vec<_> = table + .canonical_chain() + .await + .map_err(Error::CanonicalChain)? + .map(|chain| chain.0) + .unwrap_or_default(); + + let ranges = missing_ranges_by_table + .get_mut(table_name) + .expect("table name present in missing_ranges_by_table"); + + for redump in &pending_redumps { + let raw_range = + (redump.start_block as BlockNum)..=(redump.end_block as BlockNum); + // Expand the requested range to the table's segment boundaries so + // that we re-extract complete segments rather than partial ranges. + let expanded_range = + expand_to_segment_boundaries(&canonical_segments, raw_range); + // Merge the expanded range with existing missing ranges so that + // even already-covered blocks are re-extracted. + let merged = merge_ranges( + ranges + .iter() + .cloned() + .chain(std::iter::once(expanded_range)) + .collect(), + ); + *ranges = merged; + } + } + } + // Use the union of missing table block ranges. let missing_dataset_ranges = { let ranges: Vec> = missing_ranges_by_table @@ -358,6 +418,24 @@ pub async fn dump( progress_reporter.clone(), ) .await?; + + // After a successful dump, delete any redump requests that were processed. + if !pending_redump_ids.is_empty() { + match redump_requests::delete_batch(&ctx.metadata_db, &pending_redump_ids).await { + Ok(deleted) => { + tracing::info!(deleted, "deleted processed redump requests"); + } + Err(err) => { + // Log and continue - the redump request will be processed again next + // iteration, which is safe (idempotent re-extraction). + tracing::warn!( + error = %err, + error_source = logging::error_source(&err), + "failed to delete processed redump requests; will retry next iteration" + ); + } + } + } } Ok(()) } @@ -510,6 +588,27 @@ pub enum Error { #[error("Failed to get missing block ranges for table")] MissingRanges(#[source] MissingRangesError), + /// Failed to get pending redump requests + /// + /// This occurs when querying the metadata database for pending redump requests fails. + /// + /// Common causes: + /// - Metadata database connectivity issues + /// - Database query timeout + #[error("Failed to get pending redump requests")] + GetRedumpRequests(#[source] metadata_db::Error), + + /// Failed to get canonical chain for expanding redump ranges to segment boundaries + /// + /// This occurs when fetching the canonical chain for a table fails during + /// a redump operation. + /// + /// Common causes: + /// - Metadata database connectivity issues + /// - Corrupted file metadata + #[error("Failed to get canonical chain for redump range expansion")] + CanonicalChain(#[source] CanonicalChainError), + /// A partition task execution failed /// /// This occurs when one of the parallel partition tasks fails during execution diff --git a/crates/services/admin-api/src/handlers/datasets.rs b/crates/services/admin-api/src/handlers/datasets.rs index 9fdeef1e1..5e9ffd67e 100644 --- a/crates/services/admin-api/src/handlers/datasets.rs +++ b/crates/services/admin-api/src/handlers/datasets.rs @@ -9,5 +9,6 @@ pub mod get_manifest; pub mod list_all; pub mod list_jobs; pub mod list_versions; +pub mod redump; pub mod register; pub mod restore; diff --git a/crates/services/admin-api/src/handlers/datasets/redump.rs b/crates/services/admin-api/src/handlers/datasets/redump.rs new file mode 100644 index 000000000..fa56c672d --- /dev/null +++ b/crates/services/admin-api/src/handlers/datasets/redump.rs @@ -0,0 +1,315 @@ +use amp_datasets_registry::error::ResolveRevisionError; +use axum::{ + Json, + extract::{ + Path, State, + rejection::{JsonRejection, PathRejection}, + }, + http::StatusCode, +}; +use datasets_common::{name::Name, namespace::Namespace, reference::Reference, revision::Revision}; +use metadata_db::RedumpRequestId; +use monitoring::logging; +use worker::job::JobId; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler::{ListJobsByDatasetError, ScheduleJobError}, +}; + +/// Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/redump` endpoint +/// +/// Creates a redump request to re-extract blocks in the given range. +/// If no active dump job exists for the dataset, one is auto-started. +/// +/// ## Path Parameters +/// - `namespace`: Dataset namespace +/// - `name`: Dataset name +/// - `revision`: Revision (version, hash, "latest", or "dev") +/// +/// ## Request Body +/// - `start_block`: First block of the range to re-extract (inclusive) +/// - `end_block`: Last block of the range to re-extract (inclusive) +/// +/// ## Response +/// - **202 Accepted**: Redump request created +/// - **400 Bad Request**: Invalid path parameters or request body +/// - **404 Not Found**: Dataset or revision not found +/// - **409 Conflict**: A redump request for this range already exists +/// - **500 Internal Server Error**: Database or scheduler error +/// +/// ## Error Codes +/// - `INVALID_PATH`: Invalid path parameters (namespace, name, or revision) +/// - `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields) +/// - `DATASET_NOT_FOUND`: The specified dataset or revision does not exist +/// - `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash +/// - `DUPLICATE_REQUEST`: A redump request for this range already exists +/// - `LIST_JOBS_ERROR`: Failed to list jobs for dataset +/// - `SCHEDULER_ERROR`: Failed to schedule extraction job +/// - `DATABASE_ERROR`: Failed to insert redump request +/// +/// ## Behavior +/// 1. Resolves the revision to find the corresponding dataset +/// 2. Checks if an active dump job exists for the dataset +/// 3. Inserts the redump request into the database +/// 4. If no active job, schedules a new dump job for the dataset +/// 5. Returns the request ID and job ID (if a new job was started) +#[tracing::instrument(skip_all, err)] +#[cfg_attr( + feature = "utoipa", + utoipa::path( + post, + path = "/datasets/{namespace}/{name}/versions/{revision}/redump", + tag = "datasets", + operation_id = "redump_dataset", + params( + ("namespace" = String, Path, description = "Dataset namespace"), + ("name" = String, Path, description = "Dataset name"), + ("revision" = String, Path, description = "Revision (version, hash, latest, or dev)") + ), + request_body = RedumpRequest, + responses( + (status = 202, description = "Redump request created", body = RedumpResponse), + (status = 400, description = "Bad request (invalid parameters)", body = ErrorResponse), + (status = 404, description = "Dataset or revision not found", body = ErrorResponse), + (status = 409, description = "Duplicate redump request", body = ErrorResponse), + (status = 500, description = "Internal server error", body = ErrorResponse) + ) + ) +)] +pub async fn handler( + path: Result, PathRejection>, + State(ctx): State, + json: Result, JsonRejection>, +) -> Result<(StatusCode, Json), ErrorResponse> { + let reference = match path { + Ok(Path((namespace, name, revision))) => Reference::new(namespace, name, revision), + Err(err) => { + tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid path parameters"); + return Err(Error::InvalidPath(err).into()); + } + }; + + let RedumpRequest { + start_block, + end_block, + } = match json { + Ok(Json(request)) => request, + Err(err) => { + tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid request body"); + return Err(Error::InvalidBody(err).into()); + } + }; + + tracing::debug!( + dataset_reference = %reference, + start_block, + end_block, + "creating redump request" + ); + + let namespace = reference.namespace().clone(); + let name = reference.name().clone(); + let revision = reference.revision().clone(); + + // Resolve reference to hash reference + let reference = ctx + .datasets_registry + .resolve_revision(&reference) + .await + .map_err(Error::ResolveRevision)? + .ok_or_else(|| Error::NotFound { + namespace: namespace.clone(), + name: name.clone(), + revision: revision.clone(), + })?; + + // Check for active (non-terminal) jobs for the dataset + let existing_jobs = ctx + .scheduler + .list_jobs_by_dataset(reference.namespace(), reference.name(), reference.hash()) + .await + .map_err(Error::ListJobs)?; + + let has_active_job = existing_jobs.iter().any(|job| !job.status.is_terminal()); + + // Insert the redump request + let request_id = metadata_db::redump_requests::insert( + &ctx.metadata_db, + reference.namespace(), + reference.name(), + reference.hash(), + start_block, + end_block, + ) + .await + .map_err(|err| { + // Unique constraint violation -> duplicate request + if let metadata_db::Error::Database(ref db_err) = err + && let Some(pg_err) = db_err.as_database_error() + { + // PostgreSQL unique violation code: 23505 + if pg_err.code().as_deref() == Some("23505") { + return Error::DuplicateRequest { + start_block, + end_block, + }; + } + } + Error::Database(err) + })?; + + tracing::debug!( + dataset_reference = %reference, + request_id = %request_id, + "redump request inserted" + ); + + // If no active job exists, schedule a new one + let job_id = if !has_active_job { + let dataset = ctx + .dataset_store + .get_dataset(&reference) + .await + .map_err(Error::GetDataset)?; + + let jid = ctx + .scheduler + .schedule_dataset_sync_job( + reference.clone(), + dataset.kind().clone(), + datasets_common::end_block::EndBlock::default(), + 1, + None, + ) + .await + .map_err(|err| { + tracing::error!( + dataset_reference = %reference, + error = %err, + error_source = logging::error_source(&err), + "failed to schedule dump job for redump" + ); + Error::Scheduler(err) + })?; + + tracing::info!( + dataset_reference = %reference, + job_id = %jid, + "scheduled new dump job for redump" + ); + + Some(jid) + } else { + None + }; + + Ok(( + StatusCode::ACCEPTED, + Json(RedumpResponse { request_id, job_id }), + )) +} + +/// Request body for redump operation +#[derive(serde::Deserialize)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +pub struct RedumpRequest { + /// First block of the range to re-extract (inclusive) + pub start_block: u64, + /// Last block of the range to re-extract (inclusive) + pub end_block: u64, +} + +/// Response for redump operation +#[derive(serde::Serialize)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +pub struct RedumpResponse { + /// The ID of the created redump request + #[cfg_attr(feature = "utoipa", schema(value_type = i64))] + pub request_id: RedumpRequestId, + /// The ID of the scheduled dump job, if a new job was started. + /// `null` if an active job already exists for this dataset. + #[cfg_attr(feature = "utoipa", schema(value_type = Option))] + pub job_id: Option, +} + +/// Errors that can occur when creating a redump request +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Invalid path parameters + #[error("Invalid path parameters: {0}")] + InvalidPath(#[source] PathRejection), + + /// Invalid request body + #[error("Invalid request body: {0}")] + InvalidBody(#[source] JsonRejection), + + /// Dataset or revision not found + #[error("Dataset '{namespace}/{name}' at revision '{revision}' not found")] + NotFound { + namespace: Namespace, + name: Name, + revision: Revision, + }, + + /// Failed to resolve revision to manifest hash + #[error("Failed to resolve revision: {0}")] + ResolveRevision(#[source] ResolveRevisionError), + + /// A redump request for this block range already exists + #[error("A redump request for blocks {start_block}..={end_block} already exists")] + DuplicateRequest { start_block: u64, end_block: u64 }, + + /// Failed to list jobs for dataset + #[error("Failed to list jobs for dataset: {0}")] + ListJobs(#[source] ListJobsByDatasetError), + + /// Failed to load dataset from store + #[error("Failed to load dataset: {0}")] + GetDataset(#[source] common::dataset_store::GetDatasetError), + + /// Failed to schedule dump job + #[error("Failed to schedule job: {0}")] + Scheduler(#[source] ScheduleJobError), + + /// Database error when inserting redump request + #[error("Database error: {0}")] + Database(#[source] metadata_db::Error), +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidPath(_) => "INVALID_PATH", + Error::InvalidBody(_) => "INVALID_BODY", + Error::NotFound { .. } => "DATASET_NOT_FOUND", + Error::ResolveRevision(_) => "RESOLVE_REVISION_ERROR", + Error::DuplicateRequest { .. } => "DUPLICATE_REQUEST", + Error::ListJobs(_) => "LIST_JOBS_ERROR", + Error::GetDataset(_) => "GET_DATASET_ERROR", + Error::Scheduler(err) => match err { + ScheduleJobError::WorkerNotAvailable(_) => "WORKER_NOT_AVAILABLE", + _ => "SCHEDULER_ERROR", + }, + Error::Database(_) => "DATABASE_ERROR", + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidPath(_) => StatusCode::BAD_REQUEST, + Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::NotFound { .. } => StatusCode::NOT_FOUND, + Error::ResolveRevision(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::DuplicateRequest { .. } => StatusCode::CONFLICT, + Error::ListJobs(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::GetDataset(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::Scheduler(err) => match err { + ScheduleJobError::WorkerNotAvailable(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + Error::Database(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index 75f865a48..3603bb9fe 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -46,6 +46,10 @@ pub fn router(ctx: Ctx) -> Router<()> { "/datasets/{namespace}/{name}/versions/{revision}/restore", post(datasets::restore::handler), ) + .route( + "/datasets/{namespace}/{name}/versions/{revision}/redump", + post(datasets::redump::handler), + ) .route( "/datasets/{namespace}/{name}/versions/{revision}/jobs", get(datasets::list_jobs::handler), @@ -120,6 +124,7 @@ pub fn router(ctx: Ctx) -> Router<()> { handlers::datasets::register::handler, handlers::datasets::deploy::handler, handlers::datasets::restore::handler, + handlers::datasets::redump::handler, handlers::datasets::delete::handler, handlers::datasets::delete_version::handler, // Manifest endpoints @@ -176,6 +181,8 @@ pub fn router(ctx: Ctx) -> Router<()> { handlers::datasets::deploy::DeployResponse, handlers::datasets::restore::RestoreResponse, handlers::datasets::restore::RestoredTableInfo, + handlers::datasets::redump::RedumpRequest, + handlers::datasets::redump::RedumpResponse, // Job schemas handlers::jobs::progress::JobProgressResponse, handlers::jobs::progress::TableProgress, From 7c0eb865f257505f84f95677c72178aaf7b98895 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 19 Feb 2026 09:45:37 -0700 Subject: [PATCH 2/2] gen openapi spec --- docs/openapi-specs/admin.spec.json | 145 +++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 863ea36a0..97cb65880 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -525,6 +525,107 @@ } } }, + "/datasets/{namespace}/{name}/versions/{revision}/redump": { + "post": { + "tags": [ + "datasets" + ], + "summary": "Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/redump` endpoint", + "description": "Creates a redump request to re-extract blocks in the given range.\nIf no active dump job exists for the dataset, one is auto-started.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, \"latest\", or \"dev\")\n\n## Request Body\n- `start_block`: First block of the range to re-extract (inclusive)\n- `end_block`: Last block of the range to re-extract (inclusive)\n\n## Response\n- **202 Accepted**: Redump request created\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset or revision not found\n- **409 Conflict**: A redump request for this range already exists\n- **500 Internal Server Error**: Database or scheduler error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `DUPLICATE_REQUEST`: A redump request for this range already exists\n- `LIST_JOBS_ERROR`: Failed to list jobs for dataset\n- `SCHEDULER_ERROR`: Failed to schedule extraction job\n- `DATABASE_ERROR`: Failed to insert redump request\n\n## Behavior\n1. Resolves the revision to find the corresponding dataset\n2. Checks if an active dump job exists for the dataset\n3. Inserts the redump request into the database\n4. If no active job, schedules a new dump job for the dataset\n5. Returns the request ID and job ID (if a new job was started)", + "operationId": "redump_dataset", + "parameters": [ + { + "name": "namespace", + "in": "path", + "description": "Dataset namespace", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "name", + "in": "path", + "description": "Dataset name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "revision", + "in": "path", + "description": "Revision (version, hash, latest, or dev)", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RedumpRequest" + } + } + }, + "required": true + }, + "responses": { + "202": { + "description": "Redump request created", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RedumpResponse" + } + } + } + }, + "400": { + "description": "Bad request (invalid parameters)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "404": { + "description": "Dataset or revision not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "409": { + "description": "Duplicate redump request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + } + } + } + }, "/datasets/{namespace}/{name}/versions/{revision}/restore": { "post": { "tags": [ @@ -2503,6 +2604,50 @@ } } }, + "RedumpRequest": { + "type": "object", + "description": "Request body for redump operation", + "required": [ + "start_block", + "end_block" + ], + "properties": { + "end_block": { + "type": "integer", + "format": "int64", + "description": "Last block of the range to re-extract (inclusive)", + "minimum": 0 + }, + "start_block": { + "type": "integer", + "format": "int64", + "description": "First block of the range to re-extract (inclusive)", + "minimum": 0 + } + } + }, + "RedumpResponse": { + "type": "object", + "description": "Response for redump operation", + "required": [ + "request_id" + ], + "properties": { + "job_id": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "The ID of the scheduled dump job, if a new job was started.\n`null` if an active job already exists for this dataset." + }, + "request_id": { + "type": "integer", + "format": "int64", + "description": "The ID of the created redump request" + } + } + }, "RegisterManifestResponse": { "type": "object", "description": "Response payload for manifest registration\n\nContains the computed hash of the registered manifest.",