kafka进阶-延时队列
在kafka中要如何实现延时队列呢?很遗憾kafka官方到目前为止并没有给出延时队列的直接实现。也就是说我们需要自己来实现这一部分的功能。
延时队列
实现的原理描述如下:
根据业务需要,自定义几个特殊主题。比如:topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m。分别表示延迟1秒,延迟5秒,延迟5分钟,延迟30分钟。当然具体延迟多少需要根据你自己的业务来设定。但有一点就是延迟的时间间隔是需要固定的。
自己实现一个delay service。可以是java或者其它语言实现。delay service 同时消费上面定义的主题,然后等待延时时间的到达。时间到达后,把消息转发至业务主题。
业务处理正常消费业务主题,处理消息
举个例子,比如火车票订单,如果30分钟内未支付,则自动取消订单。那么用时序图描述延时处理过程就是正面这样的。
autonumber business -> kafka : 发送订单消息至 topic-delay-5m participant ds as "delay service" kafka -> ds : 从 topic-delay-5m 消费订单消息 ds -> ds : 等待5分钟 ds -> kafka : 把订单消息发送至 topic-business kafka -> business : 从 topic-business 消费订单消息,并处理
注意事项
实现delay service 里需要注意:
因为consumer是单线程的,所以理论上有多少个延迟主题,就需要创建多少个线程。像上面topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m 这4种主题,就需要4个线程来处理。延迟主题少问题不大,但延迟主题如果比较多的话,还是比较难受的。
需要手动提交偏移量,因为delay service 可能会因为升级或者故障,导致重启,这个时候消息是不能漏的,所以一定要消息已经转发到业务主题后,再提交偏移量,防止漏消息。
重复消息的处理,因为delay service 需要先把消息发送到业务 主题,再提交偏移量,就有可能出现 消息发送到业务主题后,还没来的及发送偏移量,delay service 就因电力故障无法正常服务了,下次重启后,就可能继续发送已经发送过的消息。所以业务上建议是要做到幂等,以实现容错。
重试队列 与 死信队列
我们在 延时队列的基础上实现重试队列就比较简单了,当某个消息处理失败时,就把这个消息发送到 延时队列。等时间到了就会重试处理。如果处理成功,则结束。如果处理失败则重试次数加1,再一次进入延时队列。而如果超过了重试次数,则写入死信队列,作为记录。
这里说的重试队列,死信队列都是概念上的东西,kafka本身并不提供。我们是可以在使用层实现这一类概念。下面的时序图,是一个示例。我们假设有一个订单支付成功了,有积分逻辑需要处理,但重试三次后依然失败了。时序过程描述如下:
autonumber participant os as "订单微服务" participant ds as "延时微服务" participant is as "积分微服务" os -> kafka : 发送支付成功消息至 topic-integral kafka -> is : 从 topic-integral 消费订单消息 is -> is : 积分处理第1次失败 is -> kafka : 发送重试消息至 topic-delay-1m kafka -> ds : 从 topic-delay-1m 消费订单消息 ds -> ds : 等待1分钟 ds -> kafka : 把订单消息发送至 topic-integral kafka -> is : 从 topic-integral 消费订单消息 is -> is : 积分处理第2次失败 is -> kafka : 发送重试消息至 topic-delay-1m kafka -> ds : 从 topic-delay-1m 消费订单消息 ds -> ds : 等待1分钟 ds -> kafka : 把订单消息发送至 topic-integral kafka -> is : 从 topic-integral 消费订单消息 is -> is : 积分处理第3次失败 is -> kafka : 发送消息至 topic-dead,记录死信消息
实现示例
我在github上面提交了,我的实现示例,这里简单说明一下。github地址: https://github.com/junfengliang/kafka-delay-queue
项目中的kafka-delay-queue是核心代码,kafka-delay-service是springboot集成的示例。
这里简单讲一下集成的过程: 文件目录:
1 | ├── pom.xml |
一共就三个文件。
pom.xml
1 | <dependency> |
关键就是添加依赖,说明一下,因为 kafka-delay-queue 这个包还没有传到 maven的中心仓库中。所以需要先本地到kafka-delay-queue目录下通过 mvn install编译这个jar包。
application.yml
1 | kafka: |
注意修改 kafka服务器地址,对应你自己的kafka服务器地址。topics的格式是 延时 + 逗号 + 延时主题。多个配置用 分号 分隔。相关的主题需要先手动在kafka上创建。
Application.java
1 | package cn.genlei.service; |
这里关键的部分就是run方法内的内容。我们创建一个DelayQueue实例,然后调用它的start方法,就会开始延时队列的处理了。
注间:上述代码并没有在生产环境长时间验证过,如果你需要用于生产,请谨慎使用。