Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,26 @@

<junit-jupiter.version>5.14.2</junit-jupiter.version>
<junit-platform.version>1.14.2</junit-platform.version>
<netty-handler.version>4.2.9.Final</netty-handler.version>
<netty-handler.version>4.1.124.Final</netty-handler.version>

<plugin.surefire.version>3.5.4</plugin.surefire.version>
<plugin.jacoco.version>0.8.14</plugin.jacoco.version>
<plugin.checkstyle.version>3.6.0</plugin.checkstyle.version>
<plugin.javadoc.version>3.12.0</plugin.javadoc.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>1.75.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
Expand Down
29 changes: 22 additions & 7 deletions src/test/java/com/influxdb/v3/client/integration/E2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ void correctSslCertificates() throws Exception {
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
.sslRootsFilePath(influxDBcertificateFile)
.build();
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
assertGetDataSuccess(influxDBClient);
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
assertGetDataSuccess(influxDBClient);
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
void disableServerCertificateValidation() {
void disableServerCertificateValidation() throws Exception {
String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem";

ClientConfig clientConfig = new ClientConfig.Builder()
Expand All @@ -130,8 +131,9 @@ void disableServerCertificateValidation() {
.build();

// Test succeeded with wrong certificate file because disableServerCertificateValidation is true
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
assertGetDataSuccess(influxDBClient);
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
assertGetDataSuccess(influxDBClient);
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
Expand Down Expand Up @@ -196,8 +198,9 @@ public void testQueryRows() throws Exception {
String uuid = UUID.randomUUID().toString();
String measurement = "host22";
List<Map<String, Object>> testDatas = new ArrayList<>();
long baseTimestamp = System.currentTimeMillis();
for (int i = 0; i <= 9; i++) {
long timestamp = System.currentTimeMillis();
long timestamp = baseTimestamp + i;
Map<String, Object> map = Map.of(
"measurement", measurement,
"tag", "tagValue",
Expand Down Expand Up @@ -230,6 +233,8 @@ public void testQueryRows() throws Exception {
testDatas.add(map);
}

Thread.sleep(2_000);

Map<String, Object> parameters = Map.of("testId", uuid);
// Result set much be ordered by time
String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement);
Expand Down Expand Up @@ -274,8 +279,9 @@ public void testQueryRowWithOptions() throws Exception {
String uuid = UUID.randomUUID().toString();
String measurement = "host21";
List<Map<String, Object>> testDatas = new ArrayList<>();
long baseTimestamp = System.currentTimeMillis();
for (int i = 0; i <= 9; i++) {
long timestamp = System.currentTimeMillis();
long timestamp = baseTimestamp + i;
Map<String, Object> map = Map.of(
"measurement", measurement,
"tag", "tagValue",
Expand Down Expand Up @@ -308,6 +314,8 @@ public void testQueryRowWithOptions() throws Exception {
testDatas.add(map);
}

Thread.sleep(2_000);

// Result set much be ordered by time
String sql = String.format("Select * from %s where \"testId\"='%s' order by time", measurement, uuid);
try (Stream<Map<String, Object>> stream = client.queryRows(sql, QueryOptions.defaultQueryOptions())) {
Expand Down Expand Up @@ -417,6 +425,12 @@ public void testNoAllocatorMemoryLeak() {
.setTimestamp(now));

client.writePoints(points);
try {
Thread.sleep(2_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
String query = "SELECT * FROM " + measurement;

try (Stream<PointValues> stream = client.queryPoints(query)) {
Expand Down Expand Up @@ -469,6 +483,7 @@ public void testMultipleQueries() throws Exception {
.setTimestamp(now));

client.writePoints(points);
Thread.sleep(2_000);
String query = "SELECT * FROM " + measurement;

for (int i = 0; i < 20; i++) {
Expand Down
52 changes: 52 additions & 0 deletions src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallOptions;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightProducer.CallContext;
import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.PointValues;
Expand Down Expand Up @@ -186,6 +191,53 @@ void setInboundMessageSizeLarge() throws Exception {
}
}

@Test
@Timeout(5)
void queryTimeout() throws Exception {
int freePort = findFreePort();
URI uri = URI.create("http://127.0.0.1:" + freePort);
try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1);
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() {
@Override
public void getStream(final CallContext context,
final Ticket ticket,
final ServerStreamListener listener) {
listener.start(vectorSchemaRoot);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
listener.completed();
}
})
) {
flightServer.start();

String host = String.format("http://%s:%d", uri.getHost(), uri.getPort());
ClientConfig clientConfig = new ClientConfig.Builder()
.host(host)
.database("test")
.queryTimeout(Duration.ofMillis(200))
.build();

try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
Throwable thrown = Assertions.catchThrowable(() -> {
try (Stream<PointValues> stream = influxDBClient.queryPoints(
"Select * from \"nothing\""
)) {
stream.count();
}
});

Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
FlightRuntimeException fre = (FlightRuntimeException) thrown;
Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code());
}
}
}

@Test
void defaultGrpcCallOptions() {
GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions();
Expand Down
Loading