1、概述

MQ概述

限流削峰(请求存在队列中)

异步解耦

数据采集

image-20220127213121974

MQ常见协议

JMS

STOMP

AMQP

MQTT

2、RocketMQ

基本概念

消息、主题、标签、队列、消息标识(消息唯一id:生产者、broker)

image-20220127213610678

系统架构

image-20220127213633953

生产者

消费者

(一个消费者组,消费者消费队列不同)

Name Server(注册中心):broker管理、路由信息管理

路由注册、路由剔除(心跳包)、路由发现、选择策略:轮询

broker

组成:实体、客户端管理、存储、高可用(集群数据同步)、索引(ID)

集群部署:主备集群

主从、主备

image-20220127213923298

工作流程

1、启动server、监听端口

2、Broker与serve长连接

3、创建topic

4、生产者发送消息:和server建立连接,获取路由(topic、broker)。

与broker长连接,向broker发送消息。路由信息每30s缓存在本地。

5、消费者,server连接,获取路由。与broker长连接,消费。

Consumer发送心跳,确认broker存活状态。

Topic的创建模式

集群、broker模式

读写队列

一般情况数量一样

设计目的:方便topic的缩容

写队列先缩容,读队列消费完后再缩容,不丢失任何消息。

集群搭建理论

image-20220127214305521

数据复制与刷盘策略

生产者、消费者:groupname相同即可

Nameserver集群:相互不通信,启动即可

Broker集群:多个主备小集群

数据复制:master向slave复制

复制策略

刷盘:由内存写入到磁盘

同步、异步复制(ACK时机)

同步:slave成功后ack

异步:master写入后ack

刷盘策略

同步、异步刷盘

同步:broker写入磁盘ack

异步:broker写入内存ack

image-20220127214313364

多master集群:宕机后无法消费

多master多slave集群

一个master一个slave:主备模式

Master处理消息读写

Slave负责消息备份和宕机后的自动切换

最好使用RAID阵列

同步双写:同步复制

安全性最高,RT响应时间长

Slave不能自动切换

RAID10磁盘阵列 + 异步复制

磁盘阵列RAID

集群搭建

3、RocketMQ的工作原理

消息的生产

1、生产者向servre发送请求,获取topic路由

2、Server返回topic路由表及broker列表

(1)路由表:map<topic,queuedata>,queuedata:broker——queue

找到queue对应的broker

(2)Broker列表

3、选择queue

4、生产者对消息特殊处理

5、生产者向broker发出RPC请求,发送到queue(底层netty)

选择算法

轮询(问题:性能)、最小投递延迟(问题:分配不均)

消息的存储

store目录

刷盘、消息、配置、队列、消息索引、全局资源锁

commitlog

第n个文件名:前n-1个文件大小之和

消息单元

image-20220127214852355

consumequeue

消费队列

image-20220127214940291

索引条目:

是commitlog的索引文件——定位具体消息

消息条目——偏移量、消息长度、消息tag

image-20220127215015213

对文件的读写

消息写入

1、根据queueid,得到comsumequeue要写入的偏移量

2、封装消息单元

3、写入commitlog

4、形成索引条目

消息拉取

1、获取所在queue的偏移量

2、发送broker拉取请求,queue等信息

3、得到consumequeue中位置

4、查找指定tag索引

5、定位commitlog

6、读取其中的消息单元

image-20220127215053467

性能提升:

文件读写:通过mmap零拷贝

Comsuequeue读取:

引入pagecache预读取机制,接近内存读取(顺序读写)

本质:缓存

写:先写入pagecache,再异步写入磁盘

读:首先读pagecache,若没有命中,将磁盘数据加载到pacache,同时对相邻数据预读取

Commitlog读取:随机访问,会影响性能

与kafka对比:

RocketMQ中的commitlog目录与consumequeue的结合就类似于Kafka中的partition分区目录。

mappedFile文件就类似于Kafka中的segment段

indexfile

根据key进行消息查询(前提:消息包含key)

image-20220127215235908

Indexheader:该索引索引相关信息(数量,偏移量等)

image-20220127215241833

Slot

image-20220127215249106

Index

image-20220127215257689

创建

indexfile查询流程:

1、根据key、time找到indexfile

