我们使用MQ作为消息中间件,传输一些消息的时候,必须考虑到消息丢失的可能。因为有的时候消息丢失了,会产生很严重的后果,比如消息计费数据,跟钱有关的消息。
这篇文章我们以Ro
cketMQ为例来讲解,如何设计一套全链路消息不丢失的方案。
接下来我们分别讲下生产者、broker、消费者,如何确保消息不丢失的。
1、生产者如何确保消息不丢失?
发送消息的时候,可能存在消息的丢失,就是说可能消息根本就没有进入到MQ就丢了,我们看下面的图。
图1 生产者丢失消息
解决生产者丢失消息,一般有两种方法。
(1)重试发消息
RocketMQ生产者发送消息一般有三种api:
同步发送,就是生产者向broker发送消息,阻塞当前线程等待broker响应发送结果。
异步发送,就是生产者首先创建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
Oneway发送,就是生产者只负责发送请求,不等待应答,生产者只负责把请求发出去,而不处理响应结果。
为了确保消息一定发送到了broker,我们可以采用同步发送的方式,然后等待发送的结果。一直等待,如果消息发送失败,或者MQ内部异常,我们肯定会收到一个异常,比如请求超时,或者网络错误。
如果我们在收到异常之后,就认为消息到MQ发送失败了,然后再次重试尝试发送消息到MQ,接着再次同步等待MQ返回响应给我们,这样反复重试,是否可以确保消息一定会到达MQ?
理论上一些短暂网络异常的场景下,我们是可以通过不停的重试去保证消息到达MQ的,因为如果短时间网络异常了消息一直没法发送,我们只要不停的重试,网络一旦恢复了,消息就可以发送到MQ了。
如果要是反复重试多次还是没法把消息投递到MQ,此时我们就可以直接当作消息发送失败了。
其代码就像是这样的:
try { doSomething(); // 发送消息到RocketMQ producer.sendMessage();} catch (Exception e) { for (int i = 0; i < 3; i) { // 重试发消息 producer.sendMessage(); } // 如果重试3次还是发送失败,那么此次消息就发送失败了。} 另外,如果你是本地先执行一些数据库操作,再把消息发送到RocketMQ,那么就需要注意把本地事务与发送消息到RocketMQ放在一个事务里,保证执行本地事务和发送消息要么一起成功,要么一起失败。
@Transactional(rollbackFor = Exception.class)public void payOrderSuccess() // 執行本地事务 try { doSomething(); // 发送消息到RocketMQ producer.sendMessage(); } catch (Exception e) { for (int i = 0; i < 3; i) { // 重试发消息 producer.sendMessage(); } // 如果重试3次还是发送失败,那么此次消息就发送失败了。 throw new Exception(); }} 不过使用这种方式,要考虑到接口耗时问题,如果网络异常,发送消息到RocketMQ的请求每次都到超时才返回,那么多次重试可能耗时很久,导致调用payOrderSuccess方法的接口超时异常。
(2)RocketMQ事务
RocketMQ支持事务消息机制,用事务机制保证生产者消息发送成功,这个方案在业内还是比较常用的。这个方案落地之后,他可以保证你的本地事务一旦成功,那么消息必然会被投递到MQ中去,业务系统的数据也是一致的。
MQ事务机制原理还是有一点复杂的,放着这里讲,文章篇幅会过长,所以会单独起一篇文章讲解MQ事务机制。
不管是重试发消息的方法,还是事务机制,都会大大影响系统的吞吐量。
2、broker如何确保消息不丢失?
假如现在消息提交到MQ里去了,就一定不会丢失吗?
消息进入MQ后会先落到磁盘上,但写磁盘的过程,并不是一下子就写到磁盘上的,而是先进入os cache,再由操作系统的线程不定时刷到磁盘上去。
假如此时这台机器突然宕机了,os cache里的数据就全部丢失了,此时必然导致你的消息丢失。
那怎么去确保消息写入MQ之后,MQ自己不要随便丢失数据呢?
解决这个问题的第一个关键点,就是要知道broker的刷盘策略。broker的刷盘策略有两种:异步刷盘,同步刷盘。
异步刷盘,就是你的消息即使成功写入了MQ,它也就在机器的os cache中,没有进入磁盘里,要过一会儿等操作系统自己把os cache里的数据实际刷入磁盘文件中去。
所以异步刷盘模式,写入消息的吞吐量肯定是非常高的,毕竟消息只需要进入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丢失的风险。
所以如果一定要确保数据零丢失的话,可以调整MQ的刷盘策略为同步刷盘。
RocketMQ broker的默认刷盘策略为异步刷盘,即ASYNC_FLUSH。可以将broker的配置文件中的flushDiskType配置设置为:SYNC_FLUSH同步刷盘。
同步刷盘之后,我们写入MQ的每条消息,只要MQ告诉我们写入成功了,那么就表示已经进入了磁盘文件了。
同步刷盘,broker就一定不会丢失数据吗?如果broker磁盘损坏了呢?
接着我们就要讲下,如何避免磁盘故障导致数据丢失。
其实也很简单,我们必须要对Broker使用主从架构的模式
也就是说,必须让一个Master Broker有一个Slave Broker去同步它的数据,而且你一条消息写入成功,必须是让slave Broker也写入成功,保证数据有多个冗余的副本。
这样一来,你一条消息只要写入成功了,此时主从master Broker和slave broker上都有这条数据了,此时如果你的Master Broker的磁盘损坏了,但是Slave Broker上至少还是有数据的,数据是不会因为磁盘故障而丢失的。
RocketMQ从4.5.0版本开始使用Dledger技术和基于Raft协议实现,自动故障转移,有兴趣的同学可以自行去查阅相关资料。
3 如何保证消费者消息不丢失?
假如消费者拿到了消息,就一定可以成功处理吗?
如果消费者从broker拿到一条信息了,但是消息目前还在它的内存里,还没执行具体的业务逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了。
接着消费者系统就直接崩溃了,内存里的消息就没了,业务逻辑也没执行,结果Broker已经收到他提交的消息offset了,还以为他已经处理完这条消息了。
等消费者系统重启的时候,就不会再次消费这条消息了,因为已经提交过offset,broker认为你已经成功消费过这条消息了。
所以我们在这里,我们要明确一点,即使你保证发送消息到MQ的时候绝对不会丢失,而且MQ收到消息之后一定不会把消息搞丢失,但是你的消费者系统在获取到消息之后还是可能会搞丢。
一般RocketMQ的消费者中会注册一个监听器,当你的消费者获取到一批消息之后,就会回调你的这个监听器函数,让你来处理这一批消息。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { // 执行业务逻辑 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 处理完毕后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的标志,告诉RocketMQ,这批消息我已经处理完毕了。
所以对于RocketMQ而言,只要你的消费者系统是在这个监听器的函数中先处理一批消息,基于这批消息都执行完了业务逻辑,然后返回了那个消费成功的状态,接着才会去提交这批消息的offset到broker去。
所以在这个情况下,如果你对一批消息都处理完毕了,然后再提交消息的offset给broker,接着消费者系统崩溃了,此时是不会丢失消息的。
但是,如果是消费者系统获取到一批消息之后,还没处理完,也就是还没返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态,自然没提交这批消息的offset给broker呢,此时消费者系统突然挂了,会怎么样?
在这种情况下,你对一批消息都没提交他的offset给broker,broker不会认为你已经处理完了这批消息,此时你的消费者系统的一台机器宕机了,它其实会感知到你的消费者系统的一台机器作为一个Consumer挂了,它会把你没处理完的那批消息交给生产者系统的其他机器去进行处理,所以在这种情况下,消息也绝对是不会丢失的。
在默认的Consumer的消费模式之下,必须是你处理完一批消息了,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态,表示消息都处理结束了,去提交offset到broker去。在这种情况下,一般来说是不会丢失消息的,即使你一个Consumer宕机了,他会把你没处理完的消息交给其他Consumer去处理。
但是这里我们要注意一点,就是我们不能在代码中对消息进行异步的处理,假如我们开启了一个线程去处理这批消息,然后启动线程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { new Thread() { public void run() { // 执行业务逻辑 } }.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 如果要是用这种方式来处理消息的话,那可能就会出现你开启的线程还没处理完消息呢,已经返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了,就可能提交这批消息的offset给broker了,认为已经处理结束了。
然后此时你消费者系统突然宕机,必然会导致你的消息丢失了!
因此在使用RocketMQ的场景下,我们如果要保证消费数据的时候别丢消息,你就老老实实的在回调函数里处理消息,处理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态表明你处理完毕了。
总结:
基于RocketMQ设计一套全链路消息不丢失方案,需要确保生产者、broker、消费者三者都不丢失数据。
(1)生产者不丢失消息
方案1:同步发送消息 失败重试;
方案2:事务消息机制;
(2)broker不丢失消息,开启同步刷盘策略 主从架构同步机制。
只要让一个master Broker收到消息之后同步写入磁盘,同时同步复制给其他slave Broker,再返回成功响应给生产者,此时就可以保证MQ自己不会弄丢消息
(3)消费者不丢失消息,采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,不过别采用多线程异步处理消息的方式。
虽然这一整套消息不丢失方案,可以确保消息流转过程中不丢失。但显而易见的是,你用了这套方案之后,会让你整个从头到尾的消息流转链路的性能大幅度下降,让你的MQ的吞吐量大幅度的下降。
所以一般大家不要随便一个业务里就上如此重的一套方案,要明白这背后的成本!
一般我们建议,对于跟金钱、交易以及核心数据相关的系统和核心链路,可以上这套消息零丢失方案。
而对于其他大部分没那么核心的场景和系统,其实即使丢失一些数据,也不会导致太大的问题,此时可以不采取这些方案,或者说你可以在其他的场景里做一些简化。
ckname="架构师社区" data-alias="devabc" data-signature="架构师社区,专注分享架构师技术干货,架构师行业秘闻,汇集各类奇妙好玩的架构师话题和流行的架构师动向!" data-from="0">
本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。