博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitMQ 在spring boot的使用
阅读量:6419 次
发布时间:2019-06-23

本文共 15961 字,大约阅读时间需要 53 分钟。

hot3.png

RabbitMQ介绍

    RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种。

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

 

相关概念

    通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange).。

    这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

195500c393baf86d9418c13182ce81fe396.jpg

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
  • 中间即是 RabbitMQ,其中包括了 交换器 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

    其中比较重要的概念有 4 个,分别为:虚拟主机,交换器,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换器、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换器/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换器:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    这里有一个比较重要的概念:路由键 。消息到交换器的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换器需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换器(Exchange)

    交换器的功能主要是接收消息并且转发到绑定的队列,交换器不存储消息,在启用ack模式后,交换器找不到队列会返回错误。

    交换器有四种类型:Direct、topic、Headers、Fanout

  • Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置header attribute参数类型的交换器
  • Fanout:转发消息到所有绑定队列

 

    Direct Exchange是RabbitMQ默认的交换器模式,也是最简单的模式,根据key全文匹配去寻找队列。

0680bffa128223ce29f0cd13c64c5d3bfd8.jpg

    

    当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

    X - Q1 就一个 binding key,名字为 orange;

    X - Q2 有 2 个 binding key,名字为 black 和 green。

 

    Topic Exchange 转发消息主要是根据通配符。 

    在这种交换器下,队列和交换器的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换器才能转发消息。

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

    具体代码发送的时候还是一样,第一个参数表示交换器,第二个参数表示routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

    topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

 

    headers Exchange 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.

    在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

 

    Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列。即使 routing_key配置了,也会被忽略。

 

springboot集成RabbitMQ

 1.添加依赖:

org.springframework.boot
spring-boot-starter-amqp

2、配置文件

    配置rabbitmq的安装地址、端口以及账户信息:

spring.rabbitmq.host=192.168.0.86spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456

3.声明队列和交换器,并各自绑定。

package com;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;@SpringBootApplicationpublic class RabbitmqDemoApplication {    /***************************************队列***********************************************/    @Bean    public Queue helloQueue() {        return new Queue("helloQueue");    }    @Bean    public Queue topicMessage() {        return new Queue("topicMessage");    }    @Bean    public Queue topicMessages() {        return new Queue("topicMessages");    }    @Bean    public Queue fanoutA() {        return new Queue("fanoutA");    }    @Bean    public Queue fanoutB() {        return new Queue("fanoutB");    }    @Bean    public Queue fanoutC() {        return new Queue("fanoutC");    }    /***************************************两种exchange***********************************************/    @Bean    TopicExchange topicExchange() {        return new TopicExchange("topicExchange");    }    @Bean    FanoutExchange fanoutExchange() {        return new FanoutExchange("fanoutExchange");    }    /***************************************将队列和exchange绑定***********************************************/    /**     * 将队列topicMessage与topicExchange绑定,     * 只有路由键 为topic.Message才能匹配,     * 将消息转发到绑定的Queue     * @param topicMessage     * @param topicExchange     * @return     */    @Bean    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");    }    /**     * 将队列topicMessages与topicExchange绑定,     * 以topic开头的路由键 均会模糊匹配,     * 将消息转发到绑定的Queue     * @param topicMessages     * @param topicExchange     * @return     */    @Bean    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");    }    /**     * 将队列fanoutA与fanoutExchange绑定     *      * @param fanoutA     * @param fanoutExchange     * @return     */    @Bean    Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(fanoutA).to(fanoutExchange);    }    /**     * 将队列fanoutB与fanoutExchange绑定     *     * @param fanoutB     * @param fanoutExchange     * @return     */    @Bean    Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(fanoutB).to(fanoutExchange);    }    /**     * 将队列fanoutC与fanoutExchange绑定     *     * @param fanoutC     * @param fanoutExchange     * @return     */    @Bean    Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(fanoutC).to(fanoutExchange);    }    public static void main(String[] args) {        SpringApplication.run(RabbitmqDemoApplication.class, args);    }}

4.实现发送消息类:

    实体类(测试对象的发送和接收):

package com.demo.model;import java.io.Serializable;/** * @Description:实体类 */public class User implements Serializable {    private String userName;    private String password;    private String sex;    private String level;    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }    public String getPassword() {        return password;    }    public void setPassword(String password) {        this.password = password;    }    public String getSex() {        return sex;    }    public void setSex(String sex) {        this.sex = sex;    }    public String getLevel() {        return level;    }    public void setLevel(String level) {        this.level = level;    }}

   生产者1:

