Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
 into dev-new-local
  • Loading branch information
cherrylzhao committed Sep 1, 2018
2 parents 6d64bdf + b94a8ee commit b0a5139
Show file tree
Hide file tree
Showing 38 changed files with 298 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,33 +156,33 @@ public <I, O> List<O> groupExecute(
Collection<I> firstInputs = firstInputGroup.next();
inputs.put(firstKey, Lists.newArrayList(firstInputGroup));
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
return getGroupResults(syncGroupExecute(firstKey, firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, List<List<I>>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (Entry<String, List<List<I>>> entry : inputs.entrySet()) {
result.addAll(asyncGroupExecute(entry.getKey(), entry.getValue(), callback));
result.addAll(asyncGroupExecute(entry.getValue(), callback));
}
return result;
}

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final String key, final List<List<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<List<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (final List<I> each : inputs) {
result.add(executorService.submit(new Callable<Collection<O>>() {

@Override
public Collection<O> call() throws SQLException {
return callback.execute(key, each);
return callback.execute(each);
}
}));
}
return result;
}

private <I, O> Collection<O> syncGroupExecute(final String key, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(key, inputs);
private <I, O> Collection<O> syncGroupExecute(final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(inputs);
}

private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ public interface ShardingGroupExecuteCallback<I, O> {
/**
* Execute callback.
*
* @param key input key
* @param values input values
* @param inputs input values
* @return execute result
* @throws SQLException throw when execute failure
*/
Collection<O> execute(String key, Collection<I> values) throws SQLException;
Collection<O> execute(Collection<I> inputs) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql;
package io.shardingsphere.core.executor.sql.execute;

import com.google.common.eventbus.EventBus;
import io.shardingsphere.core.constant.SQLType;
Expand All @@ -24,8 +24,9 @@
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.event.executor.sql.SQLExecutionEvent;
import io.shardingsphere.core.event.executor.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
Expand Down Expand Up @@ -59,7 +60,7 @@ public final T execute(final StatementExecuteUnit executeUnit) throws SQLExcepti
}

@Override
public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws SQLException {
public final Collection<T> execute(final Collection<StatementExecuteUnit> executeUnits) throws SQLException {
Collection<T> result = new LinkedList<>();
for (StatementExecuteUnit each : executeUnits) {
result.add(execute0(each));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* </p>
*/

package io.shardingsphere.core.executor.sql;
package io.shardingsphere.core.executor.sql.execute;

import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.event.executor.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql.result;
package io.shardingsphere.core.executor.sql.execute.result;

import io.shardingsphere.core.merger.QueryResult;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql.result;
package io.shardingsphere.core.executor.sql.execute.result;

import io.shardingsphere.core.merger.QueryResult;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql.threadlocal;
package io.shardingsphere.core.executor.sql.execute.threadlocal;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql.threadlocal;
package io.shardingsphere.core.executor.sql.execute.threadlocal;

import io.shardingsphere.core.exception.ShardingException;
import lombok.AccessLevel;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.executor.sql.prepare;

import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.routing.SQLExecutionUnit;

import java.sql.Connection;
import java.sql.SQLException;

/**
* SQL execute prepare callback.
*
* @author zhangliang
*/
public interface SQLExecutePrepareCallback {

/**
* Get connection.
*
* @param dataSourceName data source name
* @return connection
* @throws SQLException SQL exception
*/
Connection getConnection(String dataSourceName) throws SQLException;

/**
* Create statement execute unit.
*
* @param connection connection
* @param sqlExecutionUnit SQL execution unit
* @return statement execute unit
* @throws SQLException SQL exception
*/
StatementExecuteUnit createStatementExecuteUnit(Connection connection, SQLExecutionUnit sqlExecutionUnit) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.executor.sql.prepare;

import com.google.common.collect.Lists;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.routing.SQLExecutionUnit;
import io.shardingsphere.core.routing.SQLUnit;
import lombok.RequiredArgsConstructor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* SQL execute prepare template.
*
* @author zhaojun
* @author zhangliang
*/
@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {

private final int maxConnectionsSizePerQuery;

/**
* Get statement execute units.
*
* @param sqlExecutionUnits units execution SQL units
* @param callback SQL execute prepare callback
* @return key is data source name, value is statement execute unit groups
* @throws SQLException SQL exception
*/
public Map<String, List<List<StatementExecuteUnit>>> getStatementExecuteUnits(final Collection<SQLExecutionUnit> sqlExecutionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(sqlExecutionUnits);
Map<String, List<List<StatementExecuteUnit>>> result = new HashMap<>(sqlUnitGroups.size(), 1);
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.put(entry.getKey(), partitionSQLUnits(entry.getKey(), entry.getValue(), callback));
}
return result;
}

private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<SQLExecutionUnit> sqlExecutionUnits) {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(sqlExecutionUnits.size(), 1);
for (SQLExecutionUnit each : sqlExecutionUnits) {
if (!result.containsKey(each.getDataSource())) {
result.put(each.getDataSource(), new LinkedList<SQLUnit>());
}
result.get(each.getDataSource()).add(each.getSqlUnit());
}
return result;
}

private List<List<StatementExecuteUnit>> partitionSQLUnits(final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<List<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1);
for (List<SQLUnit> each : Lists.partition(sqlUnits, desiredPartitionSize)) {
// TODO get connection sync to prevent dead lock
result.add(getStatementExecuteUnitGroup(callback.getConnection(dataSourceName), dataSourceName, each, callback));
}
return result;
}

private List<StatementExecuteUnit> getStatementExecuteUnitGroup(
final Connection connection, final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
for (SQLUnit each : sqlUnitGroup) {
result.add(callback.createStatementExecuteUnit(connection, new SQLExecutionUnit(dataSourceName, each)));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
import io.shardingsphere.core.metadata.table.ColumnMetaData;
import io.shardingsphere.core.metadata.table.TableMetaData;
import io.shardingsphere.core.rule.DataNode;
import io.shardingsphere.core.rule.ShardingDataSourceNames;
import io.shardingsphere.core.rule.ShardingRule;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -71,33 +72,34 @@ public TableMetaData load(final String logicTableName, final ShardingRule shardi
return actualTableMetaDataList.iterator().next();
}

private List<TableMetaData> load(final Map<String, List<String>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
return executeEngine.groupExecute(partitionDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<String, TableMetaData>() {
private List<TableMetaData> load(final Map<String, List<DataNode>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
return executeEngine.groupExecute(partitionDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() {

@Override
public Collection<TableMetaData> execute(final String dataSourceName, final Collection<String> actualTableNames) throws SQLException {
public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes) throws SQLException {
String dataSourceName = dataNodes.iterator().next().getDataSourceName();
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, actualTableNames);
return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes);
}
});
}

private Collection<TableMetaData> load(final String dataSourceName, final String catalog, final Collection<String> actualTableNames) throws SQLException {
private Collection<TableMetaData> load(final String dataSourceName, final String catalog, final Collection<DataNode> dataNodes) throws SQLException {
Collection<TableMetaData> result = new LinkedList<>();
try (Connection connection = connectionManager.getConnection(dataSourceName)) {
for (String each : actualTableNames) {
result.add(new TableMetaData(isTableExist(connection, catalog, each) ? getColumnMetaDataList(connection, catalog, each) : Collections.<ColumnMetaData>emptyList()));
for (DataNode each : dataNodes) {
result.add(new TableMetaData(isTableExist(connection, catalog, each.getTableName()) ? getColumnMetaDataList(connection, catalog, each.getTableName()) : Collections.<ColumnMetaData>emptyList()));
}
}
return result;
}

private Map<String, List<List<String>>> partitionDataNodeGroups(final Map<String, List<String>> dataNodeGroups) {
Map<String, List<List<String>>> result = new HashMap<>(dataNodeGroups.size(), 1);
for (Entry<String, List<String>> entry : dataNodeGroups.entrySet()) {
int desiredPartitionSize = entry.getValue().size() / maxConnectionsSizePerQuery;
result.put(entry.getKey(), Lists.partition(entry.getValue(), 0 == desiredPartitionSize ? 1 : desiredPartitionSize));
private Map<String, List<List<DataNode>>> partitionDataNodeGroups(final Map<String, List<DataNode>> dataNodeGroups) {
Map<String, List<List<DataNode>>> result = new HashMap<>(dataNodeGroups.size(), 1);
for (Entry<String, List<DataNode>> entry : dataNodeGroups.entrySet()) {
int desiredPartitionSize = Math.max(entry.getValue().size() / maxConnectionsSizePerQuery, 1);
result.put(entry.getKey(), Lists.partition(entry.getValue(), desiredPartitionSize));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -49,20 +45,4 @@ public final class SQLRouteResult {
public SQLRouteResult(final SQLStatement sqlStatement) {
this(sqlStatement, null);
}

/**
* Get SQL units grouped by data source name.
*
* @return SQL units grouped by data source name.
*/
public Map<String, List<SQLUnit>> getSQLUnitGroups() {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(executionUnits.size(), 1);
for (SQLExecutionUnit each : executionUnits) {
if (!result.containsKey(each.getDataSource())) {
result.put(each.getDataSource(), new LinkedList<SQLUnit>());
}
result.get(each.getDataSource()).add(each.getSqlUnit());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ private List<DataNode> generateDataNodes(final List<String> actualDataNodes, fin
*
* @return data node groups, key is data source name, value is tables belong to this data source
*/
public Map<String, List<String>> getDataNodeGroups() {
Map<String, List<String>> result = new LinkedHashMap<>(actualDataNodes.size(), 1);
public Map<String, List<DataNode>> getDataNodeGroups() {
Map<String, List<DataNode>> result = new LinkedHashMap<>(actualDataNodes.size(), 1);
for (DataNode each : actualDataNodes) {
String dataSourceName = each.getDataSourceName();
if (!result.containsKey(dataSourceName)) {
result.put(dataSourceName, new LinkedList<String>());
result.put(dataSourceName, new LinkedList<DataNode>());
}
result.get(dataSourceName).add(each.getTableName());
result.get(dataSourceName).add(each);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.shardingsphere.core.event.ShardingEventType;
import io.shardingsphere.core.event.executor.overall.OverallExecutionEvent;
import io.shardingsphere.core.event.executor.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

Expand Down
Loading

0 comments on commit b0a5139

Please sign in to comment.