Skip to content

Conversation

@JackieTien97
Copy link
Contributor

When SharedTsBlockQueue.add() encounters memory pressure, it registers an
async listener on a MemoryReservationFuture to add the TsBlock later. If
the upstream FragmentInstance finishes and calls abort()/close() before the
listener executes, the following race occurs:

  1. abort() sets closed=true, clears the queue, frees bufferRetainedSizeInBytes
  2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's
    memory mapping
  3. The async listener fires and adds the TsBlock to the closed queue
  4. The downstream consumer calls remove() -> MemoryPool.free() with the
    upstream FI's IDs, but the mapping no longer exists -> NPE
    Fix: Check the closed flag inside the async listener before adding the
    TsBlock. When closed, skip the add (memory was already freed by
    abort/close) and complete channelBlocked to prevent hangs.
    Also add a unit test that reproduces this race condition by using a
    manually-controlled SettableFuture to simulate the blocked-on-memory path.

…n MemoryPool.free()

When SharedTsBlockQueue.add() encounters memory pressure, it registers an
async listener on a MemoryReservationFuture to add the TsBlock later. If
the upstream FragmentInstance finishes and calls abort()/close() before the
listener executes, the following race occurs:
1. abort() sets closed=true, clears the queue, frees bufferRetainedSizeInBytes
2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's
   memory mapping
3. The async listener fires and adds the TsBlock to the closed queue
4. The downstream consumer calls remove() -> MemoryPool.free() with the
   upstream FI's IDs, but the mapping no longer exists -> NPE
Fix: Check the `closed` flag inside the async listener before adding the
TsBlock. When closed, skip the add (memory was already freed by
abort/close) and complete channelBlocked to prevent hangs.
Also add a unit test that reproduces this race condition by using a
manually-controlled SettableFuture to simulate the blocked-on-memory path.
@sonarqubecloud
Copy link

@codecov
Copy link

codecov bot commented Feb 11, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 39.50%. Comparing base (d9b692b) to head (7194e73).
⚠️ Report is 10 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #17196   +/-   ##
=========================================
  Coverage     39.50%   39.50%           
  Complexity      282      282           
=========================================
  Files          5101     5101           
  Lines        341768   341807   +39     
  Branches      43555    43518   -37     
=========================================
+ Hits         135007   135033   +26     
- Misses       206761   206774   +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@JackieTien97 JackieTien97 merged commit 7ee7a83 into master Feb 11, 2026
31 checks passed
@JackieTien97 JackieTien97 deleted the MemoryNPE branch February 11, 2026 06:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant