diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 6977167f0e629..ffcbb97ffd1bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -272,6 +272,7 @@ public void run() { times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE]; } writableMemChunk.encode(ioTaskQueue, encodeInfo, times); + writableMemChunk.releaseTemporaryTvListForFlush(); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index f33182fc680ef..7a3233d58b638 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; + protected TVList workingListForFlush; + /** * Release the TVList if there is no query on it. Otherwise, it should set the first query as the * owner. TVList is released until all queries finish. If it throws memory-not-enough exception @@ -198,7 +200,46 @@ public abstract void writeAlignedTablet( public abstract IMeasurementSchema getSchema(); @Override - public abstract void sortTvListForFlush(); + public void sortTvListForFlush() { + TVList workingList = getWorkingTVList(); + if (workingList.isSorted()) { + workingListForFlush = workingList; + return; + } + + /* + * Concurrency background: + * + * A query may start earlier and record the current row count (rows) of the TVList as its visible range. + * After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList. + * + * During sorting, the underlying indices array of the TVList may be reordered. + * If the query continues to use the previously recorded rows as its upper bound, + * it may convert a logical index to a physical index via the updated indices array. + * + * In this case, the converted physical index may exceed the previously visible + * rows range, leading to invalid access or unexpected behavior. + * + * To avoid this issue, when there are active queries on the working TVList, we must + * clone the times and indices before sorting, so that the flush sort does not mutate + * the data structures that concurrent queries rely on. + */ + boolean needCloneTimesAndIndicesInWorkingTVList; + workingList.lockQueryList(); + try { + needCloneTimesAndIndicesInWorkingTVList = !workingList.getQueryContextSet().isEmpty(); + } finally { + workingList.unlockQueryList(); + } + workingListForFlush = + needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + workingListForFlush.sort(); + } + + @Override + public void releaseTemporaryTvListForFlush() { + workingListForFlush = null; + } @Override public abstract int delete(long lowerBound, long upperBound); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 5fa0008ce75ae..86878ea8dc9fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -68,6 +68,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { private Map measurementIndexMap; private List dataTypes; private final List schemaList; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private AlignedTVList list; private List sortedList; private long sortedRowCount = 0; @@ -499,13 +500,6 @@ public long getMinTime() { return minTime; } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = list.delete(lowerBound, upperBound); @@ -557,7 +551,10 @@ public void encodeWorkingAlignedTVList( long maxNumberOfPointsInChunk, int maxNumberOfPointsInPage) { BitMap allValueColDeletedMap; - allValueColDeletedMap = ignoreAllNullRows ? list.getAllValueColDeletedMap() : null; + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; + + allValueColDeletedMap = + ignoreAllNullRows ? alignedWorkingListForFlush.getAllValueColDeletedMap() : null; boolean[] timeDuplicateInfo = null; @@ -569,8 +566,10 @@ public void encodeWorkingAlignedTVList( int pointNumInPage = 0; int pointNumInChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - long time = list.getTime(sortedRowIndex); + for (int sortedRowIndex = 0; + sortedRowIndex < alignedWorkingListForFlush.rowCount(); + sortedRowIndex++) { + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (pointNumInPage == 0) { pageRange.add(sortedRowIndex); } @@ -591,15 +590,17 @@ public void encodeWorkingAlignedTVList( } int nextRowIndex = sortedRowIndex + 1; - while (nextRowIndex < list.rowCount() + while (nextRowIndex < alignedWorkingListForFlush.rowCount() && ((allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex))) - || list.isTimeDeleted(nextRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(nextRowIndex))) + || alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) { nextRowIndex++; } - if (nextRowIndex != list.rowCount() && time == list.getTime(nextRowIndex)) { + if (nextRowIndex != alignedWorkingListForFlush.rowCount() + && time == alignedWorkingListForFlush.getTime(nextRowIndex)) { if (Objects.isNull(timeDuplicateInfo)) { - timeDuplicateInfo = new boolean[list.rowCount()]; + timeDuplicateInfo = new boolean[alignedWorkingListForFlush.rowCount()]; } timeDuplicateInfo[sortedRowIndex] = true; } @@ -607,7 +608,7 @@ public void encodeWorkingAlignedTVList( } if (pointNumInPage != 0) { - pageRange.add(list.rowCount() - 1); + pageRange.add(alignedWorkingListForFlush.rowCount() - 1); } if (pointNumInChunk != 0) { chunkRange.add(pageRange); @@ -623,7 +624,8 @@ private void handleEncoding( boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { - List dataTypes = list.getTsDataTypes(); + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; + List dataTypes = alignedWorkingListForFlush.getTsDataTypes(); Pair[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List pageRange : chunkRange) { AlignedChunkWriterImpl alignedChunkWriter = @@ -641,16 +643,18 @@ private void handleEncoding( sortedRowIndex++) { // skip empty row if (allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) { continue; } // skip time duplicated rows - long time = list.getTime(sortedRowIndex); + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (Objects.nonNull(timeDuplicateInfo)) { - if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { + if (!alignedWorkingListForFlush.isNullValue( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex), columnIndex)) { lastValidPointIndexForTimeDupCheck[columnIndex].left = time; lastValidPointIndexForTimeDupCheck[columnIndex].right = - list.getValueIndex(sortedRowIndex); + alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } if (timeDuplicateInfo[sortedRowIndex]) { continue; @@ -671,41 +675,55 @@ private void handleEncoding( && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; } else { - originRowIndex = list.getValueIndex(sortedRowIndex); + originRowIndex = alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } - boolean isNull = list.isNullValue(originRowIndex, columnIndex); + boolean isNull = alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex); switch (tsDataType) { case BOOLEAN: alignedChunkWriter.writeByColumn( time, - !isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex), + !isNull + && alignedWorkingListForFlush.getBooleanByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT32: case DATE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getIntByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT64: case TIMESTAMP: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getLongByValueIndex( + originRowIndex, columnIndex), isNull); break; case FLOAT: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getFloatByValueIndex( + originRowIndex, columnIndex), isNull); break; case DOUBLE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getDoubleByValueIndex( + originRowIndex, columnIndex), isNull); break; case TEXT: @@ -714,7 +732,10 @@ private void handleEncoding( case OBJECT: alignedChunkWriter.writeByColumn( time, - isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex), + isNull + ? null + : alignedWorkingListForFlush.getBinaryByValueIndex( + originRowIndex, columnIndex), isNull); break; default: @@ -724,19 +745,21 @@ private void handleEncoding( alignedChunkWriter.nextColumn(); } - long[] times = new long[Math.min(maxNumberOfPointsInPage, list.rowCount())]; + long[] times = + new long[Math.min(maxNumberOfPointsInPage, alignedWorkingListForFlush.rowCount())]; int pointsInPage = 0; for (int sortedRowIndex = pageRange.get(pageNum * 2); sortedRowIndex <= pageRange.get(pageNum * 2 + 1); sortedRowIndex++) { // skip empty row if (((allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) - || (list.isTimeDeleted(sortedRowIndex)))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) + || (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) { continue; } if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) { - times[pointsInPage++] = list.getTime(sortedRowIndex); + times[pointsInPage++] = alignedWorkingListForFlush.getTime(sortedRowIndex); } } alignedChunkWriter.write(times, pointsInPage, 0); @@ -752,8 +775,7 @@ private void handleEncoding( } @Override - public synchronized void encode( - BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + public void encode(BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { encodeInfo.maxNumberOfPointsInChunk = Math.min( encodeInfo.maxNumberOfPointsInChunk, @@ -770,7 +792,7 @@ public synchronized void encode( // create MergeSortAlignedTVListIterator. List alignedTvLists = new ArrayList<>(sortedList); - alignedTvLists.add(list); + alignedTvLists.add((AlignedTVList) workingListForFlush); List columnIndexList = buildColumnIndexList(schemaList); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 635e06da4b783..af55d7df9d117 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -98,6 +98,8 @@ void writeAlignedTablet( */ void sortTvListForFlush(); + void releaseTemporaryTvListForFlush(); + default long getMaxTime() { return Long.MAX_VALUE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 0256d5b16e52b..571a371c84957 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -59,6 +59,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { private IMeasurementSchema schema; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private TVList list; private List sortedList; private long sortedRowCount = 0; @@ -262,13 +263,6 @@ public void putAlignedTablet( throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public TVList getWorkingTVList() { return list; @@ -395,50 +389,53 @@ public void encodeWorkingTVList( ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); long dataSizeInCurrentChunk = 0; int pointNumInCurrentChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - if (list.isNullValue(list.getValueIndex(sortedRowIndex))) { + for (int sortedRowIndex = 0; + sortedRowIndex < workingListForFlush.rowCount(); + sortedRowIndex++) { + if (workingListForFlush.isNullValue(workingListForFlush.getValueIndex(sortedRowIndex))) { continue; } - long time = list.getTime(sortedRowIndex); + long time = workingListForFlush.getTime(sortedRowIndex); // skip duplicated data - if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { + if ((sortedRowIndex + 1 < workingListForFlush.rowCount() + && (time == workingListForFlush.getTime(sortedRowIndex + 1)))) { continue; } // store last point for SDT - if (sortedRowIndex + 1 == list.rowCount()) { + if (sortedRowIndex + 1 == workingListForFlush.rowCount()) { chunkWriterImpl.setLastPoint(true); } switch (tsDataType) { case BOOLEAN: - chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getBoolean(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 1L; break; case INT32: case DATE: - chunkWriterImpl.write(time, list.getInt(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getInt(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case INT64: case TIMESTAMP: - chunkWriterImpl.write(time, list.getLong(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getLong(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case FLOAT: - chunkWriterImpl.write(time, list.getFloat(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getFloat(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case DOUBLE: - chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getDouble(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case TEXT: case BLOB: case STRING: - Binary value = list.getBinary(sortedRowIndex); + Binary value = workingListForFlush.getBinary(sortedRowIndex); chunkWriterImpl.write(time, value); dataSizeInCurrentChunk += 8L + getBinarySize(value); break; @@ -473,8 +470,7 @@ public void encodeWorkingTVList( } @Override - public synchronized void encode( - BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + public void encode(BlockingQueue ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { if (TVLIST_SORT_THRESHOLD == 0) { encodeWorkingTVList( ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk, encodeInfo.targetChunkSize); @@ -488,7 +484,7 @@ public synchronized void encode( // create MultiTvListIterator. It need not handle float/double precision here. List tvLists = new ArrayList<>(sortedList); - tvLists.add(list); + tvLists.add(workingListForFlush); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 8c732d4f08e5e..2c87fdb32abec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -165,6 +165,18 @@ public TVList getTvListByColumnIndex( return alignedTvList; } + @Override + public synchronized AlignedTVList cloneForFlushSort() { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneList.timeDeletedCnt = this.timeDeletedCnt; + cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize; + cloneList.values = this.values; + cloneList.bitMaps = this.bitMaps; + cloneList.timeColDeletedMap = this.timeColDeletedMap; + return cloneList; + } + @Override public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index dc4ff5529d45b..b274bc8ef6586 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -63,6 +63,15 @@ public static BinaryTVList newList() { } } + @Override + public synchronized TVList cloneForFlushSort() { + BinaryTVList cloneList = BinaryTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BinaryTVList clone() { BinaryTVList cloneList = BinaryTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index b8eb0e508bfe8..5e3461acb2c8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -62,6 +62,15 @@ public static BooleanTVList newList() { } } + @Override + public synchronized TVList cloneForFlushSort() { + BooleanTVList cloneList = BooleanTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BooleanTVList clone() { BooleanTVList cloneList = BooleanTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index f61995ef0628a..753ca2020a89f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -63,6 +63,15 @@ public static DoubleTVList newList() { } } + @Override + public synchronized TVList cloneForFlushSort() { + DoubleTVList cloneList = DoubleTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized DoubleTVList clone() { DoubleTVList cloneList = DoubleTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 3623fa49a3efe..517dc208211fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -63,6 +63,15 @@ public static FloatTVList newList() { } } + @Override + public synchronized TVList cloneForFlushSort() { + FloatTVList cloneList = FloatTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized FloatTVList clone() { FloatTVList cloneList = FloatTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 0146ecf0e6b25..a51e785d414fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -62,6 +62,15 @@ public static IntTVList newList(TSDataType dataType) { } } + @Override + public synchronized TVList cloneForFlushSort() { + IntTVList cloneList = IntTVList.newList(dataType); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized IntTVList clone() { IntTVList cloneList = IntTVList.newList(dataType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 7b4bd8d82d264..544d2cb7dd0eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -62,6 +62,15 @@ public static LongTVList newList() { } } + @Override + public synchronized TVList cloneForFlushSort() { + LongTVList cloneList = LongTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized LongTVList clone() { LongTVList cloneList = LongTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index d61611c4cc5f3..b3bedd62e8fdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -520,6 +520,8 @@ public long getVersion() { protected abstract void expandValues(); + public abstract TVList cloneForFlushSort(); + @Override public abstract TVList clone(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 7e77abca29e18..169dd11042fa0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -205,6 +205,54 @@ public void testWriteDuringPrepareTVListAndActualQueryExecution() } } + @Test + public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() + throws QueryProcessException, IOException, IllegalPathException { + + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + List measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + for (int i = 1000; i < 2000; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + for (int i = 100; i < 200; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", TSDataType.INT32), 150, 160)); + ResourceByPathUtils resourcesByPathUtils = + ResourceByPathUtils.getResourceInstance( + new AlignedFullPath( + new StringArrayDeviceID("root.test.d1"), + Arrays.asList("s1", "s2", "s3"), + measurementSchemas)); + ReadOnlyMemChunk readOnlyMemChunk = + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null); + + for (int i = 1; i <= 50; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "").sortTvListForFlush(); + + readOnlyMemChunk.sortTvLists(); + + MemPointIterator memPointIterator = readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null); + while (memPointIterator.hasNextBatch()) { + memPointIterator.nextBatch(); + } + } + @Test public void memSeriesToStringTest() throws IOException { TSDataType dataType = TSDataType.INT32;