From 75bbe1ae9b36b7fad6072afafb3decfc535b1567 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 10 Feb 2026 10:21:51 +0800 Subject: [PATCH 1/8] Fixed concurrency issues caused by write and flush sorting during query execution --- .../memtable/AbstractWritableMemChunk.java | 21 +++++++- .../memtable/AlignedWritableMemChunk.java | 11 ++-- .../dataregion/memtable/WritableMemChunk.java | 10 +--- .../db/utils/datastructure/AlignedTVList.java | 12 +++++ .../db/utils/datastructure/BinaryTVList.java | 9 ++++ .../db/utils/datastructure/BooleanTVList.java | 9 ++++ .../db/utils/datastructure/DoubleTVList.java | 9 ++++ .../db/utils/datastructure/FloatTVList.java | 9 ++++ .../db/utils/datastructure/IntTVList.java | 9 ++++ .../db/utils/datastructure/LongTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/TVList.java | 2 + .../memtable/PrimitiveMemTableTest.java | 50 +++++++++++++++++++ 12 files changed, 144 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index f33182fc680ef..fe494004529aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; + protected TVList listForFlushSort; + /** * Release the TVList if there is no query on it. Otherwise, it should set the first query as the * owner. TVList is released until all queries finish. If it throws memory-not-enough exception @@ -198,7 +200,24 @@ public abstract void writeAlignedTablet( public abstract IMeasurementSchema getSchema(); @Override - public abstract void sortTvListForFlush(); + public synchronized void sortTvListForFlush() { + TVList workingList = getWorkingTVList(); + if (workingList.isSorted()) { + listForFlushSort = workingList; + return; + } + + boolean needCloneTimesAndIndicesInWorkingTVList; + workingList.lockQueryList(); + try { + needCloneTimesAndIndicesInWorkingTVList = !workingList.getQueryContextSet().isEmpty(); + } finally { + workingList.unlockQueryList(); + } + listForFlushSort = + needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + listForFlushSort.sort(); + } @Override public abstract int delete(long lowerBound, long upperBound); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 5fa0008ce75ae..498698f19ef91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -499,13 +499,6 @@ public long getMinTime() { return minTime; } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = list.delete(lowerBound, upperBound); @@ -557,6 +550,8 @@ public void encodeWorkingAlignedTVList( long maxNumberOfPointsInChunk, int maxNumberOfPointsInPage) { BitMap allValueColDeletedMap; + AlignedTVList list = (AlignedTVList) listForFlushSort; + allValueColDeletedMap = ignoreAllNullRows ? list.getAllValueColDeletedMap() : null; boolean[] timeDuplicateInfo = null; @@ -623,6 +618,7 @@ private void handleEncoding( boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { + AlignedTVList list = (AlignedTVList) listForFlushSort; List dataTypes = list.getTsDataTypes(); Pair[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List pageRange : chunkRange) { @@ -754,6 +750,7 @@ private void handleEncoding( @Override public synchronized void encode( BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + AlignedTVList list = (AlignedTVList) listForFlushSort; encodeInfo.maxNumberOfPointsInChunk = Math.min( encodeInfo.maxNumberOfPointsInChunk, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 0256d5b16e52b..0411690adb115 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -262,13 +262,6 @@ public void putAlignedTablet( throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public TVList getWorkingTVList() { return list; @@ -391,6 +384,7 @@ public String toString() { public void encodeWorkingTVList( BlockingQueue ioTaskQueue, long maxNumberOfPointsInChunk, long targetChunkSize) { + TVList list = listForFlushSort; TSDataType tsDataType = schema.getType(); ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); long dataSizeInCurrentChunk = 0; @@ -488,7 +482,7 @@ public synchronized void encode( // create MultiTvListIterator. It need not handle float/double precision here. List tvLists = new ArrayList<>(sortedList); - tvLists.add(list); + tvLists.add(listForFlushSort); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 8c732d4f08e5e..fdbebb70960a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -165,6 +165,18 @@ public TVList getTvListByColumnIndex( return alignedTvList; } + @Override + public AlignedTVList cloneForFlushSort() { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneList.timeDeletedCnt = this.timeDeletedCnt; + cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize; + cloneList.values = this.values; + cloneList.bitMaps = this.bitMaps; + cloneList.timeColDeletedMap = this.timeColDeletedMap; + return cloneList; + } + @Override public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index dc4ff5529d45b..7899565b75c9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -63,6 +63,15 @@ public static BinaryTVList newList() { } } + @Override + public TVList cloneForFlushSort() { + BinaryTVList cloneList = BinaryTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BinaryTVList clone() { BinaryTVList cloneList = BinaryTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index b8eb0e508bfe8..a4cc03401bb86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -62,6 +62,15 @@ public static BooleanTVList newList() { } } + @Override + public TVList cloneForFlushSort() { + BooleanTVList cloneList = BooleanTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BooleanTVList clone() { BooleanTVList cloneList = BooleanTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index f61995ef0628a..9897e11db56c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -63,6 +63,15 @@ public static DoubleTVList newList() { } } + @Override + public TVList cloneForFlushSort() { + DoubleTVList cloneList = DoubleTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized DoubleTVList clone() { DoubleTVList cloneList = DoubleTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 3623fa49a3efe..857668cd3a89d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -63,6 +63,15 @@ public static FloatTVList newList() { } } + @Override + public TVList cloneForFlushSort() { + FloatTVList cloneList = FloatTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized FloatTVList clone() { FloatTVList cloneList = FloatTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 0146ecf0e6b25..172f092046823 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -62,6 +62,15 @@ public static IntTVList newList(TSDataType dataType) { } } + @Override + public TVList cloneForFlushSort() { + IntTVList cloneList = IntTVList.newList(dataType); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized IntTVList clone() { IntTVList cloneList = IntTVList.newList(dataType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 7b4bd8d82d264..74789b1842f8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -62,6 +62,15 @@ public static LongTVList newList() { } } + @Override + public TVList cloneForFlushSort() { + LongTVList cloneList = LongTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized LongTVList clone() { LongTVList cloneList = LongTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index d61611c4cc5f3..b3bedd62e8fdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -520,6 +520,8 @@ public long getVersion() { protected abstract void expandValues(); + public abstract TVList cloneForFlushSort(); + @Override public abstract TVList clone(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index b16e20d4f857b..0412981433afb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -205,6 +205,56 @@ public void testWriteDuringPrepareTVListAndActualQueryExecution() } } + @Test + public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() + throws QueryProcessException, IOException, IllegalPathException { + + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + List measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + for (int i = 1000; i < 2000; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + for (int i = 100; i < 200; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", TSDataType.INT32), 150, 160)); + ResourceByPathUtils resourcesByPathUtils = + ResourceByPathUtils.getResourceInstance( + new AlignedFullPath( + new StringArrayDeviceID("root.test.d1"), + Arrays.asList("s1", "s2", "s3"), + measurementSchemas)); + ReadOnlyMemChunk readOnlyMemChunk = + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(1), memTable, null, Long.MAX_VALUE, null); + + for (int i = 1; i <= 50; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1").sortTvListForFlush(); + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s2").sortTvListForFlush(); + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s3").sortTvListForFlush(); + + readOnlyMemChunk.sortTvLists(); + + MemPointIterator memPointIterator = readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null); + while (memPointIterator.hasNextBatch()) { + memPointIterator.nextBatch(); + } + } + @Test public void memSeriesToStringTest() throws IOException { TSDataType dataType = TSDataType.INT32; From dc5b75e2c89ecc29ed0528179a5827a3d76e046f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 10 Feb 2026 10:27:20 +0800 Subject: [PATCH 2/8] spotless --- .../dataregion/memtable/PrimitiveMemTableTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 0412981433afb..25d8bbfd76c18 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -243,9 +243,15 @@ public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() memTable.writeAlignedRow( new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); } - memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1").sortTvListForFlush(); - memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s2").sortTvListForFlush(); - memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s3").sortTvListForFlush(); + memTable + .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1") + .sortTvListForFlush(); + memTable + .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s2") + .sortTvListForFlush(); + memTable + .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s3") + .sortTvListForFlush(); readOnlyMemChunk.sortTvLists(); From a0d71bdbdb21203970750e260dc917ae22224698 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 10 Feb 2026 10:42:43 +0800 Subject: [PATCH 3/8] add comments --- .../memtable/AbstractWritableMemChunk.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index fe494004529aa..3fceeba02f90c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -207,6 +207,23 @@ public synchronized void sortTvListForFlush() { return; } + /* + * Concurrency background: + * + * A query may start earlier and record the current row count (rows) of the TVList as its visible range. + * After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList. + * + * During sorting, the underlying indices array of the TVList may be reordered. + * If the query continues to use the previously recorded rows as its upper bound, + * it may convert a logical index to a physical index via the updated indices array. + * + * In this case, the converted physical index may exceed the previously visible + * rows range, leading to invalid access or unexpected behavior. + * + * To avoid this issue, when there are active queries on the working TVList, we must + * clone the times and indices before sorting, so that the flush sort does not mutate + * the data structures that concurrent queries rely on. + */ boolean needCloneTimesAndIndicesInWorkingTVList; workingList.lockQueryList(); try { From 955b4ea58390d65de69003af76ffa556ff374790 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 10 Feb 2026 12:19:31 +0800 Subject: [PATCH 4/8] fix ut --- .../dataregion/memtable/PrimitiveMemTableTest.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 25d8bbfd76c18..39a4f3e01f428 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -243,15 +243,7 @@ public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() memTable.writeAlignedRow( new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); } - memTable - .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1") - .sortTvListForFlush(); - memTable - .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s2") - .sortTvListForFlush(); - memTable - .getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s3") - .sortTvListForFlush(); + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "").sortTvListForFlush(); readOnlyMemChunk.sortTvLists(); From b12f007e6741acd440fd303ec05cee1d9c30a0a3 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 11 Feb 2026 14:14:52 +0800 Subject: [PATCH 5/8] fix --- .../dataregion/flush/MemTableFlushTask.java | 1 + .../memtable/AbstractWritableMemChunk.java | 14 +++++--- .../memtable/AlignedWritableMemChunk.java | 34 +++++++++++-------- .../memtable/IWritableMemChunk.java | 2 ++ .../dataregion/memtable/WritableMemChunk.java | 32 +++++++++-------- .../db/utils/datastructure/AlignedTVList.java | 2 +- .../db/utils/datastructure/BinaryTVList.java | 2 +- .../db/utils/datastructure/BooleanTVList.java | 2 +- .../db/utils/datastructure/DoubleTVList.java | 2 +- .../db/utils/datastructure/FloatTVList.java | 2 +- .../db/utils/datastructure/IntTVList.java | 2 +- .../db/utils/datastructure/LongTVList.java | 2 +- 12 files changed, 55 insertions(+), 42 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 6977167f0e629..ffcbb97ffd1bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -272,6 +272,7 @@ public void run() { times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE]; } writableMemChunk.encode(ioTaskQueue, encodeInfo, times); + writableMemChunk.releaseTemporaryTvListForFlush(); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 3fceeba02f90c..9180c1391e795 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -47,7 +47,7 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; - protected TVList listForFlushSort; + protected TVList workingListForFlush; /** * Release the TVList if there is no query on it. Otherwise, it should set the first query as the @@ -200,10 +200,10 @@ public abstract void writeAlignedTablet( public abstract IMeasurementSchema getSchema(); @Override - public synchronized void sortTvListForFlush() { + public void sortTvListForFlush() { TVList workingList = getWorkingTVList(); if (workingList.isSorted()) { - listForFlushSort = workingList; + workingListForFlush = workingList; return; } @@ -231,9 +231,13 @@ public synchronized void sortTvListForFlush() { } finally { workingList.unlockQueryList(); } - listForFlushSort = + workingListForFlush = needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; - listForFlushSort.sort(); + workingListForFlush.sort(); + } + + public void releaseTemporaryTvListForFlush() { + workingListForFlush = null; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 498698f19ef91..12a8329130357 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -68,6 +68,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { private Map measurementIndexMap; private List dataTypes; private final List schemaList; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private AlignedTVList list; private List sortedList; private long sortedRowCount = 0; @@ -550,9 +551,10 @@ public void encodeWorkingAlignedTVList( long maxNumberOfPointsInChunk, int maxNumberOfPointsInPage) { BitMap allValueColDeletedMap; - AlignedTVList list = (AlignedTVList) listForFlushSort; + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; - allValueColDeletedMap = ignoreAllNullRows ? list.getAllValueColDeletedMap() : null; + allValueColDeletedMap = + ignoreAllNullRows ? alignedWorkingListForFlush.getAllValueColDeletedMap() : null; boolean[] timeDuplicateInfo = null; @@ -564,8 +566,10 @@ public void encodeWorkingAlignedTVList( int pointNumInPage = 0; int pointNumInChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - long time = list.getTime(sortedRowIndex); + for (int sortedRowIndex = 0; + sortedRowIndex < alignedWorkingListForFlush.rowCount(); + sortedRowIndex++) { + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (pointNumInPage == 0) { pageRange.add(sortedRowIndex); } @@ -586,15 +590,17 @@ public void encodeWorkingAlignedTVList( } int nextRowIndex = sortedRowIndex + 1; - while (nextRowIndex < list.rowCount() + while (nextRowIndex < alignedWorkingListForFlush.rowCount() && ((allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex))) - || list.isTimeDeleted(nextRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(nextRowIndex))) + || alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) { nextRowIndex++; } - if (nextRowIndex != list.rowCount() && time == list.getTime(nextRowIndex)) { + if (nextRowIndex != alignedWorkingListForFlush.rowCount() + && time == alignedWorkingListForFlush.getTime(nextRowIndex)) { if (Objects.isNull(timeDuplicateInfo)) { - timeDuplicateInfo = new boolean[list.rowCount()]; + timeDuplicateInfo = new boolean[alignedWorkingListForFlush.rowCount()]; } timeDuplicateInfo[sortedRowIndex] = true; } @@ -602,7 +608,7 @@ public void encodeWorkingAlignedTVList( } if (pointNumInPage != 0) { - pageRange.add(list.rowCount() - 1); + pageRange.add(alignedWorkingListForFlush.rowCount() - 1); } if (pointNumInChunk != 0) { chunkRange.add(pageRange); @@ -618,7 +624,7 @@ private void handleEncoding( boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { - AlignedTVList list = (AlignedTVList) listForFlushSort; + AlignedTVList list = (AlignedTVList) workingListForFlush; List dataTypes = list.getTsDataTypes(); Pair[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List pageRange : chunkRange) { @@ -748,9 +754,7 @@ private void handleEncoding( } @Override - public synchronized void encode( - BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { - AlignedTVList list = (AlignedTVList) listForFlushSort; + public void encode(BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { encodeInfo.maxNumberOfPointsInChunk = Math.min( encodeInfo.maxNumberOfPointsInChunk, @@ -767,7 +771,7 @@ public synchronized void encode( // create MergeSortAlignedTVListIterator. List alignedTvLists = new ArrayList<>(sortedList); - alignedTvLists.add(list); + alignedTvLists.add((AlignedTVList) workingListForFlush); List columnIndexList = buildColumnIndexList(schemaList); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 635e06da4b783..af55d7df9d117 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -98,6 +98,8 @@ void writeAlignedTablet( */ void sortTvListForFlush(); + void releaseTemporaryTvListForFlush(); + default long getMaxTime() { return Long.MAX_VALUE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 0411690adb115..571a371c84957 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -59,6 +59,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { private IMeasurementSchema schema; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private TVList list; private List sortedList; private long sortedRowCount = 0; @@ -384,55 +385,57 @@ public String toString() { public void encodeWorkingTVList( BlockingQueue ioTaskQueue, long maxNumberOfPointsInChunk, long targetChunkSize) { - TVList list = listForFlushSort; TSDataType tsDataType = schema.getType(); ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); long dataSizeInCurrentChunk = 0; int pointNumInCurrentChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - if (list.isNullValue(list.getValueIndex(sortedRowIndex))) { + for (int sortedRowIndex = 0; + sortedRowIndex < workingListForFlush.rowCount(); + sortedRowIndex++) { + if (workingListForFlush.isNullValue(workingListForFlush.getValueIndex(sortedRowIndex))) { continue; } - long time = list.getTime(sortedRowIndex); + long time = workingListForFlush.getTime(sortedRowIndex); // skip duplicated data - if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { + if ((sortedRowIndex + 1 < workingListForFlush.rowCount() + && (time == workingListForFlush.getTime(sortedRowIndex + 1)))) { continue; } // store last point for SDT - if (sortedRowIndex + 1 == list.rowCount()) { + if (sortedRowIndex + 1 == workingListForFlush.rowCount()) { chunkWriterImpl.setLastPoint(true); } switch (tsDataType) { case BOOLEAN: - chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getBoolean(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 1L; break; case INT32: case DATE: - chunkWriterImpl.write(time, list.getInt(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getInt(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case INT64: case TIMESTAMP: - chunkWriterImpl.write(time, list.getLong(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getLong(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case FLOAT: - chunkWriterImpl.write(time, list.getFloat(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getFloat(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case DOUBLE: - chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getDouble(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case TEXT: case BLOB: case STRING: - Binary value = list.getBinary(sortedRowIndex); + Binary value = workingListForFlush.getBinary(sortedRowIndex); chunkWriterImpl.write(time, value); dataSizeInCurrentChunk += 8L + getBinarySize(value); break; @@ -467,8 +470,7 @@ public void encodeWorkingTVList( } @Override - public synchronized void encode( - BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + public void encode(BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { if (TVLIST_SORT_THRESHOLD == 0) { encodeWorkingTVList( ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk, encodeInfo.targetChunkSize); @@ -482,7 +484,7 @@ public synchronized void encode( // create MultiTvListIterator. It need not handle float/double precision here. List tvLists = new ArrayList<>(sortedList); - tvLists.add(listForFlushSort); + tvLists.add(workingListForFlush); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index fdbebb70960a1..2c87fdb32abec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -166,7 +166,7 @@ public TVList getTvListByColumnIndex( } @Override - public AlignedTVList cloneForFlushSort() { + public synchronized AlignedTVList cloneForFlushSort() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); cloneAs(cloneList); cloneList.timeDeletedCnt = this.timeDeletedCnt; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index 7899565b75c9e..b274bc8ef6586 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -64,7 +64,7 @@ public static BinaryTVList newList() { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { BinaryTVList cloneList = BinaryTVList.newList(); cloneAs(cloneList); cloneList.bitMap = this.bitMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index a4cc03401bb86..5e3461acb2c8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -63,7 +63,7 @@ public static BooleanTVList newList() { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { BooleanTVList cloneList = BooleanTVList.newList(); cloneAs(cloneList); cloneList.bitMap = this.bitMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index 9897e11db56c0..753ca2020a89f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -64,7 +64,7 @@ public static DoubleTVList newList() { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { DoubleTVList cloneList = DoubleTVList.newList(); cloneAs(cloneList); cloneList.bitMap = this.bitMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 857668cd3a89d..517dc208211fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -64,7 +64,7 @@ public static FloatTVList newList() { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { FloatTVList cloneList = FloatTVList.newList(); cloneAs(cloneList); cloneList.bitMap = this.bitMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 172f092046823..a51e785d414fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -63,7 +63,7 @@ public static IntTVList newList(TSDataType dataType) { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { IntTVList cloneList = IntTVList.newList(dataType); cloneAs(cloneList); cloneList.bitMap = this.bitMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 74789b1842f8d..544d2cb7dd0eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -63,7 +63,7 @@ public static LongTVList newList() { } @Override - public TVList cloneForFlushSort() { + public synchronized TVList cloneForFlushSort() { LongTVList cloneList = LongTVList.newList(); cloneAs(cloneList); cloneList.bitMap = this.bitMap; From bb206ebcd19616613f352ba1958b391c040794ba Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 11 Feb 2026 14:16:59 +0800 Subject: [PATCH 6/8] rename --- .../memtable/AlignedWritableMemChunk.java | 57 +++++++++++++------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 12a8329130357..86878ea8dc9fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -624,8 +624,8 @@ private void handleEncoding( boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { - AlignedTVList list = (AlignedTVList) workingListForFlush; - List dataTypes = list.getTsDataTypes(); + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; + List dataTypes = alignedWorkingListForFlush.getTsDataTypes(); Pair[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List pageRange : chunkRange) { AlignedChunkWriterImpl alignedChunkWriter = @@ -643,16 +643,18 @@ private void handleEncoding( sortedRowIndex++) { // skip empty row if (allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) { continue; } // skip time duplicated rows - long time = list.getTime(sortedRowIndex); + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (Objects.nonNull(timeDuplicateInfo)) { - if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { + if (!alignedWorkingListForFlush.isNullValue( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex), columnIndex)) { lastValidPointIndexForTimeDupCheck[columnIndex].left = time; lastValidPointIndexForTimeDupCheck[columnIndex].right = - list.getValueIndex(sortedRowIndex); + alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } if (timeDuplicateInfo[sortedRowIndex]) { continue; @@ -673,41 +675,55 @@ private void handleEncoding( && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; } else { - originRowIndex = list.getValueIndex(sortedRowIndex); + originRowIndex = alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } - boolean isNull = list.isNullValue(originRowIndex, columnIndex); + boolean isNull = alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex); switch (tsDataType) { case BOOLEAN: alignedChunkWriter.writeByColumn( time, - !isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex), + !isNull + && alignedWorkingListForFlush.getBooleanByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT32: case DATE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getIntByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT64: case TIMESTAMP: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getLongByValueIndex( + originRowIndex, columnIndex), isNull); break; case FLOAT: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getFloatByValueIndex( + originRowIndex, columnIndex), isNull); break; case DOUBLE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getDoubleByValueIndex( + originRowIndex, columnIndex), isNull); break; case TEXT: @@ -716,7 +732,10 @@ private void handleEncoding( case OBJECT: alignedChunkWriter.writeByColumn( time, - isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex), + isNull + ? null + : alignedWorkingListForFlush.getBinaryByValueIndex( + originRowIndex, columnIndex), isNull); break; default: @@ -726,19 +745,21 @@ private void handleEncoding( alignedChunkWriter.nextColumn(); } - long[] times = new long[Math.min(maxNumberOfPointsInPage, list.rowCount())]; + long[] times = + new long[Math.min(maxNumberOfPointsInPage, alignedWorkingListForFlush.rowCount())]; int pointsInPage = 0; for (int sortedRowIndex = pageRange.get(pageNum * 2); sortedRowIndex <= pageRange.get(pageNum * 2 + 1); sortedRowIndex++) { // skip empty row if (((allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) - || (list.isTimeDeleted(sortedRowIndex)))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) + || (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) { continue; } if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) { - times[pointsInPage++] = list.getTime(sortedRowIndex); + times[pointsInPage++] = alignedWorkingListForFlush.getTime(sortedRowIndex); } } alignedChunkWriter.write(times, pointsInPage, 0); From 670547593e5fc05ade85b1680aaf844ba8175eb3 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 11 Feb 2026 14:26:10 +0800 Subject: [PATCH 7/8] fix conflict --- .../dataregion/memtable/PrimitiveMemTableTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index bd6cab509d81b..169dd11042fa0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -237,7 +237,7 @@ public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() measurementSchemas)); ReadOnlyMemChunk readOnlyMemChunk = resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( - new QueryContext(1), memTable, null, Long.MAX_VALUE, null); + new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null); for (int i = 1; i <= 50; i++) { memTable.writeAlignedRow( From 192d2b83902230ba9ce9d5f23b89e55fa9426151 Mon Sep 17 00:00:00 2001 From: shuwenwei <55970239+shuwenwei@users.noreply.github.com> Date: Wed, 11 Feb 2026 16:03:58 +0800 Subject: [PATCH 8/8] Update iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../dataregion/memtable/AbstractWritableMemChunk.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 9180c1391e795..7a3233d58b638 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -236,6 +236,7 @@ public void sortTvListForFlush() { workingListForFlush.sort(); } + @Override public void releaseTemporaryTvListForFlush() { workingListForFlush = null; }