type
status
date
slug
summary
tags
category
icon
password
URL
文章来源说明
一、MQ简介
MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。 这个词可以分两个部分来看,一是Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。二是Queue:队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构,是用来缓存数据的。对于消息中间件产品来说,能不能保证FIFO特性,尚值得考量。但是,所有消息队列都是需要具备存储消息,让消息排队的能力。
广义上来说,只要能够实现消息跨进程传输以及队列数据缓存,就可以称之为消息队列。例如我们常用的QQ、微信、阿里旺旺等就都具备了这样的功能。只不过他们对接的使用对象是人,而我们这里讨论的MQ产品需要对接的使用对象是应用程序。
MQ的作用主要有以下三个方面:
- 异步
例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。
作用:异步能提高系统的响应速度、吞吐量。

- 解耦
例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。
作用:
1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

- 削峰
例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。引入三峡大坝后,可以把水储存起来,下游慢慢排水。
作用:以稳定的系统资源应对突发的流量冲击。

二、RocketMQ产品特点
1、RocketMQ介绍
RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。
早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时,由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。
2、RocketMQ特点
当今互联网MQ产品众多,其中,影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品,但是由于设计和实现上的一些差异,造成他们适合于不同的细分场景。
优点 | 缺点 | 适合场景 |
Apache Kafka | 吞吐量非常大,性能非常好,集群高可用。 | 会有丢数据的可能,功能比较单一 |
RabbitMQ | 消息可靠性高,功能全面。 | erlang语言不好定制。吞吐量比较低。 |
Apache Pulsar | 基于Bookeeper构建,消息可靠性非常高。 | 周边生态还有差距,目前使用的公司比较少。 |
Apache RocketMQ | 高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发,方便定制。 | 服务加载比较慢。 |
其中RocketMQ,孵化自阿里巴巴。历经阿里多年双十一的严格考验,RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品,也是少数几个在金融场景比较适用的MQ产品。从横向对比来看,RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。而RocketMQ最大的优势就是他天生就为金融互联网而生。他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。另外,RocketMQ的高级功能也越来越全面,广播消费、延迟队列、死信队列等等高级功能一应俱全,甚至某些业务功能比如事务消息,已经呈现出领先潮流的趋势。
三、RocketMQ快速实战
搭建
RocketMQ的官网地址: http://rocketmq.apache.org 。在下载页面可以获取RocketMQ的源码包以及运行包。下载页面地址:https://rocketmq.apache.org/download。
接下来,RocketMQ建议的运行环境需要至少12G的内存,这是生产环境比较理想的资源配置。但是,学习阶段,如果你的服务器没有这么大的内存空间,那么就需要做一下调整。进入bin目录,对其中的runserver.sh和runbroker.sh两个脚本进行一下修改。
使用vi runserver.sh指令,编辑这个脚本,找到下面的一行配置,调整Java进程的内存大小。
接下来,同样调整runbroker.sh中的内存大小。
调整完成后,就可以启动RocketMQ服务了。 RocketMQ服务基于Java开发,所以需要提前安装JDK。JDK建议采用1.8版本即可。
RocketMQ的后端服务分为nameserver和broker两个服务,关于他们的作用,后面会给你分享。接下来我们先将这两个服务启动起来。
第一步:启动nameserver服务。
指令执行后,会生成一个nohup.out的日志文件。在这个日志文件里如果看到下面这一条关键日志,就表示nameserver服务启动成功了。
接下来,可以通过jsp指令进行验证。使用jps指令后,可以看到有一个NamesrvStartup的进程运行,也表示nameserver服务启动完成。
第二步:启动broker服务。
启动broker服务之前,要做一个小小的配置。进入RocketMQ安装目录下的conf目录,修改broker.conf文件,在文件最后面加入一个配置:
这个选项是为了便于进行后续实验。他的作用是允许 broker 端自动创建新的 Topic。另外,如果你的服务器配置了多张网卡,比如阿里云,腾讯云这样的云服务器,他们通常有内网网卡和外网网卡两张网卡,那么需要增加配置brokerIP1属性,指向服务器的外网IP 地址,这样才能确保从其他服务器上访问到RocketMQ 服务。
然后也可以用之前的方式启动broker服务。启动broker服务的指令是mqbroker
启动完成后,同样检查nohup.out日志文件,有如下一条关键日志,就表示broker服务启动正常了。
2、快速实现消息收发
RocketMQ后端服务启动完成后,就可以启动客户端的消息生产者和消息消费者进行消息转发了。接下来,我们会先通过RocketMQ提供的命令行工具快速体验一下RocketMQ消息收发的功能。然后,再动手搭建一个Maven项目,在项目中使用RocketMQ进行消息收发。
1、命令行快速实现消息收发
第一步:需要配置一个环境变量NAMESRV_ADDR,只想我们之前启动的nameserver服务。
通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。
第二步:通过指令启动RocketMQ的消息生产者发送消息。
这个指令会默认往RocketMQ中发送1000条消息。在命令行窗口可以看到发送消息的日志:
这部分日志中,并没有打印出发送了什么消息。上面SendResult开头部分是消息发送到Broker后的结果。最后两行日志表示消息生产者发完消息后,服务正常关闭了。
第三步:可以启动消息消费者接收之前发送的消息
消费者启动完成后,可以看到消费到的消息
2、搭建Maven客户端项目
之前的步骤实际上是在服务器上快速验证RocketMQ的服务状态,接下来我们动手搭建一个RocketMQ的客户端应用,在实际应用中集成使用RocketMQ。
第一步:创建一个标准的maven项目,在pom.xml中引入以下核心依赖
第二步:就可以直接创建一个简单的消息生产者
运行其中的main方法,就会往RocketMQ中发送两条消息。在这个实现过程中,需要注意一下的是对于生产者,需要指定对应的nameserver服务的地址,这个地址需要指向你自己的服务器。
第三步:创建一个消息消费者接收RocketMQ中的消息。
运行其中的main方法后,就可以启动一个RocketMQ消费者,接收之前发到RocketMQ上的消息,并将消息内容打印出来。在这个实现过程中,需要重点关注的有两点。一是对于消费者,同样需要指定nameserver的地址。二是消费者需要在RocketMQ中订阅具体的Topic,只有发送到这个Topic上的消息才会被这个消费者接收到。
3、搭建RocketMQ可视化管理服务
在之前的简单实验中,RocketMQ都是以后台服务的方式在运行,我们并不很清楚RocketMQ是如何运行的。RocketMQ的社区就提供了一个图形化的管理控制台Dashboard,可以用可视化的方式直接观测并管理RocketMQ的运行过程。
Dashboard服务并不在RocketMQ的运行包中,需要到RocketMQ的官网下载页面单独下载。

