借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题
扫描二维码
随时随地手机看文章
背景
我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。
MQ的优势和缺点
MQ是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。
但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。
消息可靠性的应对
消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker的实时刷盘持久化,消费端的手动ACK 。
这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动ACK机制。
手动ACK的问题
手动ACK可以保证消息一定被消费,但是需要确保手动ACK的顺序和消息顺序一致,为什么?
消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的offset进行拉取的,如果commit offset的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。
因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的ACK操作是顺序的,怎么办,难道只能同步拉消费取然后ACK么。
解决方案
最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。
某次看JUC中的AQS的时候,启发了我。
我们平时用的类似CountDownLauch这些并发工具类,不也是处理的多线程协作的问题么。
我们的场景完全没有AQS复杂,借鉴它的思路,应该是没有问题的。
- 创建双端队列,队列节点中需要维护自身处理状态state,和对应msg的offset。
- 服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。
- 消息消费完之后,通知队列中对应的节点,更新状态为完成。
- 队列头被更新后出队列,提交offset,并判断新的队列头的状态,直到遇到state是未完成的head时阻塞。undefined
方案解析
该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终commit offset时有序。
在最差情况下(即head节点对应的msg最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。
异步通知的实现
public class MSGFuture { /*全局变量,存放msg对应的future对象*/ private static final MapFUTURES = new ConcurrentHashMap (); /*全局不变唯一标识*/ private final long id; /*最长等待时间*/ private final int timeout; /*并发锁*/ private final Lock lock = new ReentrantLock(); /*通知条件*/ private final Condition done = lock.newCondition(); /*开始时间*/ private final long start = System.currentTimeMillis(); /*业务结果*/ private volatile Object response; }
//构造函数 public MSGFuture(Request request, int timeout) { /*全局自增ID*/ this.id = request.getrId(); /*超时时间*/ this.timeout = timeout > 0 ? timeout : 1000; /*放入全局变量*/ FUTURES.put(id, this); }
//业务处理结果更新 public static void received(long id, Object response) { MSGFuture future = FUTURES.remove(id); if (future != null) { future.doReceived(response); } else { logger.warn("response return timeout,id:"+id); } }
//结果更新,通知等待条件 private void doReceived(Object res) { lock.lock(); try { response = res; done.signal(); } finally { lock.unlock(); } }
//异步等待获取结果 public Object get(int timeout) throws TimeoutException { if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(); } } return returnFromResponse(); }
总结
看到这里,有同学会说,这个和AQS有啥关系呀~
其实,只是处理思路的一种借鉴,比如state状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。
总之一句话,别说背八股文没用,多多了解会有大帮助~
免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!