Rocketmq使用常轮询的方式实现了push功能。主要包括几个组件:
类关系
(真想吐槽!)
执行过程
DefaultMQPushConsumerImpl
实现了消费者的接口。同时是个启动者,通过它直接或间接启动了拉消息服务,消费消息服务。
其中提供了一个重要的接口pullMessage
。该接口的流程如下:
在拉消息过程中,做了流控,防止拉的太快,消费的太慢。主要从三个方面检测:
DefaultMQPushConsumerImpl.pullThresholdForQueue
和DefaultMQPushConsumerImpl.pullThresholdForTopic
。DefaultMQPushConsumerImpl.pullThresholdSizeForQueue
和DefaultMQPushConsumerImpl.pullThresholdSizeForTopic
。几个考虑:
NO_NEW_MSG/NO_MATCHED_MSG
情况下,correctTagsOffset
的逻辑为什么需要考虑有没有消息?如果还有消息说明本地还没有消息没被消费,此时更新的offset是服务端返回的,存在比没有被消费的消息偏移量大的情况。
OFFSET_ILLEAGL
的情况下为什么要过10s以后才去更新offsetstore,保存offset,在reblance中移除process queue?出现这个问题是因为NO_MATCHED_LOGIC_QUEUE/NO_MESSAGE_IN_QUEUE/OFFSET_OVERFLOW_BADLY/OFFSET_TOO_SMALL
这四种情况,而这些情况可能发生在服务端在恢复数据的时候,因此考虑是暂停消费这个队列。如果drop之后不延迟,就会有可能又去拉取消息了。
保存push的消费者拉到的消息。同时,有序消费模式还记录了情况下正在消费的消息。
PullMessageService
只负责拉取消息,它会调用DefaultMQPushConsumerImpl.pullMessage
。
当ReblanceService
执行负载均衡的时候如果发现被分配了新的消息队列就会最终调用PullMessage.executePullRequestImmediately
执行拉取消息。代码执行路径:
ReblanceService.run
->MQClientInstance.doReblance
->MQConsumerInnter.doReblance[DefaultMQPushConsumerImpl.doReblance]
->ReblanceImpl.doReblance
->ReblanceImpl.dispatchPullRequest[ReblancePushImpl.dispatchPullRequest]
->DefaultMQPushConsumerImpl.executePullRequestImmediately
->PullMessage.executePullRequestImmediately
另外,在DefaultMQPushConsumerImpl.pullMessage
执行时,也会根据条件调用PullMessageService.executePullRequestImmediately
、PullMessageService.executeTaskLater
或者PullMessageService.executePullRequestLater
触发拉取消息。
消费服务分并发消费和顺序消费,主要区别在于提交消费任务逻辑,消费逻辑和处理消费结果的逻辑,以及对message queue的处理逻辑。另外,顺序消费是指在同一个消费队列里面的消息顺序消费。
并发消费:把消息分成多个批次并发处理,一批多少个消息是自定义的,默认是1。如果提交异常,则延迟5s后提交。
顺序消费:依赖于process queue是否正在被消费,这样避免同时消费多个不同的消息,不然就没法保证有序了。
下图中左边是并发消费,右边是顺序消费。
消费消息的时候,在可能停顿的执行点上面都加上了process queue是否已经drop的检查。
因为提交任务的方式不一样导致了不同模式下面消费逻辑的差别。
并发消费:只考虑当前的消息即可。
顺序消费:从process queue中取消息。消费的时候需要确保:
以上两个条件中只要一个条件不满足,就没法保证消息顺序消费。另外,第一个逻辑需要的锁,是因为消费慢,同时队列被分配别的消费者,在消费结束之前又分配回来了,就有可能导致1条件不满足,所以需要加锁。在代码层面第一个逻辑需要的锁已经确保了第二个逻辑。消费之前需要锁的原因是为了避免,用户还在消费的时候向broker解锁。
锁的逻辑
只有message queue被锁住了才能消费。客户端向服务端发送锁的请求成功以后才算锁成功。同时锁会有一个过期时间。在客户端这边定时向broker发送锁的请求,所得粒度是group+clientID,过期时间是30s。在服务端这边,锁了的过期时间是60s,这个时间以后能够接收其他锁的请求。
在负载均衡的时候,检查一个消费队列发现不属于自己或者长时间没有拉的时候就会把这个消费队列移除掉。移除的逻辑比较有意思,为了确保这个消费队列正在被消费不会被移除,这里使用了一个消费锁。移除的时候尝试获得这个锁,如果超过1s还没有获得就会等待下一次负载均衡的检查,如果获得了锁就会延迟20s再向broker发送解锁请求。这里的延迟,有个效果就是可能这时候已经向broker发送了拉消息的请求,如果在它返回之前又把队列分配给自己了,那么就有可能两个触发一个拉消息的请求,这个时候就会同时有两个拉消息的请求,那么拉出来重复的消息。
下图中左边是并发消费,右边是顺序消费。
处理消费结果的逻辑主要是处理消费失败的消息。
并发消费:如果是在广播模式下,直接丢弃了。如果是在集群模式下面会尝试把消息发回broker,如果发送失败的话,就会把这些发送失败的消息延迟提交消费。
顺序模式:如果是ROLLBACK
,把消息放回,再次消费。如果是SUSPEND_CURRENT_QUEUE_A_MOMENT
则会判断是否需要停止一段时间再消费。通过检查消费次数,当超过预定的值(默认是没有限制)就会把消息发回broker。如果消息都已经发回broker,就提交消息接下去消费,否则就停一会,把当前的消息延迟提交消费。
处理message queue
并发消费:定时清理长时间没法消费的消息,默认是15分钟。
顺序消费:在集群模式下面,定时向broker锁住message queue,锁的粒度是group+clientID。