package com.demo.sender;import com.demo.model.User;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;/** * @Description: * 生产者1 */@Componentpublic class Sender1 {    @Autowired    private AmqpTemplate rabbitTemplate;    /**     * 测试发送普通消息,全文检索名称为 helloQueue 的队列     */    public void send() {        String sendMsg = "hello1 " + new Date();        System.out.println("Sender1:" + sendMsg);        rabbitTemplate.convertAndSend("helloQueue", sendMsg);    }    /**     * 测试发送实体类,全文检索名称为 helloQueue 的队列     * @param user     */    public void sendUser(User user){        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());        rabbitTemplate.convertAndSend("helloQueue", user);    }    /**     * 测试发送普通文本消息,     * 在topicExchange 交换器上,路由键:topic.Message、topic.Messages将依次寻找与绑定键 匹配的队列     */    public void testTopPicMessage() {        String msg = "sendTopPicMessage";        System.out.println("sendTopPicMessage1:" + msg);        //第一个参数:指定了exchange        //第二个参数:指定了接受消息的栏目名        //第三个参数:消息内容        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合        msg = "sendTopPicMessages";        System.out.println("sendTopPicMessages1:" + msg);        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合    }    /**     * 测试发送普通文本消息到fanoutExchange 交换器绑定的队列上     */    public void testFanoutMessage(){        String sendMsg = "sendFanoutMessage";        System.out.println("fanout Sender1:" + sendMsg);        //第二个参数不会进行正则表达式的过滤        //但是必须要填,才能根据exchange找到相关Queue        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);    }}

    生产者2:

package com.demo.sender;import com.demo.model.User;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;/** * @Description: 生产者2 */@Componentpublic class Sender2 {    @Autowired    private AmqpTemplate rabbitTemplate;    /**     * 测试发送普通消息,全文检索名称为 helloQueue 的队列     */    public void send() {        String sendMsg = "hello2 " + new Date();        System.out.println("Sender2:" + sendMsg);        rabbitTemplate.convertAndSend("helloQueue", sendMsg);    }    /**     * 测试发送实体类,全文检索名称为 helloQueue 的队列     *     * @param user     */    public void sendUser(User user) {        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());        rabbitTemplate.convertAndSend("helloQueue", user);    }    /**     * 测试发送普通文本消息,     * 在topicExchange 交换器上,路由键:topic.Message、topic.Messages将依次寻找与绑定键 匹配的队列     */    public void testTopPicMessage() {        String msg = "sendTopPicMessage";        System.out.println("sendTopPicMessage2:" + msg);        //第一个参数:指定了exchange        //第二个参数:指定了接受消息的栏目名        //第三个参数:消息内容        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合        msg = "sendTopPicMessages";        System.out.println("sendTopPicMessages2:" + msg);        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合    }    /**     * 测试发送普通文本消息到fanoutExchange 交换器绑定的队列上     */    public void testFanoutMessage() {        String sendMsg = "sendFanoutMessage";        System.out.println("fanout Sender2:" + sendMsg);        //第二个参数不会进行正则表达式的过滤        //但是必须要填,才能根据exchange找到相关Queue        rabbitTemplate.convertAndSend("fanoutExchange", "", sendMsg);    }}

5.实现消费者:

    Direct Exchange 消费者1:

package com.demo.receiver;import com.demo.model.User;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * helloQueue消费者1 */@Component@RabbitListener(queues = "helloQueue")public class HelloReceiver1 {    @RabbitHandler    public void process(String hello) {        System.out.println("Receiver1:" + hello);    }    @RabbitHandler    public void processUser(User user) {        System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword());    }}

    Direct Exchange 消费者2:

package com.demo.receiver;import com.demo.model.User;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * helloQueue消费者2 */@Component@RabbitListener(queues = "helloQueue")public class HelloReceiver2 {    @RabbitHandler    public void process(String mesg) {        System.out.println("Receiver2:" + mesg);    }    @RabbitHandler    public void processUser(User user) {        System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword());    }}

    Topic Exchange 消费者1:

package com.demo.receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * topicMessage消费者 */@Component//监听topicMessage队列@RabbitListener(queues = "topicMessage")public class TopMessageReceiver {    @RabbitHandler    public void process(String msg) {        System.out.println("topMessageReceiver:" +msg);    }}

    Topic Exchange 消费者2:

package com.demo.receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * topicMessages消费者 */@Component//监听topicMessages队列@RabbitListener(queues = "topicMessages")public class TopMessagesReceiver {    @RabbitHandler    public void process(String msg) {        System.out.println("topMessagesReceiver:" +msg);    }}

    Fanout Exchange 消费者1:

package com.demo.receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * fanoutA消费者 */@Component//监听fanoutA队列@RabbitListener(queues = "fanoutA")public class FanoutReceiverA {    @RabbitHandler    public void process(String msg) {        System.out.println("FanoutReceiverA:" + msg);    }}

    Fanout Exchange 消费者2:

