1 介绍
1.1 RabbitMQ简介
- MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
开发中消息队列通常有如下应用场景:
任务异步处理
- 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高应用的响应时间
应用程序解耦合
- MQ相当于一个中介,消息生产者通过MQ和消息消费者交互,将应用程序进行解耦合
常见的消息队列
- ActiveMQ
- ZeroMQ
- Kafka
- MetaMQ
- RocketMQ
- Redis
使用RabbitMQ的好处
- 使得简单,功能强大。
- 基于AMQP协议。
- 社区活跃,文档完善。
- 高并发性能好,这主要得益于Erlang语言。
- Spring Boot默认已集成RabbitMQ
1.2 相关知识补充
1.2.1 AMQP(Advanced Message Queuing Protocol)
- AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式,
为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
1.2.2 JMS(Java Message Service)
- JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。
2 快速入门
2.1 RabbitMQ的工作原理
- Broker:消息队列服务进程,此进程包括两个部分:
Exchange
和Queue
。 - Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
2.2 消息发布和接收流程
2.2.1 发送消息
- 生产者和Broker建立TCP连接。
- 生产者和Broker建立通道。
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
- Exchange将消息转发到指定的Queue(队列)
2.2.2 接收消息
- 消费者和Broker建立TCP连接
- 消费者和Broker建立通道
- 消费者监听指定的Queue(队列)
- 当有消息到达Queue时Broker默认将消息推送给消费者。
- 消费者接收到消息。
2.3 RabbitMQ及其依赖语言erlang的下载与安装
2.3.1 启动RabbitMQ成功但是访问localhost:15672无法访问解决方案
2.3 HelloWorld
2.3.1 搭建生产环境
使用
rabbit-java-client
从底层了解RabbitMQ的交互的认识- 创建Maven工程。分别创建生产者工程和消费者项目,分别加入RabbitMQ java client的依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--建议加上日志依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
2.3.2 生产者
- 创建连接
- 创建通道
- 声明队列
- 发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
//队列
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
//通过连接工厂创建新的连接,和mq建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//外部web管理端口时 15672,注意区分
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//1. 创建连接。建立新连接
connection = connectionFactory.newConnection();
//2. 创建通道。创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
//3. 声明队列。如果队列在mq中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细:
* 1.queue 队列名称
* 2.durable 是否持久化。如果持久化,mq重启后队列还在
* 3.exclusive 是否独占连接,队列只允许在该连接中访问,
* 如果连接关闭,队列自动删除;如果将此参数设置为true可用于临时队列的创建
* 4.autoDelete 自动删除,队列不再是哟个时是否自动删除此队列
* 如果将此参数和exclusive参数设置为true,就可以实现临时队列(队列不用了就自动删除)
* 5.arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间等
*/
channel.queueDeclare(QUEUE, true, false, false, null);
//4. 发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 1.exchange 交换机,如果不指定将使用mq的默认交换机,设置为""
* 2.routingKey 路由Key,交换机根据路由key来将消息转发到指定的队列
* 如果使用默认交换机,routingKey设置为队列的名称
* 3.props 消息的属性
* 4.body 消息内容
*/
String message = "hello rabbitmq";
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("发送给mq的信息为:" + message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
//先关闭通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
//再关闭连接
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.3.3 消费者
- 创建连接
- 创建通道
- 声明队列
- 监听队列
- 接收消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
//队列
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
//通过连接工厂创建新的连接,和mq建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//外部web管理端口时 15672,注意区分
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost("/");
//1. 创建连接。建立新连接
Connection connection = connectionFactory.newConnection();
//2. 创建会话通道。生产者和mq服务所有通信都在channel中完成
Channel channel = connection.createChannel();
//3. 声明队列:如果队列在mq中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细:
* 1.queue 队列名称
* 2.durable 是否持久化。如果持久化,mq重启后队列还在
* 3.exclusive 是否独占连接,队列只允许在该连接中访问,
* 如果连接关闭,队列自动删除;如果将此参数设置为true可用于临时队列的创建
* 4.autoDelete 自动删除,队列不再是哟个时是否自动删除此队列
* 如果将此参数和exclusive参数设置为true,就可以实现临时队列(队列不用了就自动删除)
* 5.arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间等
*/
channel.queueDeclare(QUEUE, true, false, false, null);
//5. 接收消息。实现消费方法来决定如何接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来表示消费者,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope 可得到交换机和消息id等
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String message = new String(body, "utf-8");
System.out.println("接受到的消息:" + message);
}
};
//4. 监听队列
//参数:String queue, boolean autoAck, Consumer callback
/**
* 1.queue 队列名称
* 2.autoAck 自动回复
* 当消费者接收到消息后要告诉mq消息已经接收
* true表示自动回复mq
* false表示要通过编程实现回复
* 3.callback 消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE, true, defaultConsumer);
}
}