目录

消息队列


消息队列的应用场景

场景一:解耦

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dc4ba8c003d142c4bb07ba9902e22932~tplv-k3u1fbpfcp-watermark.image?

如上所述,如果记录用户行为的这个请求过程,和后台服务的记录过程直接耦合,将会产生很严重的后果,如果后台服务宕机或者直接删库跑路等等状况,那么前端的请求也不会得到正确的响应,这时前端的请求服务将会一直阻塞,这会严重影响用户体验。

此时如果我们在记录存储的前面引入消息队列,那么每次前端发送的请求只要到了消息队列就能正常返回了,这会大大提高响应的速度,后续的存储过程就不需要再关心了,这就是解耦的应用

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/64db17bd8f5c40f1a7ffdc2952c92457~tplv-k3u1fbpfcp-watermark.image?

场景二:削峰

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8c7a1750d7ca4e49a6f91a30c98f8ab4~tplv-k3u1fbpfcp-watermark.image?

具体到生活中的业务就是,如果有一个抢购秒杀活动,这个时候,肯定会导致在某个时间段流量达到很大的峰值,而我们最终只需要寥寥10个名额甚至更少,那么我们该如何解决?

这个时候可以利用到消息队列,可以在后端正式处理之前加上一个消息队列,便可达到以下效果:

  • 可以控制活动的人数
  • 可以缓解短时间内高流量压垮应用
  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
  • 后台只需消费队列中的内容即可

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/971b249d08d64e79a2ac0fb836e630cd~tplv-k3u1fbpfcp-watermark.image?

场景三:异步

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/576482769825417d82bd3c29db196542~tplv-k3u1fbpfcp-watermark.image?

如上图所示,这是一个串行的过程,很明显可以优化为并行过程,但如果还是像图中的直接将发起订单后的三个过程并行,那最终还是得等30s才能有结果!那么如何解决呢?

