SpringBoot与消息

一、概述

1、在大多数应用中,我们系统之间需要进行异步通信,即异步消息异步消息回调机制
2、异步消息中两个重要概念:
消息代理(message broker)和目的地(destination)
消息代理: 是一种在数据源与目的地之间移动数据使信息处理流畅的软件技术,也是一种架构模式,用于消息验证、变换、路由。
目的地: 消息接受者。
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3、异步消息主要有两种形式(point-to-point)

  • 1)、队列(queue):点对点消息通信

    • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
    • 消息只有唯一的发送者和接收者,但并不是说只能有一个接收者。
  • 2)、主题(topic):发布(publish)/订阅(subscribe)消息通信

    • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。

4、JMS(Java Message Service) java消息服务
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS的实现
5、AMQP(Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
  • RabbitMQ是AMQP的实现

二、SpringBoot整合RabbitMQ

RabbitMQ简介:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ是AMQP的一个开源实现_。
_核心概念:

Producer&Consumer

  • producer指的是消息的生产者,consumer消息的消费者。

Broker

  • 提供一种传输服务,他的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

Queue

  • 消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
  • 设置为持久化队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失。
  • 设置为临时队列,queue中的数据在系统重启之后就会丢失。
  • 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除。

Exchange

  • 消息交换机,指定消息按什么规则,路由到那个队列。
  • Exchange有4中类型:direct(默认),fanout,topic和headers,不同类型的Exchange转发消息的策略有所区别:

Binding

  • 将一个特定的Exchange和一个特定的Queue绑定起来。
  • Exchange和Queue的绑定可以是多对多的关系。

Virtual host(vhosts)

  • 在 rabbitmq server上可以创建多个虚拟机的message broker,又叫做virtual hosts(vhosts)。
  • 每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange和bindings。
  • vhost相当于物理的server,可以为不同app提供边界隔离。
  • producer和consumer连接rabbit server需要指定一个vhost

三、RabbitMQ运行机制

RabbitMQ运行机制

四、SpringBoot整合RabbitMQ

1.引入spring-boot-starter-amqp
 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
</dependency>
2.在docker中下载rabbitmq的镜像

1.在docker hub上搜索rabbitmq镜像,尽量安装带有management标志的(附带有web管理界面)
在docker hub上搜索镜像
2.拉取rabbitmq镜像

docker pull rabbitmq:3-management

3.查看下载镜像

docker iamges                 //查看下载镜像

查看镜像
4.启动运行镜像

docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq a64a4ae7bc1f

-d:表示后台运行
-p:表示暴露端口
5672:是客户端和rabbitmq进行通信的端口
15672:管理界面访问web页面的端口
==我的已经运行了我就直接启动==

5.查看所有的容器(无论运行还是停止都有)

docker ps -a

查看所有容器
6.启动容器

docker start 37739ced79ed

7.查看运行的容器(只包含运行的容器)

docker ps

![查看正在运行的容器]https://img-blog.csdnimg.cn/20200222124719445.png)](https://img-blog.csdnimg.cn/2020022212480088.png)
输入192.168.43.186:15672(192.168.43.186虚拟机地址)访问rabbitmq的web管理页面 ** 默认密码和账号都是guest **访问rabbitmq的web管理页面
登陆界面

3.测试数据

1.创建三个交换器
创建三个交换器
2.添加四个消息队列
添加4个消息队列
我们创建的队列要是能工作我们就得与我们创建的交换器进行绑定(binding)
在次列举exchange.direct与消息队列的绑定其余同理可得
交换器与消息队列的绑定
测试:
1. exchange.direct(是完全匹配路由键的_)在exchange.direct交换器中发送消息
发送消息进行测试
所以只有atorg一个消息队列能收到此消息只有一个队列收到消息
点击atorg得到发送的消息得到我们发送的消息
2. exchange.fanout(
无论路由键是什么会发送给每一个消息队列)在exchange.fanout发送消息
发送消息
每个消息队列都收到消息
每个消息队列都收到消息
3. exchange.topic(
根据规则匹配_)在exchange.topic发送消息
topic交换器根据规则匹配
查看消息队列发现每个消息队列的消息都增加了
查看消息队列

4.整合

配置文件

spring.rabbitmq.addresses=192.168.43.186           #虚拟机地址
spring.rabbitmq.username=guest                     #rabbitmqweb页面账号和密码
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

在测试类进行发送数据测试
在exchange.direct发送消息我们知道direct是完全匹配路由键故只有atorg.news收到消息

@Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 1.单播(点对点)
     */
@Test
    void contextLoads() {
        //Message需要自己构造一个;定义消息内容和消息头
        //rabbitTemplate.send(exchange, routeKey, message);

        //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq
        //rabbitTemplate.convertAndSend(exchange, routeKey, object);

        Map<String,Object> map =new HashMap<>();
        map.put("msg", "这是第一个消息");
        map.put("data", Arrays.asList("helloworld",123,true));
        //对象被默认序列化以后发送出去(默认是以jdk序列化规则)
        rabbitTemplate.convertAndSend("exchange.direct","atorg.news",map);
    }

在web端查看消息队列接收消息(只有atorg.news收到消息)
我在次处理了序列化问题

package com.atorg.amqp.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

管理页面得到发送的数据只有atorg.news收到消息
在测试类进行接受数据测试

//接受数据,如何将数据自动转化为json发送出去
    @Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("atorg.news");//此处写路由键名
        System.out.println(o.getClass());
        System.out.println(o);
    }

控制台接收到数据
控制台接受数据
我们收到消息之后消息队列就没有此条消息了rabbitmqweb管理页面查看消息
我们可以利用AmqpAdmin创建、删除Queue,Exchange Binding

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void creatExchange(){
   amqpAdmin.declareExchange(newDirectExchange("amqbadmin.exchange"));//创建交换器
   System.out.println("创建完成");

   amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//创建消息队列
   amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqbadmin.exchange", "amqp.haha", null));//创建绑定规则
    }

查看web管理页面的交换器
创建的交换器
查看web管理页面的消息队列
创建的消息队列
查看web管理页面的绑定规则
创建的绑定规则
有问题希望大家能多多斧正


Author: Lelege
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Lelege !
评论
 Previous
MySQL(完) MySQL(完)
14.流程控制的使用(实例)==存储过程和函数中可以使用流程控制来控制语句的执行。MySQL 中可以使用 IF 语句、CASE 语句、LOOP语句、LEAVE 语句、ITERATE 语句、REPEAT 语句和 WHILE 语句来进行流程控制
2020-02-24
Next 
SpringBoot与缓存 SpringBoot与缓存
一、SpringBoot与缓存1、基础概念缓存:缓存是指可以进行高速数据交换的存储器,它先于内存与CPU交换数据,因此速率很快。缓存作用:缓存的工作原理是当CPU要读取一个数据时,首先从CPU缓存中查找,找到就立即读取并送给CPU处理;没有
  TOC