Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/io/cdm/net/AbstractConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,14 @@ public ByteBuffer writeToBuffer(byte[] src, ByteBuffer buffer) {
public void close(String reason) {
if (!isClosed.get()) {
closeSocket();
isClosed.set(true);
if (processor != null) {
processor.removeConnection(this);
}
this.cleanup();
isSupportCompress = false;

//修改Got packet 1156包乱序的错误
isClosed.set(true);
// ignore null information
if (Strings.isNullOrEmpty(reason)) {
return;
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/cdm/server/NonBlockingSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void execute(RouteResultset rrs, int type) {
}

try {
if(initCount > 1){
if((type == ServerParse.DELETE || type == ServerParse.INSERT
|| type == ServerParse.UPDATE)&&initCount >= 1&&target.get(nodes[0])==null){
checkDistriTransaxAndExecute(rrs,1,autocommit);
}else{
singleNodeHandler.execute();
Expand All @@ -159,7 +160,9 @@ public void execute(RouteResultset rrs, int type) {
multiNodeHandler.setPrepared(true);
}
try {
if(((type == ServerParse.DELETE || type == ServerParse.INSERT || type == ServerParse.UPDATE) && !rrs.isGlobalTable() && nodes.length > 1)||initCount > 1) {
//多个分片节点 是执行的DML 然后不是全局表 节点个数大于1 或者 target里面大于等于1 (保证除DQL语句只能执行一个执行全局表)
if ((type == ServerParse.DELETE || type == ServerParse.INSERT
|| type == ServerParse.UPDATE)&&((!rrs.isGlobalTable()&& nodes.length > 1)|| (initCount >= 1))) {
checkDistriTransaxAndExecute(rrs,2,autocommit);
} else {
multiNodeHandler.execute();
Expand All @@ -178,9 +181,16 @@ public void execute(RouteResultset rrs, int type) {
private void checkDistriTransaxAndExecute(RouteResultset rrs, int type,boolean autocommit) throws Exception {
switch(CDMServer.getInstance().getConfig().getSystem().getHandleDistributedTransactions()) {
case 1:
source.writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Distributed transaction is disabled!");
RouteResultsetNode preResultsetNode = null;
if(target.size()>0){
preResultsetNode = (RouteResultsetNode)target.keySet().toArray()[0];
}
String errorMes = "Distributed transaction is disabled!" + " error sql is "+
(preResultsetNode == null? rrs.getStatement():preResultsetNode.getStatement());
LOGGER.error(errorMes);
source.writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND,errorMes);
if(!autocommit){
source.setTxInterrupt("Distributed transaction is disabled!");
source.setTxInterrupt(errorMes);
}
break;
case 2:
Expand Down