Skip to content

Commit

Permalink
Removed synchronization and adjusted flow for multithreaded access
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 21, 2014
1 parent 35dfea1 commit 6a6f10e
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 76 deletions.
175 changes: 102 additions & 73 deletions lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisCommandInterruptedException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.output.VoidOutput;

import io.netty.buffer.ByteBuf;
Expand All @@ -34,12 +36,14 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC

private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
protected BlockingQueue<RedisCommand<K, V, ?>> queue;
protected BlockingQueue<RedisCommand<K, V, ?>> commandBuffer = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
protected ByteBuf buffer;
protected RedisStateMachine<K, V> rsm;
private Channel channel;
private boolean closed;
private RedisChannelHandler<K, V> redisChannelHandler;
private Object lock = new Object();
private final ReentrantLock writeLock = new ReentrantLock();
private final ReentrantLock readLock = new ReentrantLock();

/**
* Initialize a new instance that handles commands from the supplied queue.
Expand Down Expand Up @@ -72,19 +76,23 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
return;
}

synchronized (lock) {
if (buffer == null) {
logger.warn("CommandHandler is closed, incoming response will be discarded.");
return;
}
if (buffer == null) {
logger.warn("CommandHandler is closed, incoming response will be discarded.");
return;
}

try {
readLock.lock();
buffer.writeBytes(input);

if (logger.isDebugEnabled()) {
logger.debug("[" + channel.remoteAddress() + "] Received: "
if (logger.isTraceEnabled()) {
logger.trace("[" + ctx.channel().remoteAddress() + "] Received: "
+ buffer.toString(Charset.defaultCharset()).trim());
}

decode(ctx, buffer);
} finally {
readLock.unlock();
}

} finally {
Expand All @@ -105,16 +113,48 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
synchronized (queue) {
if (!queue.isEmpty()) {
RedisCommand<K, V, ?> command = queue.take();
command.setException(cause);
command.complete();
}
if (!queue.isEmpty()) {
RedisCommand<K, V, ?> command = queue.take();
command.setException(cause);
command.complete();
}
super.exceptionCaught(ctx, cause);
}

@Override
public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
try {

if (closed) {
throw new RedisException("Connection is closed");
}

if (channel != null) {
if (logger.isDebugEnabled()) {
logger.debug("writeAndFlush Command " + command);
}
channel.writeAndFlush(command);
} else {
try {
writeLock.lock();
if (logger.isDebugEnabled()) {
logger.debug("buffering Command " + command);
}
commandBuffer.put(command);
} finally {
writeLock.unlock();
}
}

} catch (NullPointerException e) {
throw new RedisException("Connection is closed");
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}

return command;
}

/**
*
* @see io.netty.channel.ChannelDuplexHandler#write(io.netty.channel.ChannelHandlerContext, java.lang.Object,
Expand All @@ -127,22 +167,17 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
final RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
if (logger.isDebugEnabled()) {
logger.debug("[" + channel.remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim());
}

synchronized (queue) {
if (logger.isTraceEnabled()) {
logger.trace("[" + ctx.channel().remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim());
}

if (cmd.getOutput() instanceof VoidOutput) {
queue.remove(cmd);
ctx.write(buf, promise);
cmd.complete();
} else {
if (!queue.contains(cmd)) {
queue.put(cmd);
}
ctx.write(buf, promise);
}
if (cmd.getOutput() instanceof VoidOutput) {
ctx.write(buf, promise);
cmd.complete();
} else {
queue.put(cmd);
ctx.write(buf, promise);
}

}
Expand All @@ -154,17 +189,27 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
public void channelActive(final ChannelHandlerContext ctx) throws Exception {

logger.debug("channelActive()");
this.channel = ctx.channel();
List<RedisCommand<K, V, ?>> tmp = new ArrayList<RedisCommand<K, V, ?>>(queue.size() + commandBuffer.size());

try {
writeLock.lock();

this.channel = ctx.channel();

List<RedisCommand<K, V, ?>> tmp = new ArrayList<RedisCommand<K, V, ?>>(queue.size());
tmp.addAll(queue);
queue.clear();

tmp.addAll(queue);
queue.clear();
if (redisChannelHandler != null) {
redisChannelHandler.activated();
}

if (redisChannelHandler != null) {
redisChannelHandler.activated();
} finally {
writeLock.unlock();
}

tmp.addAll(commandBuffer);
commandBuffer.clear();

for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {
logger.debug("Triggering command " + cmd);
Expand All @@ -174,6 +219,8 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {

tmp.clear();

logger.debug("channelActive() done");

}

/**
Expand All @@ -183,57 +230,34 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelInactive()");
try {
this.channel = null;
if (closed) {
for (RedisCommand<K, V, ?> cmd : queue) {
if (cmd.getOutput() != null) {
cmd.getOutput().setError("Connection closed");
}
cmd.complete();
}
queue.clear();
queue = null;
this.channel = null;

if (redisChannelHandler != null) {
redisChannelHandler.deactivated();
if (closed) {
for (RedisCommand<K, V, ?> cmd : queue) {
if (cmd.getOutput() != null) {
cmd.getOutput().setError("Connection closed");
}
cmd.complete();
}
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
throw e;
}
}
queue.clear();
queue = null;

@Override
public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
try {
commandBuffer.clear();
queue = null;

if (closed) {
throw new RedisException("Connection is closed");
if (redisChannelHandler != null) {
redisChannelHandler.deactivated();
}

if (channel != null) {
channel.writeAndFlush(command);
} else {
synchronized (queue) {
queue.put(command);
}
}
} catch (NullPointerException e) {
throw new RedisException("Connection is closed");
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}

return command;
logger.debug("channelInactive() done");
}

/**
* Close the connection.
*/
@Override
public synchronized void close() {
public void close() {
logger.debug("close()");

if (closed) {
Expand All @@ -242,8 +266,12 @@ public synchronized void close() {
}

if (buffer != null) {
synchronized (lock) {
try {
readLock.lock();
buffer.release();
} finally {
readLock.unlock();

}
buffer = null;
}
Expand All @@ -261,6 +289,7 @@ public synchronized void close() {
}

channel = null;

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void syncPoolPerformanceTest() throws Exception {
RedisConnectionPool<RedisConnection<String, String>> pool = client.pool();
RedisConnection<String, String> c1 = pool.allocateConnection();

c1.ping();
Stopwatch stopwatch = Stopwatch.createStarted();

for (int i = 0; i < 1000; i++) {
Expand All @@ -168,6 +169,7 @@ public void asyncPoolPerformanceTest() throws Exception {
RedisConnectionPool<RedisAsyncConnection<String, String>> pool = client.asyncPool();
RedisAsyncConnection<String, String> c1 = pool.allocateConnection();

c1.ping();
Stopwatch stopwatch = Stopwatch.createStarted();

for (int i = 0; i < 1000; i++) {
Expand Down
4 changes: 2 additions & 2 deletions lettuce/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# Root logger option
log4j.rootLogger=INFO, stdout
log4j.rootLogger=INFO, stdout, file

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p] [%t] (%c{1}:%L) %m%n

log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=target/log.log
log4j.appender.file.Append=false
log4j.appender.file.bufferedIO=true
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p] [%t] (%c{1}:%L) %m%n

Expand Down
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<useFile>false</useFile>
<systemProperties>
<io.netty.eventLoopThreads>4</io.netty.eventLoopThreads>
</systemProperties>
</configuration>
</plugin>

Expand Down

0 comments on commit 6a6f10e

Please sign in to comment.