Fix OOM in recursive diff by limiting goroutine concurrency#81
Fix OOM in recursive diff by limiting goroutine concurrency#81mason-sharp merged 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughAdded a buffered semaphore ( Changes
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/integration/table_diff_memory_test.go (2)
102-105: Consider addingruntime.GC()before the finalReadMemStatsand renamingpeakHeapInUse.Two small issues with the memory measurement:
No GC before final read: Without a
runtime.GC()call beforeReadMemStats(&memAfter),HeapInusemay include unreachable objects that inflate the number. Adding a GC pass before the final read makes the assertion more deterministic and less susceptible to CI flakiness.Misleading name:
peakHeapInUseis actually "heap in use at measurement time," not the peak during the test. Go'sMemStatsdoesn't track peak heap. A more accurate name would beheapInUseAfter.Suggested change
// Capture memory after diff + runtime.GC() var memAfter runtime.MemStats runtime.ReadMemStats(&memAfter) // Calculate memory usage heapAllocDelta := int64(memAfter.TotalAlloc) - int64(memBefore.TotalAlloc) - peakHeapInUse := memAfter.HeapInuse - peakSys := memAfter.Sys + heapInUseAfter := memAfter.HeapInuse + sysAfter := memAfter.Sys(Update references at lines 148-153 and 177-179 accordingly.)
Also applies to: 138-145
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/table_diff_memory_test.go` around lines 102 - 105, Add a runtime.GC() call immediately before the final runtime.ReadMemStats(&memAfter) to force a GC pass so HeapInuse reflects live objects only, and rename the variable peakHeapInUse to heapInUseAfter (and update all references) because it represents heap in use at measurement time rather than a true peak; update any assertions or log messages that reference peakHeapInUse to use heapInUseAfter and ensure you read memAfter via runtime.ReadMemStats after the added runtime.GC() call.
120-129: Pre-initializedDiffResultis overwritten byExecuteTask.
ExecuteTask()unconditionally replacest.DiffResult(table_diff.go, line 1301), so this block has no effect. It can be removed to avoid confusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/table_diff_memory_test.go` around lines 120 - 129, The test pre-initializes tdTask.DiffResult but ExecuteTask() unconditionally overwrites t.DiffResult (see ExecuteTask in table_diff.go), so remove the entire tdTask.DiffResult initialization block from tests/integration/table_diff_memory_test.go to avoid dead/redundant setup; if the test needs specific DiffResult values instead, set them after calling ExecuteTask or modify ExecuteTask to accept initial state, otherwise simply delete the block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/integration/table_diff_memory_test.go`:
- Around line 102-105: Add a runtime.GC() call immediately before the final
runtime.ReadMemStats(&memAfter) to force a GC pass so HeapInuse reflects live
objects only, and rename the variable peakHeapInUse to heapInUseAfter (and
update all references) because it represents heap in use at measurement time
rather than a true peak; update any assertions or log messages that reference
peakHeapInUse to use heapInUseAfter and ensure you read memAfter via
runtime.ReadMemStats after the added runtime.GC() call.
- Around line 120-129: The test pre-initializes tdTask.DiffResult but
ExecuteTask() unconditionally overwrites t.DiffResult (see ExecuteTask in
table_diff.go), so remove the entire tdTask.DiffResult initialization block from
tests/integration/table_diff_memory_test.go to avoid dead/redundant setup; if
the test needs specific DiffResult values instead, set them after calling
ExecuteTask or modify ExecuteTask to accept initial state, otherwise simply
delete the block.
5086517 to
a43c7c0
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1558-1566: The goroutine spawn uses an unconditional send to
t.diffSem which can block forever if ctx is cancelled, leaving diffWg.Wait()
hung; change the semaphore acquisition in the anonymous goroutine that calls
t.recursiveDiff (and the analogous recursive spawn site) to a context-aware
select: attempt to send on t.diffSem in one case and return (ensuring the
associated diffWg.Done is invoked) if ctx.Done() fires in the other case, so the
goroutine doesn't block on semaphore acquisition when cancelled and ensures the
wait group is decremented or the caller returns promptly.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/consistency/diff/table_diff.gotests/integration/table_diff_memory_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/table_diff_memory_test.go
The recursive diff phase had no concurrency control — every mismatched range spawned a new goroutine, and each recursion level spawned more. With large numbers of mismatches, this caused exponential goroutine fan-out, each fetching rows from Postgres into memory simultaneously, leading to OOM. Add a channel-based semaphore (diffSem) that limits concurrent recursive diff goroutines to maxConcurrent (NumCPU * ConcurrencyFactor), the same limit used by the initial hash phase. The semaphore is acquired inside the goroutine body to avoid parent-child deadlock. Measured on 100k mismatched rows (4 CPUs): - Before: 2+ GB heap, 5 GB system memory, ~24M heap objects - After: 372 MB heap, 1 GB system memory, 5 goroutines Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
a43c7c0 to
76f9598
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)
1552-1577: Optional: bound goroutine creation count, not just the running count.The current loop spawns all
len(mismatchedTasks)goroutines upfront. At mostmaxConcurrentrun; the rest block ont.diffSem. Blocked goroutines don't hold DB connections or row data, so memory is much better than before. However, for very large mismatch counts (e.g., tens of thousands of initial mismatched ranges), goroutine stacks alone (~2–8 KB each) can add up.The initial hash phase (Lines 1458–1485) already uses the canonical bounded worker-pool pattern — a fixed pool of
maxConcurrentgoroutines draining a channel. Applying the same pattern here would cap the number of created diff goroutines tomaxConcurrent, not just the number running:♻️ Sketch of worker-pool approach
- for _, task := range mismatchedTasks { - if t.shouldStopDueToLimit() { - diffBar.Increment() - continue - } - diffWg.Add(1) - go func(task RecursiveDiffTask) { - select { - case t.diffSem <- struct{}{}: - defer func() { <-t.diffSem }() - case <-ctx.Done(): - diffWg.Done() - diffBar.Increment() - return - } - defer diffBar.Increment() - t.recursiveDiff(ctx, task, &diffWg) - }(task) - } + diffTaskQueue := make(chan RecursiveDiffTask, len(mismatchedTasks)) + for i := 0; i < maxConcurrent; i++ { + diffWg.Add(1) + go func() { + defer diffWg.Done() + for task := range diffTaskQueue { + if t.shouldStopDueToLimit() { + diffBar.Increment() + continue + } + // Each sub-task is a unit of work; recursiveDiff spawns + // its own children via t.diffSem as before. + innerWg := &sync.WaitGroup{} + innerWg.Add(1) + t.recursiveDiff(ctx, task, innerWg) + innerWg.Wait() + diffBar.Increment() + } + }() + } + for _, task := range mismatchedTasks { + diffTaskQueue <- task + } + close(diffTaskQueue)Note: this sketch omits the recursive sub-range goroutines spawned inside
recursiveDiff; those would still uset.diffSemas-is.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/consistency/diff/table_diff.go` around lines 1552 - 1577, The loop currently starts a goroutine per entry in mismatchedTasks which can create thousands of goroutines; instead implement a bounded worker-pool: create a tasks channel, start maxConcurrent worker goroutines (using the same t.diffSem or without it) that range over the channel and for each task call t.recursiveDiff(ctx, task, &diffWg) while managing diffWg and diffBar (increment diffBar and call diffWg.Add/Done appropriately inside workers), close the channel after enqueuing mismatchedTasks, and wait for all workers to finish; reference mismatchedTasks, recursiveDiff, t.diffSem, diffWg, and diffBar when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1552-1577: The loop currently starts a goroutine per entry in
mismatchedTasks which can create thousands of goroutines; instead implement a
bounded worker-pool: create a tasks channel, start maxConcurrent worker
goroutines (using the same t.diffSem or without it) that range over the channel
and for each task call t.recursiveDiff(ctx, task, &diffWg) while managing diffWg
and diffBar (increment diffBar and call diffWg.Add/Done appropriately inside
workers), close the channel after enqueuing mismatchedTasks, and wait for all
workers to finish; reference mismatchedTasks, recursiveDiff, t.diffSem, diffWg,
and diffBar when making the change.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/consistency/diff/table_diff.gotests/integration/table_diff_memory_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/table_diff_memory_test.go
|
Reviewed together with Ibrar online |
The recursive diff phase had no concurrency control — every mismatched range spawned a new goroutine, and each recursion level spawned more. With large numbers of mismatches, this caused exponential goroutine fan-out, each fetching rows from Postgres into memory simultaneously, leading to OOM.
Add a channel-based semaphore (diffSem) that limits concurrent recursive diff goroutines to maxConcurrent (NumCPU * ConcurrencyFactor), the same limit used by the initial hash phase. The semaphore is acquired inside the goroutine body to avoid parent-child deadlock.
Measured on 100k mismatched rows (4 CPUs):