From 3b539d0c1c6ec9caba37a09410ea9eae1d35b233 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 01:12:53 +0900 Subject: [PATCH 1/7] Add hotspot shard split design document --- docs/hotspot_shard_split_design.md | 327 +++++++++++++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 docs/hotspot_shard_split_design.md diff --git a/docs/hotspot_shard_split_design.md b/docs/hotspot_shard_split_design.md new file mode 100644 index 00000000..e3e0bdc5 --- /dev/null +++ b/docs/hotspot_shard_split_design.md @@ -0,0 +1,327 @@ +# Hotspot Shard Split Design for Elastickv + +## 1. 背景 + +Elastickv には既にシャード境界の概念がありますが、ホットスポットに対して安全に自動分割するための制御面が不足しています。 + +現状(2026-02-17 時点)の実装状況: + +- `distribution/engine.go` にアクセスカウントと閾値超過時の `splitRange` はある。 +- ただし `RecordAccess` は実リクエスト経路から呼ばれていない。 +- ルートテーブルはメモリ常駐で、ノード再起動で消える。 +- ルート更新は `UpdateRoute` の append ベースで、重複/競合検出やバージョン管理がない。 +- 分割後に別 Raft Group へデータ移送する仕組みがない。 + +結果として、実際の「ホットスポット分割」は未接続状態です。 + +## 2. 目的と非目的 + +### 2.1 目的 + +1. ホットレンジを自動検知し、範囲分割を実行できるようにする。 +2. 分割片を別 Raft Group に移送し、負荷を分散できるようにする。 +3. 分割中も整合性(書き込み喪失なし、MVCC/Txn 破壊なし)を維持する。 +4. 失敗時に再開可能なジョブ方式で運用できるようにする。 + +### 2.2 非目的 + +1. Raft Group の自動生成・自動メンバー変更(既存 `--raftGroups` 内で完結)。 +2. 自動マージ(split の逆操作)は本設計の対象外。 +3. LSM/Pebble の最適化実装は後段(まず in-memory MVCC を基準)。 + +## 3. 要件 + +### 3.1 機能要件 + +1. Read/Write 負荷をレンジ単位で収集できる。 +2. 閾値・ヒステリシスに基づいて分割候補を決定できる。 +3. 分割キーをアクセス分布から決定できる(単純 midpoint 依存を脱却)。 +4. バックフィルとカットオーバーを段階的に実行できる。 +5. 分割後のルーティングを全ノードに反映できる。 + +### 3.2 整合性要件 + +1. カットオーバー境界で書き込みを取りこぼさない。 +2. `!txn|...` と `!lst|...` を含む内部キーを正しく移送する。 +3. stale route で旧グループに届いた書き込みを reject できる。 + +### 3.3 運用要件 + +1. 自動 split を ON/OFF できる。 +2. 手動 split API を提供する。 +3. ジョブ状態、失敗理由、処理速度を観測可能にする。 + +## 4. 全体アーキテクチャ + +追加コンポーネント: + +1. `Split Controller` +2. `Hotspot Detector` +3. `Route Catalog`(永続化 + バージョン管理) +4. `Range Migrator` +5. `Route Watcher` + +責務分担: + +| コンポーネント | 主責務 | 配置 | +|---|---|---| +| Hotspot Detector | 負荷集計と split 候補抽出 | 各ノード + リーダー集約 | +| Split Controller | split ジョブ状態機械、再試行 | 既定グループの leader | +| Route Catalog | ルート表と split ジョブの永続化 | 既定グループの内部メタデータ | +| Range Migrator | バックフィル/差分コピー | source/target leader 間 | +| Route Watcher | ルート表更新の配布とローカル反映 | 全ノード | + +## 5. データモデル + +`RouteDescriptor`: + +- `route_id uint64` +- `start []byte`(inclusive) +- `end []byte`(exclusive, nil=+inf) +- `group_id uint64` +- `version uint64`(catalog 全体世代) +- `state` (`ACTIVE`, `WRITE_FENCED`, `MIGRATING_SOURCE`, `MIGRATING_TARGET`) +- `parent_route_id uint64`(split 由来のトレース) + +`SplitJob`: + +- `job_id string` +- `source_route_id uint64` +- `target_group_id uint64` +- `split_key []byte` +- `snapshot_ts uint64` +- `fence_ts uint64` +- `phase` (`PLANNED`, `BACKFILL`, `FENCE`, `DELTA_COPY`, `CUTOVER`, `CLEANUP`, `DONE`, `ABORTED`) +- `cursor []byte`(再開用) +- `retry_count uint32` +- `last_error string` + +`AccessWindow`: + +- `route_id uint64` +- `window_start_unix_ms uint64` +- `read_ops uint64` +- `write_ops uint64` +- `p95_latency_us uint64`(将来) +- `top_keys_sample`(分割キー選定用) + +## 6. ホットスポット検知設計 + +### 6.1 計測点 + +Read: + +- `kv/ShardStore.GetAt` +- `kv/ShardStore.ScanAt` + +Write: + +- `kv/ShardedCoordinator.groupMutations`(キー単位で加算) + +### 6.2 判定式 + +レンジ score: + +`score = write_ops * Ww + read_ops * Wr` + +初期値: + +- `Ww=4`, `Wr=1` +- `threshold=50_000 ops/min` +- 3 ウィンドウ連続超過で候補化 +- split 実行後 cooldown 10 分 + +### 6.3 分割キー決定 + +1. サンプルキー分布から p50 を基本 split key とする。 +2. 単一キー偏り (`top_key_share >= 0.8`) の場合は hot key 分離を優先。 +3. `end=nil`(無限上限)でも観測キーに基づいて split key を算出可能にする。 + +## 7. Split 実行フロー + +```mermaid +flowchart LR + A[Detect hotspot] --> B[Create SplitJob: PLANNED] + B --> C[BACKFILL at snapshot_ts] + C --> D[FENCE writes on moving range] + D --> E[DELTA_COPY snapshot_ts+1..fence_ts] + E --> F[CUTOVER route version+1] + F --> G[CLEANUP source old copies] + G --> H[DONE] +``` + +### 7.1 BACKFILL + +1. `snapshot_ts = source.LastCommitTS()` を記録。 +2. source から moving range の MVCC バージョンをエクスポート。 +3. target へ idempotent import。 +4. `cursor` をジョブに保存しながら進める。 + +### 7.2 FENCE + +1. moving range を `WRITE_FENCED` に更新。 +2. `ShardedCoordinator.Dispatch` で対象キー書き込みを `retryable` エラーで拒否。 +3. `raft.Barrier` で fence 反映点を確定。 + +### 7.3 DELTA_COPY + +1. `fence_ts = source.LastCommitTS()` を取得。 +2. `(snapshot_ts, fence_ts]` の差分バージョンをコピー。 +3. 反映完了後に cutover 可能。 + +### 7.4 CUTOVER + +1. 親レンジを削除し、子レンジ 2 本を `ACTIVE` で公開。 +2. moving child の `group_id` を target に切替。 +3. route catalog `version` をインクリメント。 + +### 7.5 CLEANUP + +1. 一定 grace 期間は source に旧データを残す(読み取り保険)。 +2. 期間後に source 側の moving child データを GC。 + +## 8. ルーティング整合性 + +### 8.1 Route Version 付与 + +- `pb.Request`(`proto/internal.proto`)に `route_version` を追加。 +- Coordinator は現在の route version を request に埋める。 + +### 8.2 所有権検証 + +- `kvFSM` に `groupID` と route resolver を注入。 +- Apply 前に mutation key の `routeKey` 所有グループを検証。 +- 不一致時は `ErrWrongShard` を返す。 + +### 8.3 stale route 対策 + +- stale ノードが旧ルートで送信しても、旧グループ leader が reject する。 +- クライアントは再試行時に最新ルートを取得する。 + +## 9. 移送対象キーの定義 + +moving range 判定は raw key ではなく logical route key で行う。 + +対象: + +1. user key 本体 +2. list key(`!lst|meta|...`, `!lst|itm|...`) +3. txn key(`!txn|lock|...`, `!txn|int|...`, `!txn|cmt|...`, `!txn|rb|...`) + +必要変更: + +- `kv/txn_keys.go` の route key 抽出をエクスポートし、migrator から再利用可能にする。 +- `store` 側に MVCC バージョン export/import API を追加する。 + +## 10. API 変更案 + +### 10.1 `proto/distribution.proto` + +追加 RPC: + +1. `ReportAccess(ReportAccessRequest) returns (ReportAccessResponse)` +2. `ListRoutes(ListRoutesRequest) returns (ListRoutesResponse)` +3. `WatchRoutes(WatchRoutesRequest) returns (stream WatchRoutesResponse)` +4. `SplitRange(SplitRangeRequest) returns (SplitRangeResponse)` +5. `GetSplitJob(GetSplitJobRequest) returns (GetSplitJobResponse)` + +### 10.2 `proto/internal.proto` + +追加 RPC: + +1. `ExportRangeVersions(ExportRangeVersionsRequest) returns (stream ExportRangeVersionsResponse)` +2. `ImportRangeVersions(ImportRangeVersionsRequest) returns (ImportRangeVersionsResponse)` + +## 11. 変更対象(実装単位) + +1. `distribution/engine.go` +- route version/route state を持つ構造へ拡張 +- append 更新ではなく CAS 更新 API 追加 + +2. `kv/shard_store.go` +- read path の access 記録追加 +- moving/fenced レンジ時の挙動整理 + +3. `kv/sharded_coordinator.go` +- write path の access 記録追加 +- `WRITE_FENCED` への retryable error 返却 + +4. `kv/fsm.go` +- route ownership 検証追加 +- `ErrWrongShard` を返す分岐追加 + +5. `adapter/distribution_server.go` +- split/job/watch/report RPC 実装 + +6. `store/mvcc_store.go`(+ `store/lsm_store.go`) +- range export/import API 実装 + +## 12. 段階導入計画 + +### Milestone 1: 制御面 + +1. Route catalog の永続化 +2. route version + watcher +3. 手動 split API(データ移送なし・同一 group 内分割) + +### Milestone 2: データ移送 + +1. MVCC range export/import +2. BACKFILL/FENCE/DELTA/CUTOVER のジョブ化 +3. 手動 split with target group + +### Milestone 3: 自動化 + +1. access 集計 +2. hotspot detector +3. auto split scheduler(cooldown/hysteresis) + +### Milestone 4: 堅牢化 + +1. stale route reject +2. cleanup GC +3. Jepsen workload 追加 + +## 13. テスト戦略 + +### 13.1 Unit + +1. hotspot 判定と split key 選定 +2. route catalog CAS/バージョン遷移 +3. split state machine の phase 遷移 +4. ownership 検証 + +### 13.2 Integration + +1. split 中の並行 write/read の整合性 +2. FENCE 中の retryable エラー確認 +3. リーダー交代・ノード再起動後のジョブ再開 +4. cross-shard txn と split 併用 + +### 13.3 Jepsen + +1. hotspot 負荷 + partition nemesis +2. hotspot 負荷 + kill nemesis +3. split 実行中の linearizability 検証 + +## 14. リスクと緩和策 + +1. リスク: split 中の write 停止時間が長い。 +- 緩和: chunked delta copy、fence 範囲を moving child のみに限定。 + +2. リスク: stale route による誤配送。 +- 緩和: leader 側 ownership 検証 + route version。 + +3. リスク: 内部キー取りこぼしで txn 解決不能。 +- 緩和: logical route key ベースで export 対象を決定し、txn/list の専用テストを追加。 + +4. リスク: cleanup が早すぎると古い reader に影響。 +- 緩和: grace 期間 + route version 監視 + 段階 GC。 + +## 15. 受け入れ基準 + +1. 手動 split で range が 2 分割され、片側を別 group へ移送できる。 +2. split 中に write ロスが発生しない。 +3. split 後に stale route write が reject される。 +4. 障害復旧後に split job が再開され最終的に `DONE` になる。 +5. 既存テスト + 新規 split テスト + Jepsen 拡張シナリオが通る。 From 3ce44fffdd26a64328c176b2fa8c6406a85e3846 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 01:22:14 +0900 Subject: [PATCH 2/7] Update document for hotspot shard split design --- docs/hotspot_shard_split_design.md | 324 +++++++++++------------ docs/hotspot_split_milestone1_pr_plan.md | 244 +++++++++++++++++ 2 files changed, 406 insertions(+), 162 deletions(-) create mode 100644 docs/hotspot_split_milestone1_pr_plan.md diff --git a/docs/hotspot_shard_split_design.md b/docs/hotspot_shard_split_design.md index e3e0bdc5..3f9ff754 100644 --- a/docs/hotspot_shard_split_design.md +++ b/docs/hotspot_shard_split_design.md @@ -1,87 +1,87 @@ # Hotspot Shard Split Design for Elastickv -## 1. 背景 +## 1. Background -Elastickv には既にシャード境界の概念がありますが、ホットスポットに対して安全に自動分割するための制御面が不足しています。 +Elastickv already has shard boundaries, but it does not yet have the control-plane needed for safe automatic hotspot splitting. -現状(2026-02-17 時点)の実装状況: +Current implementation status (as of February 17, 2026): -- `distribution/engine.go` にアクセスカウントと閾値超過時の `splitRange` はある。 -- ただし `RecordAccess` は実リクエスト経路から呼ばれていない。 -- ルートテーブルはメモリ常駐で、ノード再起動で消える。 -- ルート更新は `UpdateRoute` の append ベースで、重複/競合検出やバージョン管理がない。 -- 分割後に別 Raft Group へデータ移送する仕組みがない。 +- `distribution/engine.go` has per-range access counters and threshold-based `splitRange`. +- However, `RecordAccess` is not wired into real request paths. +- The route table is in-memory only and is lost on restart. +- Route updates are append-based via `UpdateRoute`, with no conflict detection, deduplication, or versioning. +- There is no data movement workflow to relocate part of a split range to another Raft group. -結果として、実際の「ホットスポット分割」は未接続状態です。 +So practical “hotspot splitting” is currently not connected end-to-end. -## 2. 目的と非目的 +## 2. Goals and Non-goals -### 2.1 目的 +### 2.1 Goals -1. ホットレンジを自動検知し、範囲分割を実行できるようにする。 -2. 分割片を別 Raft Group に移送し、負荷を分散できるようにする。 -3. 分割中も整合性(書き込み喪失なし、MVCC/Txn 破壊なし)を維持する。 -4. 失敗時に再開可能なジョブ方式で運用できるようにする。 +1. Detect hot ranges automatically and execute range splits. +2. Move split children to other Raft groups to distribute load. +3. Preserve consistency during split (no lost writes, no MVCC/Txn breakage). +4. Support resumable job-based operations after failures. -### 2.2 非目的 +### 2.2 Non-goals -1. Raft Group の自動生成・自動メンバー変更(既存 `--raftGroups` 内で完結)。 -2. 自動マージ(split の逆操作)は本設計の対象外。 -3. LSM/Pebble の最適化実装は後段(まず in-memory MVCC を基準)。 +1. Automatic Raft group creation or membership orchestration (operate within existing `--raftGroups`). +2. Automatic merge (inverse of split) in this design. +3. Deep LSM/Pebble optimization in the first phase (in-memory MVCC is the baseline first). -## 3. 要件 +## 3. Requirements -### 3.1 機能要件 +### 3.1 Functional requirements -1. Read/Write 負荷をレンジ単位で収集できる。 -2. 閾値・ヒステリシスに基づいて分割候補を決定できる。 -3. 分割キーをアクセス分布から決定できる(単純 midpoint 依存を脱却)。 -4. バックフィルとカットオーバーを段階的に実行できる。 -5. 分割後のルーティングを全ノードに反映できる。 +1. Collect read/write load by range. +2. Select split candidates with thresholds and hysteresis. +3. Choose split keys from observed key distribution (not midpoint-only). +4. Execute phased backfill and cutover. +5. Propagate post-split routing to all nodes. -### 3.2 整合性要件 +### 3.2 Consistency requirements -1. カットオーバー境界で書き込みを取りこぼさない。 -2. `!txn|...` と `!lst|...` を含む内部キーを正しく移送する。 -3. stale route で旧グループに届いた書き込みを reject できる。 +1. No write loss at cutover boundaries. +2. Correct migration of internal keys including `!txn|...` and `!lst|...`. +3. Ability to reject writes delivered to the old group using stale routes. -### 3.3 運用要件 +### 3.3 Operational requirements -1. 自動 split を ON/OFF できる。 -2. 手動 split API を提供する。 -3. ジョブ状態、失敗理由、処理速度を観測可能にする。 +1. Toggle auto-split ON/OFF. +2. Provide manual split API. +3. Expose job status, failure reason, and migration throughput. -## 4. 全体アーキテクチャ +## 4. High-level Architecture -追加コンポーネント: +New components: 1. `Split Controller` 2. `Hotspot Detector` -3. `Route Catalog`(永続化 + バージョン管理) +3. `Route Catalog` (durable + versioned) 4. `Range Migrator` 5. `Route Watcher` -責務分担: +Responsibilities: -| コンポーネント | 主責務 | 配置 | +| Component | Responsibility | Placement | |---|---|---| -| Hotspot Detector | 負荷集計と split 候補抽出 | 各ノード + リーダー集約 | -| Split Controller | split ジョブ状態機械、再試行 | 既定グループの leader | -| Route Catalog | ルート表と split ジョブの永続化 | 既定グループの内部メタデータ | -| Range Migrator | バックフィル/差分コピー | source/target leader 間 | -| Route Watcher | ルート表更新の配布とローカル反映 | 全ノード | +| Hotspot Detector | Load aggregation and split candidate extraction | Every node + leader aggregation | +| Split Controller | Split state machine and retries | Default-group leader | +| Route Catalog | Durable route table and split jobs | Internal metadata in default group | +| Range Migrator | Backfill and delta copy | Between source/target leaders | +| Route Watcher | Route update distribution and local refresh | All nodes | -## 5. データモデル +## 5. Data Model `RouteDescriptor`: - `route_id uint64` -- `start []byte`(inclusive) -- `end []byte`(exclusive, nil=+inf) +- `start []byte` (inclusive) +- `end []byte` (exclusive, nil=+inf) - `group_id uint64` -- `version uint64`(catalog 全体世代) +- `version uint64` (catalog generation) - `state` (`ACTIVE`, `WRITE_FENCED`, `MIGRATING_SOURCE`, `MIGRATING_TARGET`) -- `parent_route_id uint64`(split 由来のトレース) +- `parent_route_id uint64` (lineage) `SplitJob`: @@ -92,7 +92,7 @@ Elastickv には既にシャード境界の概念がありますが、ホット - `snapshot_ts uint64` - `fence_ts uint64` - `phase` (`PLANNED`, `BACKFILL`, `FENCE`, `DELTA_COPY`, `CUTOVER`, `CLEANUP`, `DONE`, `ABORTED`) -- `cursor []byte`(再開用) +- `cursor []byte` (resume cursor) - `retry_count uint32` - `last_error string` @@ -102,42 +102,42 @@ Elastickv には既にシャード境界の概念がありますが、ホット - `window_start_unix_ms uint64` - `read_ops uint64` - `write_ops uint64` -- `p95_latency_us uint64`(将来) -- `top_keys_sample`(分割キー選定用) +- `p95_latency_us uint64` (future) +- `top_keys_sample` (for split-key selection) -## 6. ホットスポット検知設計 +## 6. Hotspot Detection -### 6.1 計測点 +### 6.1 Instrumentation points -Read: +Read path: - `kv/ShardStore.GetAt` - `kv/ShardStore.ScanAt` -Write: +Write path: -- `kv/ShardedCoordinator.groupMutations`(キー単位で加算) +- `kv/ShardedCoordinator.groupMutations` (increment per key) -### 6.2 判定式 +### 6.2 Scoring -レンジ score: +Range score: `score = write_ops * Ww + read_ops * Wr` -初期値: +Initial parameters: - `Ww=4`, `Wr=1` - `threshold=50_000 ops/min` -- 3 ウィンドウ連続超過で候補化 -- split 実行後 cooldown 10 分 +- Candidate after 3 consecutive windows over threshold +- Cooldown 10 minutes after split -### 6.3 分割キー決定 +### 6.3 Split-key selection -1. サンプルキー分布から p50 を基本 split key とする。 -2. 単一キー偏り (`top_key_share >= 0.8`) の場合は hot key 分離を優先。 -3. `end=nil`(無限上限)でも観測キーに基づいて split key を算出可能にする。 +1. Default to p50 of sampled key distribution. +2. If single-key skew is high (`top_key_share >= 0.8`), isolate the hot key. +3. For `end=nil` ranges, derive split key from observed keys. -## 7. Split 実行フロー +## 7. Split Execution Flow ```mermaid flowchart LR @@ -152,72 +152,72 @@ flowchart LR ### 7.1 BACKFILL -1. `snapshot_ts = source.LastCommitTS()` を記録。 -2. source から moving range の MVCC バージョンをエクスポート。 -3. target へ idempotent import。 -4. `cursor` をジョブに保存しながら進める。 +1. Record `snapshot_ts = source.LastCommitTS()`. +2. Export MVCC versions for the moving range from source. +3. Import into target idempotently. +4. Persist `cursor` continuously for resumability. ### 7.2 FENCE -1. moving range を `WRITE_FENCED` に更新。 -2. `ShardedCoordinator.Dispatch` で対象キー書き込みを `retryable` エラーで拒否。 -3. `raft.Barrier` で fence 反映点を確定。 +1. Mark moving range as `WRITE_FENCED`. +2. In `ShardedCoordinator.Dispatch`, reject writes in fenced range with retryable errors. +3. Use `raft.Barrier` to confirm fence visibility. ### 7.3 DELTA_COPY -1. `fence_ts = source.LastCommitTS()` を取得。 -2. `(snapshot_ts, fence_ts]` の差分バージョンをコピー。 -3. 反映完了後に cutover 可能。 +1. Acquire `fence_ts = source.LastCommitTS()`. +2. Copy delta versions in `(snapshot_ts, fence_ts]`. +3. Mark cutover-ready when complete. ### 7.4 CUTOVER -1. 親レンジを削除し、子レンジ 2 本を `ACTIVE` で公開。 -2. moving child の `group_id` を target に切替。 -3. route catalog `version` をインクリメント。 +1. Replace parent range with two active child ranges. +2. Switch moving child `group_id` to target. +3. Increment route catalog `version`. ### 7.5 CLEANUP -1. 一定 grace 期間は source に旧データを残す(読み取り保険)。 -2. 期間後に source 側の moving child データを GC。 +1. Keep old copies on source for a grace period (read safety). +2. Garbage-collect moved data after grace period. -## 8. ルーティング整合性 +## 8. Routing Consistency -### 8.1 Route Version 付与 +### 8.1 Route version in requests -- `pb.Request`(`proto/internal.proto`)に `route_version` を追加。 -- Coordinator は現在の route version を request に埋める。 +- Add `route_version` to `pb.Request` (`proto/internal.proto`). +- Coordinator stamps current route version into each request. -### 8.2 所有権検証 +### 8.2 Ownership validation -- `kvFSM` に `groupID` と route resolver を注入。 -- Apply 前に mutation key の `routeKey` 所有グループを検証。 -- 不一致時は `ErrWrongShard` を返す。 +- Inject `groupID` + route resolver into `kvFSM`. +- Validate ownership for each mutation key (via `routeKey`) before apply. +- Return `ErrWrongShard` on mismatch. -### 8.3 stale route 対策 +### 8.3 Stale-route handling -- stale ノードが旧ルートで送信しても、旧グループ leader が reject する。 -- クライアントは再試行時に最新ルートを取得する。 +- Old-group leader rejects stale-route writes. +- Client retries after refreshing routes. -## 9. 移送対象キーの定義 +## 9. Key Coverage for Migration -moving range 判定は raw key ではなく logical route key で行う。 +Range membership must be based on logical route key, not raw storage key. -対象: +Keys to include: -1. user key 本体 -2. list key(`!lst|meta|...`, `!lst|itm|...`) -3. txn key(`!txn|lock|...`, `!txn|int|...`, `!txn|cmt|...`, `!txn|rb|...`) +1. User keys +2. List keys (`!lst|meta|...`, `!lst|itm|...`) +3. Txn keys (`!txn|lock|...`, `!txn|int|...`, `!txn|cmt|...`, `!txn|rb|...`) -必要変更: +Required changes: -- `kv/txn_keys.go` の route key 抽出をエクスポートし、migrator から再利用可能にする。 -- `store` 側に MVCC バージョン export/import API を追加する。 +- Export route-key extraction from `kv/txn_keys.go` for migrator reuse. +- Add MVCC version export/import APIs in `store`. -## 10. API 変更案 +## 10. API Changes ### 10.1 `proto/distribution.proto` -追加 RPC: +Add RPCs: 1. `ReportAccess(ReportAccessRequest) returns (ReportAccessResponse)` 2. `ListRoutes(ListRoutesRequest) returns (ListRoutesResponse)` @@ -227,101 +227,101 @@ moving range 判定は raw key ではなく logical route key で行う。 ### 10.2 `proto/internal.proto` -追加 RPC: +Add RPCs: 1. `ExportRangeVersions(ExportRangeVersionsRequest) returns (stream ExportRangeVersionsResponse)` 2. `ImportRangeVersions(ImportRangeVersionsRequest) returns (ImportRangeVersionsResponse)` -## 11. 変更対象(実装単位) +## 11. Implementation Touchpoints 1. `distribution/engine.go` -- route version/route state を持つ構造へ拡張 -- append 更新ではなく CAS 更新 API 追加 +- Extend route data with state/version. +- Replace append-only updates with CAS update APIs. 2. `kv/shard_store.go` -- read path の access 記録追加 -- moving/fenced レンジ時の挙動整理 +- Add read-path access recording. +- Define behavior for moving/fenced ranges. 3. `kv/sharded_coordinator.go` -- write path の access 記録追加 -- `WRITE_FENCED` への retryable error 返却 +- Add write-path access recording. +- Return retryable errors for `WRITE_FENCED` writes. 4. `kv/fsm.go` -- route ownership 検証追加 -- `ErrWrongShard` を返す分岐追加 +- Add route-ownership checks. +- Return `ErrWrongShard` when ownership mismatches. 5. `adapter/distribution_server.go` -- split/job/watch/report RPC 実装 +- Implement split/job/watch/report APIs. -6. `store/mvcc_store.go`(+ `store/lsm_store.go`) -- range export/import API 実装 +6. `store/mvcc_store.go` and `store/lsm_store.go` +- Implement range version export/import. -## 12. 段階導入計画 +## 12. Phased Rollout -### Milestone 1: 制御面 +### Milestone 1: Control plane -1. Route catalog の永続化 -2. route version + watcher -3. 手動 split API(データ移送なし・同一 group 内分割) +1. Durable route catalog +2. Route version + watcher +3. Manual split API (same-group split, no migration) -### Milestone 2: データ移送 +### Milestone 2: Migration plane 1. MVCC range export/import -2. BACKFILL/FENCE/DELTA/CUTOVER のジョブ化 -3. 手動 split with target group +2. Job phases: BACKFILL/FENCE/DELTA/CUTOVER +3. Manual split with target-group relocation -### Milestone 3: 自動化 +### Milestone 3: Automation -1. access 集計 -2. hotspot detector -3. auto split scheduler(cooldown/hysteresis) +1. Access aggregation +2. Hotspot detector +3. Auto-split scheduler (cooldown/hysteresis) -### Milestone 4: 堅牢化 +### Milestone 4: Hardening -1. stale route reject -2. cleanup GC -3. Jepsen workload 追加 +1. Stale-route reject path +2. Cleanup GC +3. Jepsen hotspot workloads -## 13. テスト戦略 +## 13. Test Strategy -### 13.1 Unit +### 13.1 Unit tests -1. hotspot 判定と split key 選定 -2. route catalog CAS/バージョン遷移 -3. split state machine の phase 遷移 -4. ownership 検証 +1. Hotspot scoring and split-key selection +2. Route catalog CAS/version transitions +3. Split state-machine phase transitions +4. Ownership validation -### 13.2 Integration +### 13.2 Integration tests -1. split 中の並行 write/read の整合性 -2. FENCE 中の retryable エラー確認 -3. リーダー交代・ノード再起動後のジョブ再開 -4. cross-shard txn と split 併用 +1. Consistency under concurrent read/write during split +2. Retryable behavior during FENCE +3. Job resume after leader change/restart +4. Split with cross-shard transactions ### 13.3 Jepsen -1. hotspot 負荷 + partition nemesis -2. hotspot 負荷 + kill nemesis -3. split 実行中の linearizability 検証 +1. Hotspot load + partition nemesis +2. Hotspot load + kill nemesis +3. Linearizability checks during split -## 14. リスクと緩和策 +## 14. Risks and Mitigations -1. リスク: split 中の write 停止時間が長い。 -- 緩和: chunked delta copy、fence 範囲を moving child のみに限定。 +1. Risk: Long write pause during split. +- Mitigation: Chunked delta copy, narrow fence to moving child only. -2. リスク: stale route による誤配送。 -- 緩和: leader 側 ownership 検証 + route version。 +2. Risk: Misrouting via stale route cache. +- Mitigation: Leader-side ownership validation + route version checks. -3. リスク: 内部キー取りこぼしで txn 解決不能。 -- 緩和: logical route key ベースで export 対象を決定し、txn/list の専用テストを追加。 +3. Risk: Internal key omission causing unresolved transactions. +- Mitigation: Logical route-key based filtering + dedicated txn/list migration tests. -4. リスク: cleanup が早すぎると古い reader に影響。 -- 緩和: grace 期間 + route version 監視 + 段階 GC。 +4. Risk: Cleanup too early impacting old readers. +- Mitigation: Grace period + route-version observability + staged GC. -## 15. 受け入れ基準 +## 15. Acceptance Criteria -1. 手動 split で range が 2 分割され、片側を別 group へ移送できる。 -2. split 中に write ロスが発生しない。 -3. split 後に stale route write が reject される。 -4. 障害復旧後に split job が再開され最終的に `DONE` になる。 -5. 既存テスト + 新規 split テスト + Jepsen 拡張シナリオが通る。 +1. Manual split divides one range into two and moves one child to another group. +2. No write loss during split. +3. Stale-route writes are rejected after cutover. +4. Split jobs resume after failure and eventually reach `DONE`. +5. Existing tests + new split tests + extended Jepsen scenarios pass. diff --git a/docs/hotspot_split_milestone1_pr_plan.md b/docs/hotspot_split_milestone1_pr_plan.md new file mode 100644 index 00000000..77a750c4 --- /dev/null +++ b/docs/hotspot_split_milestone1_pr_plan.md @@ -0,0 +1,244 @@ +# Hotspot Split Milestone 1 PR Plan + +## Objective + +Milestone 1 delivers the control plane only: + +1. Durable route catalog. +2. Route versioning and watcher-based refresh. +3. Manual split API (same-group split, no cross-group migration). + +## Out of Scope + +1. Data migration to a different Raft group. +2. Automatic hotspot detection and auto-split scheduling. +3. Stale-route write rejection in FSM (`ErrWrongShard`) and route-version stamping on every internal request. + +## Implementation Strategy + +Use the existing default Raft group as the source of truth for route metadata. + +1. Persist route catalog entries as reserved internal keys in replicated MVCC state. +2. Keep `distribution.Engine` as the in-memory read path used by routers/coordinators. +3. Run a route watcher that refreshes `Engine` from durable catalog version changes. + +This keeps rollout small and avoids introducing a new external metadata service. + +## PR Breakdown + +## PR1: Route Catalog Model and Persistence + +### Goal + +Introduce a durable route catalog format and storage operations. + +### Main files + +1. `distribution/catalog.go` (new) +2. `distribution/catalog_test.go` (new) +3. `kv/` metadata helper file for reserved distribution keys (new) +4. `store/` tests if key encoding helpers are shared there + +### Tasks + +1. Define `RouteDescriptor` and `CatalogSnapshot` structs for durable representation. +2. Define reserved key prefixes, for example: + - `!dist|meta|version` + - `!dist|route|` +3. Implement encoding/decoding helpers (stable binary or protobuf-based). +4. Implement catalog read/write helpers with optimistic version checks. +5. Add unit tests for: + - encode/decode round-trip + - version mismatch behavior + - deterministic ordering in snapshots + +### Done criteria + +1. Catalog structs and key format are stable and documented. +2. Versioned read/write helpers are covered by tests. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... ./kv/... +``` + +## PR2: Engine Refactor for Versioned Snapshot Apply + +### Goal + +Make `distribution.Engine` apply a full route snapshot atomically with catalog version. + +### Main files + +1. `distribution/engine.go` +2. `distribution/engine_test.go` + +### Tasks + +1. Add engine metadata: + - current route catalog version + - route IDs and states needed for control-plane operations +2. Add APIs: + - `ApplySnapshot(snapshot CatalogSnapshot) error` + - `Version() uint64` +3. Keep existing `GetRoute` and scan behavior unchanged for data path callers. +4. Add tests: + - snapshot apply replaces old routes atomically + - old version snapshot is rejected + - lookup behavior remains correct after snapshot apply + +### Done criteria + +1. Engine can be fully refreshed from durable catalog. +2. Existing routing tests remain green. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... +``` + +## PR3: Distribution Admin Service (ListRoutes and SplitRange) + +### Goal + +Add manual control-plane APIs for route introspection and same-group split. + +### Main files + +1. `proto/distribution.proto` +2. Generated files: + - `proto/distribution.pb.go` + - `proto/distribution_grpc.pb.go` +3. `adapter/distribution_server.go` +4. `adapter/distribution_server_test.go` + +### Tasks + +1. Add RPCs: + - `ListRoutes` + - `SplitRange` +2. Add message fields needed for route version and route state. +3. Implement `ListRoutes` by reading the durable catalog. +4. Implement `SplitRange` for same-group split only: + - validate range ownership and boundaries + - create two child ranges in same `group_id` + - bump catalog version atomically +5. Return clear validation errors for: + - unknown route + - invalid split key + - split key at boundaries + - version conflict + +### Done criteria + +1. Manual split updates durable catalog atomically. +2. APIs are covered with success and failure-path tests. + +### Validation + +```bash +cd proto && make gen +cd .. +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./adapter/... ./proto/... +``` + +## PR4: Route Watcher and Runtime Refresh + +### Goal + +Make every node refresh in-memory routes when durable catalog version changes. + +### Main files + +1. `distribution/watcher.go` (new) +2. `distribution/watcher_test.go` (new) +3. `main.go` +4. `cmd/server/demo.go` if demo mode should support this path + +### Tasks + +1. Add a watcher loop with periodic poll (initially simple and robust): + - read durable catalog version + - if version changed, fetch full snapshot + - call `engine.ApplySnapshot(...)` +2. Add lifecycle hooks in server startup and graceful stop. +3. Add tests: + - watcher applies new version + - no-op when version unchanged + - retry on transient read error + +### Done criteria + +1. Route updates made by one leader become visible in all nodes’ engines. +2. Watcher failure does not crash serving path; it retries. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... ./... +``` + +## PR5: End-to-End Milestone 1 Tests and Docs + +### Goal + +Add integration tests for manual split and update operator docs. + +### Main files + +1. `kv/sharded_integration_test.go` +2. `adapter/grpc_test.go` and/or dedicated split integration tests +3. `README.md` +4. `docs/hotspot_shard_split_design.md` + +### Tasks + +1. Add integration scenario: + - start with one range + - write keys on both sides of planned split point + - call `SplitRange` + - verify routing + reads/writes continue correctly after refresh +2. Add restart scenario: + - apply split + - restart node(s) + - confirm routes reload from durable catalog +3. Document manual split API request/response examples. + +### Done criteria + +1. Milestone 1 behavior is reproducible via tests and docs. +2. Restart does not lose route table changes. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./... +GOCACHE=$(pwd)/.cache GOLANGCI_LINT_CACHE=$(pwd)/.golangci-cache golangci-lint run ./... --timeout=5m +``` + +## Merge Order + +1. PR1 +2. PR2 +3. PR3 +4. PR4 +5. PR5 + +Each PR should be independently testable and mergeable. + +## Operational Rollout Checklist (Milestone 1) + +1. Deploy binaries with watcher enabled but manual split unused. +2. Validate `ListRoutes` output on staging. +3. Execute one manual same-group split in staging. +4. Confirm all nodes converge to the same route catalog version. +5. Restart one node and verify route version is preserved. + +## Exit Criteria for Milestone 1 + +1. Route catalog is durable across restart. +2. Manual split API works for same-group split. +3. Route changes propagate to all nodes via watcher refresh. +4. Full test and lint pass in CI. From 7324839f80c3b5a794451c98ee937959d6c717c3 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 01:36:15 +0900 Subject: [PATCH 3/7] distribution: add versioned route catalog persistence helpers - introduce RouteDescriptor, CatalogSnapshot, and RouteState - add reserved catalog key helpers and route/version codecs - implement CatalogStore Snapshot/Save with optimistic version checks - add catalog tests for round-trip, validation, mismatch, and overflow cases --- distribution/catalog.go | 624 +++++++++++++++++++++++++++++++++++ distribution/catalog_test.go | 262 +++++++++++++++ 2 files changed, 886 insertions(+) create mode 100644 distribution/catalog.go create mode 100644 distribution/catalog_test.go diff --git a/distribution/catalog.go b/distribution/catalog.go new file mode 100644 index 00000000..6dcacccb --- /dev/null +++ b/distribution/catalog.go @@ -0,0 +1,624 @@ +package distribution + +import ( + "bytes" + "context" + "encoding/binary" + "sort" + "strconv" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +const ( + catalogMetaPrefix = "!dist|meta|" + catalogRoutePrefix = "!dist|route|" + catalogVersionStorageKey = catalogMetaPrefix + "version" + + catalogUint64Bytes = 8 + catalogVersionRecordSize = 1 + catalogUint64Bytes + + catalogVersionCodecVersion byte = 1 + catalogRouteCodecVersion byte = 1 + + catalogScanPageSize = 256 +) + +var ( + ErrCatalogStoreRequired = errors.New("catalog store is required") + ErrCatalogVersionMismatch = errors.New("catalog version mismatch") + ErrCatalogVersionOverflow = errors.New("catalog version overflow") + ErrCatalogRouteIDRequired = errors.New("catalog route id is required") + ErrCatalogGroupIDRequired = errors.New("catalog group id is required") + ErrCatalogDuplicateRouteID = errors.New("catalog route id must be unique") + ErrCatalogInvalidRouteRange = errors.New("catalog route range is invalid") + ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid") + ErrCatalogInvalidRouteRecord = errors.New("catalog route record is invalid") + ErrCatalogInvalidRouteState = errors.New("catalog route state is invalid") + ErrCatalogInvalidRouteKey = errors.New("catalog route key is invalid") + ErrCatalogRouteKeyIDMismatch = errors.New("catalog route key and record route id mismatch") +) + +// RouteState describes the control-plane state of a route. +type RouteState byte + +const ( + // RouteStateActive is a normal serving route. + RouteStateActive RouteState = iota + // RouteStateWriteFenced blocks writes during cutover. + RouteStateWriteFenced + // RouteStateMigratingSource means range data is being copied out. + RouteStateMigratingSource + // RouteStateMigratingTarget means range data is being copied in. + RouteStateMigratingTarget +) + +func (s RouteState) valid() bool { + switch s { + case RouteStateActive, RouteStateWriteFenced, RouteStateMigratingSource, RouteStateMigratingTarget: + return true + default: + return false + } +} + +// RouteDescriptor is the durable representation of a route. +type RouteDescriptor struct { + RouteID uint64 + Start []byte + End []byte + GroupID uint64 + State RouteState + ParentRouteID uint64 +} + +// CatalogSnapshot is a point-in-time snapshot of the route catalog. +type CatalogSnapshot struct { + Version uint64 + Routes []RouteDescriptor +} + +// CatalogStore provides persistence helpers for route catalog state. +type CatalogStore struct { + store store.MVCCStore +} + +// NewCatalogStore creates a route catalog persistence helper. +func NewCatalogStore(st store.MVCCStore) *CatalogStore { + return &CatalogStore{store: st} +} + +// CatalogVersionKey returns the reserved key used for catalog version storage. +func CatalogVersionKey() []byte { + return []byte(catalogVersionStorageKey) +} + +// CatalogRouteKey returns the reserved key used for a route descriptor. +func CatalogRouteKey(routeID uint64) []byte { + key := make([]byte, len(catalogRoutePrefix)+catalogUint64Bytes) + copy(key, []byte(catalogRoutePrefix)) + binary.BigEndian.PutUint64(key[len(catalogRoutePrefix):], routeID) + return key +} + +// IsCatalogRouteKey reports whether key belongs to the route catalog keyspace. +func IsCatalogRouteKey(key []byte) bool { + return bytes.HasPrefix(key, []byte(catalogRoutePrefix)) +} + +// CatalogRouteIDFromKey parses the route ID from a catalog route key. +func CatalogRouteIDFromKey(key []byte) (uint64, bool) { + if !IsCatalogRouteKey(key) { + return 0, false + } + suffix := key[len(catalogRoutePrefix):] + if len(suffix) != catalogUint64Bytes { + return 0, false + } + return binary.BigEndian.Uint64(suffix), true +} + +// EncodeCatalogVersion serializes a catalog version record. +func EncodeCatalogVersion(version uint64) []byte { + out := make([]byte, catalogVersionRecordSize) + out[0] = catalogVersionCodecVersion + binary.BigEndian.PutUint64(out[1:], version) + return out +} + +// DecodeCatalogVersion deserializes a catalog version record. +func DecodeCatalogVersion(raw []byte) (uint64, error) { + if len(raw) != catalogVersionRecordSize { + return 0, errors.WithStack(ErrCatalogInvalidVersionRecord) + } + if raw[0] != catalogVersionCodecVersion { + return 0, errors.Wrapf(ErrCatalogInvalidVersionRecord, "unsupported version %d", raw[0]) + } + return binary.BigEndian.Uint64(raw[1:]), nil +} + +// EncodeRouteDescriptor serializes a route descriptor record. +func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error) { + if err := validateRouteDescriptor(route); err != nil { + return nil, err + } + + var buf bytes.Buffer + buf.WriteByte(catalogRouteCodecVersion) + _ = binary.Write(&buf, binary.BigEndian, route.RouteID) + _ = binary.Write(&buf, binary.BigEndian, route.GroupID) + buf.WriteByte(byte(route.State)) + _ = binary.Write(&buf, binary.BigEndian, route.ParentRouteID) + _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.Start))) + if len(route.Start) > 0 { + buf.Write(route.Start) + } + if route.End == nil { + buf.WriteByte(0) + return buf.Bytes(), nil + } + + buf.WriteByte(1) + _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.End))) + buf.Write(route.End) + return buf.Bytes(), nil +} + +// DecodeRouteDescriptor deserializes a route descriptor record. +func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error) { + if len(raw) < 1 { + return RouteDescriptor{}, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if raw[0] != catalogRouteCodecVersion { + return RouteDescriptor{}, errors.Wrapf(ErrCatalogInvalidRouteRecord, "unsupported version %d", raw[0]) + } + + r := bytes.NewReader(raw[1:]) + route, err := decodeRouteDescriptorHeader(r) + if err != nil { + return RouteDescriptor{}, err + } + route.End, err = decodeRouteDescriptorEnd(r) + if err != nil { + return RouteDescriptor{}, err + } + if err := validateRouteDescriptor(route); err != nil { + return RouteDescriptor{}, err + } + return route, nil +} + +// Snapshot reads a consistent route catalog snapshot at the store's latest +// known commit timestamp. +func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error) { + if err := ensureCatalogStore(s); err != nil { + return CatalogSnapshot{}, err + } + if ctx == nil { + ctx = context.Background() + } + + readTS := s.store.LastCommitTS() + version, err := s.versionAt(ctx, readTS) + if err != nil { + return CatalogSnapshot{}, err + } + routes, err := s.routesAt(ctx, readTS) + if err != nil { + return CatalogSnapshot{}, err + } + return CatalogSnapshot{Version: version, Routes: routes}, nil +} + +// Save updates the route catalog using optimistic version checks and bumps the +// catalog version by exactly one on success. +func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error) { + if err := ensureCatalogStore(s); err != nil { + return CatalogSnapshot{}, err + } + if ctx == nil { + ctx = context.Background() + } + + plan, err := s.prepareSave(ctx, expectedVersion, routes) + if err != nil { + return CatalogSnapshot{}, err + } + mutations, err := s.buildSaveMutations(ctx, plan) + if err != nil { + return CatalogSnapshot{}, err + } + if err := s.applySaveMutations(ctx, plan, mutations); err != nil { + return CatalogSnapshot{}, err + } + + return CatalogSnapshot{ + Version: plan.nextVersion, + Routes: cloneRouteDescriptors(plan.routes), + }, nil +} + +func ensureCatalogStore(s *CatalogStore) error { + if s == nil || s.store == nil { + return errors.WithStack(ErrCatalogStoreRequired) + } + return nil +} + +func normalizeRoutes(routes []RouteDescriptor) ([]RouteDescriptor, error) { + if len(routes) == 0 { + return []RouteDescriptor{}, nil + } + out := make([]RouteDescriptor, 0, len(routes)) + seen := make(map[uint64]struct{}, len(routes)) + for _, route := range routes { + if err := validateRouteDescriptor(route); err != nil { + return nil, err + } + if _, exists := seen[route.RouteID]; exists { + return nil, errors.WithStack(ErrCatalogDuplicateRouteID) + } + seen[route.RouteID] = struct{}{} + out = append(out, cloneRouteDescriptor(route)) + } + sort.Slice(out, func(i, j int) bool { + return out[i].RouteID < out[j].RouteID + }) + return out, nil +} + +func validateRouteDescriptor(route RouteDescriptor) error { + if route.RouteID == 0 { + return errors.WithStack(ErrCatalogRouteIDRequired) + } + if route.GroupID == 0 { + return errors.WithStack(ErrCatalogGroupIDRequired) + } + if !route.State.valid() { + return errors.WithStack(ErrCatalogInvalidRouteState) + } + if route.End != nil && bytes.Compare(route.Start, route.End) >= 0 { + return errors.WithStack(ErrCatalogInvalidRouteRange) + } + return nil +} + +func cloneRouteDescriptor(route RouteDescriptor) RouteDescriptor { + return RouteDescriptor{ + RouteID: route.RouteID, + Start: cloneBytes(route.Start), + End: cloneBytes(route.End), + GroupID: route.GroupID, + State: route.State, + ParentRouteID: route.ParentRouteID, + } +} + +func cloneRouteDescriptors(routes []RouteDescriptor) []RouteDescriptor { + out := make([]RouteDescriptor, len(routes)) + for i := range routes { + out[i] = cloneRouteDescriptor(routes[i]) + } + return out +} + +func readU64LenBytes(r *bytes.Reader, rawLen uint64) ([]byte, error) { + n, err := u64ToInt(rawLen) + if err != nil { + return nil, err + } + if n > r.Len() { + return nil, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if n == 0 { + return []byte{}, nil + } + out := make([]byte, n) + if _, err := r.Read(out); err != nil { + return nil, errors.WithStack(err) + } + return out, nil +} + +func u64ToInt(v uint64) (int, error) { + if strconv.IntSize == 32 && v > uint64(^uint32(0)>>1) { + return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if strconv.IntSize == 64 && v > (^uint64(0)>>1) { + return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + return int(v), nil +} + +func (s *CatalogStore) versionAt(ctx context.Context, ts uint64) (uint64, error) { + raw, err := s.store.GetAt(ctx, CatalogVersionKey(), ts) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + return 0, nil + } + return 0, errors.WithStack(err) + } + v, err := DecodeCatalogVersion(raw) + if err != nil { + return 0, err + } + return v, nil +} + +func (s *CatalogStore) routesAt(ctx context.Context, ts uint64) ([]RouteDescriptor, error) { + entries, err := s.scanRouteEntriesAt(ctx, ts) + if err != nil { + return nil, err + } + out := make([]RouteDescriptor, 0, len(entries)) + seen := make(map[uint64]struct{}, len(entries)) + for _, kvp := range entries { + routeID, ok := CatalogRouteIDFromKey(kvp.Key) + if !ok { + return nil, errors.WithStack(ErrCatalogInvalidRouteKey) + } + route, err := DecodeRouteDescriptor(kvp.Value) + if err != nil { + return nil, err + } + if route.RouteID != routeID { + return nil, errors.WithStack(ErrCatalogRouteKeyIDMismatch) + } + if _, exists := seen[route.RouteID]; exists { + return nil, errors.WithStack(ErrCatalogDuplicateRouteID) + } + seen[route.RouteID] = struct{}{} + out = append(out, route) + } + sort.Slice(out, func(i, j int) bool { + return out[i].RouteID < out[j].RouteID + }) + return out, nil +} + +func (s *CatalogStore) routeKeysAt(ctx context.Context, ts uint64) ([][]byte, error) { + entries, err := s.scanRouteEntriesAt(ctx, ts) + if err != nil { + return nil, err + } + out := make([][]byte, 0, len(entries)) + for _, kvp := range entries { + out = append(out, cloneBytes(kvp.Key)) + } + return out, nil +} + +func (s *CatalogStore) scanRouteEntriesAt(ctx context.Context, ts uint64) ([]*store.KVPair, error) { + prefix := []byte(catalogRoutePrefix) + upper := prefixScanEnd(prefix) + cursor := cloneBytes(prefix) + out := make([]*store.KVPair, 0) + + for { + page, err := s.store.ScanAt(ctx, cursor, upper, catalogScanPageSize, ts) + if err != nil { + return nil, errors.WithStack(err) + } + if len(page) == 0 { + break + } + + var ( + lastKey []byte + done bool + ) + out, lastKey, done = appendRoutePage(out, page, prefix) + if done { + return out, nil + } + if lastKey == nil || len(page) < catalogScanPageSize { + break + } + nextCursor, inRange := nextCursorWithinUpper(lastKey, upper) + if !inRange { + break + } + cursor = nextCursor + } + return out, nil +} + +type savePlan struct { + readTS uint64 + commitTS uint64 + nextVersion uint64 + routes []RouteDescriptor +} + +func (s *CatalogStore) prepareSave(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (savePlan, error) { + normalized, err := normalizeRoutes(routes) + if err != nil { + return savePlan{}, err + } + + readTS := s.store.LastCommitTS() + currentVersion, err := s.versionAt(ctx, readTS) + if err != nil { + return savePlan{}, err + } + if currentVersion != expectedVersion { + return savePlan{}, errors.WithStack(ErrCatalogVersionMismatch) + } + + nextVersion := expectedVersion + 1 + if nextVersion == 0 { + return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) + } + commitTS := readTS + 1 + if commitTS == 0 { + return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) + } + + return savePlan{ + readTS: readTS, + commitTS: commitTS, + nextVersion: nextVersion, + routes: normalized, + }, nil +} + +func (s *CatalogStore) buildSaveMutations(ctx context.Context, plan savePlan) ([]*store.KVPairMutation, error) { + existingKeys, err := s.routeKeysAt(ctx, plan.readTS) + if err != nil { + return nil, err + } + + mutations := make([]*store.KVPairMutation, 0, len(existingKeys)+len(plan.routes)+1) + mutations = appendDeleteMutations(mutations, existingKeys) + mutations, err = appendRouteMutations(mutations, plan.routes) + if err != nil { + return nil, err + } + mutations = append(mutations, &store.KVPairMutation{ + Op: store.OpTypePut, + Key: CatalogVersionKey(), + Value: EncodeCatalogVersion(plan.nextVersion), + }) + return mutations, nil +} + +func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mutations []*store.KVPairMutation) error { + if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, plan.commitTS); err != nil { + if errors.Is(err, store.ErrWriteConflict) { + return errors.WithStack(ErrCatalogVersionMismatch) + } + return errors.WithStack(err) + } + return nil +} + +func appendDeleteMutations(out []*store.KVPairMutation, keys [][]byte) []*store.KVPairMutation { + for _, key := range keys { + out = append(out, &store.KVPairMutation{ + Op: store.OpTypeDelete, + Key: key, + }) + } + return out +} + +func appendRouteMutations(out []*store.KVPairMutation, routes []RouteDescriptor) ([]*store.KVPairMutation, error) { + for _, route := range routes { + encoded, err := EncodeRouteDescriptor(route) + if err != nil { + return nil, err + } + out = append(out, &store.KVPairMutation{ + Op: store.OpTypePut, + Key: CatalogRouteKey(route.RouteID), + Value: encoded, + }) + } + return out, nil +} + +func decodeRouteDescriptorHeader(r *bytes.Reader) (RouteDescriptor, error) { + var routeID uint64 + var groupID uint64 + var parentRouteID uint64 + var startLen uint64 + + if err := binary.Read(r, binary.BigEndian, &routeID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &groupID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + stateRaw, err := r.ReadByte() + if err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &parentRouteID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &startLen); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + + start, err := readU64LenBytes(r, startLen) + if err != nil { + return RouteDescriptor{}, err + } + return RouteDescriptor{ + RouteID: routeID, + GroupID: groupID, + State: RouteState(stateRaw), + ParentRouteID: parentRouteID, + Start: start, + }, nil +} + +func decodeRouteDescriptorEnd(r *bytes.Reader) ([]byte, error) { + endFlag, err := r.ReadByte() + if err != nil { + return nil, errors.WithStack(err) + } + if endFlag == 0 { + return nil, nil + } + if endFlag != 1 { + return nil, errors.Wrapf(ErrCatalogInvalidRouteRecord, "invalid end flag %d", endFlag) + } + + var endLen uint64 + if err := binary.Read(r, binary.BigEndian, &endLen); err != nil { + return nil, errors.WithStack(err) + } + return readU64LenBytes(r, endLen) +} + +func appendRoutePage(out []*store.KVPair, page []*store.KVPair, prefix []byte) ([]*store.KVPair, []byte, bool) { + var lastKey []byte + for _, kvp := range page { + if kvp == nil { + continue + } + if !bytes.HasPrefix(kvp.Key, prefix) { + return out, lastKey, true + } + out = append(out, &store.KVPair{ + Key: cloneBytes(kvp.Key), + Value: cloneBytes(kvp.Value), + }) + lastKey = kvp.Key + } + return out, lastKey, false +} + +func nextCursorWithinUpper(lastKey []byte, upper []byte) ([]byte, bool) { + if len(lastKey) == 0 { + return nil, false + } + nextCursor := nextScanCursor(lastKey) + if upper != nil && bytes.Compare(nextCursor, upper) >= 0 { + return nil, false + } + return nextCursor, true +} + +func prefixScanEnd(prefix []byte) []byte { + if len(prefix) == 0 { + return nil + } + out := cloneBytes(prefix) + for i := len(out) - 1; i >= 0; i-- { + if out[i] == ^byte(0) { + continue + } + out[i]++ + return out[:i+1] + } + return nil +} + +func nextScanCursor(lastKey []byte) []byte { + next := make([]byte, len(lastKey)+1) + copy(next, lastKey) + return next +} diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go new file mode 100644 index 00000000..8023ba2f --- /dev/null +++ b/distribution/catalog_test.go @@ -0,0 +1,262 @@ +package distribution + +import ( + "bytes" + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +func TestCatalogVersionCodecRoundTrip(t *testing.T) { + raw := EncodeCatalogVersion(42) + got, err := DecodeCatalogVersion(raw) + if err != nil { + t.Fatalf("decode version: %v", err) + } + if got != 42 { + t.Fatalf("expected 42, got %d", got) + } +} + +func TestCatalogVersionCodecRejectsInvalidPayload(t *testing.T) { + if _, err := DecodeCatalogVersion(nil); !errors.Is(err, ErrCatalogInvalidVersionRecord) { + t.Fatalf("expected ErrCatalogInvalidVersionRecord, got %v", err) + } + if _, err := DecodeCatalogVersion([]byte{99, 0, 0, 0, 0, 0, 0, 0, 1}); !errors.Is(err, ErrCatalogInvalidVersionRecord) { + t.Fatalf("expected ErrCatalogInvalidVersionRecord, got %v", err) + } +} + +func TestRouteDescriptorCodecRoundTrip(t *testing.T) { + route := RouteDescriptor{ + RouteID: 7, + Start: []byte("a"), + End: []byte("m"), + GroupID: 3, + State: RouteStateWriteFenced, + ParentRouteID: 2, + } + raw, err := EncodeRouteDescriptor(route) + if err != nil { + t.Fatalf("encode route: %v", err) + } + got, err := DecodeRouteDescriptor(raw) + if err != nil { + t.Fatalf("decode route: %v", err) + } + assertRouteEqual(t, route, got) +} + +func TestRouteDescriptorCodecRoundTripNilEnd(t *testing.T) { + route := RouteDescriptor{ + RouteID: 9, + Start: []byte("m"), + End: nil, + GroupID: 2, + State: RouteStateActive, + ParentRouteID: 0, + } + raw, err := EncodeRouteDescriptor(route) + if err != nil { + t.Fatalf("encode route: %v", err) + } + got, err := DecodeRouteDescriptor(raw) + if err != nil { + t.Fatalf("decode route: %v", err) + } + assertRouteEqual(t, route, got) +} + +func TestCatalogRouteKeyHelpers(t *testing.T) { + key := CatalogRouteKey(11) + if !IsCatalogRouteKey(key) { + t.Fatal("expected route key prefix match") + } + id, ok := CatalogRouteIDFromKey(key) + if !ok { + t.Fatal("expected route id parse to succeed") + } + if id != 11 { + t.Fatalf("expected route id 11, got %d", id) + } + if _, ok := CatalogRouteIDFromKey([]byte("not-a-route-key")); ok { + t.Fatal("expected parse failure for non-route key") + } +} + +func TestCatalogStoreSnapshotEmpty(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + snapshot, err := cs.Snapshot(context.Background()) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 0 { + t.Fatalf("expected empty version 0, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 0 { + t.Fatalf("expected no routes, got %d", len(snapshot.Routes)) + } +} + +func TestCatalogStoreSaveAndSnapshot(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + + saved, err := cs.Save(ctx, 0, []RouteDescriptor{ + { + RouteID: 2, + Start: []byte("m"), + End: nil, + GroupID: 2, + State: RouteStateActive, + ParentRouteID: 1, + }, + { + RouteID: 1, + Start: []byte(""), + End: []byte("m"), + GroupID: 1, + State: RouteStateActive, + ParentRouteID: 0, + }, + }) + if err != nil { + t.Fatalf("save: %v", err) + } + if saved.Version != 1 { + t.Fatalf("expected version 1, got %d", saved.Version) + } + if len(saved.Routes) != 2 { + t.Fatalf("expected 2 routes, got %d", len(saved.Routes)) + } + if saved.Routes[0].RouteID != 1 || saved.Routes[1].RouteID != 2 { + t.Fatalf("expected sorted route ids [1,2], got [%d,%d]", saved.Routes[0].RouteID, saved.Routes[1].RouteID) + } + + snapshot, err := cs.Snapshot(ctx) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 1 { + t.Fatalf("expected version 1, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 2 { + t.Fatalf("expected 2 routes, got %d", len(snapshot.Routes)) + } + assertRouteEqual(t, saved.Routes[0], snapshot.Routes[0]) + assertRouteEqual(t, saved.Routes[1], snapshot.Routes[1]) +} + +func TestCatalogStoreSaveRejectsVersionMismatch(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + routes := []RouteDescriptor{ + { + RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive, + }, + } + + if _, err := cs.Save(ctx, 0, routes); err != nil { + t.Fatalf("first save: %v", err) + } + if _, err := cs.Save(ctx, 0, routes); !errors.Is(err, ErrCatalogVersionMismatch) { + t.Fatalf("expected ErrCatalogVersionMismatch, got %v", err) + } +} + +func TestCatalogStoreSaveRejectsDuplicateRouteIDs(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 1, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogDuplicateRouteID) { + t.Fatalf("expected ErrCatalogDuplicateRouteID, got %v", err) + } +} + +func TestCatalogStoreSaveRejectsInvalidRouteRange(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte("z"), End: []byte("a"), GroupID: 1, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogInvalidRouteRange) { + t.Fatalf("expected ErrCatalogInvalidRouteRange, got %v", err) + } +} + +func TestCatalogStoreSaveDeletesRemovedRoutes(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if err != nil { + t.Fatalf("first save: %v", err) + } + + _, err = cs.Save(ctx, 1, []RouteDescriptor{ + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if err != nil { + t.Fatalf("second save: %v", err) + } + + snapshot, err := cs.Snapshot(ctx) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 2 { + t.Fatalf("expected version 2, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 1 { + t.Fatalf("expected 1 route after delete, got %d", len(snapshot.Routes)) + } + if snapshot.Routes[0].RouteID != 2 { + t.Fatalf("expected remaining route id 2, got %d", snapshot.Routes[0].RouteID) + } +} + +func TestCatalogStoreSaveRejectsVersionOverflow(t *testing.T) { + st := store.NewMVCCStore() + ctx := context.Background() + // Prepare current catalog version == max uint64. + if err := st.PutAt(ctx, CatalogVersionKey(), EncodeCatalogVersion(^uint64(0)), 1, 0); err != nil { + t.Fatalf("prepare version key: %v", err) + } + cs := NewCatalogStore(st) + _, err := cs.Save(ctx, ^uint64(0), []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogVersionOverflow) { + t.Fatalf("expected ErrCatalogVersionOverflow, got %v", err) + } +} + +func assertRouteEqual(t *testing.T, want, got RouteDescriptor) { + t.Helper() + if want.RouteID != got.RouteID { + t.Fatalf("route id mismatch: want %d, got %d", want.RouteID, got.RouteID) + } + if want.GroupID != got.GroupID { + t.Fatalf("group id mismatch: want %d, got %d", want.GroupID, got.GroupID) + } + if want.ParentRouteID != got.ParentRouteID { + t.Fatalf("parent route id mismatch: want %d, got %d", want.ParentRouteID, got.ParentRouteID) + } + if want.State != got.State { + t.Fatalf("state mismatch: want %d, got %d", want.State, got.State) + } + if !bytes.Equal(want.Start, got.Start) { + t.Fatalf("start mismatch: want %q, got %q", want.Start, got.Start) + } + if !bytes.Equal(want.End, got.End) { + t.Fatalf("end mismatch: want %q, got %q", want.End, got.End) + } +} From df66633125f0d9f5e4e0584324f0f4c4db70b2af Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 01:46:33 +0900 Subject: [PATCH 4/7] distribution: address PR #316 review feedback - fix gosec G115 by simplifying uint64->int overflow guard in u64ToInt - add commit timestamp overflow test for CatalogStore.Save - align hotspot split design doc with implementation: - remove per-route version from RouteDescriptor - document global catalog snapshot versioning and catalog-level CAS model --- distribution/catalog.go | 7 ++----- distribution/catalog_test.go | 22 ++++++++++++++++++++++ docs/hotspot_shard_split_design.md | 12 +++++++++++- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/distribution/catalog.go b/distribution/catalog.go index 6dcacccb..581825e3 100644 --- a/distribution/catalog.go +++ b/distribution/catalog.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "sort" - "strconv" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -322,10 +321,8 @@ func readU64LenBytes(r *bytes.Reader, rawLen uint64) ([]byte, error) { } func u64ToInt(v uint64) (int, error) { - if strconv.IntSize == 32 && v > uint64(^uint32(0)>>1) { - return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) - } - if strconv.IntSize == 64 && v > (^uint64(0)>>1) { + const maxInt = int(^uint(0) >> 1) + if v > uint64(maxInt) { return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) } return int(v), nil diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go index 8023ba2f..e506e084 100644 --- a/distribution/catalog_test.go +++ b/distribution/catalog_test.go @@ -239,6 +239,28 @@ func TestCatalogStoreSaveRejectsVersionOverflow(t *testing.T) { } } +func TestCatalogStoreSaveRejectsCommitTSOverflow(t *testing.T) { + st := store.NewMVCCStore() + ctx := context.Background() + + // Set catalog version at a high commit TS. + if err := st.PutAt(ctx, CatalogVersionKey(), EncodeCatalogVersion(10), ^uint64(0)-1, 0); err != nil { + t.Fatalf("prepare version key: %v", err) + } + // Push LastCommitTS to max uint64 so prepareSave overflows readTS+1. + if err := st.PutAt(ctx, []byte("!dist|meta|sentinel"), []byte("x"), ^uint64(0), 0); err != nil { + t.Fatalf("prepare sentinel key: %v", err) + } + + cs := NewCatalogStore(st) + _, err := cs.Save(ctx, 10, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogVersionOverflow) { + t.Fatalf("expected ErrCatalogVersionOverflow, got %v", err) + } +} + func assertRouteEqual(t *testing.T, want, got RouteDescriptor) { t.Helper() if want.RouteID != got.RouteID { diff --git a/docs/hotspot_shard_split_design.md b/docs/hotspot_shard_split_design.md index 3f9ff754..6a2497f1 100644 --- a/docs/hotspot_shard_split_design.md +++ b/docs/hotspot_shard_split_design.md @@ -79,10 +79,20 @@ Responsibilities: - `start []byte` (inclusive) - `end []byte` (exclusive, nil=+inf) - `group_id uint64` -- `version uint64` (catalog generation) - `state` (`ACTIVE`, `WRITE_FENCED`, `MIGRATING_SOURCE`, `MIGRATING_TARGET`) - `parent_route_id uint64` (lineage) +`CatalogSnapshot`: + +- `version uint64` (global catalog generation) +- `routes []RouteDescriptor` + +Versioning model note: + +- The current implementation uses a single global catalog version. +- Per-route version fields are not used. +- Store-level OCC with per-record version checks is not the mechanism here; consistency is coordinated by leader-side sequencing and catalog-level compare-and-swap. + `SplitJob`: - `job_id string` From b17ebe72bcb64e3d97e1f738a8eebbc5e898e81f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 01:56:17 +0900 Subject: [PATCH 5/7] distribution: make catalog save commit timestamps monotonic - recompute catalog commit TS at apply time to avoid stale planned timestamps - ensure commit TS is greater than both planned minimum and current store LastCommitTS - keep overflow checks for commit TS calculation - add regression test for concurrent LastCommitTS advancement between prepare and apply --- distribution/catalog.go | 27 ++++++++++++++++++++----- distribution/catalog_test.go | 38 ++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/distribution/catalog.go b/distribution/catalog.go index 581825e3..0b38a169 100644 --- a/distribution/catalog.go +++ b/distribution/catalog.go @@ -423,7 +423,7 @@ func (s *CatalogStore) scanRouteEntriesAt(ctx context.Context, ts uint64) ([]*st type savePlan struct { readTS uint64 - commitTS uint64 + minCommitTS uint64 nextVersion uint64 routes []RouteDescriptor } @@ -447,14 +447,14 @@ func (s *CatalogStore) prepareSave(ctx context.Context, expectedVersion uint64, if nextVersion == 0 { return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) } - commitTS := readTS + 1 - if commitTS == 0 { + minCommitTS := readTS + 1 + if minCommitTS == 0 { return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) } return savePlan{ readTS: readTS, - commitTS: commitTS, + minCommitTS: minCommitTS, nextVersion: nextVersion, routes: normalized, }, nil @@ -481,7 +481,11 @@ func (s *CatalogStore) buildSaveMutations(ctx context.Context, plan savePlan) ([ } func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mutations []*store.KVPairMutation) error { - if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, plan.commitTS); err != nil { + commitTS, err := s.commitTSForApply(plan.minCommitTS) + if err != nil { + return err + } + if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, commitTS); err != nil { if errors.Is(err, store.ErrWriteConflict) { return errors.WithStack(ErrCatalogVersionMismatch) } @@ -490,6 +494,19 @@ func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mu return nil } +func (s *CatalogStore) commitTSForApply(minCommitTS uint64) (uint64, error) { + currentLast := s.store.LastCommitTS() + candidateFromStore := currentLast + 1 + if candidateFromStore == 0 { + return 0, errors.WithStack(ErrCatalogVersionOverflow) + } + + if candidateFromStore > minCommitTS { + return candidateFromStore, nil + } + return minCommitTS, nil +} + func appendDeleteMutations(out []*store.KVPairMutation, keys [][]byte) []*store.KVPairMutation { for _, key := range keys { out = append(out, &store.KVPairMutation{ diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go index e506e084..57947913 100644 --- a/distribution/catalog_test.go +++ b/distribution/catalog_test.go @@ -261,6 +261,44 @@ func TestCatalogStoreSaveRejectsCommitTSOverflow(t *testing.T) { } } +func TestCatalogStoreApplySaveMutations_UsesMonotonicCommitTS(t *testing.T) { + st := store.NewMVCCStore() + ctx := context.Background() + cs := NewCatalogStore(st) + + plan, err := cs.prepareSave(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}, + }) + if err != nil { + t.Fatalf("prepareSave: %v", err) + } + + // Simulate unrelated writes advancing the global LastCommitTS after planning. + advancedTS := plan.minCommitTS + 50 + if err := st.PutAt(ctx, []byte("unrelated"), []byte("v"), advancedTS, 0); err != nil { + t.Fatalf("advance LastCommitTS: %v", err) + } + + mutations, err := cs.buildSaveMutations(ctx, plan) + if err != nil { + t.Fatalf("buildSaveMutations: %v", err) + } + if err := cs.applySaveMutations(ctx, plan, mutations); err != nil { + t.Fatalf("applySaveMutations: %v", err) + } + + ts, exists, err := st.LatestCommitTS(ctx, CatalogVersionKey()) + if err != nil { + t.Fatalf("LatestCommitTS(version key): %v", err) + } + if !exists { + t.Fatal("expected catalog version key to exist") + } + if ts <= advancedTS { + t.Fatalf("expected catalog commit TS to be > %d, got %d", advancedTS, ts) + } +} + func assertRouteEqual(t *testing.T, want, got RouteDescriptor) { t.Helper() if want.RouteID != got.RouteID { From dae9d27938ba8a8791bb1bf174e882d7282d137f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 18 Feb 2026 22:08:35 +0900 Subject: [PATCH 6/7] distribution: sort catalog routes by range start to align engine ordering --- distribution/catalog.go | 34 ++++++++++++++++++++----- distribution/catalog_test.go | 48 ++++++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/distribution/catalog.go b/distribution/catalog.go index 0b38a169..b3b776b3 100644 --- a/distribution/catalog.go +++ b/distribution/catalog.go @@ -261,9 +261,7 @@ func normalizeRoutes(routes []RouteDescriptor) ([]RouteDescriptor, error) { seen[route.RouteID] = struct{}{} out = append(out, cloneRouteDescriptor(route)) } - sort.Slice(out, func(i, j int) bool { - return out[i].RouteID < out[j].RouteID - }) + sortRouteDescriptors(out) return out, nil } @@ -302,6 +300,32 @@ func cloneRouteDescriptors(routes []RouteDescriptor) []RouteDescriptor { return out } +func sortRouteDescriptors(routes []RouteDescriptor) { + sort.Slice(routes, func(i, j int) bool { + return routeDescriptorLess(routes[i], routes[j]) + }) +} + +func routeDescriptorLess(left, right RouteDescriptor) bool { + if c := bytes.Compare(left.Start, right.Start); c != 0 { + return c < 0 + } + + if left.End == nil && right.End != nil { + return false + } + if left.End != nil && right.End == nil { + return true + } + if left.End != nil && right.End != nil { + if c := bytes.Compare(left.End, right.End); c != 0 { + return c < 0 + } + } + + return left.RouteID < right.RouteID +} + func readU64LenBytes(r *bytes.Reader, rawLen uint64) ([]byte, error) { n, err := u64ToInt(rawLen) if err != nil { @@ -368,9 +392,7 @@ func (s *CatalogStore) routesAt(ctx context.Context, ts uint64) ([]RouteDescript seen[route.RouteID] = struct{}{} out = append(out, route) } - sort.Slice(out, func(i, j int) bool { - return out[i].RouteID < out[j].RouteID - }) + sortRouteDescriptors(out) return out, nil } diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go index 57947913..3ff2ab49 100644 --- a/distribution/catalog_test.go +++ b/distribution/catalog_test.go @@ -131,8 +131,8 @@ func TestCatalogStoreSaveAndSnapshot(t *testing.T) { if len(saved.Routes) != 2 { t.Fatalf("expected 2 routes, got %d", len(saved.Routes)) } - if saved.Routes[0].RouteID != 1 || saved.Routes[1].RouteID != 2 { - t.Fatalf("expected sorted route ids [1,2], got [%d,%d]", saved.Routes[0].RouteID, saved.Routes[1].RouteID) + if !bytes.Equal(saved.Routes[0].Start, []byte("")) || !bytes.Equal(saved.Routes[1].Start, []byte("m")) { + t.Fatalf("expected routes sorted by start key, got starts [%q,%q]", saved.Routes[0].Start, saved.Routes[1].Start) } snapshot, err := cs.Snapshot(ctx) @@ -149,6 +149,50 @@ func TestCatalogStoreSaveAndSnapshot(t *testing.T) { assertRouteEqual(t, saved.Routes[1], snapshot.Routes[1]) } +func TestCatalogStoreSaveAndSnapshotSortsRoutesByStart(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + + saved, err := cs.Save(ctx, 0, []RouteDescriptor{ + { + RouteID: 10, + Start: []byte("m"), + End: nil, + GroupID: 2, + State: RouteStateActive, + ParentRouteID: 0, + }, + { + RouteID: 20, + Start: []byte(""), + End: []byte("m"), + GroupID: 1, + State: RouteStateActive, + ParentRouteID: 0, + }, + }) + if err != nil { + t.Fatalf("save: %v", err) + } + if len(saved.Routes) != 2 { + t.Fatalf("expected 2 routes, got %d", len(saved.Routes)) + } + if saved.Routes[0].RouteID != 20 || saved.Routes[1].RouteID != 10 { + t.Fatalf("expected route order by start key [20,10], got [%d,%d]", saved.Routes[0].RouteID, saved.Routes[1].RouteID) + } + + snapshot, err := cs.Snapshot(ctx) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if len(snapshot.Routes) != 2 { + t.Fatalf("expected 2 routes in snapshot, got %d", len(snapshot.Routes)) + } + if snapshot.Routes[0].RouteID != 20 || snapshot.Routes[1].RouteID != 10 { + t.Fatalf("expected snapshot route order by start key [20,10], got [%d,%d]", snapshot.Routes[0].RouteID, snapshot.Routes[1].RouteID) + } +} + func TestCatalogStoreSaveRejectsVersionMismatch(t *testing.T) { cs := NewCatalogStore(store.NewMVCCStore()) ctx := context.Background() From 778db63cc95c25cf659b963f2b94c9eec9892c9d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 19 Feb 2026 00:14:27 +0900 Subject: [PATCH 7/7] Refactor route encoding for efficiency and clarity --- distribution/catalog.go | 106 ++++++++++++++++++++++------------- distribution/catalog_test.go | 60 ++++++++++++++++++++ 2 files changed, 127 insertions(+), 39 deletions(-) diff --git a/distribution/catalog.go b/distribution/catalog.go index b3b776b3..93e3da00 100644 --- a/distribution/catalog.go +++ b/distribution/catalog.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "math" "sort" "github.com/bootjp/elastickv/store" @@ -143,25 +144,23 @@ func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error) { return nil, err } - var buf bytes.Buffer - buf.WriteByte(catalogRouteCodecVersion) - _ = binary.Write(&buf, binary.BigEndian, route.RouteID) - _ = binary.Write(&buf, binary.BigEndian, route.GroupID) - buf.WriteByte(byte(route.State)) - _ = binary.Write(&buf, binary.BigEndian, route.ParentRouteID) - _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.Start))) - if len(route.Start) > 0 { - buf.Write(route.Start) - } + out := make([]byte, 0, routeDescriptorEncodedSize(route)) + out = append(out, catalogRouteCodecVersion) + out = appendU64(out, route.RouteID) + out = appendU64(out, route.GroupID) + out = append(out, byte(route.State)) + out = appendU64(out, route.ParentRouteID) + out = appendU64(out, uint64(len(route.Start))) + out = append(out, route.Start...) if route.End == nil { - buf.WriteByte(0) - return buf.Bytes(), nil + out = append(out, 0) + return out, nil } - buf.WriteByte(1) - _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.End))) - buf.Write(route.End) - return buf.Bytes(), nil + out = append(out, 1) + out = appendU64(out, uint64(len(route.End))) + out = append(out, route.End...) + return out, nil } // DecodeRouteDescriptor deserializes a route descriptor record. @@ -182,6 +181,9 @@ func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error) { if err != nil { return RouteDescriptor{}, err } + if r.Len() != 0 { + return RouteDescriptor{}, errors.WithStack(ErrCatalogInvalidRouteRecord) + } if err := validateRouteDescriptor(route); err != nil { return RouteDescriptor{}, err } @@ -345,8 +347,7 @@ func readU64LenBytes(r *bytes.Reader, rawLen uint64) ([]byte, error) { } func u64ToInt(v uint64) (int, error) { - const maxInt = int(^uint(0) >> 1) - if v > uint64(maxInt) { + if v > uint64(math.MaxInt) { return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) } return int(v), nil @@ -396,18 +397,6 @@ func (s *CatalogStore) routesAt(ctx context.Context, ts uint64) ([]RouteDescript return out, nil } -func (s *CatalogStore) routeKeysAt(ctx context.Context, ts uint64) ([][]byte, error) { - entries, err := s.scanRouteEntriesAt(ctx, ts) - if err != nil { - return nil, err - } - out := make([][]byte, 0, len(entries)) - for _, kvp := range entries { - out = append(out, cloneBytes(kvp.Key)) - } - return out, nil -} - func (s *CatalogStore) scanRouteEntriesAt(ctx context.Context, ts uint64) ([]*store.KVPair, error) { prefix := []byte(catalogRoutePrefix) upper := prefixScanEnd(prefix) @@ -483,14 +472,14 @@ func (s *CatalogStore) prepareSave(ctx context.Context, expectedVersion uint64, } func (s *CatalogStore) buildSaveMutations(ctx context.Context, plan savePlan) ([]*store.KVPairMutation, error) { - existingKeys, err := s.routeKeysAt(ctx, plan.readTS) + existingRoutes, err := s.routesAt(ctx, plan.readTS) if err != nil { return nil, err } - mutations := make([]*store.KVPairMutation, 0, len(existingKeys)+len(plan.routes)+1) - mutations = appendDeleteMutations(mutations, existingKeys) - mutations, err = appendRouteMutations(mutations, plan.routes) + mutations := make([]*store.KVPairMutation, 0, len(existingRoutes)+len(plan.routes)+1) + mutations = appendDeleteRouteMutations(mutations, existingRoutes, plan.routes) + mutations, err = appendUpsertRouteMutations(mutations, existingRoutes, plan.routes) if err != nil { return nil, err } @@ -529,18 +518,34 @@ func (s *CatalogStore) commitTSForApply(minCommitTS uint64) (uint64, error) { return minCommitTS, nil } -func appendDeleteMutations(out []*store.KVPairMutation, keys [][]byte) []*store.KVPairMutation { - for _, key := range keys { +func appendDeleteRouteMutations(out []*store.KVPairMutation, existing []RouteDescriptor, desired []RouteDescriptor) []*store.KVPairMutation { + desiredByID := make(map[uint64]struct{}, len(desired)) + for _, route := range desired { + desiredByID[route.RouteID] = struct{}{} + } + + for _, route := range existing { + if _, ok := desiredByID[route.RouteID]; ok { + continue + } out = append(out, &store.KVPairMutation{ Op: store.OpTypeDelete, - Key: key, + Key: CatalogRouteKey(route.RouteID), }) } return out } -func appendRouteMutations(out []*store.KVPairMutation, routes []RouteDescriptor) ([]*store.KVPairMutation, error) { - for _, route := range routes { +func appendUpsertRouteMutations(out []*store.KVPairMutation, existing []RouteDescriptor, desired []RouteDescriptor) ([]*store.KVPairMutation, error) { + existingByID := make(map[uint64]RouteDescriptor, len(existing)) + for _, route := range existing { + existingByID[route.RouteID] = route + } + + for _, route := range desired { + if existingRoute, ok := existingByID[route.RouteID]; ok && routeDescriptorEqual(existingRoute, route) { + continue + } encoded, err := EncodeRouteDescriptor(route) if err != nil { return nil, err @@ -554,6 +559,29 @@ func appendRouteMutations(out []*store.KVPairMutation, routes []RouteDescriptor) return out, nil } +func routeDescriptorEqual(left, right RouteDescriptor) bool { + return left.RouteID == right.RouteID && + bytes.Equal(left.Start, right.Start) && + bytes.Equal(left.End, right.End) && + left.GroupID == right.GroupID && + left.State == right.State && + left.ParentRouteID == right.ParentRouteID +} + +func appendU64(dst []byte, v uint64) []byte { + var encoded [catalogUint64Bytes]byte + binary.BigEndian.PutUint64(encoded[:], v) + return append(dst, encoded[:]...) +} + +func routeDescriptorEncodedSize(route RouteDescriptor) int { + size := 1 + catalogUint64Bytes + catalogUint64Bytes + 1 + catalogUint64Bytes + catalogUint64Bytes + len(route.Start) + 1 + if route.End != nil { + size += catalogUint64Bytes + len(route.End) + } + return size +} + func decodeRouteDescriptorHeader(r *bytes.Reader) (RouteDescriptor, error) { var routeID uint64 var groupID uint64 diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go index 3ff2ab49..3923a7bf 100644 --- a/distribution/catalog_test.go +++ b/distribution/catalog_test.go @@ -69,6 +69,27 @@ func TestRouteDescriptorCodecRoundTripNilEnd(t *testing.T) { assertRouteEqual(t, route, got) } +func TestRouteDescriptorCodecRejectsTrailingBytes(t *testing.T) { + route := RouteDescriptor{ + RouteID: 1, + Start: []byte("a"), + End: []byte("m"), + GroupID: 1, + State: RouteStateActive, + ParentRouteID: 0, + } + raw, err := EncodeRouteDescriptor(route) + if err != nil { + t.Fatalf("encode route: %v", err) + } + raw = append(raw, 0xff) + + _, err = DecodeRouteDescriptor(raw) + if !errors.Is(err, ErrCatalogInvalidRouteRecord) { + t.Fatalf("expected ErrCatalogInvalidRouteRecord, got %v", err) + } +} + func TestCatalogRouteKeyHelpers(t *testing.T) { key := CatalogRouteKey(11) if !IsCatalogRouteKey(key) { @@ -267,6 +288,45 @@ func TestCatalogStoreSaveDeletesRemovedRoutes(t *testing.T) { } } +func TestCatalogStoreSaveDoesNotRewriteUnchangedRoutes(t *testing.T) { + st := store.NewMVCCStore() + cs := NewCatalogStore(st) + ctx := context.Background() + + routes := []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + } + _, err := cs.Save(ctx, 0, routes) + if err != nil { + t.Fatalf("first save: %v", err) + } + + route1FirstTS, exists, err := st.LatestCommitTS(ctx, CatalogRouteKey(1)) + if err != nil { + t.Fatalf("route 1 latest ts (first): %v", err) + } + if !exists { + t.Fatal("expected route 1 key to exist") + } + + _, err = cs.Save(ctx, 1, routes) + if err != nil { + t.Fatalf("second save: %v", err) + } + + route1SecondTS, exists, err := st.LatestCommitTS(ctx, CatalogRouteKey(1)) + if err != nil { + t.Fatalf("route 1 latest ts (second): %v", err) + } + if !exists { + t.Fatal("expected route 1 key to exist after second save") + } + if route1SecondTS != route1FirstTS { + t.Fatalf("expected unchanged route key commit ts to remain %d, got %d", route1FirstTS, route1SecondTS) + } +} + func TestCatalogStoreSaveRejectsVersionOverflow(t *testing.T) { st := store.NewMVCCStore() ctx := context.Background()