From 7194e73c47dac4956216192e5a28ca8491884325 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Wed, 11 Feb 2026 10:56:41 +0800 Subject: [PATCH] Fix race condition in SharedTsBlockQueue async listener causing NPE in MemoryPool.free() When SharedTsBlockQueue.add() encounters memory pressure, it registers an async listener on a MemoryReservationFuture to add the TsBlock later. If the upstream FragmentInstance finishes and calls abort()/close() before the listener executes, the following race occurs: 1. abort() sets closed=true, clears the queue, frees bufferRetainedSizeInBytes 2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's memory mapping 3. The async listener fires and adds the TsBlock to the closed queue 4. The downstream consumer calls remove() -> MemoryPool.free() with the upstream FI's IDs, but the mapping no longer exists -> NPE Fix: Check the `closed` flag inside the async listener before adding the TsBlock. When closed, skip the add (memory was already freed by abort/close) and complete channelBlocked to prevent hangs. Also add a unit test that reproduces this race condition by using a manually-controlled SettableFuture to simulate the blocked-on-memory path. --- .../exchange/SharedTsBlockQueue.java | 34 ++++++-- .../exchange/SharedTsBlockQueueTest.java | 82 ++++++++++++++++++- 2 files changed, 107 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index 4aea7cf22dd54..e49efabb9869c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -205,7 +205,8 @@ public TsBlock remove() { localPlanNodeId, tsBlock.getSizeInBytes()); bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); - // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to + // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event + // to // corresponding LocalSinkChannel. if (sinkChannel != null) { sinkChannel.checkAndInvokeOnFinished(); @@ -257,6 +258,16 @@ public ListenableFuture add(TsBlock tsBlock) { blockedOnMemory.addListener( () -> { synchronized (this) { + // If the queue has been closed or aborted before this listener executes, + // we must not add the TsBlock. The memory reserved for this TsBlock has + // already been freed by abort()/close() via bufferRetainedSizeInBytes. + // Adding it would cause a downstream NPE in MemoryPool.free() when + // the consumer calls remove(), because the upstream FI's memory mapping + // has already been deregistered. + if (closed) { + channelBlocked.set(null); + return; + } queue.add(tsBlock); if (!blocked.isDone()) { blocked.set(null); @@ -266,8 +277,10 @@ public ListenableFuture add(TsBlock tsBlock) { }, // Use directExecutor() here could lead to deadlock. Thread A holds lock of // SharedTsBlockQueueA and tries to invoke the listener of - // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while - // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of + // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) + // while + // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener + // of // SharedTsBlockQueueA executorService); return channelBlocked; @@ -307,13 +320,18 @@ public void close() { bufferRetainedSizeInBytes = 0; } if (sinkChannel != null) { - // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we close - // LocalSourceHandle(with limit clause it's possible) before constructing the corresponding + // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we + // close + // LocalSourceHandle(with limit clause it's possible) before constructing the + // corresponding // LocalSinkChannel. - // If this close method is invoked by LocalSourceHandle, listener of LocalSourceHandle will - // remove the LocalSourceHandle from the map of MppDataExchangeManager and later when + // If this close method is invoked by LocalSourceHandle, listener of + // LocalSourceHandle will + // remove the LocalSourceHandle from the map of MppDataExchangeManager and later + // when // LocalSinkChannel is initialized, it will construct a new SharedTsBlockQueue. - // It is still safe that we let the LocalSourceHandle close successfully in this case. Because + // It is still safe that we let the LocalSourceHandle close successfully in this + // case. Because // the QueryTerminator will do the final cleaning logic. sinkChannel.close(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java index b96f849faf6b9..00c653499b0c4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java @@ -25,7 +25,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.tsfile.external.commons.lang3.Validate; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -37,12 +40,89 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; public class SharedTsBlockQueueTest { + + /** + * Test that when add() goes into the async listener path (memory blocked) and the queue is + * aborted before the listener fires, the listener does NOT add the TsBlock to the closed queue. + * This reproduces the race condition that caused NPE in MemoryPool.free(). + */ + @Test + public void testAsyncListenerAfterAbortDoesNotAddTsBlock() { + final String queryId = "q0"; + final long mockTsBlockSize = 1024L; + final TFragmentInstanceId fragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); + final String planNodeId = "test"; + + // Use a SettableFuture to manually control when the blocked-on-memory future + // completes. + SettableFuture manualFuture = SettableFuture.create(); + + // Create a mock MemoryPool that returns the manually-controlled future + // (simulating blocked). + LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); + MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); + Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); + + // reserve() returns (manualFuture, false) — simulating memory blocked + Mockito.when( + mockMemoryPool.reserve( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyLong(), + Mockito.anyLong())) + .thenReturn(new Pair<>(manualFuture, Boolean.FALSE)); + // tryCancel returns 0 — simulating future already completed (can't cancel) + Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L); + + // Use a direct executor so that when we complete manualFuture, the listener + // runs immediately. + SharedTsBlockQueue queue = + new SharedTsBlockQueue( + fragmentInstanceId, planNodeId, mockLocalMemoryManager, newDirectExecutorService()); + queue.getCanAddTsBlock().set(null); + queue.setMaxBytesCanReserve(Long.MAX_VALUE); + + TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize); + + // Step 1: add() goes into async path — listener is registered on manualFuture. + // reserve() returns (manualFuture, false), so the TsBlock is NOT yet added to + // the queue. + ListenableFuture addFuture; + synchronized (queue) { + addFuture = queue.add(mockTsBlock); + } + // The addFuture (channelBlocked) should not be done yet + Assert.assertFalse(addFuture.isDone()); + // Queue should be empty — TsBlock is waiting for memory + Assert.assertTrue(queue.isEmpty()); + + // Step 2: Abort the queue (simulates upstream FI state change listener calling + // abort) + synchronized (queue) { + queue.abort(); + } + Assert.assertTrue(queue.isClosed()); + + // Step 3: Now complete the manualFuture — this triggers the async listener. + // Before the fix, this would add the TsBlock to the closed queue. + // After the fix, the listener detects closed==true and returns without adding. + manualFuture.set(null); + + // Verify: queue should still be empty (TsBlock was NOT added to the closed + // queue) + Assert.assertTrue(queue.isEmpty()); + // The channelBlocked future should be completed (no hang) + Assert.assertTrue(addFuture.isDone()); + } + @Test(timeout = 15000L) public void concurrencyTest() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; - // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query. + // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per + // query. LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); MemoryManager memoryManager = Mockito.spy(new MemoryManager(10 * mockTsBlockSize)); MemoryPool spyMemoryPool =