Skip to content

Commit

Permalink
Add expression filtering capability to the pullBlockIfNotFound method…
Browse files Browse the repository at this point in the history
… of pull consumer (apache#8024)
  • Loading branch information
RongtongJin authored and dingshuangxi888 committed May 7, 2024
1 parent 8b31dfa commit 0d79e45
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

/**
* @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
* @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation
* {@link DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
Expand Down Expand Up @@ -375,6 +375,20 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
}

@Override
public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector,
long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums, pullCallback);
}

@Override
public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector,
long offset,
int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums);
}

@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param mq from which message queue
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* null or * expression,meaning subscribe all
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
Expand Down Expand Up @@ -121,7 +120,7 @@ void pull(final MessageQueue mq, final String subExpression, final long offset,
InterruptedException;

/**
* Pulling the messages in a async. way. Support message selection
* Pulling the messages in a async way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
Expand Down Expand Up @@ -150,6 +149,23 @@ void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, fina
final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;

/**
* Pulling the messages through callback function,if no message arrival,blocking. Support message selection
*/
void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector,
final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;

/**
* Pulling the messages,if no message arrival,blocking some time. Support message selection
*
* @return The resulting {@code PullRequest}
*/
PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector,
final long offset, final int maxNums) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;

/**
* Update the offset
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,21 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}


public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
this.isRunning();
Expand Down

0 comments on commit 0d79e45

Please sign in to comment.