package com.demo.receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * fanoutB消费者 */@Component//监听fanoutB队列@RabbitListener(queues = "fanoutB")public class FanoutReceiverB {    @RabbitHandler    public void process(String msg) {        System.out.println("FanoutReceiverB:" + msg);    }}

    Fanout Exchange 消费者3:

package com.demo.receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @Description: * fanoutC消费者 */@Component//监听fanoutC队列@RabbitListener(queues = "fanoutC")public class FanoutReceiverC {    @RabbitHandler    public void process(String msg) {        System.out.println("FanoutReceiverC:" + msg);    }}

6.测试类:

package com.demo.controller;import com.demo.model.User;import com.demo.sender.Sender1;import com.demo.sender.Sender2;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Description: 测试类 */@RestControllerpublic class RabbitController {    @Autowired    private Sender1 helloSender1;    @Autowired    private Sender2 helloSender2;    /**     * 发送普通文本消息     *     * @return     */    @RequestMapping("/hello")    public String hello() {        //两个生产者        helloSender1.send();        helloSender2.send();        return "ok";    }    /**     * 发送Object     *     * @return     */    @RequestMapping("/user")    public String user() {        User user = new User();        user.setUserName("a");        user.setPassword("1");        user.setSex("m");        user.setLevel("1");        //两个生产者        helloSender1.sendUser(user);        helloSender2.sendUser(user);        return "ok";    }    /**     * 发送普通文本消息     *     * @return     */    @RequestMapping("/topMessage")    public String topMessage() {        //两个生产者        helloSender1.testTopPicMessage();        helloSender2.testTopPicMessage();        return "ok";    }    /**     * 发送普通文本消息     *     * @return     */    @RequestMapping("/fanoutMessage")    public String fanoutMessage() {        //两个生产者        helloSender1.testFanoutMessage();        helloSender2.testFanoutMessage();        return "ok";    }}

测试结果:

  • /hello的结果为:两个消费者分摊两个生产者的消息。因为使用的是默认的交换器,一条消息只能被一个队列接收到,而这两个生产者都监听了这一个队列,因此两个消费者分摊。
  • /user的结果为:和 /hello 一样。证明了 此处支持对象的发送和接收,只是实体类 必须实现序列化接口。
  • /topMessage的结果为:

        routing_key是“topic.message” 时,exchange绑定的binding_key:“topic.message”,topic.#都符合路由规则;

        所以发送的消息,两个队列都能接收到,而两个消费者各自监听了一个队列,因此消息不会被分摊;

        routing_key是“topic.messages”,exchange绑定的binding_key 只有 “topic.#”符合路由规则;

        发送的消息,只有队列topic.messages能收到,只有一个消费者监听了这个队列,因此消息不会被分摊。

  • /fanoutMessage的结果为:就算fanoutSender发送消息的时候,指定了routing_key,但所有绑定的队列都能接受到了消息,而这三个消费者都各自监听了一个队列,因此消息不会被分摊。

 

总结:

    一条消息只能被监听该队列的某一个消费者消费。

    默认的交换器:队列名称必须和routing_key 一致。

    topic 交换器:绑定的队列中,只要消息的routing_key满足队列的binding_key的路由规则,该队列即可接收到消息。

    fanout 交换器:绑定的队列均能收到消息,不会进行路由匹配这一过程。

e51ea5647b086104a6dae23c7bd9495c4f9.jpg

转载于:https://my.oschina.net/langwanghuangshifu/blog/3004747

你可能感兴趣的文章
《构建之法》阅读笔记及项目管理软件
查看>>
VC中动态分配的几种写法
查看>>
excel数据比对,查找差异
查看>>
python3 简单登录,注册测试代码
查看>>
MAC VMWare Fusion配置mac和win7虚拟机互相访问
查看>>
awk正则匹配和awk命令统计某程序的CPU总的利用率
查看>>
CentOS7安装grafana
查看>>
bzoj 1823: [JSOI2010]满汉全席
查看>>
实战部署openldap主从架构
查看>>
我的技术路还很长
查看>>
第二阶段团队进展报告(4)
查看>>
Nginx:强劲的Web引擎
查看>>
vector容器与find算法
查看>>
Socket粘包问题
查看>>
python 点滴记录15:MAC OS安装MySQLdb
查看>>
JAVA教程 第五讲 AWT图形用户界面设计
查看>>
03 Managing an Oracle Instance
查看>>
PART THREE VB选择语句
查看>>
【原创】ZeroClipboard的时代或许已经过去了
查看>>
初探Windows Server 2012存储
查看>>