这个时候可以加个消息队列,前端和消息队列接触后,直接返回,然后后台负责去消费这个消息队列即可(实际上这个过程更像是解耦的过程

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ca23aef5edd449fab9b1dc3419d70f04~tplv-k3u1fbpfcp-watermark.image?

场景四:日志处理

https://s2.loli.net/2022/06/07/HZmRxyT9CNf8EeX.png

同样,如果是通过消息队列作为中间的过程来传递日志,那么不用担心真正后台记录日志的服务器宕机且后台日志丢失等严重问题。

消费队列的定义

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3b83c09486fe405ca8c6be1475c34768~tplv-k3u1fbpfcp-watermark.image?

其中的高并发和高吞吐都很清楚,而这里的高可用,指的就是不会随便发生异常行为而导致服务不可用!

业界的消息队列

业界常用消息队列对比

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/72322cd5263642a0b93370574e9d4140~tplv-k3u1fbpfcp-watermark.image?

Kafka

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0339e2721358493282f9dd0738ae4810~tplv-k3u1fbpfcp-zoom-1.image

以上为Kafka的使用流程。

  1. 创建集群。
  2. 需要在这个集群中创建一个Topic,并且设置好分片数量。
  3. 引入对应语言的SDK,配置好集群和Topic等参数,初始化一个生产者,调用Send方法,将你的Hello World发送出去。
  4. 引入对应语言的SDK,配置好集群和Topic等参数,初始化一个消费者,调用Poll方法,你将收到你刚刚发送的Hello World。

基本名词

Cluster

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ee329ec092054ded9bf6e8bf171a1415~tplv-k3u1fbpfcp-watermark.image?

Topic:Kakfa中的逻辑队列,可以理解成每一个不同的业务场景就是一个不同的topic,对于这个业务来说,所有的数据都存储在这个topic中。

Cluster:Kafka的物理集群,每个集群中可以新建多个不同的topic。

Producer:顾名思义,也就是消息的生产端,负责将业务消息发送到Topic当中。

Consumer:消息的消费端,负责消费已经发送到topic中的消息。

Partition:通常topic会有多个分片,不同分片直接消息是可以并发来处理的,这样提高单个Topic的吞吐。

Offset

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/18d20eaa80df4220a18efd57bb0c9eb8~tplv-k3u1fbpfcp-watermark.image?

对于每一个Partition来说,每一条消息都有一个唯一的Offset,消息在partition内的相对位置信息,并且严格递增。

Replica

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8089621dc7c9488089fc4544763bf024~tplv-k3u1fbpfcp-watermark.image?

Replica:分片的副本,分布在不同的机器上,可用来容灾,Leader对外服务,Follower异步去拉取leader的数据进行一个同步,如果leader挂掉了,可以将Follower提升成leader再堆外进行服务 ISR:意思是同步中的副本,对于Follower来说,始终和leader是有一定差距的,但当这个差距比较小的时候,我们就可以将这个follower副本加入到ISR中,不在ISR中的副本是不允许提升成Leader的。

实际架构过程分析

Broker

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d43aa230616146b68038a748adff48bb~tplv-k3u1fbpfcp-watermark.image?

注意看上图,每个broker代表一个节点,实际上对应一台机器,所有的Broker组成了一个集群,我们不要被每个框框束缚住,只需关注一共有哪些Topic和它对应的Partition即可

整个图表示,图中整个集群,包含了4个Broker机器节点,集群有两个Topic,分别是Topic1和Topic2,Topic1有两个分片,Topic2有1个分片,每个分片都是三副本的状态。这里中间有一个Broker同时也扮演了Controller的角色,Controller是整个集群的大脑,负责对副本和Broker进行分配。

Zookeeper

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8a1c35a8389f43cd9401dcf02e3e298e~tplv-k3u1fbpfcp-watermark.image?

而在集群的基础上,还有一个模块是ZooKeeper,这个模块其实是存储了集群的元数据信息,比如副本的分配信息等等,Controller计算好的方案都会放到这个地方。

Kafka高吞吐和稳定的秘诀

Producer

通过批量发送减少IO次数。

为了防止批量发送的数据包过大,使用压缩算法进行压缩。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5ad3745cd6c24d819b79ebc604825be1~tplv-k3u1fbpfcp-watermark.image?

Broker

采用以下三点进行优化:

graph TB
a[顺序写]
b[消息索引]
c[零拷贝]
d[优化方式]
d-->a
d-->b
d-->c

先来看看消息队列的的文件结构:

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/13deb9e5fef94e4e9cd67dbebeb0ca80~tplv-k3u1fbpfcp-watermark.image?

在每一个Broker,都分布着不同Topic的不同分片。

顺序写

顺序写:每条消息都是紧凑排列的顺序写入。

消息索引

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/913a3206f66548f385b4594c105fa5d3~tplv-k3u1fbpfcp-watermark.image?

由于文件名和第一条存储数据的索引相同,故可先通过二分查找找到小于offset的最大索引位置,然后再遍历这个文件内的索引记录,便可找到目标消息。

同理,也可通过时间戳二分查找。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cbee7425015a4fb69570ca0bbc234570~tplv-k3u1fbpfcp-watermark.image?

零拷贝

通过操作系统的API,直接实现内核空间直接发送到网络。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b6425ca09baa4287942ee48f94a0f6ce~tplv-k3u1fbpfcp-watermark.image?

Consumer

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cd14be19f7ad443d9ad07d330962281f~tplv-k3u1fbpfcp-watermark.image?

对于一个Consumer Group来说,多个分片可以并发的消费,这样可以大大提高消费的效率,但需要解决的问题是,Consumer和Partition的分配问题,也就是对于每一个Partition来讲,该由哪一个Consumer来消费的问题。对于这个问题,我们一般有两种解决方法,手动分配和自动分配。

手动分配

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9969593e0a43403884c2f30cabd315b7~tplv-k3u1fbpfcp-watermark.image?

第一,手动分配,也就是Kafka中所说的Low Level消费方式进行消费,这种分配方式的一个好处就是启动比较快,因为对于每一个Consumer来说,启动的时候就已经知道了自己应该去消费哪个消费方式,就好比图中的Consumer Group1来说,Consumer1去消费Partition1,2,3 Consumer2,去消费456, Consumer3去消费78。这些Consumer再启动的时候就已经知道分配方案了,但这样这种方式的缺点又是什么呢,想象一下,如果我们的Consumer3挂掉了,我们的7,8分片是不是就停止消费了。又或者,如果我们新增了一台Consumer4,那是不是又需要停掉整个集群,重新修改配置再上线,保证Consumer4也可以消费数据,其实上面两个问题,有时候对于线上业务来说是致命的。

自动分配

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7ca130359abc4d32be4089e839006272~tplv-k3u1fbpfcp-watermark.image?

所以Kafka也提供了自动分配的方式,这里也叫做High Level的消费方式,简单的来说,就是在我们的Broker集群中,对于不同的Consumer Group来讲,都会选取一台Broker当做Coordinator,而Coordinator的作用就是帮助Consumer Group进行分片的分配,也叫做分片的rebalance,使用这种方式,如果ConsumerGroup中有发生宕机,或者有新的Consumer加入,整个partition和Consumer都会重新进行分配来达到一个稳定的消费状态。

Kafka存在的问题

运维成本高

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ee30448053984787b3b90626218386df~tplv-k3u1fbpfcp-watermark.image?

举个例子来说,如果我们对一个机器进行重启 首先,我们会关闭一个Broker,此时如果该Broker上存在副本的Leader,那么该副本将发生leader切换,切换到其他节点上面并且在ISR中的Follower副本,可以看到图中是切换到了第二个Broker上面 而此时,因为数据在不断的写入,对于刚刚关闭重启的Broker来说,和新Leader之间一定会存在数据的滞后,此时这个Broker会追赶数据,重新加入到ISR当中 当数据追赶完成之后,我们需要回切leader,这一步叫做prefer leader,这一步的目的是为了避免,在一个集群长期运行后,所有的leader都分布在少数节点上,导致数据的不均衡 通过上面的一个流程分析,我们可以发现对于一个Broker的重启来说,需要进行数据复制,所以时间成本会比较大,比如一个节点重启需要10分钟,一个集群有1000个节点,如果该集群需要重启升级,则需要10000分钟,那差不多就是一个星期,这样的时间成本是非常大的。 你可能会说,可以不可以并发多台重启呀,问的好,不可以。为什么呢,在一个两副本的集群中,重启了两台机器,对某一分片来讲,可能两个分片都在这台机器上面(可能这几个机器包含所有分片,则会导致该集群处于不可用的状态。这是更不能接受的。

负载不均衡场景解决方案复杂

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1ec5932674dd4537b4fab9c741c6a050~tplv-k3u1fbpfcp-watermark.image?

这个场景当中,同一个Topic有4个分片,两副本,可以看到,对于分片1来说,数据量是明显比其他分片要大的,当我们机器IO达到瓶颈的时候,可能就需要把第一台Broker上面的Partition3迁移到其他负载小的Broker上面,但我们的数据复制又会引起Broker1的IO升高,所以问题就变成了,我为了去解决IO升高,但解决问题的过程又会带来更高的IO,所以就需要权衡IO设计出一个极其复杂的负载均衡策略

没有自己的缓存

Kafka没有自己的缓存,在进行数据读取的时候,只有Page Cache可以用,所以不是很灵活。

Controller、Coordinator、Broker处于同一进程

Kafka的Controller和Coordinator都是和Broker部署在一起的,Broker因为承载大量IO的原因,会导致Controller和Coordinator的性能下降,如果到一定程度,可能会影响整个集群的可用性。


BMQ

架构模型(解决Kafka存在的问题

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/941f82314427421080013eb850e3fa31~tplv-k3u1fbpfcp-watermark.image?

解决运维成本问题

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1f4c636788124314b0a9e22bd19aca63~tplv-k3u1fbpfcp-watermark.image?

实际上对于所有节点变更的操作,都仅仅只是集群元数据的变化,通常情况下都能秒级完成,而真正的数据已经移到下层分布式文件存储去了,所以运维操作不需要额外关心数据复制所带来的时间成本。

分布式系统的具体文件写入操作:

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/64cc0cd6cd1444938b8919c1a96b3cdf~tplv-k3u1fbpfcp-watermark.image?

通过前面的介绍,我们知道了,同一个副本是由多个segment组成,我们来看看BMQ对于单个文件写入的机制是怎么样的,首先客户端写入前会选择一定数量的DataNode,这个数量是副本数,然后将一个文件写入到这三个节点上,切换到下一个segment之后,又会重新选择三个节点进行写入。这样一来,对于单个副本的所有segment来讲,会随机的分配到分布式文件系统的整个集群中。

解决负载均衡问题

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3bf7587bc3b54d738f6f393d91c3c5e4~tplv-k3u1fbpfcp-watermark.image?

对于Kafka分片数据的写入,是通过先在Leader上面写好文件,然后同步到Follower上,所以对于同一个副本的所有Segment都在同一台机器上面。就会存在之前我们所说到的单分片过大导致负载不均衡的问题,但在BMQ集群中,因为对于单个副本来讲,是随机分配到不同的节点上面的(分布式存储,因此不会存在Kafka的负载不均问题。

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/bd2a10b4fe494efab75d09ade5c08042~tplv-k3u1fbpfcp-watermark.image?

其实对于写入的逻辑来说,我们还有一个状态机的机制,用来保证不会出现同一个分片在两个Broker上同时启动的情况,另外也能够保证一个分片的正常运行。首先,Controller做好分片的分配之后,如果在该Broker分配到了Broker,首先会start这个分片,然后进入Recover状态,这个状态主要有两个目的获取分片写入权利,也就是说,对于hdfs来讲,只会允许我一个分片进行写入,只有拿到这个权利的分片我才能写入,第二一个目的是如果上次分片是异常中断的,没有进行save checkpoint,这里会重新进行一次save checkpoint,然后就进入了正常的写流程状态,创建文件,写入数据,到一定大小之后又开始建立新的文件进行写入。

读写流程

文件写入流程

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1201bcfef90c432ca825fa061aa3a86f~tplv-k3u1fbpfcp-watermark.image?

数据校验:CRC , 参数是否合法 校验完成后,会把数据放入Buffer中 通过一个异步的Write Thread线程将数据最终写入到底层的存储系统当中。

这里有一个地方需要注意一下,就是对于业务的写入来说,可以配置返回方式,可以在写完缓存之后直接返回,另外我也可以数据真正写入存储系统后再返回,对于这两个来说前者损失了数据的可靠性,带来了吞吐性能的优势,因为只写入内存是比较快的,但如果在下一次flush前发生宕机了,这个时候数据就有可能丢失了,后者的话,因为数据已经写入了存储系统,这个时候也不需要担心数据丢失,相应的来说吞吐就会小一些 我们再来看看Thread的具体逻辑,首先会将Buffer中的数据取出来,调用底层写入逻辑,在一定的时间周期上去flush,flush完成后开始建立Index,也就是offset和timestamp对于消息具体位置的映射关系 Index建立好以后,会save一次checkpoint,也就表示,checkpoint后的数据是可以被消费的,我们想一下,如果没有checkpoint的情况下会发生什么问题,如果flush完成之后宕机,index还没有建立,这个数据是不应该被消费的 最后当文件到达一定大小之后,需要建立一个新的segment文件来写入。

文件获取流程

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/deefd72e5fe441a9b594defbab4be87a~tplv-k3u1fbpfcp-watermark.image?

首先Consumer发送一个Fetch Request,然后会有一个Wait流程,那么他的左右是什么呢,想象一个Topic,如果一直没有数据写入,那么,此时consumer就会一直发送Fetch Request,如果Consumer数量过多,BMQ的server端是扛不住这个请求的,因此,我们设置了一个等待机制,如果没有fetch到指定大小的数据,那么proxy会等待一定的时间,再返回给用户侧,这样也就降低了fetch请求的IO次数,经过我们的wait流程后,我们会到我们的Cache里面去找到是否有存在我们想要的数据,如果有直接返回,如果没有,再开始去存储系统当中寻找,首先会Open这个文件,然后通过Index找到数据所在的具体位置,从这个位置开始读取数据。

高级特性

泳道

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/81be0489c5cb47b1b79bd582a5064473~tplv-k3u1fbpfcp-watermark.image?

Databus

在直接的消息队列的客户端操作之中又封装了一层,客户端代码可用得到简化,可缓解集群压力。

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/429fe97d32ea454382ac5c63bf095f80~tplv-k3u1fbpfcp-watermark.image?

Mirror

主要用于解决跨区域(不同国家)的读写问题。

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/515b851cfb9547668c0c1ed23790fde7~tplv-k3u1fbpfcp-watermark.image?

Index

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0155131910724379814aaffd258087c2~tplv-k3u1fbpfcp-watermark.image?

通过索引构建类似于数据库的表结构。

Parquet

  • Parquet 是一种支持嵌套结构的列式存储格式
  • 非常适用于 OLAP 场景,按列存储和扫描

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cd5cb988f1dc40e7af69107596de0a37~tplv-k3u1fbpfcp-watermark.image?

RocketMQ

基本概念

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/69b59d5f72a84b9ea4106defb96c841a~tplv-k3u1fbpfcp-watermark.image?

https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8ad65276e33948a982e6fad1507f1d10~tplv-k3u1fbpfcp-watermark.image?

Broker节点有Master和Slave的概念 NameServer为集群提供轻量级服务发现和路由。

https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7d538307694f4c3595a81e9e08784fdb~tplv-k3u1fbpfcp-watermark.image?

根据我们刚刚的介绍,可以看到Producer,Consumer,Broker这三个部分,Kafka和RocketMQ是一样的,而Kafka中的Partition概念在这里叫做ConsumerQueue。

底层原理

存储模型

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/31f6c31cfc3a4bff9e7112518e73c7fa~tplv-k3u1fbpfcp-watermark.image?

接下来我们来看看RocketMQ消息的存储模型,对于一个Broker来说所有的消息的会append到一个CommitLog上面,然后按照不同的Queue,重新Dispatch到不同的Consumer中,这样Consumer就可以按照Queue进行拉取消费,但需要注意的是,这里的ConsumerQueue所存储的并不是真实的数据,真实的数据其实只存在CommitLog中,这里存的仅仅是这个Queue所有消息在CommitLog上面的位置,相当于是这个Queue的一个密集索引。

高级特性

事务消息

https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3553178b6214f98adedb22f87dc429f~tplv-k3u1fbpfcp-watermark.image?

先看一下我们最开始说的这个场景,正常情况下,这个下单的流程应该是这个样子,首先我保证库存足够能够顺利-1,这个时候再消息队列让我其他系统来处理,比如订单系统和商家系统,但这里有个比较重要的点,我库存服务和消息队列必须要是在同一个事务内的,大家还记不记得事务的基本特性是什么。ACID(原子性、一致性、隔离性、持久性),这里库存记录和往消息队列里面发的消息这两个事情,是需要有事务保证的,这样不至于发生,库存已经-1了,但我的订单没有增加,或者商家也没有收到通知要发货。因此RocketMQ提供事务消息来保证类似的场景。

ACID:

原子性(Atomicity) 原子性是指事务是一个不可分割的工作单位,事务中的操作要么都发生,要么都不发生。 一致性(Consistency) 事务前后数据的完整性必须保持一致。 隔离性(Isolation) 事务的隔离性是多个用户并发访问数据库时,数据库为每一个用户开启的事务,不能被其他事务的操作数据所干扰,多个并发事务之间要相互隔离。 持久性(Durability) 持久性是指一个事务一旦被提交,它对数据库中数据的改变就是永久性的,接下来即使数据库发生故障也不应该对其有任何影响。

延迟队列

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/36f5522333ae481ea0e80dd179cf14bd~tplv-k3u1fbpfcp-watermark.image?

执行原理:

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3c166b67813a45dd9042789fff7ef8b9~tplv-k3u1fbpfcp-watermark.image?

重试和死信队列

对于处理失败的情况下,用充实和死信队列的方式处理。

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/604cffa3f8e54c9694df9f09b901b8e2~tplv-k3u1fbpfcp-watermark.image?

特性说明

死信消息具有以下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为3天,3天后会被自动删除。因此,请在死信消息产生后的3天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
  • 如果一个Group ID未产生死信消息,消息队列RocketMQ版不会为其创建相应的死信队列。
  • 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。

消息队列RocketMQ版控制台提供对死信消息的查询、导出和重发的功能。