Skip to content

Commit

Permalink
Add unsafePopWithMsgIdAllShards() API
Browse files Browse the repository at this point in the history
Similar to popWithMsgId(), but attempts to pop from any shard.
Hence, prefixed with unsafe.

Testing: Added a test to the DynoQueueDemo.
  • Loading branch information
smukil committed Sep 23, 2019
1 parent 75f9c4b commit 6975b46
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,14 @@ public interface DynoQueue extends Closeable {
*
*/
public List<Message> unsafePopAllShards(int messageCount, int wait, TimeUnit unit);


/**
* Same as popWithMsgId(), but allows popping from any shard.
*
* @param messageId ID of message to pop
* @return Returns a "Message" object if pop was successful. 'null' otherwise.
*/
public Message unsafePopWithMsgIdAllShards(String messageId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,20 @@ private void runSimpleV1Demo(DynoJedisClient dyno) throws IOException {
logger.info("Get MSG ID that contains '3' in the queue -> " + V1Queue.getMsgWithPredicate("3", true));

List<Message> specific_pops = new ArrayList<>();
// We'd only be able to pop from the local shard, so try to pop the first payload ID we see in the local shard.
// We'd only be able to pop from the local shard with popWithMsgId(), so try to pop the first payload ID we see in the local shard.
// Until then pop all messages not in the local shard with unsafePopWithMsgIdAllShards().
for (int i = 0; i < payloads.size(); ++i) {
Message popWithMsgId = V1Queue.popWithMsgId(payloads.get(i).getId());
if (popWithMsgId != null) {
specific_pops.add(popWithMsgId);
break;
} else {
// If we were unable to pop using popWithMsgId(), that means the message ID does not exist in the local shard.
// Ensure that we can pop with unsafePopWithMsgIdAllShards().
Message unsafeSpecificPop = V1Queue.unsafePopWithMsgIdAllShards(payloads.get(i).getId());
assert(unsafeSpecificPop != null);
boolean ack = V1Queue.ack(unsafeSpecificPop.getId());
assert(ack);
}
}

Expand Down Expand Up @@ -156,6 +164,8 @@ private void runSimpleV1Demo(DynoJedisClient dyno) throws IOException {
List<Message> pop_all_msgs = V1Queue.unsafePopAllShards(7, 1000, TimeUnit.MILLISECONDS);
for (Message msg : pop_all_msgs) {
logger.info("Message popped (ID : payload) -> " + msg.getId() + " : " + msg.getPayload());
boolean ack = V1Queue.ack(msg.getId());
assert(ack);
}

V1Queue.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,29 @@ public List<Message> pop(int messageCount, int wait, TimeUnit unit) {

@Override
public Message popWithMsgId(String messageId) {
return popWithMsgIdHelper(messageId, shardName);
}

@Override
public Message unsafePopWithMsgIdAllShards(String messageId) {
for (String shard : allShards) {
Message msg = popWithMsgIdHelper(messageId, shard);
if (msg != null) return msg;
}
return null;
}

public Message popWithMsgIdHelper(String messageId, String targetShard) {

return execute("popWithMsgId", localQueueShard, () -> {
return execute("popWithMsgId", targetShard, () -> {
String queueShardName = getQueueShardKey(queueName, targetShard);
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
String unackShardName = getUnackKey(queueName, shardName);
String unackShardName = getUnackKey(queueName, targetShard);

ZAddParams zParams = ZAddParams.zAddParams().nx();

try {
long exists = nonQuorumConn.zrank(localQueueShard, messageId);
long exists = nonQuorumConn.zrank(queueShardName, messageId);
// If an exception wasn't thrown, the element has to exist.
assert(exists >= 0);
} catch (NullPointerException e) {
Expand Down Expand Up @@ -386,7 +400,7 @@ public Message popWithMsgId(String messageId) {
return null;
}

long removed = quorumConn.zrem(localQueueShard, messageId);
long removed = quorumConn.zrem(queueShardName, messageId);
if (removed == 0) {
if (logger.isDebugEnabled()) {
logger.debug("cannot remove {} from the queue shard ", queueName, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
public Message popWithMsgId(String messageId) {
throw new UnsupportedOperationException();
}

@Override
public Message unsafePopWithMsgIdAllShards(String messageId) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> peek(int messageCount) {
return me.peek(messageCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ public synchronized List<Message> pop(int messageCount, int wait, TimeUnit unit)
public Message popWithMsgId(String messageId) {
throw new UnsupportedOperationException();
}

@Override
public Message unsafePopWithMsgIdAllShards(String messageId) {
throw new UnsupportedOperationException();
}

private List<Message> _pop(List<String> batch) throws Exception {

double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
Expand Down

0 comments on commit 6975b46

Please sign in to comment.