这篇文章主要讲解了“怎么用Kotlin+RocketMQ实现延时消息”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用Kotlin+RocketMQ实现延时消息”吧!
网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、成都小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了沈丘免费建站欢迎大家使用!
一. 延时消息
延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
使用延时消息的典型场景,例如:
在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。 在电商系统中,用户七天内没有评价商品,则默认好评。
这些场景对应的解决方案,包括:
轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 redis 的 zset 实现延时队列。
除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。
二. RocketMQ
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。
三. RocketMQ 实现延时消息
3.1 业务背景
我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。
3.2 生产者(Producer)
生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
首先,定义一个支持延时发送的 AbstractProducer。
abstract class AbstractProducer :ProducerBean() { var producerId: String? = null var topic: String? = null var tag: String?=null var timeoutMillis: Int? = null var delaySendTimeMills: Long? = null val log = LogFactory.getLog(this.javaClass) open fun sendMessage(messageBody: Any, tag: String) { val msgBody = JSON.toJSONString(messageBody) val message = Message(topic, tag, msgBody.toByteArray()) if (delaySendTimeMills != null) { val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!! message.startDeliverTime = startDeliverTime log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}") } val logMessageId = buildLogMessageId(message) try { val sendResult = send(message) log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody) } catch (e: Exception) { log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e) } } fun buildLogMessageId(message: Message): String { return "topic: " + message.topic + "\n" + "producer: " + producerId + "\n" + "tag: " + message.tag + "\n" + "key: " + message.key + "\n" }}
根据业务需要,增加一个支持重试机制的 Producer
@Component@ConfigurationProperties("mqs.ons.producers.xxx-producer")@Configuration@Dataclass CleanReportPushEventProducer :AbstractProducer() { lateinit var delaySecondList:List
在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。
每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。
通过 System.currentTimeMillis() + delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。
我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(
3.3 消费者(Consumer)
消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
定义 Push 类型的 AbstractConsumer:
@Dataabstract class AbstractConsumer ():MessageListener{ var consumerId: String? = null lateinit var subscribeOptions: List
再定义具体的消费者,并且在消费失败之后能够再发送一次消息。
@Configuration@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")@Dataclass CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() { val logger: Logger = LoggerFactory.getLogger(this.javaClass) override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action { if(obj is CleanReportPushEventMessage){ //清除事件 logger.info("consumer clean-report event report_id:${obj.id} ") //消费失败之后再发送一次消息 if(!cleanReportService.sendCleanReportEvent(obj.id)){ val times = obj.times+1 eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times)) } } return Action.CommitMessage } override fun getMessageBodyType(tag: String): Type? { return CleanReportPushEventMessage::class.java }}
其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)
最后,定义 ConsumerFactory
@Componentclass ConsumerFactory(val consumers: List
四. 总结
正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。
感谢各位的阅读,以上就是“怎么用Kotlin+RocketMQ实现延时消息”的内容了,经过本文的学习后,相信大家对怎么用Kotlin+RocketMQ实现延时消息这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!