概述

当你需要一个简单的MQ时,不需要:分布式、消息持久化、事务、Pub/Sub分组等复杂的功能。基于redis来实现MQ,部署简单,使用简单,具有基本功能,可满足简单的MQ需求。

Redis MQ支持两种模式

  1. Producer/Consumer模式,生产者/消费者模式;
  2. Publish/Subscribe模式,发布/订阅模式;

公共

FDP采用java语言开发,传输的消息要实现serialize接口,用于序列化。FDP所有的实体类已实现serialize接口。

生产者/消费者模式

支持消息可靠传输(生产者confirm,消息持久化,消费者Ack)。

生产者confirm:是生产者确认,生产者发送一个消息会得到同步返结果,表示成功或失败。

消息持久化:是基本Redis的持久化,可靠性取决于Redis,Redis官方建议同时使用 AOF 和 RDB两种持久化方式互补,保证数据安全。

消费者Ack:是 消费者取出消息并办理业务成功后,通知MQ删除消息。是一种安全机制。

ACK机制:

ACK是消费者取到消息并执行成功后,才通知MQ删除此消息。可保证消费者发生异常时不丢失消息。基本原理是消费者从“等待队列”取出消息,同时把此消息放在“正在处理队列”中,费者在处理过程中出错了比如消费者挂掉,那么“正在处理队列”中还有这个消息,给予了恢复的机会。经过恢复的消息,消息的先后顺序就无法保证了。

恢复机制:(停用)

当一个消息放在“正在处理队列”超过30分钟还未处理完成,被判定为消费失败。有独立线程每5分钟检查一次“正在处理队列”,并将消息放回到“等待队列”,等待被再次消费。

使用了java的Timer定时器,单线程 + 最小堆 + 不断轮询的原理。是一个线程检查多个队列,未使用多线程,检查可以不那么及时。并使用了分布式锁防止并发检查。



发布/订阅模式

支持消息可靠传输(生产者confirm,消息持久化,不支持消费者Ack)。

生产者confirm:是生产者确认,生产者发送一个消息会得到同步返结果,表示成功或失败。

消息持久化:是基本Redis的持久化,可靠性取决于Redis,Redis官方建议同时使用 AOF 和 RDB两种持久化方式互补,保证数据安全。

消费者Ack:发布/订阅模式 无ACK机制

使用

发消息

com.sicheng.common.mqredis.PublisherService   ,Publish/Subscribe模式下的发布者;

com.sicheng.common.mqredis.ProducerService    ,Producer/Consumer模式下的生产者;

 @Autowired
 private PublisherService publisherService;

 @Autowired
 private ProducerService producerService;
//发消息,发布/订阅模式
String topic="topicName-xxx";
String msg="发消息,发布/订阅模式,123456," + Thread.currentThread().getId();
publisherService.sendMessage(topic,msg);

//发消息,生产者/消费者模式
String queue="queue-1";
String msg2="发消息,生产者/消费者模式,abcdef," + Thread.currentThread().getId();
producerService.sendMessage(queue, msg2);

接收消息

/shop-web-admin/src/main/resources/spring-context-mqredis.xml ,在些配置文件中,配置监听器,当有消息到达时回调。

消费者

当有消息到达时被回调。

/**
 * 消费者
 * Redis 实现的MQ,Producer/Consumer模式,生产者/消费者模式;
 * 
 * @author zhaolei
 */
public class ConsumerImpl extends Consumer {

 @Override
 public void onMessage(Message message, String queueName) {
  System.out.println("接收到消息:"+message.getValue()+",msgiId:"+message.getId()+",queueName:"+queueName);
 }
}

订阅者

当有消息到达时被回调。

/**
 * 订阅者
 * Redis 实现的MQ,发布/订阅模式,本类是订阅者
 * publish/subscribe模式
 * 
 * @author zhaolei
 */
public class SubscribeImpl implements MessageListener {

 @Autowired
 private RedisTemplate<Object, Object> redisTemplate;

    /**
     * 当有消息到达时,会回调本方法,用于接收消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<?> valueSerializer = redisTemplate.getDefaultSerializer();
        Object deserialize = valueSerializer.deserialize(message.getBody());
        String channelPattern="";
        if(pattern!=null) {
          channelPattern=new String(pattern);
        }
        String channelName=new String(message.getChannel());
        String s="channelName="+channelName+",channelPattern="+channelPattern+",ThreadId="+Thread.currentThread().getId();
        s+=",收到的mq消息:" + deserialize;
        System.out.println( s );
    }
}

不足之处

基于Redis实现的MQ,不支持分布式、消息持久化不能达到100%可靠、不支持事务、不支持Pub/Sub分组等。

生产者/消费者模式下的ACK机制不能保证100%恢复成功,恢复过程中发生异常会导致消息丢失。

他只适合一些简单的MQ应用场景,当你要求不高,不想额外搭建专业MQ服务器时,可使用Redis实现的MQ。

其它高可靠、高吞吐量的场景建议使用ActiveMQ,RabbitMQ,ZeroMQ,Kafka,RocketMQ。