Skip to content

Commit

Permalink
feat: add user in spark
Browse files Browse the repository at this point in the history
  • Loading branch information
4paradigm committed Dec 19, 2023
1 parent 4e69eb0 commit 38d71f9
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
if (timeout != null) {
option.setRequestTimeout(Integer.parseInt(timeout));
}
String user = options.get("user");

Check warning on line 65 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L65

Added line #L65 was not covered by tests
if (user != null) {
option.setUser(user);

Check warning on line 67 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L67

Added line #L67 was not covered by tests
}
String password = options.get("password");

Check warning on line 69 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L69

Added line #L69 was not covered by tests
if (password != null) {
option.setPassword(password);

Check warning on line 71 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L71

Added line #L71 was not covered by tests
}
String debug = options.get("debug");
if (debug != null) {
option.setEnableDebug(Boolean.valueOf(debug));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

// Must serializable
public class OpenmldbReadConfig implements Serializable {
public final String dbName, tableName, zkCluster, zkPath;
public final String dbName, tableName, zkCluster, zkPath, user, password;

public OpenmldbReadConfig(String dbName, String tableName, SdkOption option) {
this.dbName = dbName;
this.tableName = tableName;
this.zkCluster = option.getZkCluster();
this.zkPath = option.getZkPath();
this.user = option.getUser();
this.password = option.getPassword();

Check warning on line 33 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/read/OpenmldbReadConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/read/OpenmldbReadConfig.java#L32-L33

Added lines #L32 - L33 were not covered by tests
// TODO(hw): other configs in SdkOption
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setLight(true);
option.setUser(config.user);
option.setPassword(config.password);

Check warning on line 49 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java#L48-L49

Added lines #L48 - L49 were not covered by tests
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setLight(true);
option.setUser(config.user);
option.setPassword(config.password);

Check warning on line 49 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java#L48-L49

Added lines #L48 - L49 were not covered by tests
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@

// Must serializable
public class OpenmldbWriteConfig implements Serializable {
public final String dbName, tableName, zkCluster, zkPath, writerType;
public final String dbName, tableName, zkCluster, zkPath, writerType, user, password;

public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) {
this.dbName = dbName;
this.tableName = tableName;
this.zkCluster = option.getZkCluster();
this.zkPath = option.getZkPath();
this.writerType = writerType;
this.user = option.getUser();
this.password = option.getPassword();

Check warning on line 35 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java#L34-L35

Added lines #L34 - L35 were not covered by tests
// TODO(hw): other configs in SdkOption
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReade
option.setZkCluster(config.zkCluster)
option.setZkPath(config.zkPath)
option.setLight(true)
option.setUser(config.user)
option.setPassword(config.password)
val executor = new SqlClusterExecutor(option)
val dbName: String = config.dbName
val tableName: String = config.tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class SyncToolConfig {
// public static int CHANNEL_KEEP_ALIVE_TIME;
public static String ZK_CLUSTER;
public static String ZK_ROOT_PATH;

public static String USER;
public static String PASSWORD;
public static String ZK_CERT;
public static String SYNC_TASK_PROGRESS_PATH;

Expand Down Expand Up @@ -87,6 +90,8 @@ private static void parseFromProperties(Properties prop) {
if (ZK_ROOT_PATH.isEmpty()) {
throw new RuntimeException("zookeeper.root_path should not be empty");
}
USER = prop.getProperty("user", "root");
PASSWORD = prop.getProperty("password", "");
ZK_CERT = prop.getProperty("zookeeper.cert", "");

HADOOP_CONF_DIR = prop.getProperty("hadoop.conf.dir", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public SyncToolImpl(String endpoint) throws SqlException, InterruptedException {
option.setZkCluster(SyncToolConfig.ZK_CLUSTER);
option.setZkPath(SyncToolConfig.ZK_ROOT_PATH);
option.setZkCert(SyncToolConfig.ZK_CERT);
option.setUser(SyncToolConfig.USER);
option.setPassword(SyncToolConfig.PASSWORD);
this.router = new SqlClusterExecutor(option);
this.zkCollectorPath = SyncToolConfig.ZK_ROOT_PATH + "/sync_tool/collector";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static int getZkMaxConnectWaitTime() {
return getInt("zookeeper.max_connect_waitTime");
}

public static String getUser() { return getString("user"); }

Check warning on line 124 in java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java#L124

Added line #L124 was not covered by tests

public static String getPassword() { return getString("password"); }

Check warning on line 126 in java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java#L126

Added line #L126 was not covered by tests

public static String getSparkMaster() {
return getString("spark.master");
}
Expand Down Expand Up @@ -283,6 +287,14 @@ private void init() throws ConfigException {
props.setProperty("zookeeper.session_timeout", "5000");
}

if (props.getProperty("user") == null) {
props.setProperty("user", "root");

Check warning on line 291 in java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java#L291

Added line #L291 was not covered by tests
}

if (props.getProperty("password") == null) {
props.setProperty("password", "");

Check warning on line 295 in java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java#L295

Added line #L295 was not covered by tests
}

if (getZkSessionTimeout() <= 0) {
throw new ConfigException("zookeeper.session_timeout", "should be larger than 0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ object JobInfoManager {
private val option = new SdkOption
option.setZkCluster(TaskManagerConfig.getZkCluster)
option.setZkPath(TaskManagerConfig.getZkRootPath)
option.setUser(TaskManagerConfig.getUser)

Check warning on line 48 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala#L48

Added line #L48 was not covered by tests
if (!TaskManagerConfig.getPassword.isEmpty) {
option.setPassword(TaskManagerConfig.getPassword)

Check warning on line 50 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala#L50

Added line #L50 was not covered by tests
}
val sqlExecutor = new SqlClusterExecutor(option)
sqlExecutor.executeSQL("", "set @@execute_mode='online';")

Expand Down

0 comments on commit 38d71f9

Please sign in to comment.