Skip to content

Commit

Permalink
for #1205, Get connection sync to prevent dead lock for sharding-proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 19, 2018
1 parent f38351a commit 23db595
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
Expand All @@ -58,18 +59,19 @@ public final class BackendConnection implements AutoCloseable {
private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();

/**
* Get connection of current thread datasource.
* Get connections of current thread datasource.
*
* @param dataSourceName data source name
* @param connectionSize size of connections to be get
* @return connection
* @throws SQLException SQL exception
*/
public Connection getConnection(final String dataSourceName) throws SQLException {
public List<Connection> getConnections(final String dataSourceName, final int connectionSize) throws SQLException {
try {
ShardingEventBusInstance.getInstance().post(new GetConnectionStartEvent(dataSourceName));
Connection result = ruleRegistry.getBackendDataSource().getConnection(dataSourceName);
cachedConnections.add(result);
GetConnectionEvent finishEvent = new GetConnectionFinishEvent(DataSourceMetaDataFactory.newInstance(DatabaseType.MySQL, result.getMetaData().getURL()));
List<Connection> result = ruleRegistry.getBackendDataSource().getConnections(dataSourceName, connectionSize);
cachedConnections.addAll(result);
GetConnectionEvent finishEvent = new GetConnectionFinishEvent(DataSourceMetaDataFactory.newInstance(DatabaseType.MySQL, result.get(0).getMetaData().getURL()));
finishEvent.setExecuteSuccess();
ShardingEventBusInstance.getInstance().post(finishEvent);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

Expand Down Expand Up @@ -91,7 +93,27 @@ private JDBCBackendDataSourceFactory getBackendDataSourceFactory(final Transacti
* @throws SQLException SQL exception
*/
public Connection getConnection(final String dataSourceName) throws SQLException {
return getDataSourceMap().get(dataSourceName).getConnection();
return getConnections(dataSourceName, 1).get(0);
}

/**
* Get connections.
*
* @param dataSourceName data source name
* @param connectionSize size of connections to be get
* @return connections
* @throws SQLException SQL exception
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public List<Connection> getConnections(final String dataSourceName, final int connectionSize) throws SQLException {
List<Connection> result = new ArrayList<>(connectionSize);
DataSource dataSource = getDataSourceMap().get(dataSourceName);
synchronized (dataSource) {
for (int i = 0; i < connectionSize; i++) {
result.add(dataSource.getConnection());
}
}
return result;
}

private Map<String, DataSource> getDataSourceMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,10 +108,10 @@ public ExecuteResponse execute(final SQLRouteResult routeResult) throws SQLExcep
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups =
sqlExecutePrepareTemplate.getExecuteUnitGroups(routeResult.getRouteUnits(), new JDBCExecuteEngine.ConnectionStrictlySQLExecutePrepareCallback(isReturnGeneratedKeys));
sqlExecutePrepareTemplate.getExecuteUnitGroups(routeResult.getRouteUnits(), new ConnectionStrictlySQLExecutePrepareCallback(isReturnGeneratedKeys));
Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.executeGroup((Collection) sqlExecuteGroups,
new JDBCExecuteEngine.FirstConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys),
new JDBCExecuteEngine.ConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys));
new FirstConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys),
new ConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys));
ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next();
return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
? getExecuteQueryResponse(((ExecuteQueryResponseUnit) firstExecuteResponseUnit).getQueryResponsePackets(), executeResponseUnits) : new ExecuteUpdateResponse(executeResponseUnits);
Expand Down Expand Up @@ -177,7 +176,7 @@ private final class ConnectionStrictlySQLExecutePrepareCallback implements SQLEx

@Override
public List<Connection> getConnections(final String dataSourceName, final int connectionSize) throws SQLException {
return Collections.singletonList(getBackendConnection().getConnection(dataSourceName));
return getBackendConnection().getConnections(dataSourceName, connectionSize);
}

@Override
Expand Down
3 changes: 1 addition & 2 deletions sharding-proxy/src/main/resources/conf/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
#
# # CONNECTION_STRICTLY: Proxy will release connections after get the overall rows from the ResultSet.
# # Meanwhile, the cost of the memory will be increased.
# connection.mode: CONNECTION_STRICTLY
# max.connections.size.per.query: 4
# max.connections.size.per.query: 1
# acceptor.size: 16 # The default value is available processors count * 2.
# executor.size: 16 # Infinite by default.
# proxy.transaction.enabled: false
Expand Down

0 comments on commit 23db595

Please sign in to comment.