Skip to content
71 changes: 48 additions & 23 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import net.jpountz.lz4.LZ4Factory;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand Down Expand Up @@ -142,12 +143,12 @@ public class Client implements AutoCloseable {
private int retries;
private LZ4Factory lz4Factory = null;

private Client(Set<String> endpoints, Map<String,String> configuration,
private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) {
this(endpoints, configuration, sharedOperationExecutor, columnToMethodMatchingStrategy, null);
}

private Client(Set<String> endpoints, Map<String,String> configuration,
private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) {
// Simple initialization
this.configuration = ClientConfigProperties.parseConfigMap(configuration);
Expand All @@ -171,16 +172,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
// Transport
ImmutableList.Builder<Endpoint> tmpEndpoints = ImmutableList.builder();
boolean initSslContext = false;
for (String ep : endpoints) {
try {
HttpEndpoint endpoint = new HttpEndpoint(ep);
if (endpoint.isSecure()) {
for (Endpoint ep : endpoints) {
if (ep instanceof HttpEndpoint) {
HttpEndpoint httpEndpoint = (HttpEndpoint) ep;
if (httpEndpoint.isSecure()) {
initSslContext = true;
}
LOG.debug("Adding endpoint: {}", endpoint);
tmpEndpoints.add(endpoint);
} catch (Exception e) {
throw new ClientException("Failed to add endpoint " + ep, e);
LOG.debug("Adding endpoint: {}", httpEndpoint);
tmpEndpoints.add(httpEndpoint);
} else {
throw new ClientException("Unsupported endpoint type: " + ep.getClass().getName());
}
}

Expand Down Expand Up @@ -259,7 +260,7 @@ public void close() {


public static class Builder {
private Set<String> endpoints;
private Set<Endpoint> endpoints;

// Read-only configuration
private Map<String, String> configuration;
Expand Down Expand Up @@ -288,25 +289,41 @@ public Builder() {
* <ul>
* <li>{@code http://localhost:8123}</li>
* <li>{@code https://localhost:8443}</li>
* <li>{@code http://localhost:8123/clickhouse} (with path for reverse proxy scenarios)</li>
* </ul>
*
* @param endpoint - URL formatted string with protocol, host and port.
* @param endpoint - URL formatted string with protocol, host, port, and optional path.
*/
public Builder addEndpoint(String endpoint) {
try {
URL endpointURL = new URL(endpoint);

if (endpointURL.getProtocol().equalsIgnoreCase("https")) {
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), true);
} else if (endpointURL.getProtocol().equalsIgnoreCase("http")) {
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), false);
} else {
String protocolStr = endpointURL.getProtocol();
if (!protocolStr.equalsIgnoreCase("https") &&
!protocolStr.equalsIgnoreCase("http")) {
throw new IllegalArgumentException("Only HTTP and HTTPS protocols are supported");
}

boolean secure = protocolStr.equalsIgnoreCase("https");
String host = endpointURL.getHost();
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("Host cannot be empty in endpoint: " + endpoint);
}

int port = endpointURL.getPort();
if (port <= 0) {
throw new ValidationUtils.SettingsValidationException("port", "Valid port must be specified");
}

String path = endpointURL.getPath();
if (path == null || path.isEmpty()) {
path = "/";
}

return addEndpoint(Protocol.HTTP, host, port, secure, path);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Endpoint should be a valid URL string, but was " + endpoint, e);
}
return this;
}

/**
Expand All @@ -318,15 +335,23 @@ public Builder addEndpoint(String endpoint) {
* @param port - Endpoint port
*/
public Builder addEndpoint(Protocol protocol, String host, int port, boolean secure) {
return addEndpoint(protocol, host, port, secure, "/");
}

public Builder addEndpoint(Protocol protocol, String host, int port, boolean secure, String basePath) {
ValidationUtils.checkNonBlank(host, "host");
ValidationUtils.checkNotNull(protocol, "protocol");
ValidationUtils.checkRange(port, 1, ValidationUtils.TCP_PORT_NUMBER_MAX, "port");
if (secure) {
// TODO: if secure init SSL context
ValidationUtils.checkNotNull(basePath, "basePath");

if (protocol == Protocol.HTTP) {
HttpEndpoint httpEndpoint = new HttpEndpoint(host, port, secure, basePath);
this.endpoints.add(httpEndpoint);
} else {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
String endpoint = String.format("%s%s://%s:%d", protocol.toString().toLowerCase(), secure ? "s": "", host, port);
this.endpoints.add(endpoint);
return this;

}


Expand Down Expand Up @@ -2040,7 +2065,7 @@ protected int getOperationTimeout() {
*/
@Deprecated
public Set<String> getEndpoints() {
return endpoints.stream().map(Endpoint::getBaseURL).collect(Collectors.toSet());
return endpoints.stream().map(endpoint -> endpoint.getURI().toString()).collect(Collectors.toSet());
}

public String getUser() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
}
URI uri;
try {
URIBuilder uriBuilder = new URIBuilder(server.getBaseURL());
URIBuilder uriBuilder = new URIBuilder(server.getURI());
addQueryParams(uriBuilder, requestConfig);
uri = uriBuilder.normalizeSyntax().build();
} catch (URISyntaxException e) {
Expand Down Expand Up @@ -473,15 +473,15 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r

} catch (UnknownHostException e) {
closeQuietly(httpResponse);
LOG.warn("Host '{}' unknown", server.getBaseURL());
LOG.warn("Host '{}' unknown", server);
throw e;
} catch (ConnectException | NoRouteToHostException e) {
closeQuietly(httpResponse);
LOG.warn("Failed to connect to '{}': {}", server.getBaseURL(), e.getMessage());
LOG.warn("Failed to connect to '{}': {}", server, e.getMessage());
throw e;
} catch (Exception e) {
closeQuietly(httpResponse);
LOG.debug("Failed to execute request to '{}': {}", server.getBaseURL(), e.getMessage(), e);
LOG.debug("Failed to execute request to '{}': {}", server, e.getMessage(), e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package com.clickhouse.client.api.transport;

import java.net.URI;

/**
* Interface defining the behavior of transport endpoint.
* It is transport responsibility to provide suitable implementation.
*/
public interface Endpoint {

String getBaseURL();
/**
* Returns URI without query parameters
* @return endpoint url
*/
URI getURI();

/**
* Returns hostname of target server
* @return dns hostname
*/
String getHost();

/**
* Returns port of target server
* @return port number
*/
int getPort();

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,62 @@
package com.clickhouse.client.api.transport;

import java.net.MalformedURLException;
import com.clickhouse.client.api.ClientMisconfigurationException;

import java.net.URI;
import java.net.URL;

public class HttpEndpoint implements Endpoint {

private final URI uri; // contains complete connection URL + parameters

private final URL url; // only communication part

private final String baseURL;
private final URI uri; // only communication part

private final String info;

private final boolean secure;

public HttpEndpoint(String uri) throws MalformedURLException {
this.uri = URI.create(uri);
this.url = this.uri.toURL();
this.baseURL = url.toString();
this.info = baseURL;
this.secure = this.uri.getScheme().equalsIgnoreCase("https");
private final String host;

private final int port;

private final String path;

public HttpEndpoint(String host, int port, boolean secure, String path){
this.host = host;
this.port = port;
this.secure = secure;
if (path != null && !path.isEmpty()) {
// Ensure basePath starts with /
this.path = path.startsWith("/") ? path : "/" + path;
} else {
this.path = "/";
}

// Use URI constructor to properly handle encoding of path segments
// Encode path segments separately to preserve slashes
try {
this.uri = new URI(secure ? "https" : "http", null, host, port, this.path, null, null);
} catch (Exception e) {
throw new ClientMisconfigurationException("Failed to create endpoint URL", e);
}
this.info = uri.toString();
}

@Override
public String getBaseURL() {
return baseURL;
public URI getURI() {
return uri;
}

public URL getURL() {
return url;
@Override
public String getHost() {
return host;
}

public URI getURI() {
return uri;
@Override
public int getPort() {
return port;
}

public String getPath() {
return path;
}

public boolean isSecure() {
Expand All @@ -45,4 +67,14 @@ public boolean isSecure() {
public String toString() {
return info;
}

@Override
public boolean equals(Object obj) {
return obj instanceof HttpEndpoint && uri.equals(((HttpEndpoint)obj).uri);
}

@Override
public int hashCode() {
return uri.hashCode();
}
}
13 changes: 13 additions & 0 deletions client-v2/src/test/java/com/clickhouse/client/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.internal.ValidationUtils;
import com.clickhouse.client.api.metadata.DefaultColumnToMethodMatchingStrategy;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
Expand Down Expand Up @@ -45,6 +46,7 @@

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.testng.AssertJUnit.fail;

public class ClientTests extends BaseIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientTests.class);
Expand Down Expand Up @@ -443,6 +445,17 @@ public void testServerSettings() throws Exception {
}
}

@Test(groups = {"integration"})
public void testInvalidEndpoint() {

try {
new Client.Builder().addEndpoint("http://localhost/default");
fail("Exception expected");
} catch (ValidationUtils.SettingsValidationException e) {
Assert.assertTrue(e.getMessage().contains("port"));
}
}

public boolean isVersionMatch(String versionExpression, Client client) {
List<GenericRecord> serverVersion = client.queryAll("SELECT version()");
return ClickHouseVersion.of(serverVersion.get(0).getString(1)).check(versionExpression);
Expand Down
Loading
Loading