这里只提供了源码,并没有提供直接运行的jar包。将源码下载下来后,需要解压并进入对应的目录,使用maven进行编译。(需要提前安装maven客户端)
编译完成后,在源码的target目录下会生成可运行的jar包rocketmq-dashboard-1.0.1-SNAPSHOT.jar。接下来可以将这个jar包上传到服务器上。我们上传到/app/rocketmq/rocketmq-dashboard目录下
接下来我们需要在jar包所在的目录下创建一个application.yml配置文件,在配置文件中做如下配置:
接下来就可以通过java指令执行这个jar包,启动管理控制台服务。
应用启动完成后,会在服务器上搭建起一个web服务,我们就可以通过访问http://192.168.232.128:8080查看到管理页面。

这个管理控制台的功能非常全面。驾驶舱页面展示RocketMQ近期的运行情况。运维页面主要是管理nameserver服务。集群页面主要管理RocketMQ的broker服务。
升级高可用集群(略)
总结RocketMQ的运行架构

1、nameServer 命名服务
在我们之前的实验过程中,你会发现,nameServer不依赖于任何其他的服务,自己独立就能启动。并且,不管是broker还是客户端,都需要明确指定nameServer的服务地址。以一台电脑为例,nameServer可以理解为是整个RocketMQ的CPU,整个RocketMQ集群都要在CPU的协调下才能正常工作。
2、broker 核心服务
从之前的实验过程中你会发现,broker是RocketMQ中最为娇贵的一个组件。RockeMQ提供了各种各样的重要设计来保护broker的安全。同时broker也是RocketMQ中配置最为繁琐的部分。同样以电脑为例,broker就是整个RocketMQ中的硬盘、显卡这一类的核心硬件。RocketMQ最核心的消息存储、传递、查询等功能都要由broker提供。
3、client 客户端
Client包括消息生产者和消息消费者。同样以电脑为例,Client可以认为是RocketMQ中的键盘、鼠标、显示器这类的输入输出设备。鼠标、键盘输入的数据需要传输到硬盘、显卡等硬件才能进行处理。但是键盘、鼠标是不能直接将数据输入到硬盘、显卡的,这就需要CPU进行协调。通过CPU,鼠标、键盘就可以将输入的数据最终传输到核心的硬件设备中。经过硬件设备处理完成后,再通过CPU协调,显示器这样的输出设备就能最终从核心硬件设备中获取到输出的数据。
五、理解RocketMQ的消息模型

生产者和消费者都可以指定一个Topic发送消息或者拉取消息。而Topic是一个逻辑概念。Topic中的消息会分布在后面多个MessageQueue当中。这些MessageQueue会分布到一个或者多个broker中。
在RocketMQ的这个消息模型当中,最为核心的就是Topic。对于客户端,Topic代表了一类有相同业务规则的消息。对于Broker,Topic则代表了系统中一系列存储消息的资源。所以,RocketMQ对于Topic是需要做严格管理的。如果任由客户端随意创建Topic,那么服务端的资源管理压力就会非常大。默认情况下,Topic都需要由管理员在RocketMQ的服务端手动进行创建,然后才能给客户端使用的。而我们之前在broker.conf中手动添加的autoCreateTopic=true,就是表示可以由客户端自行创建Topic。这种配置方式显然只适用于测试环境,在生产环境不建议打开这个配置项。如果需要创建 Topic,可以交由运维人员提前创建 Topic。
而对于业务来说,最为重要的就是消息Message了。生产者发送到某一个Topic下的消息,最终会保存在Topic下的某一个MessageQueue中。而消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录一个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每一条消息,在一个消费者组当中只被处理一次。
致谢:
有关Notion安装或者使用上的问题,欢迎您在底部评论区留言,一起交流~
- 作者:卷神
- 链接:https://blog.952712.xyz/article/126029c1-d761-469c-a4a6-1648984b6bae
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章




