RabiitMQ安装及基本使用
1 RabiitMQ安装
RabbitMQ官网:https://rabbitmq.com/
适用于linux的安装版本:https://rabbitmq.com/install-rpm.html#downloads
当前最新稳定版本:rabbitmq-server-3.9.13-1.el8.noarch.rpm
#本地学习用的虚拟机是centos7系统,学习测试rabbit没有选择最新版本,本文的安装版本
# rabbitmq-server-3.6.5-1.noarch.rpm
# erlang-18.3-1.el7.centos.x86_64.rpm
# socat-1.7.3.2-5.el7.lux.x86_64.rpm
#我把上述3各安装包压在下面的压缩文件中,需要自取(更建议手撸官网最新稳定版,自行解决问题)
https://download.csdn.net/download/csfun1/82944178
# 1.上传rpm包至服务器
# 2.安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
# 3.安装RabbitMQ
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 4.开启管理界面及配置
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保 留guest
# 5.启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
#设置配置文件
cd /usr/share/doc/rabbitmq-server-3.6.5/ cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#windows浏览器登陆管理台
http://192.168.149.128:15672/
# 6.配置虚拟主机及用户
小插曲:安装 socat-1.7.3.2-1.1.el7.x86_64.rpm时,提示error,需要以来插件。解决办法:无视依赖,直接–force --nodeps继续安装。
开启管理界面:
修改默认配置及开启服务(开放guest账号,loopback_users中的删除<<>>即可):
小插曲:此时尝试登陆rabbitMQ管理台发现返回502服务不可用,但是通过在linux服务器上却能下载到首页面,考虑是防火墙问题或端口未开放。
尝试开放端口及关闭防火墙均无效:
firewall-cmd --zone=public --add-port=15672/tcp --permanent #开放指定端口号
firewall-cmd --zone=public --add-port=5672/tcp --permanent #开放指定端口号,后面接口调用需要
systemctl stop firewalld #关闭防火墙
firewall-cmd --reload #重启防火墙 添加完开放端口一样要重启防火墙
最终发现本地开启了全局代理访问,路由转出门再访问局域网肯定还不行的,关掉代理即可。
2 MQ概述
Message Queue消息队列,实在消息传输的过程中保存消息的容器,多用于分布式系统之间进行通信。
2.1 MQ的优势
- 应用解耦
系统的耦合度降低,容错率提高,可维护性越高。
当A、B、C服务间本不存在依赖关系,如果A服务异常,在同步调用时,将导致B、C服务无法继续执行。
- 异步提速
当服务提供的某些能力越来越强大、越来越复杂,势必无法稳定保证快速应答。此时将串行化程序,转为异步处理。
A、B、C服务内部处理MQ的任务,相互间不产生影响。
- 削峰填谷
MQ功能中极其重要的优势。将瞬时过量的请求,写入MQ进行限流,由消费端自行从MQ拉取任务进行处理。
现在后端服务架构中,常见时延要求不高,但处理任务量比较大的需求。且在无法预知某时间点用户侧并发量时,通过削峰填谷保证服务按照处理能力执行任务。
2.2 MQ的劣势
- 系统可用性降低
系统引入的外部依赖变多,系统的稳定性降低。一旦MQ宕机,会对业务功能造成影响。
解决办法:搭建集群,当单台MQ宕机时,保证其他机器正常可用。 - 系统复杂度提高
MQ的加入增加了系统复杂度,以前的同步远程调用,现在变成异步调用,需要保证消息不丢失。
3 常见的MQ产品
RabbitMQ -> RocketMQ -> kafka -> DMQ
RabbitMQ | ActiveMQ | RocketMQ | Kafka | DMQ | |
---|---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache | 华为 |
开发语言 | Erlang | Java | Java | Scala&Java | Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 | 基于kafka封装 |
客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 | Java、Python等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) | 不详 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 | 毫秒级 |
功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 | 使用zookeeper作为注册中心,实现配置下发、服务发现;kafka作为分布式发布订阅消息系统[0.8.2] |
4 控制台使用
4.1 新增用户及配置权限
用户密码自不必多说,来看看Tags的区别:
- 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息 - 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
4.2 Virtual Hosts 配置
- 创建 Virtual Hosts
- 设置 Virtual Hosts 权限
5 RabbitMQ的不同工作模式特性剖析
RabbitMQ消息队列都是基于消费者生产者模式组织的。其中的Bcoker、channel、Exchange、Queue等概念如下。
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的TCP 连接
- Channel:类似于Netty的channel,进程间通过TCP连接时,如果建立长连接,通过socket传输内容,可以大大地减少建立连接的开销。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
RabbitMQ的工作模式,在官网中讲的比较详细 https://www.rabbitmq.com/getstarted.html
5.1 RabbitMQ快速入门之生产与消费
//消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.149.128");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setVirtualHost("/asky");
Connection connection = connectionFactory.newConnection();
//创建通信管道,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("hello",false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume("hello", false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
//生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.149.128");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setVirtualHost("/asky");
Connection connection = connectionFactory.newConnection();
//创建通信管道,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("hello",false, false, false, null);
String message = "zwx";
//四个参数
//exchange 交换机,暂时用不到,发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("","hello", null,message.getBytes());
channel.close();
connection.close();
System.out.println("===发送成功===");
}
}
5.2RabbitMQ工作模式之workQueues模式
与简单模式的区别在于,工作队列下生产者与消费者可以是多个。而消费端没有topic、groupId一说,queue中的消息将依次被多个消费者消费,默认采用轮询模式。而如果为了区分机器性能和处理能力,可以设置如下设置,当机器处理完消息后再从Queue中拉取消息。
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
5.3 RabbitMQ工作模式之发布订阅模式
发布订阅模式,适用于类似公众号与粉丝之间的关系,当生产者发布消息时,所有的follower将接收到消息。其特点:
- Exchange:交换机。交换机只负责转发消息,不负责存储消息,当没有任何队列于Exchange绑定时,消息将丢失。
- Fanout:广播,将消息发送给所有绑定了当前交换机的队列中。(队列所绑定的交换机由消费者决定)
- Direct:定向路由模式。交换机将按照key所指定的patten规则,将消息转到指定的队列。
- Topic:主题发布订阅模式。与路由模式唯一的区别在于,路由key的patten规则可以通配。#可以通配所有内容、*可以通配单个字符串
//消费者 消息队列与交换器绑定 ,routeKey只在路由模式使用
//此时的最后一个参数是routeing key 此时routekey不配(配为"")即可使队列与交换机绑定而收到消息
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_FANOUT_PUBSUB,"");
5.4 RabbitMQ工作模式之Routing路由模式
routeing key路由模式下,需要生产者发送带有key,value的值给交换机。交换机再根据消费者端所绑定的消息队列中key的完整值进行匹配,当生产者发送到Exchange中的key于队列中的所绑定的key完全匹配时,才会进入到队列来,进而由消费者消费。
//生产者端 需要在不是消息时,携带key
channel.basicPublish(RabbitConstant.EXCHANGE_ROUTE_INFO,key,null,input.getBytes(StandardCharsets.UTF_8));
//消费段 消费者的队列需要绑定完整的key匹配规则
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_ROUTE_INFO,"china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_ROUTE_INFO,"china.hubei.wuhan.20201127");
5.5 RabbitMQ工作模式之Topics主题模式
topic模式与路由模式唯一的区别在于,匹配路由规则时,路径可以由通配符,其中*带表1位占位符,#代表通用多位匹配
//消费者 消息队列与交换器绑定 ,routeKey只在路由模式使用
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_TOPIC_INFO,"china.#");
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_TOPIC_INFO,"us.cal.lsj.*");
6 RabbitMQ基本使用源码
https://gitee.com/asky-cloud-service/Daily_study.git
文章评论