2、传入时间与文件名差值

3、计算key的hash——Slot序号——读取slot——最新的indexNo

4、根据indexN找到index位置

5、相比Index中的diff

(1)若时间差小,往前找

(2)若时间差大,index中定位位置,找到消息

image-20220127215406692

消息的消费

两种消息获取方式

Push(consumer主动拉取),注意拉取时间间隔

pull(broker主动推动,实时性高),向queue注册监听器,基于长连接

两种消息消费模式

广播模式:每个消费者都接收topic全部消息

集群模式:消费者平分消息

消息进度保存

广播模式:消费者保存

集群模式:保存broker中共享,消息进度参与负载均衡

Rebalance

Queue和consumer的之间重新分配

增加减少消费者,提升消息并行

消费者要小于队列

危害:

消息暂停:重分配后才能再消费

消费重复:异步提交,导致可能重复消费

一次性读取消息数量需要均衡:重复消费、性能

消息突刺:重复消费过多消息、积压时间过长

产生的原因和过程

1、Queue数量变化

(1)broker扩缩容

(2)Broker升级运维

(3)网络异常

(4)Queue扩缩容

2、消费者数量变化

(1)consumergroup扩缩容

(2)Consumer升级运维

(3)网络异常

过程:

Broker发现变化,向consumergroup发出Rebalance通知

Consumer采用分配算法,自主进行Rebalance

Broker中group coordinate选举consumer leader

由lerder完成分区的再分配,上报coordinate,coordinate同步给consumer实例

Kafka的Rebalance由选举的comsumer leader完成。

Rocket的Rebalance由每个consumer完成

Queue分配算法

平均分配

环形平均策略:类似轮询

image-20220127215711998

一致性hash策略:先把consumer放到环上,再把queue的hash值放环上,顺时针找consumer (缺点:分配不均)

image-20220127215721201

同机房策略

一致性hash:减少扩缩容的rebalance

扩缩容需要rebalance,变化较大

一致性hash扩缩容变化较小:应用场景,consumer变化较多

至少一次原则

订阅关系的一致性

offset管理

Consumer的消费进度offset

Offset本地管理

广播,消费进度保存在consumer中

Offset远程管理

集群消费,保存在broker文件中

所有consumer共享queue的消费进度

集群模式下offset采用远程管理模式,主要是为了保证Rebalance机制

Rebalance后,新的consumer可以读取相应消费进度继续消费

NextBeginOffset:下次消费起始

消费异常:异常消息提交broker的重试队列

同步、异步提交offset

异步:提交offset、无需等待broker响应,直接从broker获取next

消息幂等

消费幂等

重复消费影响相同

1、发送重复:Broker对生产者ack失败,生产者重复发送相同消息(msgid相同)

2、消费重复:consumer对broker的ack失败,broker再次投递相同消息

3、Rebalance:consumer、queue变化,没提交ack前Rebalance可能重复消费

解决方案

幂等令牌:具备唯一业务表示的字符串,唯一id

唯一性处理:服务端采用算法,保证成功执行一次

1、通过缓存去重,缓存是否命中幂等令牌

2、数据库查询是否有幂等令牌(缓存可能过期)

3、同一事务中:唯一性处理、幂等令牌写入缓存,幂等令牌写入DB

支付场景中的解决方案

1、首先redis获取流水号

(1)不为空,调用重复逻辑

2、无缓存,查询DB

(1)不为空,调用重复逻辑

3、为空,在分布式事务完成

(1)完成支付

(2)流水号作为key,存入redis(过期时间)

(3)流水号作为主键存入DB

消费幂等的实现

最好以业务唯一标识作为依据

消息堆积与消费延迟

image-20220127220321282

原因

消费速度跟不上生产速度

1、上下游能力不匹配

2、实时性要求较高,延迟造成堆积

长轮询PULL模式

1、消息拉取:批量拉取,缓存到本地缓存队列

2、消息消费:取决于消费并发、消费耗时度

本地缓存队列达到上限,停止从服务端拉取消息

消费耗时的深入分析

主要耗时:外部IO代码

1、mysql 、redis访问

2、下游系统调用:dubbo的RPC远程调用、springcloud的http接口调用

下游系统服务异常等原因

