微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

redis之mq实现发布订阅模式

 

概述

Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用redis实现发布/订阅消息队列。

  • 在Redis中,发布者没有将消息发送给特定订阅者的程序。相反,发布的消息被描述为通道,而不知道(如果有的话)可能有哪些订阅者。
  • 订阅者表示对一个或多个主题感兴趣,只接收感兴趣的消息,而不知道(如果有的话)发布者是什么。
  • 发布者和订阅者的这种解耦可以实现更大的可伸缩性和更动态的网络拓扑。

代码实现

redis实现mq的存储方式很多,可以使用list,zset及stream,这些数据的存储结构决定了怎么消费问题(消息是一次使用、允许多次使用、允许多端消息等),比如使用list,我们可以使用leftPush插入消息,使用rightPop消费消息,实现一条消息一次消息,可以参考与以示例代码

    @Test
    public void testMq() {
        for (int i = 0; i < 10; i++) {
            redistemplate.opsForList().leftPush("task-queue", "data" + i);
            log.info("插入了一个新的任务==>{}", "data" + i);
        }
        String taskId = redistemplate.opsForList().rightPop("task-queue").toString();
        log.info("处理成功,清除任务==>{}", taskId);
    }

 

1.配置代码RedisConfig.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Redistemplate;
import org.springframework.data.redis.core.StringRedistemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Autowired
    private Redistemplate redistemplate;

    /**
     * redistemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类,方便调试redis
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public Redistemplate<String, Object> redistemplate(RedisConnectionFactory redisConnectionFactory) {

        Redistemplate<String, Object> redistemplate = new Redistemplate<>();

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
        redistemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redistemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

        //使用StringRedisSerializer来序列化和反序列化redis的ke
        redistemplate.setKeySerializer(new StringRedisSerializer());
        redistemplate.setHashKeySerializer(new StringRedisSerializer());

        //开启事务
        redistemplate.setEnableTransactionSupport(true);

        redistemplate.setConnectionFactory(redisConnectionFactory);

        return redistemplate;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, topic());

        return container;
    }

    @Bean
    MessagePublisher redisPublisher() {
        return new RedisMessagePublisher(redistemplate, topic());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("messageQueue");
    }
}
View Code

2.定义消息发布接口MessagePublisher.java

package demo.data.mqRedis.config;

public interface MessagePublisher {
    void publish(String message);
}

3.发布方实现RedisMessagePublisher.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Redistemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**
 * 消息发布方
 */
public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private Redistemplate<String, Object> redistemplate;

    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher(
            Redistemplate<String, Object> redistemplate, ChannelTopic topic) {
        this.redistemplate = redistemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redistemplate.convertAndSend(topic.getTopic(), message);
    }
}

4.消息接收方RedisMessageSubscriber.java

package demo.data.mqRedis.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**
 * 消息订阅方
 */
@Service
@Slf4j
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        log.info("订阅方接收到了消息==>{}", message.toString());
    }
}

5.最后贴上application.yml配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

查看运行结果

1.编写测试用例试发布消息TestRedisMQ.java

package demo.data.mqRedis;

import demo.data.mqRedis.config.RedisMessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBoottest;
import org.springframework.test.context.junit4.springrunner;

import java.util.UUID;

@RunWith(springrunner.class)
@SpringBoottest
@Slf4j
public class TestRedisMQ {

    @Autowired
    RedisMessagePublisher redisMessagePublisher;

    @Test
    public void testMq() {
        String message = "Message " + UUID.randomUUID();
        redisMessagePublisher.publish(message);
    }
}

https://www.cnblogs.com/tqlin/p/11468257.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