Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ public void run() {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
memSerializeTime += subTaskTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
protected static long RETRY_INTERVAL_MS = 100L;
protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;

protected TVList workingListForFlush;

/**
* Release the TVList if there is no query on it. Otherwise, it should set the first query as the
* owner. TVList is released until all queries finish. If it throws memory-not-enough exception
Expand Down Expand Up @@ -198,7 +200,46 @@ public abstract void writeAlignedTablet(
public abstract IMeasurementSchema getSchema();

@Override
public abstract void sortTvListForFlush();
public void sortTvListForFlush() {
TVList workingList = getWorkingTVList();
if (workingList.isSorted()) {
workingListForFlush = workingList;
return;
}

/*
* Concurrency background:
*
* A query may start earlier and record the current row count (rows) of the TVList as its visible range.
* After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList.
*
* During sorting, the underlying indices array of the TVList may be reordered.
* If the query continues to use the previously recorded rows as its upper bound,
* it may convert a logical index to a physical index via the updated indices array.
*
* In this case, the converted physical index may exceed the previously visible
* rows range, leading to invalid access or unexpected behavior.
*
* To avoid this issue, when there are active queries on the working TVList, we must
* clone the times and indices before sorting, so that the flush sort does not mutate
* the data structures that concurrent queries rely on.
*/
boolean needCloneTimesAndIndicesInWorkingTVList;
workingList.lockQueryList();
try {
needCloneTimesAndIndicesInWorkingTVList = !workingList.getQueryContextSet().isEmpty();
} finally {
workingList.unlockQueryList();
}
workingListForFlush =
needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList;
workingListForFlush.sort();
}

@Override
public void releaseTemporaryTvListForFlush() {
workingListForFlush = null;
}

