store模块是rocketmq的核心模块。主要功能有:
负责消息存储,包括写消息,刷盘。
消息保存在默认值为${user.home}\store\commitlog
文件夹下,可以通过配置项storePathCommitLog
修改。所有的消息都写入一个逻辑文件,每个逻辑文件包含大小相等的物理文件。
写消息在不同的场景下面会有不同的逻辑。
每条消息要写到磁盘以后才算完成。
在同步刷盘的场景下,会有一个定期检查消息是否已经写入磁盘的线程:GroupCommitService
,除了检查还会进行刷盘的操作 。写消息的时候会生成一个GroupCommitRequest
提交到GroupCommitService
,并等待被唤醒或者超时。当GroupCommitService
发现已经刷盘的最后一个消息的索引大于等于本消息的索引时就会唤醒GroupCommitRequest
。
备注:以上的场景还依赖于消息的属性WAIT
,只有该属性为空或者为true
才会执行同步刷盘逻辑,默认是空的。
在异步刷盘的场景下,会有一个把数据刷到磁盘的辅助线程:FlushRealTimeService
。写消息仅仅唤醒该线程就结束了写盘操作。
每条消息要等一个从broker同步完才算完成。
在主从同步的场景下,会有一个定期检查消息是否已经被从broker同步的辅助线程:GroupTransferService
。写消息的时候会生成一个GroupCommitRequest
提交给GroupTransferService
,并等待被唤醒或者超时。当GroupTransferService
发现从broker已经同步的最后一个消息的索引大于本次消息的索引时就会唤醒GroupCommitRequest
。
使用了写buffer以后,写消息的全部逻辑就是把消息写入buffer。同时,系统会有一个线程CommitRealTimeService
定期把消息写入文件。
org.apache.rocketmq.store.CommitLog
每个topic对应多个消费队列,这个是提高消费并发度的前提。
每个消费队列对应一个逻辑文件,文件中对应每个消息的内容大小是固定的20个字节,包含消息的偏移量,大小以及tag哈希值。
数据保存在目录${rootpath}/consumequeue
下面,rootpath
通过配置项storePathRootDir
指定,默认的是${user.home}/store
。
${rootpath}/consumequeue
└── 0%default // topic
├── 0 // queue 0
│ └── 00000000000000000000
├── 1 // queue 1
│ └── 00000000000000000000
├── 2 // queue 2
│ └── 00000000000000000000
└── 3 // queue 3
└── 00000000000000000000
|<----- 8 byte ----->|<- 4 byte ->|<------ 8 byte ------>|
+--------------------+------------+----------------------+
| commitlog offset | size | message tag hash code|
+--------------------+------------+----------------------+
通过线程ReputMessageService
的分派消息的逻辑执行。
系统每隔1000ms(可配置)进行一次消费队列的写盘操作。
org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService
系统每隔10s(可以配置)执行尝试删除过期的消息。
storePathCommitLog
指定,默认是 $HOME/store/commitlog
。storePathRootDir
指定,默认是 $HOME/store
。rocketmq.broker.diskSpaceWarningLevelRatio
指定,默认是 0.9。rocketmq.broker.diskSpaceCleanForciblyRatio
指定,默认是 0.75。正常清理过期的消息,过期时间可以通过配置项 fileReservedTime
指定,默认是72小时。
清理上次没有清理成功的消息,这是因为消息被清理时,其他地方正在使用。每隔一段时间执行一次,同时如果距离上次被清理时间超过了一段时间会被强制清理。
redeleteHangedFileInterval
指定执行周期,默认120s。destroyMapedFileIntervalForcibly
指定强制清理的时间,默认120s。org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService
随着消息的清理,包含已经清理掉消息的消费队列以及索引就变得没有用处了,所以系统每隔100ms(可以配置)执行清理消费队列和索引。
获取当前消息的最小偏移量,这个偏移量随着消息的清理会不停的变化。
org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService
消息索引是方便用户查询消息的一个结构。系统可以通过配置项messageIndexEnable
开关消息索引,默认是打开的。索引允许重复构建,通过配置项duplicationEnable
指定。系统启动的时候,如果允许重复索引会重头构建,不然就从当前文件大小开始。
索引的key包含消息的两个属性:
KEYS
,支持多个值,每个值之间通过空格分割。UNIQ_KEY
。索引的内容是消息的偏移量和时间(秒的精度)。
数据保存在目录${rootpath}/index
下面,rootpath
通过配置项storePathRootDir
指定,默认的是${user.home}/store
。
index/
└── 20171225143756745
每个文件内容分成3部分,header, slot table和index linked list。组织如下:
|<-- 40 byte -->|<--- 500w --->|<--- 2000w --->|
+---------------+------------------+-------------------+
| header | slot table | index linked list |
+---------------+------------------+-------------------+
+---------------------+--0
| beginTimestampIndex | ----> 第一条消息的保存时间
+---------------------+--8
| endTimestampIndex | ----> 最后一条消息的保存时间
+---------------------+--16
| beginPhyoffsetIndex | ----> 第一条消息的在commitlog中的偏移量
+---------------------+--24
| endPhyoffsetIndex | ----> 最后一条消息的在commitlog中的偏移量
+---------------------+--32
| hashSlotcountIndex | ----> 哈希槽数量,保存添加到本槽列表的最新索引位置
+---------------------+--36
| indexCountIndex | ----> 索引数量,具体索引数据
+---------------------+--40
slot table总共有500w个位置,每个位置保存的是在这个slot上的索引列表中最新的那个索引。
index linked list保存每个消息的索引数据
- +-----+ <==== [slot table]
^ | 10 |
| +-----+
| | 200 |
| +-----+ +-----------+-------------+ +-----------+-------------+<=[index linked list]
| | 18 | --> | index data| next index |=>| index data| next index |
500w +-----+ +-----------+-------------+ +-----------+-------------+
| | ... | / \
| +-----+ / \------------------------------------\
| | 90 | |<--4 byte-->|<--- 8 byte --->|<--4 byte-->|<-----4 byte----->|
| +-----+ +------------+-------------------|------------+------------------+ <= [index]
v | 100 | | key hash | commit log offset | timestamp | next index offset|
- +-----+ +------------+-------------------|------------+------------------+
通过线程ReputMessageService
的分派消息的逻辑执行。
org.apache.rocketmq.store.index
写消息写到内存映射的文件,每次去新建一个文件同时会做内存映射操作,新建过程当中根据配置会执行比较耗时的预热操作。为了加快这个操作。系统通过一个线程预分配需要的MappedFile
。具体逻辑就是在获取新的文件的时候发送两个请求,分别对应当前的需要的文件以及这个文件写满以后需要的下一个文件,然后等待直到预分配线程分配完当前需要的文件,或者超时。
org.apache.rocketmq.store.AllocateMappedFileService
当消息写入完成以后,系统有一个线程对消息可以做其他一些逻辑。比如:构建索引,消费队列,通知long pull的客户端请求。线程会维护一个消息索引,根据这个索引跟当前最大已经写入的消息的最大偏移量进行比较得到是否有消息需要处理。
当系统重启的时候,会根据duplicationEnable
来决定是否从头开始处理消息还是只处理新来的消息。在duplicationEnable
是true
的情况下,还需要设置CommitLog.confirmOffset
才能从头开始处理消息,因为默认情况下系统启动以后CommitLog.confirmOffset
和ReputMessageService.reputFromOffset
是相等的,详见代码ReputMessageService.doReput
。
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
RocketMQ的HA是最朴素的一主多从同步,主broker挂了从broker可以读数据,但是不能写,也不会自动主从切换。
AcceptSocketService
,SLAVE会启动同步线程 HAClient
。HAConnection
之后,MASTER会为连接建立两个线程WriteSocketService
和ReadSocketService
分别负责这条连接的写和读。WriteSocketService线程
主结点向从结点推送消息线程。线程会记录当前同步的位置确保同步的数据不会重复。当线程启动的时候会等待从结点上报同步进度,如果上报的结果是0,从当前新消息文件中的第一个消息偏移量开始同步(不理解* V *)。消息写完主结点之后,会通知本线程进行写消息。另外,写完消息以后会主动向从结点发送已经同步的位置,像是一个保活机制。
ReadSocketService线程
接收从结点的同步进度。它的任务就是接收从结点同步进度,然后通知等待从结点写完的消息。
HAClient线程
从结点和主结点同步线程。主要工作
当开启写buffer的时候,主从同步的逻辑中使用到的当前消息的最大索引计算逻辑是不一样的。在这种情况下,系统会有一个线程CommitRealTimeService
负责把写buffer中的数据写入文件。只有写入以后数据,才会被同步到从broker。也就是说主从同步的实时性还会受到这个线程的影响。
当我们使用主从同步模式的时候,消息要等到主、从都写完才能返回。在这个过程中,除了主从同步逻辑以外还有消息等待从结点写完成的逻辑。这个逻辑是通过GroupTransferService
完成的。大致流程如下:消息写完主结点后向GroupTransferService
发送等待从结点写完请求,GroupTransferService
只做一件事情,就是不断对比当前从结点同步进度与当前接收到的消息物理偏移量,如果从结点的同步进度大,说明消息已经写入从结点,随即通知消息已经写完。另外,从结点的同步进度是通过ReadSocketService
接收到从结点主动发过来的;当消息写完主结点之后会通知主结点往从结点写消息服务WriteSocketService
。
org.apache.rocketmq.store.ha.HAService
org.apache.rocketmq.store.ha.HAConnection
实现了定时调度某个消息的功能。用户通过给消息设置DELAY
属性值来实现。
系统包含了一个名字为SCHEDULE_TOPIC_XXXX
的topic,当消息指定了DELALY
属性时,消息就会被发送到topic SCHEDULE_TOPIC_XXXX
中,同时会保存原来的topic、消费队列、以及其他属性值。这些值都作为消息的属性来保存。
系统通过配置项messageDelayLevel
预定义可以延迟多长时间,同时每个延迟的级别对应着的消费队列。
系统通过一个定时器,周期性从每个延迟级别对应的消费队列中拿取消息,并检查是否到期,如果到期就会把消息放入到原来的topic和队列中,同时会把先前用于保存原来消息的属性值删除,并设置投递时间。
org.apache.rocketmq.store.schedule.ScheduleMessageService
消息的统计。包含:
putMessageTimesTotal
putMessageDistributeTime
putMessageEntireTimeMax
putMessageSizeTotal
、平均大小putMessageAverageSize
putTps
getMessageEntireTimeMax
getFoundTps
、没有命中TPS getMissTps
以及总的请求TPSgetTotalTps
getTransferedTps
org.apache.rocketmq.store.StoreStatsService
如果开启了这个功能,系统启动时会向系统申请多块写buffer。每块buffer都会被锁在内存中。这个buffer只会被commitlog使用,写消息的时候写到这些buffer。
org.apache.rocketmq.store.TransientStorePool
rocketmq中的索引、消费队列、消息这些数据都通过内存映射进行读写。
org.apache.rocketmq.store.MappedFileQueue
表示org.apache.rocketmq.store.MappedFile
表示它们之间的关系:
|<- MappedFileQueue ->|
+-------------------+--------------------+-------------+-------------------+------------------+
|000000000000000000 | 000000000000000100 | ... |000000000000010000 |000000000000020000|
+-------------------+--------------------+-------------+-------------------+------------------+
|<- MappedFile-0 ->|<- MappedFile-1 ->| ... |<- MappedFile-n-1->|<- MappedFile-n ->|
+-------------------+--------------------+-------------+-------------------+------------------+
一个 MappedFileQueue
由多个 MappedFile
组成,每个 MappedFile
文件大小相等,文件名是32个字符,并且表示当前文件中第一个记录在 MappedFileQueue
所代表的逻辑文件中的偏移量。
MappedFile
在一个多线程环境里面,在使用的时候有可能已经被执行了删除操作,通过引用计数的方式进行安全管理MappedFile
的生命周期。
MappedFile
时通过先把文件映射的内存都写一遍,内核为文件分配物理页。mlock
锁住文件映射的物理内存,确保这部分内存不被交换出去。madvise
通知内核这部分数据将来会读到。建立一个buffer池,这些buffer是堆外内存。buffer所占用的内存是不会被交换出去的,同时也会通知内核这部分数据将来会读到。MappedFile
新建的时候可以通过这个池子获得buffer作为写的buffer。
org.apache.rocketmq.store.MappedFileQueue
org.apache.rocketmq.store.MappedFile
4.1.0
RocketMQ design