Skip to content

Commit

Permalink
Merge pull request #1 from MyCATApache/https/github.com/MyCATApache/M…
Browse files Browse the repository at this point in the history
…ycat-Server/tree/Mycat-server-1.6.76-test

pull latest
  • Loading branch information
funnyAnt authored Jul 25, 2020
2 parents c234a8c + 657165f commit f43520a
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mycat1.6权威指南

http://mycat.org.cn/document/mycat-definitive-guide.pdf

Mycat志愿者开发群:332702697
Mycat志愿者开发群:332702697,106088787

[![GitHub issues](https://img.shields.io/github/issues/MyCATApache/Mycat-Server.svg)](https://github.com/MyCATApache/Mycat-Server/issues)
[![GitHub forks](https://img.shields.io/github/forks/MyCATApache/Mycat-Server.svg)](https://github.com/MyCATApache/Mycat-Server/network)
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
Expand Down Expand Up @@ -116,7 +116,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.0</version>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/mycat/backend/BackendConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ public void execute(RouteResultsetNode node, ServerConnection source,

public boolean checkAlive();

public void disableRead();

public void enableRead();

}
11 changes: 11 additions & 0 deletions src/main/java/io/mycat/backend/jdbc/JDBCConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -994,5 +994,16 @@ public boolean checkAlive() {
}
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}

}
10 changes: 10 additions & 0 deletions src/main/java/io/mycat/backend/mysql/nio/MySQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -794,4 +794,14 @@ public boolean isModifiedSQLExecuted() {
public int getTxIsolation() {
return txIsolation;
}

@Override
public void disableRead() {
this.getSocketWR().disableRead();
}

@Override
public void enableRead() {
this.getSocketWR().enableRead();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,16 @@ public void query(String sql, int charsetIndex) {
LOGGER.debug("UnsupportedEncodingException :"+ e.getMessage());
}
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}
}
34 changes: 34 additions & 0 deletions src/main/java/io/mycat/config/model/SystemConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ public final class SystemConfig {

private int parallExecute;

private boolean enableWriteQueueFlowControl;// 写队列流量控制
private int writeQueueStopThreshold;// 写队列停止写入阈值
private int writeQueueRecoverThreshold;// 写队列恢复写入阈值

public String getDefaultSqlParser() {
return defaultSqlParser;
}
Expand Down Expand Up @@ -314,6 +318,11 @@ public SystemConfig() {
this.ignoreUnknownCommand = 0;
this.parallExecute = 0;
this.removeGraveAccent = 1;

// 流量控制相关
this.enableWriteQueueFlowControl = false;
this.writeQueueStopThreshold = 10 * 1024;
this.writeQueueRecoverThreshold = 512;
}

public void setMaxPreparedStmtCount(int maxPreparedStmtCount){
Expand Down Expand Up @@ -1060,4 +1069,29 @@ public int getRemoveGraveAccent() {
public void setRemoveGraveAccent(int removeGraveAccent) {
this.removeGraveAccent = removeGraveAccent;
}

public boolean isEnableWriteQueueFlowControl() {
return enableWriteQueueFlowControl;
}

public void setEnableWriteQueueFlowControl(boolean enableWriteQueueFlowControl) {
this.enableWriteQueueFlowControl = enableWriteQueueFlowControl;
}

public int getWriteQueueStopThreshold() {
return writeQueueStopThreshold;
}

public void setWriteQueueStopThreshold(int writeQueueStopThreshold) {
this.writeQueueStopThreshold = writeQueueStopThreshold;
}

public int getWriteQueueRecoverThreshold() {
return writeQueueRecoverThreshold;
}

public void setWriteQueueRecoverThreshold(int writeQueueRecoverThreshold) {
this.writeQueueRecoverThreshold = writeQueueRecoverThreshold;
}

}
6 changes: 6 additions & 0 deletions src/main/java/io/mycat/manager/ManagerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ public void handle(final byte[] data) {
handler.handle(data);
}

@Override
public void checkQueueFlow() {
// TODO Auto-generated method stub

}

}
12 changes: 12 additions & 0 deletions src/main/java/io/mycat/net/AIOSocketWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ public void doNextWriteCheck()
public boolean checkAlive() {
return channel.isOpen();
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}
}

class AIOWriteHandler implements CompletionHandler<Integer, AIOSocketWR> {
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/mycat/net/AbstractConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@

import com.google.common.base.Strings;

import io.mycat.MycatServer;
import io.mycat.backend.mysql.CharsetUtil;
import io.mycat.config.model.SystemConfig;
import io.mycat.util.CompressUtil;
import io.mycat.util.TimeUtil;

Expand Down Expand Up @@ -85,6 +87,10 @@ public abstract class AbstractConnection implements NIOConnection {

protected final SocketWR socketWR;

protected boolean enableFlowController;// writeQueue是否开启流控
protected final int writeQueueStopThreshold;// writeQueue停止阀值
protected final int writeQueueRecoverThreshold;// writeQueue恢复阀值

public AbstractConnection(NetworkChannel channel) {
this.channel = channel;
boolean isAIO = (channel instanceof AsynchronousChannel);
Expand All @@ -97,6 +103,11 @@ public AbstractConnection(NetworkChannel channel) {
this.startupTime = TimeUtil.currentTimeMillis();
this.lastReadTime = startupTime;
this.lastWriteTime = startupTime;

SystemConfig config = MycatServer.getInstance().getConfig().getSystem();
this.enableFlowController = config.isEnableWriteQueueFlowControl();
this.writeQueueStopThreshold = config.getWriteQueueStopThreshold();
this.writeQueueRecoverThreshold = config.getWriteQueueRecoverThreshold();
}

public String getCharset() {
Expand Down Expand Up @@ -623,4 +634,22 @@ public void onConnectfinish() {
public boolean checkAlive(){
return socketWR.checkAlive();
}

public boolean isEnableFlowController() {
return enableFlowController;
}

public int getWriteQueueStopThreshold() {
return writeQueueStopThreshold;
}

public int getWriteQueueRecoverThreshold() {
return writeQueueRecoverThreshold;
}

/**
* 检查写队列流量,必要时候进行流控
*/
abstract public void checkQueueFlow();

}
5 changes: 5 additions & 0 deletions src/main/java/io/mycat/net/BackendAIOConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void setProcessor(NIOProcessor processor) {
processor.addBackend(this);
}

@Override
public void checkQueueFlow() {

}

@Override
public String toString() {
return "BackendConnection [id=" + id + ", host=" + host + ", port="
Expand Down
88 changes: 88 additions & 0 deletions src/main/java/io/mycat/net/FrontendConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.MycatServer;
import io.mycat.backend.BackendConnection;
import io.mycat.backend.mysql.CharsetUtil;
import io.mycat.backend.mysql.MySQLMessage;
import io.mycat.config.Capabilities;
Expand All @@ -54,6 +58,7 @@
import io.mycat.net.mysql.HandshakeV10Packet;
import io.mycat.net.mysql.MySQLPacket;
import io.mycat.net.mysql.OkPacket;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.parser.ServerParse;
import io.mycat.util.CompressUtil;
import io.mycat.util.RandomUtil;
Expand All @@ -80,6 +85,7 @@ public abstract class FrontendConnection extends AbstractConnection {
protected LoadDataInfileHandler loadDataInfileHandler;
protected boolean isAccepted;
protected boolean isAuthenticated;
protected QueueFlowController flowController;
private boolean allowMultiStatements = false;

public FrontendConnection(NetworkChannel channel) throws IOException {
Expand All @@ -97,6 +103,10 @@ public FrontendConnection(NetworkChannel channel) throws IOException {
this.port = localAddr.getPort();
this.localPort = remoteAddr.getPort();
this.handler = new FrontendAuthenticator(this);

if (enableFlowController) {
this.flowController = new QueueFlowController(this);
}
}

public long getId() {
Expand Down Expand Up @@ -672,5 +682,83 @@ public void setOption(byte[] data) {
return;
}
}

public boolean isEnableFlowController() {
return enableFlowController;
}

public void setEnableFlowController(boolean enableFlowController) {
this.enableFlowController = enableFlowController;
}

public QueueFlowController getFlowController() {
return flowController;
}

/**
*
* 队列流量控制器,防止队列过大内存OOM,功能:
* 1)超过最大阀值,关闭NIO读事件,停止从网络读取mysql数据
* 2)队列恢复到可继续写的阀值,重启NIO读事件,继续写队列
*/
public class QueueFlowController {

private volatile boolean readIOStopped; // 读事件的IO是否已经停止
private Collection<BackendConnection> relationedBackendConns;// 关联的后端连接
private final FrontendConnection frontendConn;

public QueueFlowController(FrontendConnection c) {
this.readIOStopped = false;
this.relationedBackendConns = new ArrayList<BackendConnection>();
this.frontendConn = c;
}

/**
*恢复所有后端连接的读事件
*/
private void recoverIORead() {
if (readIOStopped) {
synchronized (relationedBackendConns) {
if (readIOStopped) {// 再次判断,防止并发多次执行
readIOStopped = false;
for (final BackendConnection conn : relationedBackendConns) {
conn.enableRead();
}
relationedBackendConns.clear();
LOGGER.info("The connection[{}] has removed flow control.", frontendConn.toString());
}
}
}
}

private void stopIORead(final Collection<BackendConnection> conns) {
if (null != conns && conns.size() > 0) {
synchronized (relationedBackendConns) {
if (!readIOStopped) {// 再次判断,防止并发多次执行
readIOStopped = true;
for (BackendConnection conn : conns) {
conn.disableRead();
this.relationedBackendConns.add(conn);
}
LOGGER.info("Now the connection[{}] is under flow control", frontendConn.toString());
}
}
}
}
/**
* 检查writeQueue的流量控制阈值
*
* @param connection
*/
public void check(Map<RouteResultsetNode, BackendConnection> target) {
int size = writeQueue.size();
if (!readIOStopped && size > writeQueueStopThreshold) {
stopIORead(target.values());
} else {
if (readIOStopped && size <= writeQueueRecoverThreshold) {
recoverIORead();
}
}
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/mycat/net/NIOProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ private void checkConSendQueue(AbstractConnection c) {
if (!c.writeQueue.isEmpty()) {
c.getSocketWR().doNextWriteCheck();
}
}

if (c.isEnableFlowController()) {
c.checkQueueFlow();
}
}

// 后端连接检查
private void backendCheck() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/mycat/net/SocketWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public abstract class SocketWR {
public abstract void asynRead() throws IOException;
public abstract void doNextWriteCheck() ;
public abstract boolean checkAlive();
public abstract void disableRead();
public abstract void enableRead();
}
Loading

0 comments on commit f43520a

Please sign in to comment.