Skip to content

Commit

Permalink
Merge pull request #1 from MyCATApache/1.6
Browse files Browse the repository at this point in the history
update
  • Loading branch information
junwen12221 authored Jun 24, 2019
2 parents 233cd43 + e3a9281 commit 65f7860
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 8 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@

# [MyCAT](http://mycat.io/)

重要通告,腾讯没有正式通知Mycat社区就突然关闭Mycat原先官网群,对Mycat开源社区造成重大影响,我们希望腾讯能给出合理解释并尽快恢复官网群, 在此之前,Mycat启用新的官网群,qq群:332702697

Mycat志愿者开发群:332702697

[![GitHub issues](https://img.shields.io/github/issues/MyCATApache/Mycat-Server.svg)](https://github.com/MyCATApache/Mycat-Server/issues)
[![GitHub forks](https://img.shields.io/github/forks/MyCATApache/Mycat-Server.svg)](https://github.com/MyCATApache/Mycat-Server/network)
Expand Down
153 changes: 147 additions & 6 deletions src/main/java/io/mycat/backend/jdbc/JDBCHeartbeat.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.mycat.backend.jdbc;

import io.mycat.backend.datasource.PhysicalDBPool;
import io.mycat.backend.datasource.PhysicalDatasource;
import io.mycat.backend.heartbeat.MySQLHeartbeat;
import io.mycat.config.model.DataHostConfig;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -13,14 +17,17 @@
import io.mycat.statistic.HeartbeatRecorder;

public class JDBCHeartbeat extends DBHeartbeat{
public static final Logger LOGGER = LoggerFactory.getLogger(JDBCHeartbeat.class);

private final ReentrantLock lock;
private final JDBCDatasource source;
private final boolean heartbeatnull;
private Long lastSendTime = System.currentTimeMillis();
private Long lastReciveTime = System.currentTimeMillis();


private Logger logger = LoggerFactory.getLogger(this.getClass());

private final int maxRetryCount;

private Logger logger = LoggerFactory.getLogger(this.getClass());

public JDBCHeartbeat(JDBCDatasource source)
{
Expand All @@ -29,6 +36,8 @@ public JDBCHeartbeat(JDBCDatasource source)
this.status = INIT_STATUS;
this.heartbeatSQL = source.getHostConfig().getHearbeatSQL().trim();
this.heartbeatnull= heartbeatSQL.length()==0;
this.maxRetryCount = source.getHostConfig().getMaxRetryCount();

}

@Override
Expand Down Expand Up @@ -96,28 +105,160 @@ public void heartbeat()
try
{
isChecking.set(true);

try (Connection c = source.getConnection())
{
try (Statement s = c.createStatement())
{
s.execute(heartbeatSQL);
}
c.close();
}
status = OK_STATUS;
setResult(OK_STATUS);
if(logger.isDebugEnabled()){
logger.debug("JDBCHeartBeat con query sql: "+heartbeatSQL);
}

} catch (Exception ex)
{
logger.error("JDBCHeartBeat error",ex);
status = ERROR_STATUS;
// status = ERROR_STATUS;
setResult(ERROR_STATUS);
} finally
{
lock.unlock();
this.isChecking.set(false);
lastReciveTime = System.currentTimeMillis();
}
}

public void setResult(int result) {
switch (result) {
case OK_STATUS:
setOk();
break;
case ERROR_STATUS:
setError();
break;
case TIMEOUT_STATUS:
setTimeout();
break;
}
if (this.status != OK_STATUS) {
switchSourceIfNeed("heartbeat error");
}

}

private void setOk() {
switch (status) {
case DBHeartbeat.TIMEOUT_STATUS:
writeStatusMsg(source.getDbPool().getHostName(), source.getName() ,DBHeartbeat.INIT_STATUS);
this.status = DBHeartbeat.INIT_STATUS;
this.errorCount.set(0);
//前一个状态为超时 当前状态为正常状态 那就马上发送一个请求 来验证状态是否恢复为Ok
heartbeat();// timeout, heart beat again
break;
case DBHeartbeat.OK_STATUS:
this.errorCount.set(0);
break;
default:
writeStatusMsg(source.getDbPool().getHostName(), source.getName() ,DBHeartbeat.OK_STATUS);
this.status = OK_STATUS;
this.errorCount.set(0);;
}
}
//发生错误了,是否进行下一次心跳检测的策略 . 是否进行下一次心跳检测.
private void nextDector( int nextStatue) {

if (isStop.get()) {
writeStatusMsg(source.getDbPool().getHostName(), source.getName() ,DBHeartbeat.OK_STATUS);
this.status = nextStatue;
} else {
// should continues check error status
if(errorCount.get() < maxRetryCount) {
} else {
writeStatusMsg(source.getDbPool().getHostName(), source.getName() ,nextStatue);
this.status = nextStatue;
this.errorCount.set(0);
}
}
}


private void setError() {
errorCount.incrementAndGet() ;
nextDector( ERROR_STATUS);
}

private void setTimeout() {
errorCount.incrementAndGet() ;
nextDector( TIMEOUT_STATUS);
//status = DBHeartbeat.TIMEOUT_STATUS;
}

/**
* switch data source
*/
private void switchSourceIfNeed(String reason) {
int switchType = source.getHostConfig().getSwitchType();
String notSwitch = source.getHostConfig().getNotSwitch();
if (notSwitch.equals(DataHostConfig.FOVER_NOT_SWITCH_DS)
|| switchType == DataHostConfig.NOT_SWITCH_DS) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not switch datasource ,for switchType is "
+ DataHostConfig.NOT_SWITCH_DS);
return;
}
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("to switchSourceIfNeed function 进行读节点转换 "
);
}
PhysicalDBPool pool = this.source.getDbPool();
int curDatasourceHB = pool.getSource().getHeartbeat().getStatus();
// read node can't switch ,only write node can switch
if (pool.getWriteType() == PhysicalDBPool.WRITE_ONLYONE_NODE
&& !source.isReadNode()
&& curDatasourceHB != DBHeartbeat.OK_STATUS
&& pool.getSources().length > 1) {
synchronized (pool) {
// try to see if need switch datasource
curDatasourceHB = pool.getSource().getHeartbeat().getStatus();
if (curDatasourceHB != DBHeartbeat.INIT_STATUS && curDatasourceHB != DBHeartbeat.OK_STATUS) {
int curIndex = pool.getActivedIndex();
int nextId = pool.next(curIndex);
PhysicalDatasource[] allWriteNodes = pool.getSources();
while (true) {
if (nextId == curIndex) {
break;
}
PhysicalDatasource theSource = allWriteNodes[nextId];
DBHeartbeat theSourceHB = theSource.getHeartbeat();
int theSourceHBStatus = theSourceHB.getStatus();
if (theSourceHBStatus == DBHeartbeat.OK_STATUS) {
if (switchType == DataHostConfig.SYN_STATUS_SWITCH_DS) {
if (Integer.valueOf(0).equals( theSourceHB.getSlaveBehindMaster())) {
LOGGER.info("try to switch datasource ,slave is synchronized to master " + theSource.getConfig());
pool.switchSourceOrVoted(nextId, true, reason);
break;
} else {
LOGGER.warn("ignored datasource ,slave is not synchronized to master , slave behind master :"
+ theSourceHB.getSlaveBehindMaster()
+ " " + theSource.getConfig());
}
} else {
// normal switch
LOGGER.info("try to switch datasource ,not checked slave synchronize status " + theSource.getConfig());
pool.switchSourceOrVoted(nextId, true, reason);
break;
}

}
nextId = pool.next(nextId);
}
}
}
}
}
}

0 comments on commit 65f7860

Please sign in to comment.