diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 35403c4dc6d05..353a5bab723ec 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -506,4 +506,35 @@ public void testSourcePermission() { fail(e.getMessage()); } } + + @Test + public void testIllegalPassword() throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create user `thulab` 'passwd123456'", + "create role `admin`", + "grant role `admin` to `thulab`", + "grant WRITE, READ, SYSTEM, SECURITY on root.** to role `admin`"), + null); + + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + + try { + statement.execute( + String.format( + "create pipe a2b" + + " with source (" + + "'user'='admin'" + + ", 'password'='passwd')" + + " with sink (" + + "'node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + fail(); + } catch (final Exception e) { + Assert.assertEquals("", e.getMessage()); + } + } + } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index c4e7edd63d0ce..c13f87ae5794f 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -371,7 +371,7 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame return new PipeParameters(thisMap); } - private static class KeyReducer { + public static class KeyReducer { private static final Set FIRST_PREFIXES = new HashSet<>(); private static final Set SECOND_PREFIXES = new HashSet<>(); @@ -399,7 +399,7 @@ static String shallowReduce(String key) { return key; } - static String reduce(String key) { + public static String reduce(String key) { if (key == null) { return null; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index e51d1a43299b5..f455edb26b8b1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1332,10 +1332,11 @@ public DataSet queryPermission(final AuthorPlan authorPlan) { } @Override - public TPermissionInfoResp login(String username, String password) { + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return permissionManager.login(username, password); + return permissionManager.login(username, password, useEncryptedPassword); } else { TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp(); resp.setStatus(status); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index fe83b8bd0c189..02c82164595df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -501,7 +501,8 @@ TDataPartitionTableResp getOrCreateDataPartition( DataSet queryPermission(final AuthorPlan authorPlan); /** login. */ - TPermissionInfoResp login(String username, String password); + TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword); /** Check User Privileges. */ TPermissionInfoResp checkUserPrivileges(String username, PrivilegeUnion union); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index 06be2818593ae..ee39bfc2b19e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -112,8 +112,9 @@ protected ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } - public TPermissionInfoResp login(String username, String password) { - return authorInfo.login(username, password); + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + return authorInfo.login(username, password, useEncryptedPassword); } public String login4Pipe(final String userName, final String password) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index d9f6e07be2b2b..81e82b96adee8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -1225,7 +1225,7 @@ protected boolean shouldLogin() { @Override protected TSStatus login() { - return configManager.login(username, password).getStatus(); + return configManager.login(username, password, false).getStatus(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java index 60512887703ff..852be86bcd28c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java @@ -57,6 +57,8 @@ import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.file.Paths; import java.util.Collections; @@ -108,6 +110,19 @@ public void customize( PipeConfigNodeRemainingTimeMetrics.getInstance().register(this); } + @Override + protected void login(final @Nonnull String password) { + if (ConfigNode.getInstance() + .getConfigManager() + .getPermissionManager() + .login(userName, password, true) + .getStatus() + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(String.format("Failed to check password for pipe %s.", pipeName)); + } + } + @Override protected AbstractPipeListeningQueue getListeningQueue() { return PipeConfigNodeAgent.runtime().listener(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java index 743e0a3f09a56..d7404009825ae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java @@ -124,8 +124,9 @@ public void setAuthorQueryPlanExecutor(IAuthorPlanExecutor authorPlanExecutor) { this.authorPlanExecutor = authorPlanExecutor; } - public TPermissionInfoResp login(String username, String password) { - return authorPlanExecutor.login(username, password); + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + return authorPlanExecutor.login(username, password, useEncryptedPassword); } public String login4Pipe(final String username, final String password) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java index 5e8843418c170..8f216df3250ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java @@ -74,13 +74,14 @@ public AuthorPlanExecutor(IAuthorizer authorizer) { } @Override - public TPermissionInfoResp login(String username, String password) { + public TPermissionInfoResp login( + String username, final String password, final boolean useEncryptedPassword) { boolean status; String loginMessage = null; TSStatus tsStatus = new TSStatus(); TPermissionInfoResp result = new TPermissionInfoResp(); try { - status = authorizer.login(username, password); + status = authorizer.login(username, password, useEncryptedPassword); if (status) { // Bring this user's permission information back to the datanode for caching if (authorizer instanceof OpenIdAuthorizer) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java index 24f0d8ceb0a7a..724ebf43f73fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java @@ -33,7 +33,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; public interface IAuthorPlanExecutor { - TPermissionInfoResp login(String username, String password); + TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword); String login4Pipe(final String username, final String password); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 26fed0e1c4a3a..5d6aa8da9f5df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -744,7 +744,10 @@ public TAuthorizerResp queryRPermission(final TAuthorizerRelationalReq req) { @Override public TPermissionInfoResp login(TLoginReq req) { - return configManager.login(req.getUserrname(), req.getPassword()); + return configManager.login( + req.getUserrname(), + req.getPassword(), + req.isSetUseEncryptedPassword() && req.isUseEncryptedPassword()); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java index 151307e280d8c..34a9a411133fe 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java @@ -664,7 +664,7 @@ public void createUserWithRawPassword() { new ArrayList<>()); status = authorInfo.authorNonQuery(authorPlan); assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - TPermissionInfoResp result = authorInfo.login("testuser", "password123456"); + TPermissionInfoResp result = authorInfo.login("testuser", "password123456", false); assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.getStatus().getCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java index 81db578ace16a..21c3bc787ee2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java @@ -142,8 +142,14 @@ public static Optional getUserId(String username) { return Optional.ofNullable(user == null ? null : user.getUserId()); } - public static TSStatus checkUser(String userName, String password) { - TSStatus status = authorityFetcher.get().checkUser(userName, password); + public static TSStatus checkUser(final String userName, final String password) { + return checkUser(userName, password, false); + } + + public static TSStatus checkUser( + final String userName, final String password, final boolean useEncryptedPassword) { + final TSStatus status = + authorityFetcher.get().checkUser(userName, password, useEncryptedPassword); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java index 2894ce8bb478e..325476174c667 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java @@ -543,23 +543,31 @@ public void setAcceptCache(boolean acceptCache) { } @Override - public TSStatus checkUser(String username, String password) { + public TSStatus checkUser( + final String username, final String password, final boolean useEncryptedPassword) { checkCacheAvailable(); - User user = iAuthorCache.getUserCache(username); + final User user = iAuthorCache.getUserCache(username); if (user != null) { if (user.isOpenIdUser()) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); - } else if (password != null && AuthUtils.validatePassword(password, user.getPassword())) { - return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); - } else if (password != null - && AuthUtils.validatePassword( - password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.MD5)) { - return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } else if (password != null) { + if (useEncryptedPassword) { + return password.equals(user.getPassword()) + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); + } else { + return AuthUtils.validatePassword(password, user.getPassword()) + || AuthUtils.validatePassword( + password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.MD5) + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); + } } else { return RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); } } else { - TLoginReq req = new TLoginReq(username, password); + TLoginReq req = + new TLoginReq(username, password).setUseEncryptedPassword(useEncryptedPassword); TPermissionInfoResp status = null; try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java index 3dc95fa41dc17..302679878955b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java @@ -36,7 +36,8 @@ public interface IAuthorityFetcher { - TSStatus checkUser(String username, String password); + TSStatus checkUser( + final String username, final String password, final boolean useEncryptedPassword); boolean checkRole(String username, String roleName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 904b335ec5874..867e2487a5087 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -953,7 +953,7 @@ protected TSStatus login() { } long userId = AuthorityChecker.getUserId(username).orElse(-1L); - Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password); + Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password, false); if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) { return RpcUtils.getStatus( TSStatusCode.ILLEGAL_PASSWORD.getStatusCode(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 9d476c542941b..322cd5fe299eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -83,6 +83,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_ID; @@ -91,6 +92,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_CLI_HOSTNAME; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_ID; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; @@ -156,6 +158,8 @@ public void customize( SINK_IOTDB_USER_KEY, CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY); + String passwordString = + parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY); String cliHostnameString = parameters.getStringByKeys(CONNECTOR_IOTDB_CLI_HOSTNAME, SINK_IOTDB_CLI_HOSTNAME); userEntity = new UserEntity(Long.parseLong(userIdString), usernameString, cliHostnameString); @@ -191,6 +195,24 @@ public void customize( if (SESSION_MANAGER.getCurrSession() == null) { SESSION_MANAGER.registerSession(session); } + + // Check the password and its expiration + if (Objects.nonNull(passwordString) + && SESSION_MANAGER + .login( + session, + usernameString, + passwordString, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + environment.getRegionId() >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException( + String.format("Failed to check password for pipe %s.", environment.getPipeName())); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index d1ceb48e8aad0..fca1a7d242b12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.source.dataregion; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; @@ -39,6 +40,9 @@ import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; @@ -50,11 +54,15 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + +import java.time.ZoneId; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -128,8 +136,8 @@ public class IoTDBDataRegionSource extends IoTDBSource { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSource.class); - private PipeHistoricalDataRegionSource historicalExtractor; - private PipeRealtimeDataRegionSource realtimeExtractor; + private PipeHistoricalDataRegionSource historicalSource; + private PipeRealtimeDataRegionSource realtimeSource; private DataRegionWatermarkInjector watermarkInjector; @@ -297,8 +305,8 @@ public void validate(final PipeParameterValidator validator) throws Exception { constructHistoricalExtractor(); constructRealtimeExtractor(validator.getParameters()); - historicalExtractor.validate(validator); - realtimeExtractor.validate(validator); + historicalSource.validate(validator); + realtimeSource.validate(validator); } private void validatePattern(final TreePattern treePattern) { @@ -431,7 +439,7 @@ private void checkInvalidParameters(final PipeParameterValidator validator) { } private void constructHistoricalExtractor() { - historicalExtractor = new PipeHistoricalDataRegionTsFileAndDeletionSource(); + historicalSource = new PipeHistoricalDataRegionTsFileAndDeletionSource(); } private void constructRealtimeExtractor(final PipeParameters parameters) { @@ -439,7 +447,7 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { if (!parameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { - realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource(); + realtimeSource = new PipeRealtimeDataRegionHeartbeatSource(); LOGGER.info( "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.", EXTRACTOR_REALTIME_ENABLE_KEY, @@ -449,7 +457,7 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { // Use heartbeat only source if enable snapshot mode if (PipeTaskAgent.isSnapshotMode(parameters)) { - realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource(); + realtimeSource = new PipeRealtimeDataRegionHeartbeatSource(); LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime source."); return; } @@ -457,7 +465,7 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); LOGGER.info( "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by default.", EXTRACTOR_MODE_STREAMING_KEY, @@ -473,9 +481,9 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY), EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE); if (isStreamingMode) { - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); } else { - realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + realtimeSource = new PipeRealtimeDataRegionTsFileSource(); } return; } @@ -483,18 +491,18 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { case EXTRACTOR_REALTIME_MODE_FILE_VALUE: case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + realtimeSource = new PipeRealtimeDataRegionTsFileSource(); break; case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE: case EXTRACTOR_REALTIME_MODE_LOG_VALUE: case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); break; case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionLogSource(); + realtimeSource = new PipeRealtimeDataRegionLogSource(); break; default: - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); if (LOGGER.isWarnEnabled()) { LOGGER.warn( "Pipe: Unsupported source realtime mode: {}, create a hybrid source.", @@ -513,8 +521,8 @@ public void customize( super.customize(parameters, configuration); - historicalExtractor.customize(parameters, configuration); - realtimeExtractor.customize(parameters, configuration); + historicalSource.customize(parameters, configuration); + realtimeSource.customize(parameters, configuration); // Set watermark injector long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; @@ -546,6 +554,24 @@ public void customize( PipeDataNodeSinglePipeMetrics.getInstance().register(this); } + @Override + protected void login(final @Nonnull String password) { + if (SessionManager.getInstance() + .login( + new InternalClientSession("Source_login_session_" + regionId), + userName, + password, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + regionId >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(String.format("Failed to check password for pipe %s.", pipeName)); + } + } + @Override public void start() throws Exception { if (hasNoExtractionNeed || hasBeenStarted.get()) { @@ -557,8 +583,8 @@ public void start() throws Exception { "Pipe {}@{}: Starting historical source {} and realtime source {}.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName()); + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName()); super.start(); @@ -591,8 +617,8 @@ public void start() throws Exception { "Pipe {}@{}: Started historical source {} and realtime source {} successfully within {} ms.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName(), + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName(), System.currentTimeMillis() - startTime); return; } @@ -603,21 +629,21 @@ public void start() throws Exception { private void startHistoricalExtractorAndRealtimeExtractor( final AtomicReference exceptionHolder) { try { - // Start realtimeExtractor first to avoid losing data. This may cause some + // Start realtimeSource first to avoid losing data. This may cause some // retransmission, yet it is OK according to the idempotency of IoTDB. // Note: The order of historical collection is flushing data -> adding all tsFile events. // There can still be writing when tsFile events are added. If we start - // realtimeExtractor after the process, then this part of data will be lost. - realtimeExtractor.start(); - historicalExtractor.start(); + // realtimeSource after the process, then this part of data will be lost. + realtimeSource.start(); + historicalSource.start(); } catch (final Exception e) { exceptionHolder.set(e); LOGGER.warn( "Pipe {}@{}: Start historical source {} and realtime source {} error.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName(), + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName(), e); } } @@ -635,14 +661,14 @@ public Event supply() throws Exception { } Event event = null; - if (!historicalExtractor.hasConsumedAll()) { - event = historicalExtractor.supply(); + if (!historicalSource.hasConsumedAll()) { + event = historicalSource.supply(); } else { if (Objects.nonNull(watermarkInjector)) { event = watermarkInjector.inject(); } if (Objects.isNull(event)) { - event = realtimeExtractor.supply(); + event = realtimeSource.supply(); } } @@ -665,8 +691,8 @@ public void close() throws Exception { return; } - historicalExtractor.close(); - realtimeExtractor.close(); + historicalSource.close(); + realtimeSource.close(); if (Objects.nonNull(taskID)) { PipeDataRegionSourceMetrics.getInstance().deregister(taskID); } @@ -675,20 +701,20 @@ public void close() throws Exception { //////////////////////////// APIs provided for metric framework //////////////////////////// public int getHistoricalTsFileInsertionEventCount() { - return hasBeenStarted.get() && Objects.nonNull(historicalExtractor) - ? historicalExtractor.getPendingQueueSize() + return hasBeenStarted.get() && Objects.nonNull(historicalSource) + ? historicalSource.getPendingQueueSize() : 0; } public int getTabletInsertionEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getTabletInsertionEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getTabletInsertionEventCount() : 0; } public int getRealtimeTsFileInsertionEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getTsFileInsertionEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getTsFileInsertionEventCount() : 0; } public int getPipeHeartbeatEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getPipeHeartbeatEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getPipeHeartbeatEventCount() : 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java index b70ff4b7d8d0a..f59cb4fe0351e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java @@ -42,6 +42,9 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics; import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -62,8 +65,11 @@ import org.apache.tsfile.common.constant.TsFileConstant; +import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.file.Paths; +import java.time.ZoneId; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -146,6 +152,24 @@ public void start() throws Exception { super.start(); } + @Override + protected void login(final @Nonnull String password) { + if (SessionManager.getInstance() + .login( + new InternalClientSession("Source_login_session_" + regionId), + userName, + password, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + regionId >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(String.format("Failed to check password for pipe %s.", pipeName)); + } + } + @Override protected boolean needTransferSnapshot() { // Note: the schema region will transfer snapshot if there are table or tree planNode captured. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java index 975cbb09437bf..d6cdfa63a323c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java @@ -42,7 +42,7 @@ public InternalClientSession(String clientID) { @Override public String getClientAddress() { - return clientID; + return ""; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index ee00655211fe2..ac7cc236d9f6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -130,11 +130,26 @@ public BasicOpenSessionResp login( TSProtocolVersion tsProtocolVersion, IoTDBConstant.ClientVersion clientVersion, IClientSession.SqlDialect sqlDialect) { - BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); - - long userId = AuthorityChecker.getUserId(username).orElse(-1L); + return login( + session, username, password, zoneId, tsProtocolVersion, clientVersion, sqlDialect, false); + } - Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password); + // Only pipe can set useEncryptedPassword to true + public BasicOpenSessionResp login( + final IClientSession session, + final String username, + final String password, + final String zoneId, + final TSProtocolVersion tsProtocolVersion, + final IoTDBConstant.ClientVersion clientVersion, + final IClientSession.SqlDialect sqlDialect, + final boolean useEncryptedPassword) { + final BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); + + final long userId = AuthorityChecker.getUserId(username).orElse(-1L); + + Long timeToExpire = + DataNodeAuthUtils.checkPasswordExpiration(userId, password, useEncryptedPassword); if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) { openSessionResp .sessionId(-1) @@ -154,7 +169,8 @@ public BasicOpenSessionResp login( return openSessionResp; } - TSStatus loginStatus = AuthorityChecker.checkUser(username, password); + final TSStatus loginStatus = + AuthorityChecker.checkUser(username, password, useEncryptedPassword); if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // check the version compatibility if (!tsProtocolVersion.equals(CURRENT_RPC_VERSION)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index cb45757d3be0b..27882ac1ddac7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -372,6 +372,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -994,7 +995,7 @@ public SettableFuture createPipePlugin( future.setException( new IoTDBException( String.format("Failed to create pipe plugin %s. " + pathError, pluginName), - TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())); + TSStatusCode.SEMANTIC_ERROR.getStatusCode())); return future; } @@ -2175,7 +2176,7 @@ public SettableFuture createPipe( future.setException( new IoTDBException( String.format("Failed to create pipe %s, " + pathError, pipeName), - TSStatusCode.PIPE_ERROR.getStatusCode())); + TSStatusCode.SEMANTIC_ERROR.getStatusCode())); return future; } @@ -2353,6 +2354,8 @@ public SettableFuture alterPipe(final AlterPipeStatement alter return future; } + boolean hasSourcePassword = false; + boolean hasSinkPassword = false; // Construct temporary pipe static meta for validation final String pipeName = alterPipeStatement.getPipeName(); final Map sourceAttributes; @@ -2368,6 +2371,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), new PipeParameters(alterPipeStatement.getSourceAttributes())); } + hasSourcePassword = containsPassword(alterPipeStatement.getSourceAttributes()); if (alterPipeStatement.isReplaceAllSourceAttributes()) { sourceAttributes = alterPipeStatement.getSourceAttributes(); } else { @@ -2407,6 +2411,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter } if (!alterPipeStatement.getSinkAttributes().isEmpty()) { + hasSinkPassword = containsPassword(alterPipeStatement.getSinkAttributes()); if (alterPipeStatement.isReplaceAllSinkAttributes()) { sinkAttributes = alterPipeStatement.getSinkAttributes(); } else { @@ -2426,8 +2431,22 @@ public SettableFuture alterPipe(final AlterPipeStatement alter sinkAttributes = pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute(); } + final Map checkedSource = new HashMap<>(sourceAttributes); + if (!hasSourcePassword) { + checkedSource.remove(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY); + checkedSource.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + checkedSource.remove( + PipeParameters.KeyReducer.reduce(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + final Map checkedSink = new HashMap<>(sinkAttributes); + if (!hasSinkPassword) { + checkedSink.remove(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY); + checkedSink.remove(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + checkedSink.remove( + PipeParameters.KeyReducer.reduce(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } PipeDataNodeAgent.plugin() - .validate(pipeName, sourceAttributes, processorAttributes, sinkAttributes); + .validate(pipeName, checkedSource, processorAttributes, checkedSink); } catch (final Exception e) { future.setException( new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode())); @@ -2500,16 +2519,20 @@ private static void checkSourceType( } } - private static boolean onlyContainsUser( - final Map extractorOrConnectorAttributes) { - final PipeParameters extractorOrConnectorParameters = - new PipeParameters(extractorOrConnectorAttributes); - return extractorOrConnectorParameters.hasAnyAttributes( + private static boolean containsPassword(final Map sourceOrSinkAttributes) { + final PipeParameters sourceOrSinkParameters = new PipeParameters(sourceOrSinkAttributes); + return sourceOrSinkParameters.hasAnyAttributes( + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + } + + private static boolean onlyContainsUser(final Map sourceOrSinkAttributes) { + final PipeParameters sourceOrSinkParameters = new PipeParameters(sourceOrSinkAttributes); + return sourceOrSinkParameters.hasAnyAttributes( PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, PipeSinkConstant.SINK_IOTDB_USER_KEY, PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, PipeSinkConstant.SINK_IOTDB_USERNAME_KEY) - && !extractorOrConnectorParameters.hasAnyAttributes( + && !sourceOrSinkParameters.hasAnyAttributes( PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java index 187575c58da7b..6ee378282adb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java @@ -275,7 +275,8 @@ public static TSStatus deletePasswordHistory(long userId) { * @return the timestamp when the password will expire. Long.MAX if the password never expires. * Null if the password history cannot be found. */ - public static Long checkPasswordExpiration(long userId, String password) { + public static Long checkPasswordExpiration( + final long userId, final String password, final boolean useEncryptedPassword) { if (userId == -1) { return null; } @@ -335,7 +336,8 @@ public static Long checkPasswordExpiration(long userId, String password) { CommonDateTimeUtils.convertIoTDBTimeToMillis(tsBlock.getTimeByIndex(0)); // columns of last query: [timeseriesName, value, dataType] String oldPassword = tsBlock.getColumn(1).getBinary(0).toString(); - if (oldPassword.equals(AuthUtils.encryptPassword(password))) { + if (oldPassword.equals( + useEncryptedPassword ? password : AuthUtils.encryptPassword(password))) { if (lastPasswordTime + passwordExpirationDays * 1000 * 86400 <= lastPasswordTime) { // overflow or passwordExpirationDays <= 0 return Long.MAX_VALUE; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java index 9d0acaf14d898..194f33e8d67b3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java @@ -64,8 +64,8 @@ public void tearDown() throws Exception { @Test public void testLogin() throws AuthException { - Assert.assertTrue(authorizer.login("root", "root")); - Assert.assertThrows(AuthException.class, () -> authorizer.login("root", "error")); + Assert.assertTrue(authorizer.login("root", "root", false)); + Assert.assertThrows(AuthException.class, () -> authorizer.login("root", "error", false)); } @Test @@ -76,7 +76,7 @@ public void createAndDeleteUser() throws AuthException { } catch (AuthException e) { assertEquals("User user already exists", e.getMessage()); } - Assert.assertTrue(authorizer.login(userName, password)); + Assert.assertTrue(authorizer.login(userName, password, false)); authorizer.deleteUser(userName); try { authorizer.deleteUser(userName); @@ -230,7 +230,7 @@ public void testUserRole() throws AuthException { public void testUpdatePassword() throws AuthException { authorizer.createUser(userName, password); authorizer.updateUserPassword(userName, "newPassword123456"); - Assert.assertTrue(authorizer.login(userName, "newPassword123456")); + Assert.assertTrue(authorizer.login(userName, "newPassword123456", false)); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java index bd0c2d399e564..196cc80e5b6da 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java @@ -60,7 +60,7 @@ public void loginWithJWT() throws AuthException, ParseException { "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg"; OpenIdAuthorizer authorizer = new OpenIdAuthorizer(JSONObjectUtils.parse(OPEN_ID_PUBLIC_JWK)); - boolean login = authorizer.login(jwt, null); + boolean login = authorizer.login(jwt, null, false); assertTrue(login); } @@ -99,14 +99,16 @@ public void fetchMetadata() boolean login = openIdAuthorizer.login( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg", - ""); + "", + false); assertTrue(login); config.setOpenIdProviderUrl("https://auth.demo.pragmaticindustries.de/auth/realms/IoTDB/"); OpenIdAuthorizer openIdAuthorizer1 = new OpenIdAuthorizer(); login = openIdAuthorizer1.login( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg", - ""); + "", + false); assertTrue(login); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java index 98f70dc0ee609..6ba8c5336b193 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java @@ -107,12 +107,17 @@ private void checkAdmin(long userId, String errmsg) throws AuthException { } @Override - public boolean login(String username, String password) throws AuthException { + public boolean login( + final String username, final String password, final boolean useEncryptedPassword) + throws AuthException { User user = userManager.getEntity(username); if (user == null || password == null) { throw new AuthException( TSStatusCode.USER_NOT_EXIST, String.format("The user %s does not exist.", username)); } + if (useEncryptedPassword) { + return password.equals(user.getPassword()); + } if (AuthUtils.validatePassword( password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.SHA_256)) { return true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java index 445b29c0790dc..3ac33dbeddd41 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java @@ -42,7 +42,8 @@ public interface IAuthorizer extends SnapshotProcessor { * @param password The password of the user. * @return True if such user exists and the given password is correct, else return false. */ - boolean login(String username, String password) throws AuthException; + boolean login(String username, String password, final boolean useEncryptedPassword) + throws AuthException; /** * Login for a user in pipe. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java index 2da1acfaee97c..ee66ee5bced95 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java @@ -145,7 +145,8 @@ private static OIDCProviderMetadata fetchMetadata(String providerUrl) } @Override - public boolean login(String token, String password) throws AuthException { + public boolean login(String token, String password, final boolean useEncryptedPassword) + throws AuthException { if (password != null && !password.isEmpty()) { logger.error( "JWT Login failed as a non-empty Password was given username (token): {}", token); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index f16c6387cf570..e0868156ca9d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -98,7 +98,7 @@ protected PipeSubtaskExecutor( public final synchronized void register(final PipeSubtask subtask) { if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) { - LOGGER.warn("The subtask {} is already registered.", subtask.getTaskID()); + LOGGER.warn("The subtask {} is already registered.", getSafeSubtaskStr(subtask.getTaskID())); return; } @@ -107,32 +107,36 @@ public final synchronized void register(final PipeSubtask subtask) { subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, schedulerSupplier(this)); } + private static String getSafeSubtaskStr(final String subtaskID) { + return subtaskID.replaceAll("password=[^,}]*", "password=******"); + } + protected PipeSubtaskScheduler schedulerSupplier(final PipeSubtaskExecutor executor) { return new PipeSubtaskScheduler(executor); } public final synchronized void start(final String subTaskID) { if (!registeredIdSubtaskMapper.containsKey(subTaskID)) { - LOGGER.warn("The subtask {} is not registered.", subTaskID); + LOGGER.warn("The subtask {} is not registered.", getSafeSubtaskStr(subTaskID)); return; } final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID); if (subtask.isSubmittingSelf()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("The subtask {} is already running.", subTaskID); + LOGGER.debug("The subtask {} is already running.", getSafeSubtaskStr(subTaskID)); } } else { subtask.allowSubmittingSelf(); subtask.submitSelf(); ++runningSubtaskNumber; - LOGGER.info("The subtask {} is started to submit self.", subTaskID); + LOGGER.info("The subtask {} is started to submit self.", getSafeSubtaskStr(subTaskID)); } } public final synchronized void stop(final String subTaskID) { if (!registeredIdSubtaskMapper.containsKey(subTaskID)) { - LOGGER.warn("The subtask {} is not registered.", subTaskID); + LOGGER.warn("The subtask {} is not registered.", getSafeSubtaskStr(subTaskID)); return; } @@ -149,9 +153,9 @@ public final synchronized void deregister(final String subTaskID) { if (subtask != null) { try { subtask.close(); - LOGGER.info("The subtask {} is closed successfully.", subTaskID); + LOGGER.info("The subtask {} is closed successfully.", getSafeSubtaskStr(subTaskID)); } catch (final Exception e) { - LOGGER.error("Failed to close the subtask {}.", subTaskID, e); + LOGGER.error("Failed to close the subtask {}.", getSafeSubtaskStr(subTaskID), e); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java index 91a149c9f6690..7e0ba8ca4f2a7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java @@ -34,16 +34,20 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_SKIP_IF_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_SKIP_IF_KEY; @@ -199,8 +203,15 @@ public void customize( userEntity.setAuditLogOperation(AuditLogOperation.QUERY); skipIfNoPrivileges = getSkipIfNoPrivileges(parameters); + final String password = + parameters.getStringByKeys(EXTRACTOR_IOTDB_PASSWORD_KEY, SOURCE_IOTDB_PASSWORD_KEY); + if (Objects.nonNull(password)) { + login(password); + } } + protected abstract void login(final @Nonnull String password); + public static boolean getSkipIfNoPrivileges(final PipeParameters extractorParameters) { final String extractorSkipIfValue = extractorParameters diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index b6569d943fbd8..92312ee81a307 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -445,6 +445,7 @@ struct TAuthizedPatternTreeResp { struct TLoginReq { 1: required string userrname 2: required string password + 3: optional bool useEncryptedPassword } // reqtype : tree, relational, system