Skip to content

Commit

Permalink
Merge pull request #1 from MyCATApache/Mycat-server-1.6.7.5-test
Browse files Browse the repository at this point in the history
merge daima
  • Loading branch information
longzhihun authored Jan 17, 2020
2 parents 063caee + c6d7769 commit 9dec291
Show file tree
Hide file tree
Showing 13 changed files with 400 additions and 277 deletions.
16 changes: 16 additions & 0 deletions src/main/java/io/mycat/backend/jdbc/JDBCDatasource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.collect.Lists;

import io.mycat.MycatServer;
Expand All @@ -19,6 +21,8 @@
import io.mycat.net.NIOProcessor;

public class JDBCDatasource extends PhysicalDatasource {

private DruidDataSource dataSource;

static {
// 加载可能的驱动
Expand All @@ -43,6 +47,18 @@ public class JDBCDatasource extends PhysicalDatasource {

public JDBCDatasource(DBHostConfig config, DataHostConfig hostConfig, boolean isReadNode) {
super(config, hostConfig, isReadNode);
DBHostConfig curConfig = getConfig();
this.dataSource = new DruidDataSource();
dataSource.setUrl(curConfig.getUrl());
dataSource.setUsername(curConfig.getUser());
dataSource.setPassword(curConfig.getPassword());
dataSource.setMaxWait(TimeUnit.SECONDS.toMillis(1));
dataSource.setMaxActive(curConfig.getMaxCon());
dataSource.setMinIdle(curConfig.getMinCon());
}

public Connection getDruidConnection() throws SQLException {
return this.dataSource.getConnection();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.mycat.backend.mysql.nio.handler;

import io.mycat.MycatServer;
import io.mycat.backend.datasource.PhysicalDBNode;
import io.mycat.backend.datasource.PhysicalDatasource;
import io.mycat.backend.jdbc.JDBCDatasource;
import io.mycat.cache.CachePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;

public class JDBCFetchStoreNodeOfChildTableHandler {

private static final Logger LOGGER = LoggerFactory
.getLogger(JDBCFetchStoreNodeOfChildTableHandler.class);

public String execute(String schema, String sql, ArrayList<String> dataNodes) {

String key = schema + ":" + sql;
CachePool cache = MycatServer.getInstance().getCacheService()
.getCachePool("ER_SQL2PARENTID");
String result = (String) cache.get(key);
if (result != null) {
return result;
}
Map<String, PhysicalDBNode> dbNodeMap = MycatServer.getInstance().getConfig().getDataNodes();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("find child node with sql:" + sql);
}
for (String dn : dataNodes) {

PhysicalDBNode physicalDBNode = dbNodeMap.get(dn);
PhysicalDatasource physicalDatasource = physicalDBNode.getDbPool().getSource();
if(physicalDatasource instanceof JDBCDatasource) {
JDBCDatasource jdbcDatasource = (JDBCDatasource) physicalDatasource;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = jdbcDatasource.getDruidConnection();
String useDB = "use " + physicalDBNode.getDatabase() + ";";
pstmt = con.prepareStatement(useDB);
pstmt.execute();
pstmt = con.prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
return dn;
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (con != null) {
con.close();
}
if (pstmt != null) {
pstmt.close();
}
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
LOGGER.error("can't find (root) parent sharding node for sql:"+ sql);
return null;
}
}
10 changes: 8 additions & 2 deletions src/main/java/io/mycat/config/loader/xml/XMLSchemaLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ private void loadTable(String schemaName, Map<String, TableConfig> tables, List
if (tableElement.hasAttribute("autoIncrement")) {
autoIncrement = Boolean.parseBoolean(tableElement.getAttribute("autoIncrement"));
}

boolean fetchStoreNodeByJdbc = false;
if (tableElement.hasAttribute("fetchStoreNodeByJdbc")) {
fetchStoreNodeByJdbc = Boolean.parseBoolean(tableElement.getAttribute("fetchStoreNodeByJdbc"));
}
//记录是否需要加返回结果集限制,默认需要加
boolean needAddLimit = true;
if (tableElement.hasAttribute("needAddLimit")) {
Expand Down Expand Up @@ -404,7 +409,7 @@ private void loadTable(String schemaName, Map<String, TableConfig> tables, List
autoIncrement, needAddLimit, tableType, dataNode,
getDbType(dataNode),
(tableRuleConfig != null) ? tableRuleConfig.getRule() : null,
ruleRequired, null, false, null, null, subTables);
ruleRequired, null, false, null, null, subTables, fetchStoreNodeByJdbc);
//因为需要等待TableConfig构造完毕才可以拿到dataNode节点数量,所以Rule构造延后到此处 @cjw
if ((tableRuleConfig != null) && (tableRuleConfig.getRule().getRuleAlgorithm() instanceof TableRuleAware)) {
AbstractPartitionAlgorithm newRuleAlgorithm = tableRuleConfig.getRule().getRuleAlgorithm();
Expand Down Expand Up @@ -530,6 +535,7 @@ private void processChildTables(Map<String, TableConfig> tables,
if (childTbElement.hasAttribute("needAddLimit")) {
needAddLimit = Boolean.parseBoolean(childTbElement.getAttribute("needAddLimit"));
}

String subTables = childTbElement.getAttribute("subTables");
//子表join键,和对应的parent的键,父子表通过这个关联
String joinKey = childTbElement.getAttribute("joinKey").toUpperCase();
Expand All @@ -538,7 +544,7 @@ private void processChildTables(Map<String, TableConfig> tables,
autoIncrement, needAddLimit,
TableConfig.TYPE_GLOBAL_DEFAULT, dataNodes,
getDbType(dataNodes), null, false, parentTable, true,
joinKey, parentKey, subTables);
joinKey, parentKey, subTables, false);

if (tables.containsKey(table.getName())) {
throw new ConfigException("table " + table.getName() + " duplicated!");
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/io/mycat/config/model/TableConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TableConfig {
private final String name;
private final String primaryKey;
private final boolean autoIncrement;
private final boolean fetchStoreNodeByJdbc;
private final boolean needAddLimit;
private final Set<String> dbTypes;
private final int tableType;
Expand All @@ -65,7 +66,7 @@ public class TableConfig {
public TableConfig(String tableName, String primaryKey, boolean autoIncrement, boolean needAddLimit, int tableType,
String dataNode, Set<String> dbType, RuleConfig rule, boolean ruleRequired,
TableConfig parentTC, boolean isChildTable, String joinKey,
String parentKey, String subTables) {
String parentKey, String subTables, boolean fetchStoreNodeByJdbc) {
if (tableName == null) {
throw new IllegalArgumentException("table name is null");
} else if (dataNode == null) {
Expand All @@ -74,6 +75,7 @@ public TableConfig(String tableName, String primaryKey, boolean autoIncrement, b
this.primaryKey = primaryKey;
this.autoIncrement = autoIncrement;
this.needAddLimit = needAddLimit;
this.fetchStoreNodeByJdbc = fetchStoreNodeByJdbc;
this.tableType = tableType;
this.dbTypes = dbType;
if (ruleRequired && rule == null) {
Expand Down Expand Up @@ -305,4 +307,8 @@ public Map<String, List<String>> getDataNodeTableStructureSQLMap() {
public void setDataNodeTableStructureSQLMap(Map<String, List<String>> dataNodeTableStructureSQLMap) {
this.dataNodeTableStructureSQLMap = dataNodeTableStructureSQLMap;
}

public boolean getFetchStoreNodeByJdbc() {
return this.fetchStoreNodeByJdbc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private static void hashTest() throws IOException {
int count = 1024;
String sb = genDataNodesString(count);
TableConfig tableConf = new TableConfig("test", "id", true, false, -1, sb,
null, rule, true, null, false, null, null, null);
null, rule, true, null, false, null, null, null, false);

hash.setTableConfig(tableConf);
hash.reInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement.ValuesClause;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import io.mycat.backend.mysql.nio.handler.FetchStoreNodeOfChildTableHandler;
import io.mycat.backend.mysql.nio.handler.JDBCFetchStoreNodeOfChildTableHandler;
import io.mycat.config.model.SchemaConfig;
import io.mycat.config.model.TableConfig;
import io.mycat.route.RouteResultset;
Expand Down Expand Up @@ -147,8 +148,16 @@ private RouteResultset parserChildTable(SchemaConfig schema, RouteResultset rrs,
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("find root parent's node sql "+ findRootTBSql);
}
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
String dn = fetchHandler.execute(schema.getName(),findRootTBSql, tc.getRootParent().getDataNodes());

String dn = null;
if (tc.getRootParent().getFetchStoreNodeByJdbc()) {
JDBCFetchStoreNodeOfChildTableHandler jdbcFetchHandler = new JDBCFetchStoreNodeOfChildTableHandler();
dn = jdbcFetchHandler.execute(schema.getName(),findRootTBSql, tc.getRootParent().getDataNodes());
} else {
FetchStoreNodeOfChildTableHandler FetchHandler = new FetchStoreNodeOfChildTableHandler();
FetchHandler.execute(schema.getName(),findRootTBSql, tc.getRootParent().getDataNodes());
}

if (dn == null) {
throw new SQLNonTransientException("can't find (root) parent sharding node for sql:"+ sql);
}
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/io/mycat/route/util/RouterUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Set;
import java.util.concurrent.Callable;

import io.mycat.backend.mysql.nio.handler.JDBCFetchStoreNodeOfChildTableHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1927,8 +1928,6 @@ public static boolean processERChildTable(final SchemaConfig schema, final Strin
realVal = joinKeyVal.substring(1, joinKeyVal.length() - 1);
}



// try to route by ER parent partion key
//如果是二级子表(父表不再有父表),并且分片字段正好是joinkey字段,调用routeByERParentKey
RouteResultset theRrs = RouterUtil.routeByERParentKey(sc, schema, ServerParse.INSERT, sql, rrs, tc, realVal);
Expand Down Expand Up @@ -1960,9 +1959,16 @@ public static boolean processERChildTable(final SchemaConfig schema, final Strin
getListeningExecutorService().submit(new Callable<String>() {
@Override
public String call() throws Exception {
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
// return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes(), sc);
if (tc.getRootParent().getFetchStoreNodeByJdbc()) {
JDBCFetchStoreNodeOfChildTableHandler jdbcFetchStoreNodeOfChildTableHandler =
new JDBCFetchStoreNodeOfChildTableHandler();
return jdbcFetchStoreNodeOfChildTableHandler
.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
} else {
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes(), sc);
}

}
});

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/schema.dtd
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<!ATTLIST table needAddLimit NMTOKEN #IMPLIED>
<!ATTLIST table type NMTOKEN #IMPLIED>
<!ATTLIST table splitTableNames NMTOKEN #IMPLIED>
<!ATTLIST table fetchStoreNodeByJdbc NMTOKEN #IMPLIED>


<!ELEMENT childTable (property*,(childTable*))>
Expand Down
10 changes: 6 additions & 4 deletions src/main/resources/schema.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
<schema name="TESTDB" checkSQLschema="true" sqlMaxLimit="100" randomDataNode="dn1">
<!-- auto sharding by id (long) -->
<!--splitTableNames 启用<table name 属性使用逗号分割配置多个表,即多个表使用这个配置-->
<table name="travelrecord,address" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" splitTableNames ="true"/>
<table name="customer" primaryKey="id" dataNode="dn1,dn2" rule="sharding-by-intfile" fetchStoreNodeByJdbc="true">
<childTable name="customer_addr" primaryKey="id" joinKey="customer_id" parentKey="id"> </childTable>
</table>
<!-- <table name="oc_call" primaryKey="ID" dataNode="dn1$0-743" rule="latest-month-calldate"
/> -->
</schema>
Expand All @@ -19,11 +21,11 @@
<dataNode name="jdbc_dn2" dataHost="jdbchost" database="db2" />
<dataNode name="jdbc_dn3" dataHost="jdbchost" database="db3" /> -->
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- can have multi write hosts -->
<writeHost host="hostM1" url="localhost:3306" user="root"
password="123456">
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306" user="root"
password="root">
</writeHost>
<!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
</dataHost>
Expand Down
Loading

0 comments on commit 9dec291

Please sign in to comment.