From 7dbdac4099cf32af2b8d5ea1e45c0d298572cae6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 10 Feb 2026 09:29:48 +0800 Subject: [PATCH] Pipe: Enabled retry locally for air gap receiver & temporary unavailable exception (#17188) * fix * some-part * Update IoTDBAirGapReceiver.java * fix * mipl * logger --- .../protocol/airgap/IoTDBAirGapReceiver.java | 60 ++++++++++++------- .../iotdb/commons/conf/CommonConfig.java | 52 ++++++++++++---- .../iotdb/commons/pipe/config/PipeConfig.java | 8 +++ .../commons/pipe/config/PipeDescriptor.java | 11 +++- 4 files changed, 99 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 610b9e5fe1a0a..0ff1834e8cd0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -133,27 +133,7 @@ private void receive() throws IOException { .setVersion(ReadWriteIOUtils.readByte(byteBuffer)) .setType(ReadWriteIOUtils.readShort(byteBuffer)) .setBody(byteBuffer.slice()); - final TPipeTransferResp resp = agent.receive(req); - - final TSStatus status = resp.getStatus(); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - ok(); - } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() - || status.getCode() - == TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) { - LOGGER.info( - "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap receiver, will ignore.", - receiverId, - resp.getStatus()); - ok(); - } else { - LOGGER.warn( - "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}", - receiverId, - resp.getStatus(), - req); - fail(); - } + handleReq(req, System.currentTimeMillis()); } catch (final PipeConnectionException e) { LOGGER.info( "Pipe air gap receiver {}: Socket {} closed when listening to data. Because: {}", @@ -171,6 +151,44 @@ private void receive() throws IOException { } } + private void handleReq(final AirGapPseudoTPipeTransferRequest req, final long startTime) + throws IOException { + final TPipeTransferResp resp = agent.receive(req); + + final TSStatus status = resp.getStatus(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + ok(); + } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || status.getCode() + == TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) { + LOGGER.info( + "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap receiver, will ignore.", + receiverId, + resp.getStatus()); + ok(); + } else if (status.getCode() + == TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) { + try { + Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + LOGGER.info( + "Temporary unavailable exception encountered at air gap receiver, will retry locally."); + if (System.currentTimeMillis() - startTime + < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) { + handleReq(req, startTime); + } + } else { + LOGGER.warn( + "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}", + receiverId, + resp.getStatus(), + req); + fail(); + } + } + private void ok() throws IOException { final OutputStream outputStream = socket.getOutputStream(); outputStream.write(AirGapOneByteResponse.OK); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 62c389df75894..ddaa5f8ab0700 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy; import org.apache.iotdb.commons.enums.PipeRateAverage; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; import org.apache.iotdb.rpc.RpcUtils; @@ -275,7 +276,7 @@ public class CommonConfig { private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5; private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20; private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30; - private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500; + private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500; private int pipeAsyncConnectorSelectorNumber = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); private int pipeAsyncConnectorMaxClientNumber = @@ -301,6 +302,9 @@ public class CommonConfig { private boolean pipeAirGapReceiverEnabled = false; private int pipeAirGapReceiverPort = 9780; + private long pipeAirGapRetryLocalIntervalMs = 1000L; + private long pipeAirGapRetryMaxMs = -1; + private long pipeReceiverLoginPeriodicVerificationIntervalMs = -1; private double pipeReceiverActualToEstimatedMemoryRatio = 3; @@ -1126,21 +1130,20 @@ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() { return pipeAsyncSinkForcedRetryTotalEventQueueSize; } - public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall( - long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) { - if (this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall - == pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) { + public void setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall( + long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) { + if (this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall + == pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) { return; } - this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = - pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall; + this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = pipeAsyncSinkMaxRetryExecutionTimeMsPerCall; logger.info( - "pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall is set to {}.", - pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall); + "pipeAsyncSinkMaxRetryExecutionTimeMsPerCall is set to {}.", + pipeAsyncSinkMaxRetryExecutionTimeMsPerCall); } public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() { - return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall; + return pipeAsyncSinkMaxRetryExecutionTimeMsPerCall; } public int getPipeAsyncSinkSelectorNumber() { @@ -1558,6 +1561,35 @@ public int getPipeAirGapReceiverPort() { return pipeAirGapReceiverPort; } + public long getPipeAirGapRetryLocalIntervalMs() { + return pipeAirGapRetryLocalIntervalMs; + } + + public void setPipeAirGapRetryLocalIntervalMs(long pipeAirGapRetryLocalIntervalMs) { + if (pipeAirGapRetryLocalIntervalMs == this.pipeAirGapRetryLocalIntervalMs) { + return; + } + this.pipeAirGapRetryLocalIntervalMs = pipeAirGapRetryLocalIntervalMs; + logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.", pipeAirGapRetryLocalIntervalMs); + } + + // < 0 : 0.8 * transfer timeout to avoid timeout + // = 0 : Disable retry + // > 0 : Explicit configuration + public long getPipeAirGapRetryMaxMs() { + return pipeAirGapRetryMaxMs >= 0 + ? pipeAirGapRetryMaxMs + : (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() * 0.8); + } + + public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) { + if (pipeAirGapRetryMaxMs == this.pipeAirGapRetryMaxMs) { + return; + } + this.pipeAirGapRetryMaxMs = pipeAirGapRetryMaxMs; + logger.info("pipeAirGapRetryMaxMs is set to {}.", pipeAirGapRetryMaxMs); + } + public void setPipeReceiverLoginPeriodicVerificationIntervalMs( long pipeReceiverLoginPeriodicVerificationIntervalMs) { if (this.pipeReceiverLoginPeriodicVerificationIntervalMs diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index fbf4cf191acf7..88dfaf509480c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -335,6 +335,14 @@ public int getPipeAirGapReceiverPort() { return COMMON_CONFIG.getPipeAirGapReceiverPort(); } + public long getPipeAirGapRetryLocalIntervalMs() { + return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs(); + } + + public long getPipeAirGapRetryMaxMs() { + return COMMON_CONFIG.getPipeAirGapRetryMaxMs(); + } + /////////////////////////////// Receiver /////////////////////////////// public long getPipeReceiverLoginPeriodicVerificationIntervalMs() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 5322bfcd7ba4f..0722a01633174 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -101,6 +101,15 @@ public static void loadPipeStaticConfig(CommonConfig config, TrimProperties prop properties.getProperty( "pipe_air_gap_receiver_port", Integer.toString(config.getPipeAirGapReceiverPort())))); + config.setPipeAirGapRetryLocalIntervalMs( + Long.parseLong( + properties.getProperty( + "pipe_air_gap_retry_local_interval_ms", + Long.toString(config.getPipeAirGapRetryLocalIntervalMs())))); + config.setPipeAirGapRetryMaxMs( + Long.parseLong( + properties.getProperty( + "pipe_air_gap_retry_max_ms", Long.toString(config.getPipeAirGapRetryMaxMs())))); config.setPipeMetaReportMaxLogNumPerRound( Double.parseDouble( @@ -387,7 +396,7 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr properties.getProperty( "pipe_connector_rpc_thrift_compression_enabled", String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled()))))); - config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall( + config.setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall( Long.parseLong( Optional.ofNullable( properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))