消息堆积——消息消费——消息耗时——外部IO——下游DB、RPC、http调用——服务异常、容量限制——网络宽带等

消息并发度的深入分析

普通消息 并发度 = 单节点线程数 * 节点数(consumer)

顺序消息:topic的queue分区数量

全局顺序消息

分区顺序消息

单机线程数的计算

避免消息堆积与消费延迟

消费耗时

1、避免循环、递归

2、IO操作是否可以本地缓存

3、是否可以异步化处理

设置消费并发度

消息的清理

以commitlog为单位进行清理

文件过期、磁盘占用率

4、RocketMQ的应用

普通消息

同步发送消息:收到ack后继续发送

1、创建生产者、group名称

2、Nameserver配置

3、开启生产者,发送消息

4、关闭生产者

SendResult:状态、msgid

异步发送消息:无需等待mq的ack消息,发送的回调接口异步响应

指定回调函数

单向发送消息:不接收ack

image-20220127220358071

顺序消息

顺序消息:

严格按发送顺序进行消费——只有一条queue——一个消费者消费

相同订单号消息放到同一queue中

image-20220127220514365

消息有序性的分类

全局有序:只有一个queue

分区有序

生产者可以指定queue选择器

image-20220127220623422

延时消息

指定时长后才可被处理:实现定时任务

发送延迟消息:过时且判断未支付,放回票池

修改消息

image-20220127220640775

1、先发给延迟topic延迟队列

2、根据定时管理器,时间到了——发送给commitlog——正常消息

写入commitqueue

原本Tage的hashcode改为消息投递时间(定时结束时间)

再次投递

1、定时管理器中有相应timertask——负责消息消费和投递

2、检测相应queue中第一条消息是否到期

3、若到期,投递

管理器作为生产者,重新写入commitlog

事务消息

分布式事务

image-20220127220802767

1、TM向TC发指令,开启全局事务

2、工行给TC发送事务消息M

3、TC给broker发送半事务消息(预提交)

4、Broker预提交返回执行结果

(1)失败——TC——TM,全局事务结束

5、成功——调用——预扣款——返回执行结果(本地事务)——TC——TM

(1)预扣款成功——commit——TC——broker(branch commit)——M可被建行看到

(2)预扣款失败——rollback——TC——broker(branch rollback)

事务消息

事务消息

半事务消息:暂不能被消费者看到

本地事务状态:生产者回调操作执行的结果——TC——TM——全局事务确认

image-20220127220933007

消息回查:重新查看本地事务的执行状态

(1)状态(UNKNOWN)

(2)TC未收到TM的最终全局事务确认指令

XA模式

XA:分布式事务处理模式

三剑客

(1)TC:事务协调者(broker)维护全局事务

(2)TM:事务管理器(生产者)全局事务发起者

(3)RM:资源管理器(生产者、broker)报告事务状态

架构

image-20220127221048552

1、TM向TC发指令开启全局事务

2、RM向TC注册分支事务,TC——RM——预执行

3、RM执行结果返回,TC汇总结果——TM

(1)global commit——TC——RM

(2)Global rollback——TC——RM

事务消息:同步的,先broker再生产者

批量消息

发送

Topic、刷盘策略一样

不能是延时、事务消息

消息——字符串:topic、body、日志、properties相关属性

image-20220127221147787

消费

默认一次拉取32条消息

默认最大一次消费32条

分割器

消息过滤

tag过滤

image-20220127221233544

SQL过滤:Properties中的属性,支持运算符

消息重试机制

发送

1、同步异步会重试,单向无法重试(只管发送)

2、顺序消息没有重试

3、可能消息重复、重复消息(负载变化、网络原因)

4、发送重试策略:同步发送失败策略、异步发送失败策略、消息刷盘失败策略

同步发送失败

失败隔离:选择不同broker

异步发送失败

消息刷盘失败策略:刷盘失败,默认不会发送到其他broker

消费

顺序消息消费重试:不断重试直至消费成功

广播消费:消费失败——无消费重试

若重复完仍然失败——死信队列

需要重试消费的消息——>重试队列(基于延时消息实现)

消息监听接口配置

死信队列

死信队列:处理异常消息

详见RabbitMQ