RocketMQ
1、概述
MQ概述
限流削峰(请求存在队列中)
异步解耦
数据采集
MQ常见协议
JMS
STOMP
AMQP
MQTT
2、RocketMQ
基本概念
消息、主题、标签、队列、消息标识(消息唯一id:生产者、broker)
系统架构
生产者
消费者
(一个消费者组,消费者消费队列不同)
Name Server(注册中心):broker管理、路由信息管理
路由注册、路由剔除(心跳包)、路由发现、选择策略:轮询
broker
组成:实体、客户端管理、存储、高可用(集群数据同步)、索引(ID)
集群部署:主备集群
主从、主备
工作流程
1、启动server、监听端口
2、Broker与serve长连接
3、创建topic
4、生产者发送消息:和server建立连接,获取路由(topic、broker)。
与broker长连接,向broker发送消息。路由信息每30s缓存在本地。
5、消费者,server连接,获取路由。与broker长连接,消费。
Consumer发送心跳,确认broker存活状态。
Topic的创建模式
集群、broker模式
读写队列
一般情况数量一样
设计目的:方便topic的缩容
写队列先缩容,读队列消费完后再缩容,不丢失任何消息。
集群搭建理论
数据复制与刷盘策略
生产者、消费者:groupname相同即可
Nameserver集群:相互不通信,启动即可
Broker集群:多个主备小集群
数据复制:master向slave复制
复制策略
刷盘:由内存写入到磁盘
同步、异步复制(ACK时机)
同步:slave成功后ack
异步:master写入后ack
刷盘策略
同步、异步刷盘
同步:broker写入磁盘ack
异步:broker写入内存ack
多master集群:宕机后无法消费
多master多slave集群
一个master一个slave:主备模式
Master处理消息读写
Slave负责消息备份和宕机后的自动切换
最好使用RAID阵列
同步双写:同步复制
安全性最高,RT响应时间长
Slave不能自动切换
RAID10磁盘阵列 + 异步复制
磁盘阵列RAID
集群搭建
3、RocketMQ的工作原理
消息的生产
1、生产者向servre发送请求,获取topic路由
2、Server返回topic路由表及broker列表
(1)路由表:map<topic,queuedata>,queuedata:broker——queue
找到queue对应的broker
(2)Broker列表
3、选择queue
4、生产者对消息特殊处理
5、生产者向broker发出RPC请求,发送到queue(底层netty)
选择算法
轮询(问题:性能)、最小投递延迟(问题:分配不均)
消息的存储
store目录
刷盘、消息、配置、队列、消息索引、全局资源锁
commitlog
第n个文件名:前n-1个文件大小之和
消息单元
consumequeue
消费队列
索引条目:
是commitlog的索引文件——定位具体消息
消息条目——偏移量、消息长度、消息tag
对文件的读写
消息写入
1、根据queueid,得到comsumequeue要写入的偏移量
2、封装消息单元
3、写入commitlog
4、形成索引条目
消息拉取
1、获取所在queue的偏移量
2、发送broker拉取请求,queue等信息
3、得到consumequeue中位置
4、查找指定tag索引
5、定位commitlog
6、读取其中的消息单元
性能提升:
文件读写:通过mmap零拷贝
Comsuequeue读取:
引入pagecache预读取机制,接近内存读取(顺序读写)
本质:缓存
写:先写入pagecache,再异步写入磁盘
读:首先读pagecache,若没有命中,将磁盘数据加载到pacache,同时对相邻数据预读取
Commitlog读取:随机访问,会影响性能
与kafka对比:
RocketMQ中的commitlog目录与consumequeue的结合就类似于Kafka中的partition分区目录。
mappedFile文件就类似于Kafka中的segment段
indexfile
根据key进行消息查询(前提:消息包含key)
Indexheader:该索引索引相关信息(数量,偏移量等)
Slot
Index
创建
indexfile查询流程:
1、根据key、time找到indexfile
2、传入时间与文件名差值
3、计算key的hash——Slot序号——读取slot——最新的indexNo
4、根据indexN找到index位置
5、相比Index中的diff
(1)若时间差小,往前找
(2)若时间差大,index中定位位置,找到消息
消息的消费
两种消息获取方式
Push(consumer主动拉取),注意拉取时间间隔
pull(broker主动推动,实时性高),向queue注册监听器,基于长连接
两种消息消费模式
广播模式:每个消费者都接收topic全部消息
集群模式:消费者平分消息
消息进度保存
广播模式:消费者保存
集群模式:保存broker中共享,消息进度参与负载均衡
Rebalance
Queue和consumer的之间重新分配
增加减少消费者,提升消息并行
消费者要小于队列
危害:
消息暂停:重分配后才能再消费
消费重复:异步提交,导致可能重复消费
一次性读取消息数量需要均衡:重复消费、性能
消息突刺:重复消费过多消息、积压时间过长
产生的原因和过程
1、Queue数量变化
(1)broker扩缩容
(2)Broker升级运维
(3)网络异常
(4)Queue扩缩容
2、消费者数量变化
(1)consumergroup扩缩容
(2)Consumer升级运维
(3)网络异常
过程:
Broker发现变化,向consumergroup发出Rebalance通知
Consumer采用分配算法,自主进行Rebalance
Broker中group coordinate选举consumer leader
由lerder完成分区的再分配,上报coordinate,coordinate同步给consumer实例
Kafka的Rebalance由选举的comsumer leader完成。
Rocket的Rebalance由每个consumer完成
Queue分配算法
平均分配
环形平均策略:类似轮询
一致性hash策略:先把consumer放到环上,再把queue的hash值放环上,顺时针找consumer (缺点:分配不均)
同机房策略
一致性hash:减少扩缩容的rebalance
扩缩容需要rebalance,变化较大
一致性hash扩缩容变化较小:应用场景,consumer变化较多
至少一次原则
订阅关系的一致性
offset管理
Consumer的消费进度offset
Offset本地管理
广播,消费进度保存在consumer中
Offset远程管理
集群消费,保存在broker文件中
所有consumer共享queue的消费进度
集群模式下offset采用远程管理模式,主要是为了保证Rebalance机制
Rebalance后,新的consumer可以读取相应消费进度继续消费
NextBeginOffset:下次消费起始
消费异常:异常消息提交broker的重试队列
同步、异步提交offset
异步:提交offset、无需等待broker响应,直接从broker获取next
消息幂等
消费幂等
重复消费影响相同
1、发送重复:Broker对生产者ack失败,生产者重复发送相同消息(msgid相同)
2、消费重复:consumer对broker的ack失败,broker再次投递相同消息
3、Rebalance:consumer、queue变化,没提交ack前Rebalance可能重复消费
解决方案
幂等令牌:具备唯一业务表示的字符串,唯一id
唯一性处理:服务端采用算法,保证成功执行一次
1、通过缓存去重,缓存是否命中幂等令牌
2、数据库查询是否有幂等令牌(缓存可能过期)
3、同一事务中:唯一性处理、幂等令牌写入缓存,幂等令牌写入DB
支付场景中的解决方案
1、首先redis获取流水号
(1)不为空,调用重复逻辑
2、无缓存,查询DB
(1)不为空,调用重复逻辑
3、为空,在分布式事务完成
(1)完成支付
(2)流水号作为key,存入redis(过期时间)
(3)流水号作为主键存入DB
消费幂等的实现
最好以业务唯一标识作为依据
消息堆积与消费延迟
原因
消费速度跟不上生产速度
1、上下游能力不匹配
2、实时性要求较高,延迟造成堆积
长轮询PULL模式
1、消息拉取:批量拉取,缓存到本地缓存队列
2、消息消费:取决于消费并发、消费耗时度
本地缓存队列达到上限,停止从服务端拉取消息
消费耗时的深入分析
主要耗时:外部IO代码
1、mysql 、redis访问
2、下游系统调用:dubbo的RPC远程调用、springcloud的http接口调用
下游系统服务异常等原因
消息堆积——消息消费——消息耗时——外部IO——下游DB、RPC、http调用——服务异常、容量限制——网络宽带等
消息并发度的深入分析
普通消息 并发度 = 单节点线程数 * 节点数(consumer)
顺序消息:topic的queue分区数量
全局顺序消息
分区顺序消息
单机线程数的计算
避免消息堆积与消费延迟
消费耗时
1、避免循环、递归
2、IO操作是否可以本地缓存
3、是否可以异步化处理
设置消费并发度
消息的清理
以commitlog为单位进行清理
文件过期、磁盘占用率
4、RocketMQ的应用
普通消息
同步发送消息:收到ack后继续发送
1、创建生产者、group名称
2、Nameserver配置
3、开启生产者,发送消息
4、关闭生产者
SendResult:状态、msgid
异步发送消息:无需等待mq的ack消息,发送的回调接口异步响应
指定回调函数
单向发送消息:不接收ack
顺序消息
顺序消息:
严格按发送顺序进行消费——只有一条queue——一个消费者消费
相同订单号消息放到同一queue中
消息有序性的分类
全局有序:只有一个queue
分区有序
生产者可以指定queue选择器
延时消息
指定时长后才可被处理:实现定时任务
发送延迟消息:过时且判断未支付,放回票池
修改消息
1、先发给延迟topic延迟队列
2、根据定时管理器,时间到了——发送给commitlog——正常消息
写入commitqueue
原本Tage的hashcode改为消息投递时间(定时结束时间)
再次投递
1、定时管理器中有相应timertask——负责消息消费和投递
2、检测相应queue中第一条消息是否到期
3、若到期,投递
管理器作为生产者,重新写入commitlog
事务消息
分布式事务
1、TM向TC发指令,开启全局事务
2、工行给TC发送事务消息M
3、TC给broker发送半事务消息(预提交)
4、Broker预提交返回执行结果
(1)失败——TC——TM,全局事务结束
5、成功——调用——预扣款——返回执行结果(本地事务)——TC——TM
(1)预扣款成功——commit——TC——broker(branch commit)——M可被建行看到
(2)预扣款失败——rollback——TC——broker(branch rollback)
事务消息
事务消息
半事务消息:暂不能被消费者看到
本地事务状态:生产者回调操作执行的结果——TC——TM——全局事务确认
消息回查:重新查看本地事务的执行状态
(1)状态(UNKNOWN)
(2)TC未收到TM的最终全局事务确认指令
XA模式
XA:分布式事务处理模式
三剑客
(1)TC:事务协调者(broker)维护全局事务
(2)TM:事务管理器(生产者)全局事务发起者
(3)RM:资源管理器(生产者、broker)报告事务状态
架构
1、TM向TC发指令开启全局事务
2、RM向TC注册分支事务,TC——RM——预执行
3、RM执行结果返回,TC汇总结果——TM
(1)global commit——TC——RM
(2)Global rollback——TC——RM
事务消息:同步的,先broker再生产者
批量消息
发送
Topic、刷盘策略一样
不能是延时、事务消息
消息——字符串:topic、body、日志、properties相关属性
消费
默认一次拉取32条消息
默认最大一次消费32条
分割器
消息过滤
tag过滤
SQL过滤:Properties中的属性,支持运算符
消息重试机制
发送
1、同步异步会重试,单向无法重试(只管发送)
2、顺序消息没有重试
3、可能消息重复、重复消息(负载变化、网络原因)
4、发送重试策略:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
同步发送失败
失败隔离:选择不同broker
异步发送失败
消息刷盘失败策略:刷盘失败,默认不会发送到其他broker
消费
顺序消息消费重试:不断重试直至消费成功
广播消费:消费失败——无消费重试
若重复完仍然失败——死信队列
需要重试消费的消息——>重试队列(基于延时消息实现)
消息监听接口配置
死信队列
死信队列:处理异常消息
详见RabbitMQ