RocketMQ push模式的实现细节

Rocketmq使用常轮询的方式实现了push功能。主要包括几个组件:

  1. DefaultMQPushConsumerImpl:拉消息的类型。
  2. ProcessQueue:保存拉出来的消息。
  3. PullMessageService:执行拉消息服务。
  4. ConsumeMessageService:消费消息服务。
  5. ReblanceService:负载均衡服务。

类关系

(真想吐槽!)

执行过程

DefaultMQPushConsumerImpl

DefaultMQPushConsumerImpl实现了消费者的接口。同时是个启动者,通过它直接或间接启动了拉消息服务,消费消息服务。

其中提供了一个重要的接口pullMessage。该接口的流程如下:

在拉消息过程中,做了流控,防止拉的太快,消费的太慢。主要从三个方面检测:

  1. 从某个消费队列拉取的等待消费的消息数量。如果超过阀值,延迟50ms后再次拉取消息。阀值默认是1000。如果设置了topic级别的阀值(默认没有限制),在队列负载均衡以后会重新计算,具体为topic级别的阀值除以当前负责的消费队列数量。主要配置变量:DefaultMQPushConsumerImpl.pullThresholdForQueueDefaultMQPushConsumerImpl.pullThresholdForTopic
  2. 从某个消费队列拉取的等待消费的消息大小(只考虑body)。同样,超过阀值就会延迟50ms后再次拉取消息。阀值默认是100M。如果topic设置了级别(默认没有限制),队列负载均衡以后会重新计算队列的限制,具体为topic级别的阀值除以当前负责的消费队列数量。主要配置变量:DefaultMQPushConsumerImpl.pullThresholdSizeForQueueDefaultMQPushConsumerImpl.pullThresholdSizeForTopic
  3. 在并发消费模式下,从某个消费队列拉取的等待消费的消息中,在消费队列中的最大位置和最小位置之间差别。如果超过阀值,也会延迟50ms后再拉取消息。默认是2000,这里可能会存在误判。因为,有条件拉取消息的时候,是有可能出现同一个消费队列中拉到的两个消息在队列中的位置距离很远。

几个考虑:

ProcessQueue

保存push的消费者拉到的消息。同时,有序消费模式还记录了情况下正在消费的消息。

PullMessageService

PullMessageService只负责拉取消息,它会调用DefaultMQPushConsumerImpl.pullMessage

ReblanceService执行负载均衡的时候如果发现被分配了新的消息队列就会最终调用PullMessage.executePullRequestImmediately执行拉取消息。代码执行路径:

ReblanceService.run
->MQClientInstance.doReblance
->MQConsumerInnter.doReblance[DefaultMQPushConsumerImpl.doReblance]
->ReblanceImpl.doReblance
->ReblanceImpl.dispatchPullRequest[ReblancePushImpl.dispatchPullRequest]
->DefaultMQPushConsumerImpl.executePullRequestImmediately
->PullMessage.executePullRequestImmediately

另外,在DefaultMQPushConsumerImpl.pullMessage执行时,也会根据条件调用PullMessageService.executePullRequestImmediatelyPullMessageService.executeTaskLater或者PullMessageService.executePullRequestLater触发拉取消息。

ConsumeMessageService

消费服务分并发消费和顺序消费,主要区别在于提交消费任务逻辑,消费逻辑和处理消费结果的逻辑,以及对message queue的处理逻辑。另外,顺序消费是指在同一个消费队列里面的消息顺序消费。

提交消费任务

并发消费:把消息分成多个批次并发处理,一批多少个消息是自定义的,默认是1。如果提交异常,则延迟5s后提交。

顺序消费:依赖于process queue是否正在被消费,这样避免同时消费多个不同的消息,不然就没法保证有序了。

消费逻辑

下图中左边是*并发消费*,右边是*顺序消费*。

消费消息的时候,在可能停顿的执行点上面都加上了process queue是否已经drop的检查。

因为提交任务的方式不一样导致了不同模式下面消费逻辑的差别。

并发消费:只考虑当前的消息即可。

顺序消费:从process queue中取消息。消费的时候需要确保:

  1. 每个消费队列某一时候只有一个消费请求被执行。
  2. 每个消费队列某一时刻只有一个地方在执行用户的消费逻辑。

以上两个条件中只要一个条件不满足,就没法保证消息顺序消费。另外,第一个逻辑需要的锁,是因为消费慢,同时队列被分配别的消费者,在消费结束之前又分配回来了,就有可能导致1条件不满足,所以需要加锁。在代码层面第一个逻辑需要的锁已经确保了第二个逻辑。消费之前需要锁的原因是为了避免,用户还在消费的时候向broker解锁。

锁的逻辑

只有message queue被锁住了才能消费。客户端向服务端发送锁的请求成功以后才算锁成功。同时锁会有一个过期时间。在客户端这边定时向broker发送锁的请求,所得粒度是group+clientID,过期时间是30s。在服务端这边,锁了的过期时间是60s,这个时间以后能够接收其他锁的请求。

在负载均衡的时候,检查一个消费队列发现不属于自己或者长时间没有拉的时候就会把这个消费队列移除掉。移除的逻辑比较有意思,为了确保这个消费队列正在被消费不会被移除,这里使用了一个消费锁。移除的时候尝试获得这个锁,如果超过1s还没有获得就会等待下一次负载均衡的检查,如果获得了锁就会延迟20s再向broker发送解锁请求。这里的延迟,有个效果就是可能这时候已经向broker发送了拉消息的请求,如果在它返回之前又把队列分配给自己了,那么就有可能两个触发一个拉消息的请求,这个时候就会同时有两个拉消息的请求,那么拉出来重复的消息。

处理消费结果

下图中左边是并发消费,右边是顺序消费。

处理消费结果的逻辑主要是处理消费失败的消息。

并发消费:如果是在广播模式下,直接丢弃了。如果是在集群模式下面会尝试把消息发回broker,如果发送失败的话,就会把这些发送失败的消息延迟提交消费。

顺序模式:如果是ROLLBACK,把消息放回,再次消费。如果是SUSPEND_CURRENT_QUEUE_A_MOMENT则会判断是否需要停止一段时间再消费。通过检查消费次数,当超过预定的值(默认是没有限制)就会把消息发回broker。如果消息都已经发回broker,就提交消息接下去消费,否则就停一会,把当前的消息延迟提交消费。

处理message queue

并发消费:定时清理长时间没法消费的消息,默认是15分钟。

顺序消费:在集群模式下面,定时向broker锁住message queue,锁的粒度是group+clientID。

← RocketMQ HA实现 RocketMQ offset管理 →
存档 关于