diff --git a/distribution/catalog.go b/distribution/catalog.go new file mode 100644 index 00000000..6dcacccb --- /dev/null +++ b/distribution/catalog.go @@ -0,0 +1,624 @@ +package distribution + +import ( + "bytes" + "context" + "encoding/binary" + "sort" + "strconv" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +const ( + catalogMetaPrefix = "!dist|meta|" + catalogRoutePrefix = "!dist|route|" + catalogVersionStorageKey = catalogMetaPrefix + "version" + + catalogUint64Bytes = 8 + catalogVersionRecordSize = 1 + catalogUint64Bytes + + catalogVersionCodecVersion byte = 1 + catalogRouteCodecVersion byte = 1 + + catalogScanPageSize = 256 +) + +var ( + ErrCatalogStoreRequired = errors.New("catalog store is required") + ErrCatalogVersionMismatch = errors.New("catalog version mismatch") + ErrCatalogVersionOverflow = errors.New("catalog version overflow") + ErrCatalogRouteIDRequired = errors.New("catalog route id is required") + ErrCatalogGroupIDRequired = errors.New("catalog group id is required") + ErrCatalogDuplicateRouteID = errors.New("catalog route id must be unique") + ErrCatalogInvalidRouteRange = errors.New("catalog route range is invalid") + ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid") + ErrCatalogInvalidRouteRecord = errors.New("catalog route record is invalid") + ErrCatalogInvalidRouteState = errors.New("catalog route state is invalid") + ErrCatalogInvalidRouteKey = errors.New("catalog route key is invalid") + ErrCatalogRouteKeyIDMismatch = errors.New("catalog route key and record route id mismatch") +) + +// RouteState describes the control-plane state of a route. +type RouteState byte + +const ( + // RouteStateActive is a normal serving route. + RouteStateActive RouteState = iota + // RouteStateWriteFenced blocks writes during cutover. + RouteStateWriteFenced + // RouteStateMigratingSource means range data is being copied out. + RouteStateMigratingSource + // RouteStateMigratingTarget means range data is being copied in. + RouteStateMigratingTarget +) + +func (s RouteState) valid() bool { + switch s { + case RouteStateActive, RouteStateWriteFenced, RouteStateMigratingSource, RouteStateMigratingTarget: + return true + default: + return false + } +} + +// RouteDescriptor is the durable representation of a route. +type RouteDescriptor struct { + RouteID uint64 + Start []byte + End []byte + GroupID uint64 + State RouteState + ParentRouteID uint64 +} + +// CatalogSnapshot is a point-in-time snapshot of the route catalog. +type CatalogSnapshot struct { + Version uint64 + Routes []RouteDescriptor +} + +// CatalogStore provides persistence helpers for route catalog state. +type CatalogStore struct { + store store.MVCCStore +} + +// NewCatalogStore creates a route catalog persistence helper. +func NewCatalogStore(st store.MVCCStore) *CatalogStore { + return &CatalogStore{store: st} +} + +// CatalogVersionKey returns the reserved key used for catalog version storage. +func CatalogVersionKey() []byte { + return []byte(catalogVersionStorageKey) +} + +// CatalogRouteKey returns the reserved key used for a route descriptor. +func CatalogRouteKey(routeID uint64) []byte { + key := make([]byte, len(catalogRoutePrefix)+catalogUint64Bytes) + copy(key, []byte(catalogRoutePrefix)) + binary.BigEndian.PutUint64(key[len(catalogRoutePrefix):], routeID) + return key +} + +// IsCatalogRouteKey reports whether key belongs to the route catalog keyspace. +func IsCatalogRouteKey(key []byte) bool { + return bytes.HasPrefix(key, []byte(catalogRoutePrefix)) +} + +// CatalogRouteIDFromKey parses the route ID from a catalog route key. +func CatalogRouteIDFromKey(key []byte) (uint64, bool) { + if !IsCatalogRouteKey(key) { + return 0, false + } + suffix := key[len(catalogRoutePrefix):] + if len(suffix) != catalogUint64Bytes { + return 0, false + } + return binary.BigEndian.Uint64(suffix), true +} + +// EncodeCatalogVersion serializes a catalog version record. +func EncodeCatalogVersion(version uint64) []byte { + out := make([]byte, catalogVersionRecordSize) + out[0] = catalogVersionCodecVersion + binary.BigEndian.PutUint64(out[1:], version) + return out +} + +// DecodeCatalogVersion deserializes a catalog version record. +func DecodeCatalogVersion(raw []byte) (uint64, error) { + if len(raw) != catalogVersionRecordSize { + return 0, errors.WithStack(ErrCatalogInvalidVersionRecord) + } + if raw[0] != catalogVersionCodecVersion { + return 0, errors.Wrapf(ErrCatalogInvalidVersionRecord, "unsupported version %d", raw[0]) + } + return binary.BigEndian.Uint64(raw[1:]), nil +} + +// EncodeRouteDescriptor serializes a route descriptor record. +func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error) { + if err := validateRouteDescriptor(route); err != nil { + return nil, err + } + + var buf bytes.Buffer + buf.WriteByte(catalogRouteCodecVersion) + _ = binary.Write(&buf, binary.BigEndian, route.RouteID) + _ = binary.Write(&buf, binary.BigEndian, route.GroupID) + buf.WriteByte(byte(route.State)) + _ = binary.Write(&buf, binary.BigEndian, route.ParentRouteID) + _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.Start))) + if len(route.Start) > 0 { + buf.Write(route.Start) + } + if route.End == nil { + buf.WriteByte(0) + return buf.Bytes(), nil + } + + buf.WriteByte(1) + _ = binary.Write(&buf, binary.BigEndian, uint64(len(route.End))) + buf.Write(route.End) + return buf.Bytes(), nil +} + +// DecodeRouteDescriptor deserializes a route descriptor record. +func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error) { + if len(raw) < 1 { + return RouteDescriptor{}, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if raw[0] != catalogRouteCodecVersion { + return RouteDescriptor{}, errors.Wrapf(ErrCatalogInvalidRouteRecord, "unsupported version %d", raw[0]) + } + + r := bytes.NewReader(raw[1:]) + route, err := decodeRouteDescriptorHeader(r) + if err != nil { + return RouteDescriptor{}, err + } + route.End, err = decodeRouteDescriptorEnd(r) + if err != nil { + return RouteDescriptor{}, err + } + if err := validateRouteDescriptor(route); err != nil { + return RouteDescriptor{}, err + } + return route, nil +} + +// Snapshot reads a consistent route catalog snapshot at the store's latest +// known commit timestamp. +func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error) { + if err := ensureCatalogStore(s); err != nil { + return CatalogSnapshot{}, err + } + if ctx == nil { + ctx = context.Background() + } + + readTS := s.store.LastCommitTS() + version, err := s.versionAt(ctx, readTS) + if err != nil { + return CatalogSnapshot{}, err + } + routes, err := s.routesAt(ctx, readTS) + if err != nil { + return CatalogSnapshot{}, err + } + return CatalogSnapshot{Version: version, Routes: routes}, nil +} + +// Save updates the route catalog using optimistic version checks and bumps the +// catalog version by exactly one on success. +func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error) { + if err := ensureCatalogStore(s); err != nil { + return CatalogSnapshot{}, err + } + if ctx == nil { + ctx = context.Background() + } + + plan, err := s.prepareSave(ctx, expectedVersion, routes) + if err != nil { + return CatalogSnapshot{}, err + } + mutations, err := s.buildSaveMutations(ctx, plan) + if err != nil { + return CatalogSnapshot{}, err + } + if err := s.applySaveMutations(ctx, plan, mutations); err != nil { + return CatalogSnapshot{}, err + } + + return CatalogSnapshot{ + Version: plan.nextVersion, + Routes: cloneRouteDescriptors(plan.routes), + }, nil +} + +func ensureCatalogStore(s *CatalogStore) error { + if s == nil || s.store == nil { + return errors.WithStack(ErrCatalogStoreRequired) + } + return nil +} + +func normalizeRoutes(routes []RouteDescriptor) ([]RouteDescriptor, error) { + if len(routes) == 0 { + return []RouteDescriptor{}, nil + } + out := make([]RouteDescriptor, 0, len(routes)) + seen := make(map[uint64]struct{}, len(routes)) + for _, route := range routes { + if err := validateRouteDescriptor(route); err != nil { + return nil, err + } + if _, exists := seen[route.RouteID]; exists { + return nil, errors.WithStack(ErrCatalogDuplicateRouteID) + } + seen[route.RouteID] = struct{}{} + out = append(out, cloneRouteDescriptor(route)) + } + sort.Slice(out, func(i, j int) bool { + return out[i].RouteID < out[j].RouteID + }) + return out, nil +} + +func validateRouteDescriptor(route RouteDescriptor) error { + if route.RouteID == 0 { + return errors.WithStack(ErrCatalogRouteIDRequired) + } + if route.GroupID == 0 { + return errors.WithStack(ErrCatalogGroupIDRequired) + } + if !route.State.valid() { + return errors.WithStack(ErrCatalogInvalidRouteState) + } + if route.End != nil && bytes.Compare(route.Start, route.End) >= 0 { + return errors.WithStack(ErrCatalogInvalidRouteRange) + } + return nil +} + +func cloneRouteDescriptor(route RouteDescriptor) RouteDescriptor { + return RouteDescriptor{ + RouteID: route.RouteID, + Start: cloneBytes(route.Start), + End: cloneBytes(route.End), + GroupID: route.GroupID, + State: route.State, + ParentRouteID: route.ParentRouteID, + } +} + +func cloneRouteDescriptors(routes []RouteDescriptor) []RouteDescriptor { + out := make([]RouteDescriptor, len(routes)) + for i := range routes { + out[i] = cloneRouteDescriptor(routes[i]) + } + return out +} + +func readU64LenBytes(r *bytes.Reader, rawLen uint64) ([]byte, error) { + n, err := u64ToInt(rawLen) + if err != nil { + return nil, err + } + if n > r.Len() { + return nil, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if n == 0 { + return []byte{}, nil + } + out := make([]byte, n) + if _, err := r.Read(out); err != nil { + return nil, errors.WithStack(err) + } + return out, nil +} + +func u64ToInt(v uint64) (int, error) { + if strconv.IntSize == 32 && v > uint64(^uint32(0)>>1) { + return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + if strconv.IntSize == 64 && v > (^uint64(0)>>1) { + return 0, errors.WithStack(ErrCatalogInvalidRouteRecord) + } + return int(v), nil +} + +func (s *CatalogStore) versionAt(ctx context.Context, ts uint64) (uint64, error) { + raw, err := s.store.GetAt(ctx, CatalogVersionKey(), ts) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + return 0, nil + } + return 0, errors.WithStack(err) + } + v, err := DecodeCatalogVersion(raw) + if err != nil { + return 0, err + } + return v, nil +} + +func (s *CatalogStore) routesAt(ctx context.Context, ts uint64) ([]RouteDescriptor, error) { + entries, err := s.scanRouteEntriesAt(ctx, ts) + if err != nil { + return nil, err + } + out := make([]RouteDescriptor, 0, len(entries)) + seen := make(map[uint64]struct{}, len(entries)) + for _, kvp := range entries { + routeID, ok := CatalogRouteIDFromKey(kvp.Key) + if !ok { + return nil, errors.WithStack(ErrCatalogInvalidRouteKey) + } + route, err := DecodeRouteDescriptor(kvp.Value) + if err != nil { + return nil, err + } + if route.RouteID != routeID { + return nil, errors.WithStack(ErrCatalogRouteKeyIDMismatch) + } + if _, exists := seen[route.RouteID]; exists { + return nil, errors.WithStack(ErrCatalogDuplicateRouteID) + } + seen[route.RouteID] = struct{}{} + out = append(out, route) + } + sort.Slice(out, func(i, j int) bool { + return out[i].RouteID < out[j].RouteID + }) + return out, nil +} + +func (s *CatalogStore) routeKeysAt(ctx context.Context, ts uint64) ([][]byte, error) { + entries, err := s.scanRouteEntriesAt(ctx, ts) + if err != nil { + return nil, err + } + out := make([][]byte, 0, len(entries)) + for _, kvp := range entries { + out = append(out, cloneBytes(kvp.Key)) + } + return out, nil +} + +func (s *CatalogStore) scanRouteEntriesAt(ctx context.Context, ts uint64) ([]*store.KVPair, error) { + prefix := []byte(catalogRoutePrefix) + upper := prefixScanEnd(prefix) + cursor := cloneBytes(prefix) + out := make([]*store.KVPair, 0) + + for { + page, err := s.store.ScanAt(ctx, cursor, upper, catalogScanPageSize, ts) + if err != nil { + return nil, errors.WithStack(err) + } + if len(page) == 0 { + break + } + + var ( + lastKey []byte + done bool + ) + out, lastKey, done = appendRoutePage(out, page, prefix) + if done { + return out, nil + } + if lastKey == nil || len(page) < catalogScanPageSize { + break + } + nextCursor, inRange := nextCursorWithinUpper(lastKey, upper) + if !inRange { + break + } + cursor = nextCursor + } + return out, nil +} + +type savePlan struct { + readTS uint64 + commitTS uint64 + nextVersion uint64 + routes []RouteDescriptor +} + +func (s *CatalogStore) prepareSave(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (savePlan, error) { + normalized, err := normalizeRoutes(routes) + if err != nil { + return savePlan{}, err + } + + readTS := s.store.LastCommitTS() + currentVersion, err := s.versionAt(ctx, readTS) + if err != nil { + return savePlan{}, err + } + if currentVersion != expectedVersion { + return savePlan{}, errors.WithStack(ErrCatalogVersionMismatch) + } + + nextVersion := expectedVersion + 1 + if nextVersion == 0 { + return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) + } + commitTS := readTS + 1 + if commitTS == 0 { + return savePlan{}, errors.WithStack(ErrCatalogVersionOverflow) + } + + return savePlan{ + readTS: readTS, + commitTS: commitTS, + nextVersion: nextVersion, + routes: normalized, + }, nil +} + +func (s *CatalogStore) buildSaveMutations(ctx context.Context, plan savePlan) ([]*store.KVPairMutation, error) { + existingKeys, err := s.routeKeysAt(ctx, plan.readTS) + if err != nil { + return nil, err + } + + mutations := make([]*store.KVPairMutation, 0, len(existingKeys)+len(plan.routes)+1) + mutations = appendDeleteMutations(mutations, existingKeys) + mutations, err = appendRouteMutations(mutations, plan.routes) + if err != nil { + return nil, err + } + mutations = append(mutations, &store.KVPairMutation{ + Op: store.OpTypePut, + Key: CatalogVersionKey(), + Value: EncodeCatalogVersion(plan.nextVersion), + }) + return mutations, nil +} + +func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mutations []*store.KVPairMutation) error { + if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, plan.commitTS); err != nil { + if errors.Is(err, store.ErrWriteConflict) { + return errors.WithStack(ErrCatalogVersionMismatch) + } + return errors.WithStack(err) + } + return nil +} + +func appendDeleteMutations(out []*store.KVPairMutation, keys [][]byte) []*store.KVPairMutation { + for _, key := range keys { + out = append(out, &store.KVPairMutation{ + Op: store.OpTypeDelete, + Key: key, + }) + } + return out +} + +func appendRouteMutations(out []*store.KVPairMutation, routes []RouteDescriptor) ([]*store.KVPairMutation, error) { + for _, route := range routes { + encoded, err := EncodeRouteDescriptor(route) + if err != nil { + return nil, err + } + out = append(out, &store.KVPairMutation{ + Op: store.OpTypePut, + Key: CatalogRouteKey(route.RouteID), + Value: encoded, + }) + } + return out, nil +} + +func decodeRouteDescriptorHeader(r *bytes.Reader) (RouteDescriptor, error) { + var routeID uint64 + var groupID uint64 + var parentRouteID uint64 + var startLen uint64 + + if err := binary.Read(r, binary.BigEndian, &routeID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &groupID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + stateRaw, err := r.ReadByte() + if err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &parentRouteID); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + if err := binary.Read(r, binary.BigEndian, &startLen); err != nil { + return RouteDescriptor{}, errors.WithStack(err) + } + + start, err := readU64LenBytes(r, startLen) + if err != nil { + return RouteDescriptor{}, err + } + return RouteDescriptor{ + RouteID: routeID, + GroupID: groupID, + State: RouteState(stateRaw), + ParentRouteID: parentRouteID, + Start: start, + }, nil +} + +func decodeRouteDescriptorEnd(r *bytes.Reader) ([]byte, error) { + endFlag, err := r.ReadByte() + if err != nil { + return nil, errors.WithStack(err) + } + if endFlag == 0 { + return nil, nil + } + if endFlag != 1 { + return nil, errors.Wrapf(ErrCatalogInvalidRouteRecord, "invalid end flag %d", endFlag) + } + + var endLen uint64 + if err := binary.Read(r, binary.BigEndian, &endLen); err != nil { + return nil, errors.WithStack(err) + } + return readU64LenBytes(r, endLen) +} + +func appendRoutePage(out []*store.KVPair, page []*store.KVPair, prefix []byte) ([]*store.KVPair, []byte, bool) { + var lastKey []byte + for _, kvp := range page { + if kvp == nil { + continue + } + if !bytes.HasPrefix(kvp.Key, prefix) { + return out, lastKey, true + } + out = append(out, &store.KVPair{ + Key: cloneBytes(kvp.Key), + Value: cloneBytes(kvp.Value), + }) + lastKey = kvp.Key + } + return out, lastKey, false +} + +func nextCursorWithinUpper(lastKey []byte, upper []byte) ([]byte, bool) { + if len(lastKey) == 0 { + return nil, false + } + nextCursor := nextScanCursor(lastKey) + if upper != nil && bytes.Compare(nextCursor, upper) >= 0 { + return nil, false + } + return nextCursor, true +} + +func prefixScanEnd(prefix []byte) []byte { + if len(prefix) == 0 { + return nil + } + out := cloneBytes(prefix) + for i := len(out) - 1; i >= 0; i-- { + if out[i] == ^byte(0) { + continue + } + out[i]++ + return out[:i+1] + } + return nil +} + +func nextScanCursor(lastKey []byte) []byte { + next := make([]byte, len(lastKey)+1) + copy(next, lastKey) + return next +} diff --git a/distribution/catalog_test.go b/distribution/catalog_test.go new file mode 100644 index 00000000..8023ba2f --- /dev/null +++ b/distribution/catalog_test.go @@ -0,0 +1,262 @@ +package distribution + +import ( + "bytes" + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +func TestCatalogVersionCodecRoundTrip(t *testing.T) { + raw := EncodeCatalogVersion(42) + got, err := DecodeCatalogVersion(raw) + if err != nil { + t.Fatalf("decode version: %v", err) + } + if got != 42 { + t.Fatalf("expected 42, got %d", got) + } +} + +func TestCatalogVersionCodecRejectsInvalidPayload(t *testing.T) { + if _, err := DecodeCatalogVersion(nil); !errors.Is(err, ErrCatalogInvalidVersionRecord) { + t.Fatalf("expected ErrCatalogInvalidVersionRecord, got %v", err) + } + if _, err := DecodeCatalogVersion([]byte{99, 0, 0, 0, 0, 0, 0, 0, 1}); !errors.Is(err, ErrCatalogInvalidVersionRecord) { + t.Fatalf("expected ErrCatalogInvalidVersionRecord, got %v", err) + } +} + +func TestRouteDescriptorCodecRoundTrip(t *testing.T) { + route := RouteDescriptor{ + RouteID: 7, + Start: []byte("a"), + End: []byte("m"), + GroupID: 3, + State: RouteStateWriteFenced, + ParentRouteID: 2, + } + raw, err := EncodeRouteDescriptor(route) + if err != nil { + t.Fatalf("encode route: %v", err) + } + got, err := DecodeRouteDescriptor(raw) + if err != nil { + t.Fatalf("decode route: %v", err) + } + assertRouteEqual(t, route, got) +} + +func TestRouteDescriptorCodecRoundTripNilEnd(t *testing.T) { + route := RouteDescriptor{ + RouteID: 9, + Start: []byte("m"), + End: nil, + GroupID: 2, + State: RouteStateActive, + ParentRouteID: 0, + } + raw, err := EncodeRouteDescriptor(route) + if err != nil { + t.Fatalf("encode route: %v", err) + } + got, err := DecodeRouteDescriptor(raw) + if err != nil { + t.Fatalf("decode route: %v", err) + } + assertRouteEqual(t, route, got) +} + +func TestCatalogRouteKeyHelpers(t *testing.T) { + key := CatalogRouteKey(11) + if !IsCatalogRouteKey(key) { + t.Fatal("expected route key prefix match") + } + id, ok := CatalogRouteIDFromKey(key) + if !ok { + t.Fatal("expected route id parse to succeed") + } + if id != 11 { + t.Fatalf("expected route id 11, got %d", id) + } + if _, ok := CatalogRouteIDFromKey([]byte("not-a-route-key")); ok { + t.Fatal("expected parse failure for non-route key") + } +} + +func TestCatalogStoreSnapshotEmpty(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + snapshot, err := cs.Snapshot(context.Background()) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 0 { + t.Fatalf("expected empty version 0, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 0 { + t.Fatalf("expected no routes, got %d", len(snapshot.Routes)) + } +} + +func TestCatalogStoreSaveAndSnapshot(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + + saved, err := cs.Save(ctx, 0, []RouteDescriptor{ + { + RouteID: 2, + Start: []byte("m"), + End: nil, + GroupID: 2, + State: RouteStateActive, + ParentRouteID: 1, + }, + { + RouteID: 1, + Start: []byte(""), + End: []byte("m"), + GroupID: 1, + State: RouteStateActive, + ParentRouteID: 0, + }, + }) + if err != nil { + t.Fatalf("save: %v", err) + } + if saved.Version != 1 { + t.Fatalf("expected version 1, got %d", saved.Version) + } + if len(saved.Routes) != 2 { + t.Fatalf("expected 2 routes, got %d", len(saved.Routes)) + } + if saved.Routes[0].RouteID != 1 || saved.Routes[1].RouteID != 2 { + t.Fatalf("expected sorted route ids [1,2], got [%d,%d]", saved.Routes[0].RouteID, saved.Routes[1].RouteID) + } + + snapshot, err := cs.Snapshot(ctx) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 1 { + t.Fatalf("expected version 1, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 2 { + t.Fatalf("expected 2 routes, got %d", len(snapshot.Routes)) + } + assertRouteEqual(t, saved.Routes[0], snapshot.Routes[0]) + assertRouteEqual(t, saved.Routes[1], snapshot.Routes[1]) +} + +func TestCatalogStoreSaveRejectsVersionMismatch(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + routes := []RouteDescriptor{ + { + RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive, + }, + } + + if _, err := cs.Save(ctx, 0, routes); err != nil { + t.Fatalf("first save: %v", err) + } + if _, err := cs.Save(ctx, 0, routes); !errors.Is(err, ErrCatalogVersionMismatch) { + t.Fatalf("expected ErrCatalogVersionMismatch, got %v", err) + } +} + +func TestCatalogStoreSaveRejectsDuplicateRouteIDs(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 1, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogDuplicateRouteID) { + t.Fatalf("expected ErrCatalogDuplicateRouteID, got %v", err) + } +} + +func TestCatalogStoreSaveRejectsInvalidRouteRange(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte("z"), End: []byte("a"), GroupID: 1, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogInvalidRouteRange) { + t.Fatalf("expected ErrCatalogInvalidRouteRange, got %v", err) + } +} + +func TestCatalogStoreSaveDeletesRemovedRoutes(t *testing.T) { + cs := NewCatalogStore(store.NewMVCCStore()) + ctx := context.Background() + + _, err := cs.Save(ctx, 0, []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if err != nil { + t.Fatalf("first save: %v", err) + } + + _, err = cs.Save(ctx, 1, []RouteDescriptor{ + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }) + if err != nil { + t.Fatalf("second save: %v", err) + } + + snapshot, err := cs.Snapshot(ctx) + if err != nil { + t.Fatalf("snapshot: %v", err) + } + if snapshot.Version != 2 { + t.Fatalf("expected version 2, got %d", snapshot.Version) + } + if len(snapshot.Routes) != 1 { + t.Fatalf("expected 1 route after delete, got %d", len(snapshot.Routes)) + } + if snapshot.Routes[0].RouteID != 2 { + t.Fatalf("expected remaining route id 2, got %d", snapshot.Routes[0].RouteID) + } +} + +func TestCatalogStoreSaveRejectsVersionOverflow(t *testing.T) { + st := store.NewMVCCStore() + ctx := context.Background() + // Prepare current catalog version == max uint64. + if err := st.PutAt(ctx, CatalogVersionKey(), EncodeCatalogVersion(^uint64(0)), 1, 0); err != nil { + t.Fatalf("prepare version key: %v", err) + } + cs := NewCatalogStore(st) + _, err := cs.Save(ctx, ^uint64(0), []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}, + }) + if !errors.Is(err, ErrCatalogVersionOverflow) { + t.Fatalf("expected ErrCatalogVersionOverflow, got %v", err) + } +} + +func assertRouteEqual(t *testing.T, want, got RouteDescriptor) { + t.Helper() + if want.RouteID != got.RouteID { + t.Fatalf("route id mismatch: want %d, got %d", want.RouteID, got.RouteID) + } + if want.GroupID != got.GroupID { + t.Fatalf("group id mismatch: want %d, got %d", want.GroupID, got.GroupID) + } + if want.ParentRouteID != got.ParentRouteID { + t.Fatalf("parent route id mismatch: want %d, got %d", want.ParentRouteID, got.ParentRouteID) + } + if want.State != got.State { + t.Fatalf("state mismatch: want %d, got %d", want.State, got.State) + } + if !bytes.Equal(want.Start, got.Start) { + t.Fatalf("start mismatch: want %q, got %q", want.Start, got.Start) + } + if !bytes.Equal(want.End, got.End) { + t.Fatalf("end mismatch: want %q, got %q", want.End, got.End) + } +} diff --git a/distribution/engine.go b/distribution/engine.go index 323e7891..54de0c1b 100644 --- a/distribution/engine.go +++ b/distribution/engine.go @@ -5,6 +5,8 @@ import ( "sort" "sync" "sync/atomic" + + "github.com/cockroachdb/errors" ) // Route represents a mapping from a key range to a raft group. @@ -12,12 +14,17 @@ import ( // End is exclusive. A nil End denotes an unbounded interval extending to // positive infinity. type Route struct { + // RouteID is the durable identifier assigned by route catalog. + // Zero means ephemeral/non-catalog routes. + RouteID uint64 // Start marks the inclusive beginning of the range. Start []byte // End marks the exclusive end of the range. nil means unbounded. End []byte // GroupID identifies the raft group for the range starting at Start. GroupID uint64 + // State tracks control-plane state for this route. + State RouteState // Load tracks the number of accesses served by this range. Load uint64 } @@ -26,11 +33,20 @@ type Route struct { type Engine struct { mu sync.RWMutex routes []Route + catalogVersion uint64 ts uint64 hotspotThreshold uint64 } const defaultGroupID uint64 = 1 +const minRouteCountForOrderValidation = 2 + +var ( + ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale") + ErrEngineSnapshotDuplicateID = errors.New("engine snapshot has duplicate route id") + ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes") + ErrEngineSnapshotRouteOrder = errors.New("engine snapshot has invalid route order") +) // NewEngine creates an Engine with no hotspot splitting. func NewEngine() *Engine { @@ -52,12 +68,47 @@ func NewEngineWithThreshold(threshold uint64) *Engine { return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold} } +// Version returns current route catalog version applied to the engine. +func (e *Engine) Version() uint64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.catalogVersion +} + +// ApplySnapshot atomically replaces all in-memory routes with the provided +// catalog snapshot when the snapshot version is newer. +func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error { + e.mu.Lock() + defer e.mu.Unlock() + + if snapshot.Version < e.catalogVersion { + return errors.WithStack(ErrEngineSnapshotVersionStale) + } + if snapshot.Version == e.catalogVersion { + return nil + } + + routes, err := routesFromCatalog(snapshot.Routes) + if err != nil { + return err + } + + e.routes = routes + e.catalogVersion = snapshot.Version + return nil +} + // UpdateRoute registers or updates a route for the given key range. // Routes are stored sorted by Start. func (e *Engine) UpdateRoute(start, end []byte, group uint64) { e.mu.Lock() defer e.mu.Unlock() - e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group}) + e.routes = append(e.routes, Route{ + Start: start, + End: end, + GroupID: group, + State: RouteStateActive, + }) sort.Slice(e.routes, func(i, j int) bool { return bytes.Compare(e.routes[i].Start, e.routes[j].Start) < 0 }) @@ -116,7 +167,14 @@ func (e *Engine) Stats() []Route { defer e.mu.RUnlock() stats := make([]Route, len(e.routes)) for i, r := range e.routes { - stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: atomic.LoadUint64(&e.routes[i].Load)} + stats[i] = Route{ + RouteID: r.RouteID, + Start: cloneBytes(r.Start), + End: cloneBytes(r.End), + GroupID: r.GroupID, + State: r.State, + Load: atomic.LoadUint64(&e.routes[i].Load), + } } return stats } @@ -143,9 +201,11 @@ func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route { } // Route intersects with scan range result = append(result, Route{ + RouteID: r.RouteID, Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, + State: r.State, Load: atomic.LoadUint64(&r.Load), }) } @@ -182,14 +242,68 @@ func (e *Engine) splitRange(idx int) { e.routes[idx].Load = 0 return } - left := Route{Start: r.Start, End: mid, GroupID: r.GroupID} - right := Route{Start: mid, End: r.End, GroupID: r.GroupID} + left := Route{Start: r.Start, End: mid, GroupID: r.GroupID, State: RouteStateActive} + right := Route{Start: mid, End: r.End, GroupID: r.GroupID, State: RouteStateActive} // replace the range at idx with left and right in an idiomatic manner e.routes = append(e.routes[:idx+1], e.routes[idx:]...) e.routes[idx] = left e.routes[idx+1] = right } +func routesFromCatalog(routes []RouteDescriptor) ([]Route, error) { + if len(routes) == 0 { + return []Route{}, nil + } + + out := make([]Route, 0, len(routes)) + seen := make(map[uint64]struct{}, len(routes)) + for _, rd := range routes { + if err := validateRouteDescriptor(rd); err != nil { + return nil, err + } + if _, exists := seen[rd.RouteID]; exists { + return nil, errors.WithStack(ErrEngineSnapshotDuplicateID) + } + seen[rd.RouteID] = struct{}{} + out = append(out, Route{ + RouteID: rd.RouteID, + Start: cloneBytes(rd.Start), + End: cloneBytes(rd.End), + GroupID: rd.GroupID, + State: rd.State, + }) + } + + sort.Slice(out, func(i, j int) bool { + return bytes.Compare(out[i].Start, out[j].Start) < 0 + }) + if err := validateRouteOrder(out); err != nil { + return nil, err + } + return out, nil +} + +func validateRouteOrder(routes []Route) error { + if len(routes) < minRouteCountForOrderValidation { + return nil + } + for i := 1; i < len(routes); i++ { + prev := routes[i-1] + curr := routes[i] + + if bytes.Compare(prev.Start, curr.Start) >= 0 { + return errors.WithStack(ErrEngineSnapshotRouteOrder) + } + if prev.End == nil { + return errors.WithStack(ErrEngineSnapshotRouteOrder) + } + if bytes.Compare(prev.End, curr.Start) > 0 { + return errors.WithStack(ErrEngineSnapshotRouteOverlap) + } + } + return nil +} + func cloneBytes(b []byte) []byte { if b == nil { return nil diff --git a/distribution/engine_test.go b/distribution/engine_test.go index 8025de3d..fd09a4be 100644 --- a/distribution/engine_test.go +++ b/distribution/engine_test.go @@ -4,6 +4,8 @@ import ( "bytes" "sync" "testing" + + "github.com/cockroachdb/errors" ) func TestEngineRouteLookup(t *testing.T) { @@ -231,3 +233,104 @@ func TestEngineGetIntersectingRoutes(t *testing.T) { }) } } + +func TestEngineApplySnapshot_ReplacesRoutesAndVersion(t *testing.T) { + e := NewEngine() + e.UpdateRoute([]byte("a"), []byte("z"), 1) + + if got := e.Version(); got != 0 { + t.Fatalf("expected initial version 0, got %d", got) + } + + err := e.ApplySnapshot(CatalogSnapshot{ + Version: 1, + Routes: []RouteDescriptor{ + {RouteID: 10, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 11, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateWriteFenced}, + }, + }) + if err != nil { + t.Fatalf("apply snapshot: %v", err) + } + + if got := e.Version(); got != 1 { + t.Fatalf("expected version 1, got %d", got) + } + + stats := e.Stats() + if len(stats) != 2 { + t.Fatalf("expected 2 routes, got %d", len(stats)) + } + if stats[0].RouteID != 10 || stats[0].State != RouteStateActive { + t.Fatalf("unexpected first route metadata: %+v", stats[0]) + } + if stats[1].RouteID != 11 || stats[1].State != RouteStateWriteFenced { + t.Fatalf("unexpected second route metadata: %+v", stats[1]) + } +} + +func TestEngineApplySnapshot_RejectsOldVersion(t *testing.T) { + e := NewEngine() + + if err := e.ApplySnapshot(CatalogSnapshot{ + Version: 2, + Routes: []RouteDescriptor{ + {RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}, + }, + }); err != nil { + t.Fatalf("first apply snapshot: %v", err) + } + + err := e.ApplySnapshot(CatalogSnapshot{ + Version: 1, + Routes: []RouteDescriptor{ + {RouteID: 2, Start: []byte(""), End: nil, GroupID: 9, State: RouteStateActive}, + }, + }) + if !errors.Is(err, ErrEngineSnapshotVersionStale) { + t.Fatalf("expected ErrEngineSnapshotVersionStale, got %v", err) + } + + route, ok := e.GetRoute([]byte("k")) + if !ok { + t.Fatal("expected route after stale apply") + } + if route.GroupID != 1 || route.RouteID != 1 { + t.Fatalf("expected route to remain unchanged, got %+v", route) + } +} + +func TestEngineApplySnapshot_LookupBehavior(t *testing.T) { + e := NewEngine() + err := e.ApplySnapshot(CatalogSnapshot{ + Version: 1, + Routes: []RouteDescriptor{ + {RouteID: 1, Start: []byte("a"), End: []byte("m"), GroupID: 1, State: RouteStateActive}, + {RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive}, + }, + }) + if err != nil { + t.Fatalf("apply snapshot: %v", err) + } + + cases := []struct { + key []byte + group uint64 + expect bool + }{ + {[]byte("0"), 0, false}, + {[]byte("a"), 1, true}, + {[]byte("b"), 1, true}, + {[]byte("m"), 2, true}, + {[]byte("x"), 2, true}, + } + for _, c := range cases { + r, ok := e.GetRoute(c.key) + if ok != c.expect { + t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok) + } + if ok && r.GroupID != c.group { + t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID) + } + } +} diff --git a/docs/hotspot_shard_split_design.md b/docs/hotspot_shard_split_design.md new file mode 100644 index 00000000..3f9ff754 --- /dev/null +++ b/docs/hotspot_shard_split_design.md @@ -0,0 +1,327 @@ +# Hotspot Shard Split Design for Elastickv + +## 1. Background + +Elastickv already has shard boundaries, but it does not yet have the control-plane needed for safe automatic hotspot splitting. + +Current implementation status (as of February 17, 2026): + +- `distribution/engine.go` has per-range access counters and threshold-based `splitRange`. +- However, `RecordAccess` is not wired into real request paths. +- The route table is in-memory only and is lost on restart. +- Route updates are append-based via `UpdateRoute`, with no conflict detection, deduplication, or versioning. +- There is no data movement workflow to relocate part of a split range to another Raft group. + +So practical “hotspot splitting” is currently not connected end-to-end. + +## 2. Goals and Non-goals + +### 2.1 Goals + +1. Detect hot ranges automatically and execute range splits. +2. Move split children to other Raft groups to distribute load. +3. Preserve consistency during split (no lost writes, no MVCC/Txn breakage). +4. Support resumable job-based operations after failures. + +### 2.2 Non-goals + +1. Automatic Raft group creation or membership orchestration (operate within existing `--raftGroups`). +2. Automatic merge (inverse of split) in this design. +3. Deep LSM/Pebble optimization in the first phase (in-memory MVCC is the baseline first). + +## 3. Requirements + +### 3.1 Functional requirements + +1. Collect read/write load by range. +2. Select split candidates with thresholds and hysteresis. +3. Choose split keys from observed key distribution (not midpoint-only). +4. Execute phased backfill and cutover. +5. Propagate post-split routing to all nodes. + +### 3.2 Consistency requirements + +1. No write loss at cutover boundaries. +2. Correct migration of internal keys including `!txn|...` and `!lst|...`. +3. Ability to reject writes delivered to the old group using stale routes. + +### 3.3 Operational requirements + +1. Toggle auto-split ON/OFF. +2. Provide manual split API. +3. Expose job status, failure reason, and migration throughput. + +## 4. High-level Architecture + +New components: + +1. `Split Controller` +2. `Hotspot Detector` +3. `Route Catalog` (durable + versioned) +4. `Range Migrator` +5. `Route Watcher` + +Responsibilities: + +| Component | Responsibility | Placement | +|---|---|---| +| Hotspot Detector | Load aggregation and split candidate extraction | Every node + leader aggregation | +| Split Controller | Split state machine and retries | Default-group leader | +| Route Catalog | Durable route table and split jobs | Internal metadata in default group | +| Range Migrator | Backfill and delta copy | Between source/target leaders | +| Route Watcher | Route update distribution and local refresh | All nodes | + +## 5. Data Model + +`RouteDescriptor`: + +- `route_id uint64` +- `start []byte` (inclusive) +- `end []byte` (exclusive, nil=+inf) +- `group_id uint64` +- `version uint64` (catalog generation) +- `state` (`ACTIVE`, `WRITE_FENCED`, `MIGRATING_SOURCE`, `MIGRATING_TARGET`) +- `parent_route_id uint64` (lineage) + +`SplitJob`: + +- `job_id string` +- `source_route_id uint64` +- `target_group_id uint64` +- `split_key []byte` +- `snapshot_ts uint64` +- `fence_ts uint64` +- `phase` (`PLANNED`, `BACKFILL`, `FENCE`, `DELTA_COPY`, `CUTOVER`, `CLEANUP`, `DONE`, `ABORTED`) +- `cursor []byte` (resume cursor) +- `retry_count uint32` +- `last_error string` + +`AccessWindow`: + +- `route_id uint64` +- `window_start_unix_ms uint64` +- `read_ops uint64` +- `write_ops uint64` +- `p95_latency_us uint64` (future) +- `top_keys_sample` (for split-key selection) + +## 6. Hotspot Detection + +### 6.1 Instrumentation points + +Read path: + +- `kv/ShardStore.GetAt` +- `kv/ShardStore.ScanAt` + +Write path: + +- `kv/ShardedCoordinator.groupMutations` (increment per key) + +### 6.2 Scoring + +Range score: + +`score = write_ops * Ww + read_ops * Wr` + +Initial parameters: + +- `Ww=4`, `Wr=1` +- `threshold=50_000 ops/min` +- Candidate after 3 consecutive windows over threshold +- Cooldown 10 minutes after split + +### 6.3 Split-key selection + +1. Default to p50 of sampled key distribution. +2. If single-key skew is high (`top_key_share >= 0.8`), isolate the hot key. +3. For `end=nil` ranges, derive split key from observed keys. + +## 7. Split Execution Flow + +```mermaid +flowchart LR + A[Detect hotspot] --> B[Create SplitJob: PLANNED] + B --> C[BACKFILL at snapshot_ts] + C --> D[FENCE writes on moving range] + D --> E[DELTA_COPY snapshot_ts+1..fence_ts] + E --> F[CUTOVER route version+1] + F --> G[CLEANUP source old copies] + G --> H[DONE] +``` + +### 7.1 BACKFILL + +1. Record `snapshot_ts = source.LastCommitTS()`. +2. Export MVCC versions for the moving range from source. +3. Import into target idempotently. +4. Persist `cursor` continuously for resumability. + +### 7.2 FENCE + +1. Mark moving range as `WRITE_FENCED`. +2. In `ShardedCoordinator.Dispatch`, reject writes in fenced range with retryable errors. +3. Use `raft.Barrier` to confirm fence visibility. + +### 7.3 DELTA_COPY + +1. Acquire `fence_ts = source.LastCommitTS()`. +2. Copy delta versions in `(snapshot_ts, fence_ts]`. +3. Mark cutover-ready when complete. + +### 7.4 CUTOVER + +1. Replace parent range with two active child ranges. +2. Switch moving child `group_id` to target. +3. Increment route catalog `version`. + +### 7.5 CLEANUP + +1. Keep old copies on source for a grace period (read safety). +2. Garbage-collect moved data after grace period. + +## 8. Routing Consistency + +### 8.1 Route version in requests + +- Add `route_version` to `pb.Request` (`proto/internal.proto`). +- Coordinator stamps current route version into each request. + +### 8.2 Ownership validation + +- Inject `groupID` + route resolver into `kvFSM`. +- Validate ownership for each mutation key (via `routeKey`) before apply. +- Return `ErrWrongShard` on mismatch. + +### 8.3 Stale-route handling + +- Old-group leader rejects stale-route writes. +- Client retries after refreshing routes. + +## 9. Key Coverage for Migration + +Range membership must be based on logical route key, not raw storage key. + +Keys to include: + +1. User keys +2. List keys (`!lst|meta|...`, `!lst|itm|...`) +3. Txn keys (`!txn|lock|...`, `!txn|int|...`, `!txn|cmt|...`, `!txn|rb|...`) + +Required changes: + +- Export route-key extraction from `kv/txn_keys.go` for migrator reuse. +- Add MVCC version export/import APIs in `store`. + +## 10. API Changes + +### 10.1 `proto/distribution.proto` + +Add RPCs: + +1. `ReportAccess(ReportAccessRequest) returns (ReportAccessResponse)` +2. `ListRoutes(ListRoutesRequest) returns (ListRoutesResponse)` +3. `WatchRoutes(WatchRoutesRequest) returns (stream WatchRoutesResponse)` +4. `SplitRange(SplitRangeRequest) returns (SplitRangeResponse)` +5. `GetSplitJob(GetSplitJobRequest) returns (GetSplitJobResponse)` + +### 10.2 `proto/internal.proto` + +Add RPCs: + +1. `ExportRangeVersions(ExportRangeVersionsRequest) returns (stream ExportRangeVersionsResponse)` +2. `ImportRangeVersions(ImportRangeVersionsRequest) returns (ImportRangeVersionsResponse)` + +## 11. Implementation Touchpoints + +1. `distribution/engine.go` +- Extend route data with state/version. +- Replace append-only updates with CAS update APIs. + +2. `kv/shard_store.go` +- Add read-path access recording. +- Define behavior for moving/fenced ranges. + +3. `kv/sharded_coordinator.go` +- Add write-path access recording. +- Return retryable errors for `WRITE_FENCED` writes. + +4. `kv/fsm.go` +- Add route-ownership checks. +- Return `ErrWrongShard` when ownership mismatches. + +5. `adapter/distribution_server.go` +- Implement split/job/watch/report APIs. + +6. `store/mvcc_store.go` and `store/lsm_store.go` +- Implement range version export/import. + +## 12. Phased Rollout + +### Milestone 1: Control plane + +1. Durable route catalog +2. Route version + watcher +3. Manual split API (same-group split, no migration) + +### Milestone 2: Migration plane + +1. MVCC range export/import +2. Job phases: BACKFILL/FENCE/DELTA/CUTOVER +3. Manual split with target-group relocation + +### Milestone 3: Automation + +1. Access aggregation +2. Hotspot detector +3. Auto-split scheduler (cooldown/hysteresis) + +### Milestone 4: Hardening + +1. Stale-route reject path +2. Cleanup GC +3. Jepsen hotspot workloads + +## 13. Test Strategy + +### 13.1 Unit tests + +1. Hotspot scoring and split-key selection +2. Route catalog CAS/version transitions +3. Split state-machine phase transitions +4. Ownership validation + +### 13.2 Integration tests + +1. Consistency under concurrent read/write during split +2. Retryable behavior during FENCE +3. Job resume after leader change/restart +4. Split with cross-shard transactions + +### 13.3 Jepsen + +1. Hotspot load + partition nemesis +2. Hotspot load + kill nemesis +3. Linearizability checks during split + +## 14. Risks and Mitigations + +1. Risk: Long write pause during split. +- Mitigation: Chunked delta copy, narrow fence to moving child only. + +2. Risk: Misrouting via stale route cache. +- Mitigation: Leader-side ownership validation + route version checks. + +3. Risk: Internal key omission causing unresolved transactions. +- Mitigation: Logical route-key based filtering + dedicated txn/list migration tests. + +4. Risk: Cleanup too early impacting old readers. +- Mitigation: Grace period + route-version observability + staged GC. + +## 15. Acceptance Criteria + +1. Manual split divides one range into two and moves one child to another group. +2. No write loss during split. +3. Stale-route writes are rejected after cutover. +4. Split jobs resume after failure and eventually reach `DONE`. +5. Existing tests + new split tests + extended Jepsen scenarios pass. diff --git a/docs/hotspot_split_milestone1_pr_plan.md b/docs/hotspot_split_milestone1_pr_plan.md new file mode 100644 index 00000000..77a750c4 --- /dev/null +++ b/docs/hotspot_split_milestone1_pr_plan.md @@ -0,0 +1,244 @@ +# Hotspot Split Milestone 1 PR Plan + +## Objective + +Milestone 1 delivers the control plane only: + +1. Durable route catalog. +2. Route versioning and watcher-based refresh. +3. Manual split API (same-group split, no cross-group migration). + +## Out of Scope + +1. Data migration to a different Raft group. +2. Automatic hotspot detection and auto-split scheduling. +3. Stale-route write rejection in FSM (`ErrWrongShard`) and route-version stamping on every internal request. + +## Implementation Strategy + +Use the existing default Raft group as the source of truth for route metadata. + +1. Persist route catalog entries as reserved internal keys in replicated MVCC state. +2. Keep `distribution.Engine` as the in-memory read path used by routers/coordinators. +3. Run a route watcher that refreshes `Engine` from durable catalog version changes. + +This keeps rollout small and avoids introducing a new external metadata service. + +## PR Breakdown + +## PR1: Route Catalog Model and Persistence + +### Goal + +Introduce a durable route catalog format and storage operations. + +### Main files + +1. `distribution/catalog.go` (new) +2. `distribution/catalog_test.go` (new) +3. `kv/` metadata helper file for reserved distribution keys (new) +4. `store/` tests if key encoding helpers are shared there + +### Tasks + +1. Define `RouteDescriptor` and `CatalogSnapshot` structs for durable representation. +2. Define reserved key prefixes, for example: + - `!dist|meta|version` + - `!dist|route|` +3. Implement encoding/decoding helpers (stable binary or protobuf-based). +4. Implement catalog read/write helpers with optimistic version checks. +5. Add unit tests for: + - encode/decode round-trip + - version mismatch behavior + - deterministic ordering in snapshots + +### Done criteria + +1. Catalog structs and key format are stable and documented. +2. Versioned read/write helpers are covered by tests. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... ./kv/... +``` + +## PR2: Engine Refactor for Versioned Snapshot Apply + +### Goal + +Make `distribution.Engine` apply a full route snapshot atomically with catalog version. + +### Main files + +1. `distribution/engine.go` +2. `distribution/engine_test.go` + +### Tasks + +1. Add engine metadata: + - current route catalog version + - route IDs and states needed for control-plane operations +2. Add APIs: + - `ApplySnapshot(snapshot CatalogSnapshot) error` + - `Version() uint64` +3. Keep existing `GetRoute` and scan behavior unchanged for data path callers. +4. Add tests: + - snapshot apply replaces old routes atomically + - old version snapshot is rejected + - lookup behavior remains correct after snapshot apply + +### Done criteria + +1. Engine can be fully refreshed from durable catalog. +2. Existing routing tests remain green. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... +``` + +## PR3: Distribution Admin Service (ListRoutes and SplitRange) + +### Goal + +Add manual control-plane APIs for route introspection and same-group split. + +### Main files + +1. `proto/distribution.proto` +2. Generated files: + - `proto/distribution.pb.go` + - `proto/distribution_grpc.pb.go` +3. `adapter/distribution_server.go` +4. `adapter/distribution_server_test.go` + +### Tasks + +1. Add RPCs: + - `ListRoutes` + - `SplitRange` +2. Add message fields needed for route version and route state. +3. Implement `ListRoutes` by reading the durable catalog. +4. Implement `SplitRange` for same-group split only: + - validate range ownership and boundaries + - create two child ranges in same `group_id` + - bump catalog version atomically +5. Return clear validation errors for: + - unknown route + - invalid split key + - split key at boundaries + - version conflict + +### Done criteria + +1. Manual split updates durable catalog atomically. +2. APIs are covered with success and failure-path tests. + +### Validation + +```bash +cd proto && make gen +cd .. +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./adapter/... ./proto/... +``` + +## PR4: Route Watcher and Runtime Refresh + +### Goal + +Make every node refresh in-memory routes when durable catalog version changes. + +### Main files + +1. `distribution/watcher.go` (new) +2. `distribution/watcher_test.go` (new) +3. `main.go` +4. `cmd/server/demo.go` if demo mode should support this path + +### Tasks + +1. Add a watcher loop with periodic poll (initially simple and robust): + - read durable catalog version + - if version changed, fetch full snapshot + - call `engine.ApplySnapshot(...)` +2. Add lifecycle hooks in server startup and graceful stop. +3. Add tests: + - watcher applies new version + - no-op when version unchanged + - retry on transient read error + +### Done criteria + +1. Route updates made by one leader become visible in all nodes’ engines. +2. Watcher failure does not crash serving path; it retries. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./distribution/... ./... +``` + +## PR5: End-to-End Milestone 1 Tests and Docs + +### Goal + +Add integration tests for manual split and update operator docs. + +### Main files + +1. `kv/sharded_integration_test.go` +2. `adapter/grpc_test.go` and/or dedicated split integration tests +3. `README.md` +4. `docs/hotspot_shard_split_design.md` + +### Tasks + +1. Add integration scenario: + - start with one range + - write keys on both sides of planned split point + - call `SplitRange` + - verify routing + reads/writes continue correctly after refresh +2. Add restart scenario: + - apply split + - restart node(s) + - confirm routes reload from durable catalog +3. Document manual split API request/response examples. + +### Done criteria + +1. Milestone 1 behavior is reproducible via tests and docs. +2. Restart does not lose route table changes. + +### Validation + +```bash +GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./... +GOCACHE=$(pwd)/.cache GOLANGCI_LINT_CACHE=$(pwd)/.golangci-cache golangci-lint run ./... --timeout=5m +``` + +## Merge Order + +1. PR1 +2. PR2 +3. PR3 +4. PR4 +5. PR5 + +Each PR should be independently testable and mergeable. + +## Operational Rollout Checklist (Milestone 1) + +1. Deploy binaries with watcher enabled but manual split unused. +2. Validate `ListRoutes` output on staging. +3. Execute one manual same-group split in staging. +4. Confirm all nodes converge to the same route catalog version. +5. Restart one node and verify route version is preserved. + +## Exit Criteria for Milestone 1 + +1. Route catalog is durable across restart. +2. Manual split API works for same-group split. +3. Route changes propagate to all nodes via watcher refresh. +4. Full test and lint pass in CI.