Skip to content

Commit

Permalink
Merge pull request #1 from MyCATApache/master
Browse files Browse the repository at this point in the history
2
  • Loading branch information
magicdoom committed Dec 17, 2015
2 parents 0fa1fae + cde521e commit df8ca5d
Show file tree
Hide file tree
Showing 16 changed files with 1,043 additions and 158 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@
<artifactId>log4j-core</artifactId>
<version>2.3</version>
</dependency>

<!-- joda日期处理工具 -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.8.2</version>
</dependency>

</dependencies>


Expand Down
142 changes: 0 additions & 142 deletions src/main/java/demo/ZkCreate.java

This file was deleted.

6 changes: 6 additions & 0 deletions src/main/java/io/mycat/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ public void onReadData(int got) throws IOException {
// handle this package
readBuffer.position(offset);
handle(readBuffer, offset, length);

// maybe handle stmt_close
if(isClosed()) {
return ;
}

// offset to next position
offset += length;
// reached end
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/mycat/server/FrontendPrepareHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface FrontendPrepareHandler {

void execute(byte[] data);

void close();
void close(byte[] data);

void clear();
}
17 changes: 15 additions & 2 deletions src/main/java/io/mycat/server/MySQLFrontConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.mycat.server.sqlhandler.SavepointHandler;
import io.mycat.server.sqlhandler.SelectHandler;
import io.mycat.server.sqlhandler.ServerLoadDataInfileHandler;
import io.mycat.server.sqlhandler.ServerPrepareHandler;
import io.mycat.server.sqlhandler.SetHandler;
import io.mycat.server.sqlhandler.ShowHandler;
import io.mycat.server.sqlhandler.StartHandler;
Expand Down Expand Up @@ -79,7 +80,7 @@ public MySQLFrontConnection(SocketChannel channel) throws IOException {
this.port = localAddr.getPort();
this.localPort = remoteAddr.getPort();
loadDataInfileHandler = new ServerLoadDataInfileHandler(this);

prepareHandler = new ServerPrepareHandler(this);
}

public void sendAuthPackge() throws IOException {
Expand Down Expand Up @@ -235,7 +236,12 @@ public void query(byte[] data) {
"Unknown charset '" + charset + "'");
return;
}

query(sql);

}

public void query(String sql) {
if (sql == null || sql.length() == 0) {
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
return;
Expand Down Expand Up @@ -415,7 +421,7 @@ public void stmtExecute(byte[] data) {

public void stmtClose(byte[] data) {
if (prepareHandler != null) {
prepareHandler.close();
prepareHandler.close(data);
} else {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
"Prepare unsupported!");
Expand Down Expand Up @@ -508,11 +514,18 @@ public void close(String reason) {
if (getLoadDataInfileHandler() != null) {
getLoadDataInfileHandler().clear();
}
if(getPrepareHandler() != null) {
getPrepareHandler().clear();
}
}

public LoadDataInfileHandler getLoadDataInfileHandler() {
return loadDataInfileHandler;
}

public FrontendPrepareHandler getPrepareHandler() {
return prepareHandler;
}

public void ping() {
write(OkPacket.OK);
Expand Down
28 changes: 25 additions & 3 deletions src/main/java/io/mycat/server/NonBlockingSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class NonBlockingSession{
private final MultiNodeCoordinator multiNodeCoordinator;
private final CommitNodeHandler commitHandler;
private volatile String xaTXID;

private boolean prepared;

public NonBlockingSession(MySQLFrontConnection source) {
this.source = source;
Expand Down Expand Up @@ -117,6 +119,9 @@ public void execute(RouteResultset rrs, int type) {

if (nodes.length == 1) {
singleNodeHandler = new SingleNodeHandler(rrs, this);
if(this.isPrepared()) {
singleNodeHandler.setPrepared(true);
}
try {
singleNodeHandler.execute();
} catch (Exception e) {
Expand All @@ -125,18 +130,25 @@ public void execute(RouteResultset rrs, int type) {
}
} else {
boolean autocommit = source.isAutocommit();
SystemConfig sysConfig = MycatServer.getInstance().getConfig()
.getSystem();
// SystemConfig sysConfig = MycatServer.getInstance().getConfig()
// .getSystem();
multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
this);

if(this.isPrepared()) {
multiNodeHandler.setPrepared(true);
}
try {
multiNodeHandler.execute();
} catch (Exception e) {
LOGGER.warn("{} {}", source, rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
}

if(this.isPrepared()) {
this.setPrepared(false);
}

}

public void commit() {
Expand Down Expand Up @@ -374,4 +386,14 @@ public String getXaTXID() {
return xaTXID;
}


public boolean isPrepared() {
return prepared;
}


public void setPrepared(boolean prepared) {
this.prepared = prepared;
}

}
25 changes: 22 additions & 3 deletions src/main/java/io/mycat/server/executors/MultiNodeQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.MySQLFrontConnection;
import io.mycat.server.NonBlockingSession;
import io.mycat.server.packet.BinaryRowDataPacket;
import io.mycat.server.config.node.MycatConfig;
import io.mycat.server.packet.FieldPacket;
import io.mycat.server.packet.OkPacket;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements
private volatile boolean fieldsReturned;
private int okCount;
private final boolean isCallProcedure;
private boolean prepared;
private List<FieldPacket> fieldPackets = new ArrayList<FieldPacket>();

public MultiNodeQueryHandler(int sqlType, RouteResultset rrs,
boolean autocommit, NonBlockingSession session) {
Expand Down Expand Up @@ -329,8 +332,15 @@ public void outputMergeResult(final MySQLFrontConnection source,
// }
for (int i = start; i < end; i++) {
RowDataPacket row = results.get(i);
row.packetId = ++packetId;
row.write(bufferArray);
if(prepared) {
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, row);
binRowDataPk.packetId = ++packetId;
binRowDataPk.write(bufferArray);
} else {
row.packetId = ++packetId;
row.write(bufferArray);
}
}

eof[3] = ++packetId;
Expand Down Expand Up @@ -416,6 +426,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
if (needMerg) {
FieldPacket fieldPkg = new FieldPacket();
fieldPkg.read(field);
fieldPackets.add(fieldPkg);
String fieldName = new String(fieldPkg.name).toUpperCase();
if (columToIndx != null
&& !columToIndx.containsKey(fieldName)) {
Expand All @@ -429,7 +440,6 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
fieldPkg.packetId = ++packetId;
shouldSkip = true;
fieldPkg.write(bufferArray);

}

columToIndx.put(fieldName,
Expand All @@ -439,6 +449,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
// find primary key index
FieldPacket fieldPkg = new FieldPacket();
fieldPkg.read(field);
fieldPackets.add(fieldPkg);
String fieldName = new String(fieldPkg.name);
if (primaryKey.equalsIgnoreCase(fieldName)) {
primaryKeyIndex = i;
Expand Down Expand Up @@ -525,4 +536,12 @@ public void requestDataResponse(byte[] data, BackendConnection conn) {
(MySQLBackendConnection) conn);
}

public boolean isPrepared() {
return prepared;
}

public void setPrepared(boolean prepared) {
this.prepared = prepared;
}

}
Loading

0 comments on commit df8ca5d

Please sign in to comment.