Skip to content

Commit

Permalink
Merge pull request #14 from Netflix/dev
Browse files Browse the repository at this point in the history
new api for setTimeout and pop message optimization
  • Loading branch information
v1r3n authored Dec 1, 2016
2 parents 46ac14f + 55691d1 commit 4083821
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public interface DynoQueue {
*/
public boolean setUnackTimeout(String messageId, long timeout);


/**
* Updates the timeout for the message.
* @param messageId ID of the message to be acknowledged
* @param timeout time in milliseconds for which the message will remain invisible and not popped out of the queue.
* @return true if the message id was found and updated with new timeout. false otherwise.
*/
public boolean setTimeout(String messageId, long timeout);

/**
*
* @param messageId Remove the message from the queue
Expand Down
11 changes: 9 additions & 2 deletions dyno-queues-redis/build.gradle
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
apply plugin: 'project-report'

dependencies {

compile project(':dyno-queues-core')

compile "com.google.inject:guice:3.0"
compile "com.netflix.dyno:dyno-jedis:1.5.5+"
compile "com.netflix.dyno:dyno-jedis:1.5.6"
compile "com.netflix.archaius:archaius-core:0.5.6"
compile "com.netflix.servo:servo-core:0.5.5"
compile 'com.netflix.eureka:eureka-client:1.1.110'
compile 'com.fasterxml.jackson.core:jackson-databind:2.4.4'

testCompile 'org.rarefiedredis.redis:redis-java:0.0.17'
testCompile "junit:junit:4.11"
}
}

tasks.withType(Test) {
maxParallelForks = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -86,8 +85,6 @@ public class RedisDynoQueue implements DynoQueue {

private LinkedBlockingQueue<String> prefetchedIds;

private int prefetchCount = 10_000;

private int retryCount = 2;

public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, ExecutorService dynoCallExecutor){
Expand All @@ -112,7 +109,7 @@ public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allSh
this.executorService = dynoCallExecutor;

Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> prefetchIds(), 0, 20, TimeUnit.MILLISECONDS);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> prefetchIds(), 0, 10, TimeUnit.MILLISECONDS);

logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName);

Expand Down Expand Up @@ -216,64 +213,54 @@ public List<Message> pop(int messageCount, int wait, TimeUnit unit) {

try {

return (List<Message>) execute(() -> {
List<Message> messages = (List<Message>)execute(() -> {

Set<String> ids = new HashSet<>();
if (logger.isDebugEnabled()) {
logger.debug("{} prefetchedIds.size={}", queueName, prefetchedIds.size());
}

if (prefetchedIds.size() < messageCount) {
prefetch.set(true);
prefetch.addAndGet(messageCount - prefetchedIds.size());
String id = prefetchedIds.poll(wait, unit);
if (id != null) {
ids.add(id);
}
}
prefetchedIds.drainTo(ids, messageCount);
if (ids.size() < messageCount) {
prefetch.set(true);
}

if (ids.isEmpty()) {
return Collections.emptyList();
}

return _pop(ids, messageCount);

});

return messages;

} finally {
sw.stop();
}

}

private AtomicBoolean prefetch = new AtomicBoolean(false);

@VisibleForTesting
void prefetch() {
prefetch.set(true);
prefetchIds();
}
AtomicInteger prefetch = new AtomicInteger(0);

private void prefetchIds() {

if (!prefetch.get()) {
if (prefetch.get() < 1) {
return;
}


int prefetchCount = prefetch.get();
Stopwatch sw = monitor.start(monitor.prefetch, prefetchCount);
try {

execute(() -> {
Set<String> ids = peekIds(0, prefetchCount);
prefetchedIds.addAll(ids);
prefetch.set(false);
return null;
});

Set<String> ids = peekIds(0, prefetchCount);
prefetchedIds.addAll(ids);
prefetch.addAndGet((-1 * ids.size()));
if(prefetch.get() < 0 || ids.isEmpty()) {
prefetch.set(0);
}
} finally {

sw.stop();
}

Expand Down Expand Up @@ -380,6 +367,39 @@ public boolean setUnackTimeout(String messageId, long timeout) {
sw.stop();
}
}

@Override
public boolean setTimeout(String messageId, long timeout) {

return execute(() -> {

String json = nonQuorumConn.hget(messageStoreKey, messageId);
if(json == null) {
return false;
}
Message message = om.readValue(json, Message.class);
message.setTimeout(timeout);

for (String shard : allShards) {

String queueShard = getQueueShardKey(queueName, shard);
Double score = quorumConn.zscore(queueShard, messageId);
if(score != null) {
double priorityd = message.getPriority() / 100;
double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd;
ZAddParams params = ZAddParams.zAddParams().xx();
long added = quorumConn.zadd(queueShard, newScore, messageId, params);
if(added == 1) {
json = om.writeValueAsString(message);
quorumConn.hset(messageStoreKey, message.getId(), json);
return true;
}
return false;
}
}
return false;
});
}

@Override
public boolean remove(String messageId) {
Expand Down Expand Up @@ -542,8 +562,6 @@ public void processUnacks() {
quorumConn.zadd(myQueueShard, score, member);
quorumConn.zrem(unackQueueName, member);
}

prefetchIds();
return null;
});

Expand Down Expand Up @@ -581,7 +599,7 @@ private <R> R executeWithRetry(ExecutorService es, Callable<R> r, int retryCount

try {

return es.submit(r).get(10, TimeUnit.SECONDS);
return es.submit(r).get(1, TimeUnit.MINUTES);

} catch (ExecutionException e) {

Expand All @@ -595,5 +613,5 @@ private <R> R executeWithRetry(ExecutorService es, Callable<R> r, int retryCount
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ public Long zadd(String key, double score, String member, ZAddParams params) {
if(existing == null) {
return 0L;
}
return redis.zadd(key, new ZsetPair(member, score));
redis.zadd(key, new ZsetPair(member, score));
return 1L;
}else {
return redis.zadd(key, new ZsetPair(member, score));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public void test(){
public Collection<Host> getHosts() {
List<Host> hosts = new LinkedList<>();
hosts.add(new Host("host1", 8102, "us-east-1a", Status.Up));
hosts.add(new Host("host2", 8102, "us-east-1b", Status.Up));
hosts.add(new Host("host3", 8102, "us-east-1d", Status.Up));
hosts.add(new Host("host1", 8102, "us-east-1b", Status.Up));
hosts.add(new Host("host1", 8102, "us-east-1d", Status.Up));

return hosts;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -111,6 +117,7 @@ public void testGetUnackTime() {

@Test
public void testTimeoutUpdate() {

rdq.clear();

String id = UUID.randomUUID().toString();
Expand All @@ -123,7 +130,6 @@ public void testTimeoutUpdate() {
assertEquals(0, popped.size());

Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
rdq.prefetch();

popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
Expand Down Expand Up @@ -167,8 +173,89 @@ public void testTimeoutUpdate() {
}

@Test
public void testAll() {
public void testConcurrency() throws InterruptedException, ExecutionException {

rdq.clear();

final int count = 100;
final AtomicInteger published = new AtomicInteger(0);

ScheduledExecutorService ses = Executors.newScheduledThreadPool(6);
Runnable publisher = new Runnable() {

@Override
public void run() {
List<Message> messages = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Message msg = new Message(UUID.randomUUID().toString(), "Hello World-" + i);
msg.setPriority(new Random().nextInt(98));
messages.add(msg);
}
if(published.get() >= count) {
return;
}

published.addAndGet(messages.size());
rdq.push(messages);

}
};

for(int p = 0; p < 3; p++) {
ses.scheduleWithFixedDelay(publisher, 1, 1, TimeUnit.MILLISECONDS);
}

CountDownLatch latch = new CountDownLatch(count);
List<Message> allMsgs = new LinkedList<>();
Runnable consumer = new Runnable() {

@Override
public void run() {
List<Message> popped = rdq.pop(15, 1, TimeUnit.SECONDS);
allMsgs.addAll(popped);
popped.stream().forEach(p -> latch.countDown());
}
};

for(int c = 0; c < 3; c++) {
ses.scheduleWithFixedDelay(consumer, 1, 1, TimeUnit.MILLISECONDS);
}

Uninterruptibles.awaitUninterruptibly(latch);
Set<Message> uniqueMessages = allMsgs.stream().collect(Collectors.toSet());

assertEquals(count, allMsgs.size());
assertEquals(count, uniqueMessages.size());
List<Message> more = rdq.pop(1, 1, TimeUnit.SECONDS);
assertEquals(0, more.size());
assertEquals(0, rdq.prefetch.get());

ses.shutdownNow();
}

@Test
public void testSetTimeout() {

rdq.clear();

Message msg = new Message("x001", "Hello World");
msg.setPriority(3);
msg.setTimeout(20_000);
rdq.push(Arrays.asList(msg));

List<Message> popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertTrue(popped.isEmpty());

boolean updated = rdq.setTimeout(msg.getId(), 1);
assertTrue(updated);
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertEquals(1, popped.size());
assertEquals(1, popped.get(0).getTimeout());
}

@Test
public void testAll() {

rdq.clear();

int count = 10;
Expand Down Expand Up @@ -244,7 +331,7 @@ public void testAll() {

}

@After
@Before
public void clear(){
rdq.clear();
assertTrue(dynoClient.hlen(messageKey) == 0);
Expand Down

0 comments on commit 4083821

Please sign in to comment.