@Override
public abstract int delete(long lowerBound, long upperBound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk {
private Map<String, Integer> measurementIndexMap;
private List<TSDataType> dataTypes;
private final List<IMeasurementSchema> schemaList;
// Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask
private AlignedTVList list;
private List<AlignedTVList> sortedList;
private long sortedRowCount = 0;
Expand Down Expand Up @@ -499,13 +500,6 @@ public long getMinTime() {
return minTime;
}

@Override
public synchronized void sortTvListForFlush() {
if (!list.isSorted()) {
list.sort();
}
}

@Override
public int delete(long lowerBound, long upperBound) {
int deletedNumber = list.delete(lowerBound, upperBound);
Expand Down Expand Up @@ -557,7 +551,10 @@ public void encodeWorkingAlignedTVList(
long maxNumberOfPointsInChunk,
int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap;
allValueColDeletedMap = ignoreAllNullRows ? list.getAllValueColDeletedMap() : null;
AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush;

allValueColDeletedMap =
ignoreAllNullRows ? alignedWorkingListForFlush.getAllValueColDeletedMap() : null;

boolean[] timeDuplicateInfo = null;

Expand All @@ -569,8 +566,10 @@ public void encodeWorkingAlignedTVList(
int pointNumInPage = 0;
int pointNumInChunk = 0;

for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
for (int sortedRowIndex = 0;
sortedRowIndex < alignedWorkingListForFlush.rowCount();
sortedRowIndex++) {
long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
if (pointNumInPage == 0) {
pageRange.add(sortedRowIndex);
}
Expand All @@ -591,23 +590,25 @@ public void encodeWorkingAlignedTVList(
}

int nextRowIndex = sortedRowIndex + 1;
while (nextRowIndex < list.rowCount()
while (nextRowIndex < alignedWorkingListForFlush.rowCount()
&& ((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex)))
|| list.isTimeDeleted(nextRowIndex))) {
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(nextRowIndex)))
|| alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) {
nextRowIndex++;
}
if (nextRowIndex != list.rowCount() && time == list.getTime(nextRowIndex)) {
if (nextRowIndex != alignedWorkingListForFlush.rowCount()
&& time == alignedWorkingListForFlush.getTime(nextRowIndex)) {
if (Objects.isNull(timeDuplicateInfo)) {
timeDuplicateInfo = new boolean[list.rowCount()];
timeDuplicateInfo = new boolean[alignedWorkingListForFlush.rowCount()];
}
timeDuplicateInfo[sortedRowIndex] = true;
}
sortedRowIndex = nextRowIndex - 1;
}

if (pointNumInPage != 0) {
pageRange.add(list.rowCount() - 1);
pageRange.add(alignedWorkingListForFlush.rowCount() - 1);
}
if (pointNumInChunk != 0) {
chunkRange.add(pageRange);
Expand All @@ -623,7 +624,8 @@ private void handleEncoding(
boolean[] timeDuplicateInfo,
BitMap allValueColDeletedMap,
int maxNumberOfPointsInPage) {
List<TSDataType> dataTypes = list.getTsDataTypes();
AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush;
List<TSDataType> dataTypes = alignedWorkingListForFlush.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()];
for (List<Integer> pageRange : chunkRange) {
AlignedChunkWriterImpl alignedChunkWriter =
Expand All @@ -641,16 +643,18 @@ private void handleEncoding(
sortedRowIndex++) {
// skip empty row
if (allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) {
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) {
continue;
}
// skip time duplicated rows
long time = list.getTime(sortedRowIndex);
long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
if (Objects.nonNull(timeDuplicateInfo)) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) {
if (!alignedWorkingListForFlush.isNullValue(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex), columnIndex)) {
lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
lastValidPointIndexForTimeDupCheck[columnIndex].right =
list.getValueIndex(sortedRowIndex);
alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
continue;
Expand All @@ -671,41 +675,55 @@ private void handleEncoding(
&& (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right;
} else {
originRowIndex = list.getValueIndex(sortedRowIndex);
originRowIndex = alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
}

boolean isNull = list.isNullValue(originRowIndex, columnIndex);
boolean isNull = alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex);
switch (tsDataType) {
case BOOLEAN:
alignedChunkWriter.writeByColumn(
time,
!isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex),
!isNull
&& alignedWorkingListForFlush.getBooleanByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
case INT32:
case DATE:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex),
isNull
? 0
: alignedWorkingListForFlush.getIntByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
case INT64:
case TIMESTAMP:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex),
isNull
? 0
: alignedWorkingListForFlush.getLongByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
case FLOAT:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex),
isNull
? 0
: alignedWorkingListForFlush.getFloatByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
case DOUBLE:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex),
isNull
? 0
: alignedWorkingListForFlush.getDoubleByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
case TEXT:
Expand All @@ -714,7 +732,10 @@ private void handleEncoding(
case OBJECT:
alignedChunkWriter.writeByColumn(
time,
isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex),
isNull
? null
: alignedWorkingListForFlush.getBinaryByValueIndex(
originRowIndex, columnIndex),
isNull);
break;
default:
Expand All @@ -724,19 +745,21 @@ private void handleEncoding(
alignedChunkWriter.nextColumn();
}

long[] times = new long[Math.min(maxNumberOfPointsInPage, list.rowCount())];
long[] times =
new long[Math.min(maxNumberOfPointsInPage, alignedWorkingListForFlush.rowCount())];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex)))
|| (list.isTimeDeleted(sortedRowIndex)))) {
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
|| (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) {
continue;
}
if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) {
times[pointsInPage++] = list.getTime(sortedRowIndex);
times[pointsInPage++] = alignedWorkingListForFlush.getTime(sortedRowIndex);
}
}
alignedChunkWriter.write(times, pointsInPage, 0);
Expand All @@ -752,8 +775,7 @@ private void handleEncoding(
}

@Override
public synchronized void encode(
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) {
public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) {
encodeInfo.maxNumberOfPointsInChunk =
Math.min(
encodeInfo.maxNumberOfPointsInChunk,
Expand All @@ -770,7 +792,7 @@ public synchronized void encode(

// create MergeSortAlignedTVListIterator.
List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
alignedTvLists.add(list);
alignedTvLists.add((AlignedTVList) workingListForFlush);
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ void writeAlignedTablet(
*/
void sortTvListForFlush();

void releaseTemporaryTvListForFlush();

default long getMaxTime() {
return Long.MAX_VALUE;
}
Expand Down
Loading
Loading