Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,7 @@ private void receive() throws IOException {
.setVersion(ReadWriteIOUtils.readByte(byteBuffer))
.setType(ReadWriteIOUtils.readShort(byteBuffer))
.setBody(byteBuffer.slice());
final TPipeTransferResp resp = agent.receive(req);

final TSStatus status = resp.getStatus();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
ok();
} else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| status.getCode()
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
LOGGER.info(
"Pipe air gap receiver {}: TSStatus {} is encountered at the air gap receiver, will ignore.",
receiverId,
resp.getStatus());
ok();
} else {
LOGGER.warn(
"Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
receiverId,
resp.getStatus(),
req);
fail();
}
handleReq(req, System.currentTimeMillis());
} catch (final PipeConnectionException e) {
LOGGER.info(
"Pipe air gap receiver {}: Socket {} closed when listening to data. Because: {}",
Expand All @@ -171,6 +151,44 @@ private void receive() throws IOException {
}
}

private void handleReq(final AirGapPseudoTPipeTransferRequest req, final long startTime)
throws IOException {
final TPipeTransferResp resp = agent.receive(req);

final TSStatus status = resp.getStatus();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
ok();
} else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| status.getCode()
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
LOGGER.info(
"Pipe air gap receiver {}: TSStatus {} is encountered at the air gap receiver, will ignore.",
receiverId,
resp.getStatus());
ok();
} else if (status.getCode()
== TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
try {
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
LOGGER.info(
"Temporary unavailable exception encountered at air gap receiver, will retry locally.");
if (System.currentTimeMillis() - startTime
< PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
handleReq(req, startTime);
}
} else {
LOGGER.warn(
"Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
receiverId,
resp.getStatus(),
req);
fail();
}
}

private void ok() throws IOException {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(AirGapOneByteResponse.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -275,7 +276,7 @@ public class CommonConfig {
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
Expand All @@ -301,6 +302,9 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;

private long pipeAirGapRetryLocalIntervalMs = 1000L;
private long pipeAirGapRetryMaxMs = -1;

private long pipeReceiverLoginPeriodicVerificationIntervalMs = -1;
private double pipeReceiverActualToEstimatedMemoryRatio = 3;

Expand Down Expand Up @@ -1126,21 +1130,20 @@ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
return pipeAsyncSinkForcedRetryTotalEventQueueSize;
}

public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
if (this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall
== pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
public void setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
if (this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall
== pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
return;
}
this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
logger.info(
"pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall is set to {}.",
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
"pipeAsyncSinkMaxRetryExecutionTimeMsPerCall is set to {}.",
pipeAsyncSinkMaxRetryExecutionTimeMsPerCall);
}

public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
return pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
}

public int getPipeAsyncSinkSelectorNumber() {
Expand Down Expand Up @@ -1558,6 +1561,35 @@ public int getPipeAirGapReceiverPort() {
return pipeAirGapReceiverPort;
}

public long getPipeAirGapRetryLocalIntervalMs() {
return pipeAirGapRetryLocalIntervalMs;
}

public void setPipeAirGapRetryLocalIntervalMs(long pipeAirGapRetryLocalIntervalMs) {
if (pipeAirGapRetryLocalIntervalMs == this.pipeAirGapRetryLocalIntervalMs) {
return;
}
this.pipeAirGapRetryLocalIntervalMs = pipeAirGapRetryLocalIntervalMs;
logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.", pipeAirGapRetryLocalIntervalMs);
}

// < 0 : 0.8 * transfer timeout to avoid timeout
// = 0 : Disable retry
// > 0 : Explicit configuration
public long getPipeAirGapRetryMaxMs() {
return pipeAirGapRetryMaxMs >= 0
? pipeAirGapRetryMaxMs
: (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() * 0.8);
}

public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) {
if (pipeAirGapRetryMaxMs == this.pipeAirGapRetryMaxMs) {
return;
}
this.pipeAirGapRetryMaxMs = pipeAirGapRetryMaxMs;
logger.info("pipeAirGapRetryMaxMs is set to {}.", pipeAirGapRetryMaxMs);
}

public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
long pipeReceiverLoginPeriodicVerificationIntervalMs) {
if (this.pipeReceiverLoginPeriodicVerificationIntervalMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ public int getPipeAirGapReceiverPort() {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}

public long getPipeAirGapRetryLocalIntervalMs() {
return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs();
}

public long getPipeAirGapRetryMaxMs() {
return COMMON_CONFIG.getPipeAirGapRetryMaxMs();
}

/////////////////////////////// Receiver ///////////////////////////////

public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public static void loadPipeStaticConfig(CommonConfig config, TrimProperties prop
properties.getProperty(
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
config.setPipeAirGapRetryLocalIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_air_gap_retry_local_interval_ms",
Long.toString(config.getPipeAirGapRetryLocalIntervalMs()))));
config.setPipeAirGapRetryMaxMs(
Long.parseLong(
properties.getProperty(
"pipe_air_gap_retry_max_ms", Long.toString(config.getPipeAirGapRetryMaxMs()))));

config.setPipeMetaReportMaxLogNumPerRound(
Double.parseDouble(
Expand Down Expand Up @@ -387,7 +396,7 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
config.setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
Expand Down
Loading