概述
当你需要一个简单的MQ时,不需要:分布式、消息持久化、事务、Pub/Sub分组等复杂的功能。基于redis来实现MQ,部署简单,使用简单,具有基本功能,可满足简单的MQ需求。
Redis MQ支持两种模式
- Producer/Consumer模式,生产者/消费者模式;
- Publish/Subscribe模式,发布/订阅模式;
公共
FDP采用java语言开发,传输的消息要实现serialize接口,用于序列化。FDP所有的实体类已实现serialize接口。
生产者/消费者模式
支持消息可靠传输(生产者confirm,消息持久化,消费者Ack)。
生产者confirm:是生产者确认,生产者发送一个消息会得到同步返结果,表示成功或失败。
消息持久化:是基本Redis的持久化,可靠性取决于Redis,Redis官方建议同时使用 AOF 和 RDB两种持久化方式互补,保证数据安全。
消费者Ack:是 消费者取出消息并办理业务成功后,通知MQ删除消息。是一种安全机制。
ACK机制:
ACK是消费者取到消息并执行成功后,才通知MQ删除此消息。可保证消费者发生异常时不丢失消息。基本原理是消费者从“等待队列”取出消息,同时把此消息放在“正在处理队列”中,费者在处理过程中出错了比如消费者挂掉,那么“正在处理队列”中还有这个消息,给予了恢复的机会。经过恢复的消息,消息的先后顺序就无法保证了。
恢复机制:(停用)
当一个消息放在“正在处理队列”超过30分钟还未处理完成,被判定为消费失败。有独立线程每5分钟检查一次“正在处理队列”,并将消息放回到“等待队列”,等待被再次消费。
使用了java的Timer定时器,单线程 + 最小堆 + 不断轮询的原理。是一个线程检查多个队列,未使用多线程,检查可以不那么及时。并使用了分布式锁防止并发检查。
发布/订阅模式
支持消息可靠传输(生产者confirm,消息持久化,不支持消费者Ack)。
生产者confirm:是生产者确认,生产者发送一个消息会得到同步返结果,表示成功或失败。
消息持久化:是基本Redis的持久化,可靠性取决于Redis,Redis官方建议同时使用 AOF 和 RDB两种持久化方式互补,保证数据安全。
消费者Ack:发布/订阅模式 无ACK机制
使用
发消息
com.sicheng.common.mqredis.PublisherService ,Publish/Subscribe模式下的发布者;
com.sicheng.common.mqredis.ProducerService ,Producer/Consumer模式下的生产者;
@Autowired private PublisherService publisherService; @Autowired private ProducerService producerService; //发消息,发布/订阅模式 String topic="topicName-xxx"; String msg="发消息,发布/订阅模式,123456," + Thread.currentThread().getId(); publisherService.sendMessage(topic,msg); //发消息,生产者/消费者模式 String queue="queue-1"; String msg2="发消息,生产者/消费者模式,abcdef," + Thread.currentThread().getId(); producerService.sendMessage(queue, msg2);
接收消息
/shop-web-admin/src/main/resources/spring-context-mqredis.xml ,在些配置文件中,配置监听器,当有消息到达时回调。
消费者
当有消息到达时被回调。
/** * 消费者 * Redis 实现的MQ,Producer/Consumer模式,生产者/消费者模式; * * @author zhaolei */ public class ConsumerImpl extends Consumer { @Override public void onMessage(Message message, String queueName) { System.out.println("接收到消息:"+message.getValue()+",msgiId:"+message.getId()+",queueName:"+queueName); } }
订阅者
当有消息到达时被回调。
/** * 订阅者 * Redis 实现的MQ,发布/订阅模式,本类是订阅者 * publish/subscribe模式 * * @author zhaolei */ public class SubscribeImpl implements MessageListener { @Autowired private RedisTemplate<Object, Object> redisTemplate; /** * 当有消息到达时,会回调本方法,用于接收消息 */ @Override public void onMessage(Message message, byte[] pattern) { RedisSerializer<?> valueSerializer = redisTemplate.getDefaultSerializer(); Object deserialize = valueSerializer.deserialize(message.getBody()); String channelPattern=""; if(pattern!=null) { channelPattern=new String(pattern); } String channelName=new String(message.getChannel()); String s="channelName="+channelName+",channelPattern="+channelPattern+",ThreadId="+Thread.currentThread().getId(); s+=",收到的mq消息:" + deserialize; System.out.println( s ); } }
不足之处
基于Redis实现的MQ,不支持分布式、消息持久化不能达到100%可靠、不支持事务、不支持Pub/Sub分组等。
生产者/消费者模式下的ACK机制不能保证100%恢复成功,恢复过程中发生异常会导致消息丢失。
他只适合一些简单的MQ应用场景,当你要求不高,不想额外搭建专业MQ服务器时,可使用Redis实现的MQ。
其它高可靠、高吞吐量的场景建议使用ActiveMQ,RabbitMQ,ZeroMQ,Kafka,RocketMQ。