Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public TsBlock remove() {
localPlanNodeId,
tsBlock.getSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event
// to
// corresponding LocalSinkChannel.
if (sinkChannel != null) {
sinkChannel.checkAndInvokeOnFinished();
Expand Down Expand Up @@ -257,6 +258,16 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
blockedOnMemory.addListener(
() -> {
synchronized (this) {
// If the queue has been closed or aborted before this listener executes,
// we must not add the TsBlock. The memory reserved for this TsBlock has
// already been freed by abort()/close() via bufferRetainedSizeInBytes.
// Adding it would cause a downstream NPE in MemoryPool.free() when
// the consumer calls remove(), because the upstream FI's memory mapping
// has already been deregistered.
if (closed) {
channelBlocked.set(null);
return;
}
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
Expand All @@ -266,8 +277,10 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
},
// Use directExecutor() here could lead to deadlock. Thread A holds lock of
// SharedTsBlockQueueA and tries to invoke the listener of
// SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of
// SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture)
// while
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener
// of
// SharedTsBlockQueueA
executorService);
return channelBlocked;
Expand Down Expand Up @@ -307,13 +320,18 @@ public void close() {
bufferRetainedSizeInBytes = 0;
}
if (sinkChannel != null) {
// attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we close
// LocalSourceHandle(with limit clause it's possible) before constructing the corresponding
// attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we
// close
// LocalSourceHandle(with limit clause it's possible) before constructing the
// corresponding
// LocalSinkChannel.
// If this close method is invoked by LocalSourceHandle, listener of LocalSourceHandle will
// remove the LocalSourceHandle from the map of MppDataExchangeManager and later when
// If this close method is invoked by LocalSourceHandle, listener of
// LocalSourceHandle will
// remove the LocalSourceHandle from the map of MppDataExchangeManager and later
// when
// LocalSinkChannel is initialized, it will construct a new SharedTsBlockQueue.
// It is still safe that we let the LocalSourceHandle close successfully in this case. Because
// It is still safe that we let the LocalSourceHandle close successfully in this
// case. Because
// the QueryTerminator will do the final cleaning logic.
sinkChannel.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.tsfile.external.commons.lang3.Validate;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -37,12 +40,89 @@
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;

public class SharedTsBlockQueueTest {

/**
* Test that when add() goes into the async listener path (memory blocked) and the queue is
* aborted before the listener fires, the listener does NOT add the TsBlock to the closed queue.
* This reproduces the race condition that caused NPE in MemoryPool.free().
*/
@Test
public void testAsyncListenerAfterAbortDoesNotAddTsBlock() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L;
final TFragmentInstanceId fragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String planNodeId = "test";

// Use a SettableFuture to manually control when the blocked-on-memory future
// completes.
SettableFuture<Void> manualFuture = SettableFuture.create();

// Create a mock MemoryPool that returns the manually-controlled future
// (simulating blocked).
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);

// reserve() returns (manualFuture, false) — simulating memory blocked
Mockito.when(
mockMemoryPool.reserve(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
Mockito.anyLong()))
.thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
// tryCancel returns 0 — simulating future already completed (can't cancel)
Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);

// Use a direct executor so that when we complete manualFuture, the listener
// runs immediately.
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
fragmentInstanceId, planNodeId, mockLocalMemoryManager, newDirectExecutorService());
queue.getCanAddTsBlock().set(null);
queue.setMaxBytesCanReserve(Long.MAX_VALUE);

TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize);

// Step 1: add() goes into async path — listener is registered on manualFuture.
// reserve() returns (manualFuture, false), so the TsBlock is NOT yet added to
// the queue.
ListenableFuture<Void> addFuture;
synchronized (queue) {
addFuture = queue.add(mockTsBlock);
}
// The addFuture (channelBlocked) should not be done yet
Assert.assertFalse(addFuture.isDone());
// Queue should be empty — TsBlock is waiting for memory
Assert.assertTrue(queue.isEmpty());

// Step 2: Abort the queue (simulates upstream FI state change listener calling
// abort)
synchronized (queue) {
queue.abort();
}
Assert.assertTrue(queue.isClosed());

// Step 3: Now complete the manualFuture — this triggers the async listener.
// Before the fix, this would add the TsBlock to the closed queue.
// After the fix, the listener detects closed==true and returns without adding.
manualFuture.set(null);

// Verify: queue should still be empty (TsBlock was NOT added to the closed
// queue)
Assert.assertTrue(queue.isEmpty());
// The channelBlocked future should be completed (no hang)
Assert.assertTrue(addFuture.isDone());
}

@Test(timeout = 15000L)
public void concurrencyTest() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;

// Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
// Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per
// query.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryManager memoryManager = Mockito.spy(new MemoryManager(10 * mockTsBlockSize));
MemoryPool spyMemoryPool =
Expand Down
Loading