Skip to content

NaiveRedis PubSub 使用说明

heimuheimu edited this page Nov 27, 2019 · 4 revisions

Log4J 配置

# Redis 消息发布者信息日志
log4j.logger.NAIVEREDIS_PUBLISHER_LOG=INFO, NAIVEREDIS_PUBLISHER_LOG
log4j.additivity.NAIVEREDIS_PUBLISHER_LOG=false
log4j.appender.NAIVEREDIS_PUBLISHER_LOG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.NAIVEREDIS_PUBLISHER_LOG.file=${log.output.directory}/naiveredis/publisher.log
log4j.appender.NAIVEREDIS_PUBLISHER_LOG.encoding=UTF-8
log4j.appender.NAIVEREDIS_PUBLISHER_LOG.DatePattern=_yyyy-MM-dd
log4j.appender.NAIVEREDIS_PUBLISHER_LOG.layout=org.apache.log4j.PatternLayout
log4j.appender.NAIVEREDIS_PUBLISHER_LOG.layout.ConversionPattern=%d{ISO8601} %-5p [%F:%L] : %m%n

# Redis 消息订阅者信息日志
log4j.logger.NAIVEREDIS_SUBSCRIBER_LOG=INFO, NAIVEREDIS_SUBSCRIBER_LOG
log4j.additivity.NAIVEREDIS_SUBSCRIBER_LOG=false
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG.file=${log.output.directory}/naiveredis/subscriber.log
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG.encoding=UTF-8
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG.DatePattern=_yyyy-MM-dd
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG.layout=org.apache.log4j.PatternLayout
log4j.appender.NAIVEREDIS_SUBSCRIBER_LOG.layout.ConversionPattern=%d{ISO8601} %-5p [%F:%L] : %m%n

# Redis 消息订阅者消费过慢信息日志
log4j.logger.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG=INFO, REDIS_SUBSCRIBER_SLOW_CONSUME_LOG
log4j.additivity.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG=false
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG.file=${log.output.directory}/naiveredis/subscriber_slow_consume.log
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG.encoding=UTF-8
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG.DatePattern=_yyyy-MM-dd
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG.layout=org.apache.log4j.PatternLayout
log4j.appender.REDIS_SUBSCRIBER_SLOW_CONSUME_LOG.layout.ConversionPattern=%d{ISO8601} %-5p [%F:%L] : %m%n

Spring 配置

自动重连 Redis 消息发布客户端

    <!-- 自动重连 Redis 消息发布客户端,Redis 操作超时时间默认为 5 秒,Redis 操作过慢最小时间默认为 50 毫秒,心跳检测时间默认为 30 秒,Java 对象与字节数组转换器将会使用 RedisPublishClient 实现指定的默认转换器 -->
    <bean id="autoReconnectRedisPublishClient" class="com.heimuheimu.naiveredis.spring.AutoReconnectRedisPublishClientFactory" destroy-method="close">
        <constructor-arg index="0" value="127.0.0.1:6379" /> <!-- Redis 地址 -->
        <constructor-arg index="1">
            <!-- 在 Redis 消息发布客户端不可用时进行实时通知 -->
            <bean class="com.heimuheimu.naiveredis.pubsub.NoticeableAutoReconnectRedisPublishClientListener">
                <constructor-arg index="0" value="your-project-name" /> <!-- 当前项目名称 -->
                <constructor-arg index="1" ref="notifierList" /> <!-- 报警器列表,报警器的信息可查看 naivemonitor 项目 -->
            </bean>
        </constructor-arg>
    </bean>

自动重连 Redis 消息订阅客户端

    <!-- 自动重连 Redis 消息订阅客户端,PING 命令发送时间间隔为 30 秒,Java 对象与字节数组转换器将会使用 RedisSubscribeClient 实现指定的默认转换器,单个 Redis 消息订阅者消费过慢最小时间为 50 毫秒 -->
    <bean id="autoReconnectRedisSubscribeClient" class="com.heimuheimu.naiveredis.spring.AutoReconnectRedisSubscribeClientFactory" destroy-method="close">
        <constructor-arg index="0" value="127.0.0.1:6379" /> <!-- Redis 地址 -->
        <constructor-arg index="1">
            <util:list>
                <!-- Redis channel 消息订阅者列表,允许为 {@code null} 或空,但不允许与 patternSubscriberList 同时为空 -->
            </util:list>
        </constructor-arg>
        <constructor-arg index="2">
            <util:list>
                <!-- Redis pattern 消息订阅者列表,允许为 {@code null} 或空,但不允许与 channelSubscriberList 同时为空 -->
            </util:list>
        </constructor-arg>
        <constructor-arg index="3">
            <!-- 在 Redis 消息订阅客户端不可用时进行实时通知 -->
            <bean class="com.heimuheimu.naiveredis.pubsub.NoticeableAutoReconnectRedisSubscribeClientListener">
                <constructor-arg index="0" value="your-project-name" /> <!-- 当前项目名称 -->
                <constructor-arg index="1" ref="notifierList" /> <!-- 报警器列表,报警器的信息可查看 naivemonitor 项目 -->
            </bean>
        </constructor-arg>
    </bean>

Falcon 监控 Spring 配置

    <!-- 监控数据采集器列表 -->
    <util:list id="falconDataCollectorList">
        <!-- Redis 消息发布数据采集器 -->
        <bean class="com.heimuheimu.naiveredis.monitor.falcon.PublisherDataCollector" />
        <!-- Redis 消息订阅数据采集器 -->
        <bean class="com.heimuheimu.naiveredis.monitor.falcon.SubscriberDataCollector" />
    </util:list>
    
    <!-- Falcon 监控数据上报器 -->
    <bean id="falconReporter" class="com.heimuheimu.naivemonitor.falcon.FalconReporter" init-method="init" destroy-method="close">
        <constructor-arg index="0" value="http://127.0.0.1:1988/v1/push" /> <!-- Falcon 监控数据推送地址 -->
        <constructor-arg index="1" ref="falconDataCollectorList" />
    </bean>

代码示例

Redis 消息发布

    public class RedisMessagePublisher {
        
        @Autowired
        private AutoReconnectRedisPublishClient publishClient;
        
        public void publish() {
            // 往 test-channel 频道发送一条字符串消息,消息内容为:"test message"
            client.publish("test-channel", "test message");
        }
    }

Redis 消息订阅

channel 消息订阅者

    private class RedisChannelSubscribe implements NaiveRedisChannelSubscriber {

        @Override
        public List<String> getChannelList() {
            // 当前订阅者订阅的频道为 "test-channel"
            return Arrays.asList("test-channel");
        }

        @Override
        public <T> void consume(String channel, T message) {
            // 当接收到 "test-channel" 频道的消息时,将执行此方法
            System.out.println("channel: " + channel + ", message: " + message);
        }
    }

pattern 消息订阅者

    private static class SimplePatternSubscribe implements NaiveRedisPatternSubscriber {
    
        @Override
        public List<String> getPatternList() {
            // 当前订阅者订阅的 pattern 为 "test-*",将会接受所有以 "test-" 开头的频道消息
            return Arrays.asList("test-*");
        }

        @Override
        public <T> void consume(String pattern, String channel, T message) {
            // 当接收到以 "test-" 开头的频道的消息时,将执行此方法
            System.out.println("pattern: " + pattern + ", channel: " + channel + ", message: " + message);
        }
    }
Clone this wiki locally