一、概述
1、在大多数应用中,我们系统之间需要进行异步通信,即异步消息
2、异步消息中两个重要概念:
消息代理(message broker)和目的地(destination)
消息代理: 是一种在数据源与目的地之间移动数据使信息处理流畅的软件技术,也是一种架构模式,用于消息验证、变换、路由。
目的地: 消息接受者。
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3、异步消息主要有两种形式(point-to-point)
1)、队列(queue):点对点消息通信
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接收者,但并不是说只能有一个接收者。
2)、主题(topic):发布(publish)/订阅(subscribe)消息通信
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。
4、JMS(Java Message Service) java消息服务
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS的实现
5、AMQP(Advanced Message Queuing Protocol)
- 高级消息队列协议,也是一个消息代理的规范,兼容JMS
- RabbitMQ是AMQP的实现
二、SpringBoot整合RabbitMQ
RabbitMQ简介:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ是AMQP的一个开源实现_。
_核心概念:
Producer&Consumer
- producer指的是消息的生产者,consumer消息的消费者。
Broker
- 提供一种传输服务,他的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。
Queue
- 消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
- 设置为持久化队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失。
- 设置为临时队列,queue中的数据在系统重启之后就会丢失。
- 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除。
Exchange
- 消息交换机,指定消息按什么规则,路由到那个队列。
- Exchange有4中类型:direct(默认),fanout,topic和headers,不同类型的Exchange转发消息的策略有所区别:
Binding
- 将一个特定的Exchange和一个特定的Queue绑定起来。
- Exchange和Queue的绑定可以是多对多的关系。
Virtual host(vhosts)
- 在 rabbitmq server上可以创建多个虚拟机的message broker,又叫做virtual hosts(vhosts)。
- 每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange和bindings。
- vhost相当于物理的server,可以为不同app提供边界隔离。
- producer和consumer连接rabbit server需要指定一个vhost
三、RabbitMQ运行机制
四、SpringBoot整合RabbitMQ
1.引入spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.在docker中下载rabbitmq的镜像
1.在docker hub上搜索rabbitmq镜像,尽量安装带有management标志的(附带有web管理界面)
2.拉取rabbitmq镜像
docker pull rabbitmq:3-management
3.查看下载镜像
docker iamges //查看下载镜像
4.启动运行镜像
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq a64a4ae7bc1f
-d:表示后台运行
-p:表示暴露端口
5672:是客户端和rabbitmq进行通信的端口
15672:管理界面访问web页面的端口
==我的已经运行了我就直接启动==
5.查看所有的容器(无论运行还是停止都有)
docker ps -a
6.启动容器
docker start 37739ced79ed
7.查看运行的容器(只包含运行的容器)
docker ps
![查看正在运行的容器]https://img-blog.csdnimg.cn/20200222124719445.png)](https://img-blog.csdnimg.cn/2020022212480088.png)
输入192.168.43.186:15672(192.168.43.186虚拟机地址)访问rabbitmq的web管理页面 ** 默认密码和账号都是guest **
3.测试数据
1.创建三个交换器
2.添加四个消息队列
我们创建的队列要是能工作我们就得与我们创建的交换器进行绑定(binding)
在次列举exchange.direct与消息队列的绑定其余同理可得
测试:
1. exchange.direct(是完全匹配路由键的_)在exchange.direct交换器中发送消息
所以只有atorg一个消息队列能收到此消息
点击atorg得到发送的消息
2. exchange.fanout(无论路由键是什么会发送给每一个消息队列)在exchange.fanout发送消息
每个消息队列都收到消息
3. exchange.topic(根据规则匹配_)在exchange.topic发送消息
查看消息队列发现每个消息队列的消息都增加了
4.整合
配置文件
spring.rabbitmq.addresses=192.168.43.186 #虚拟机地址
spring.rabbitmq.username=guest #rabbitmqweb页面账号和密码
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
在测试类进行发送数据测试
在exchange.direct发送消息我们知道direct是完全匹配路由键故只有atorg.news收到消息
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 1.单播(点对点)
*/
@Test
void contextLoads() {
//Message需要自己构造一个;定义消息内容和消息头
//rabbitTemplate.send(exchange, routeKey, message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq
//rabbitTemplate.convertAndSend(exchange, routeKey, object);
Map<String,Object> map =new HashMap<>();
map.put("msg", "这是第一个消息");
map.put("data", Arrays.asList("helloworld",123,true));
//对象被默认序列化以后发送出去(默认是以jdk序列化规则)
rabbitTemplate.convertAndSend("exchange.direct","atorg.news",map);
}
在web端查看消息队列接收消息(只有atorg.news收到消息)
我在次处理了序列化问题
package com.atorg.amqp.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
管理页面得到发送的数据
在测试类进行接受数据测试
//接受数据,如何将数据自动转化为json发送出去
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("atorg.news");//此处写路由键名
System.out.println(o.getClass());
System.out.println(o);
}
控制台接收到数据
我们收到消息之后消息队列就没有此条消息了
我们可以利用AmqpAdmin创建、删除Queue,Exchange Binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void creatExchange(){
amqpAdmin.declareExchange(newDirectExchange("amqbadmin.exchange"));//创建交换器
System.out.println("创建完成");
amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//创建消息队列
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqbadmin.exchange", "amqp.haha", null));//创建绑定规则
}
查看web管理页面的交换器
查看web管理页面的消息队列
查看web管理页面的绑定规则
有问题希望大家能多多斧正