RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

news/2024/7/8 7:05:41 标签: java-rabbitmq, rabbitmq, java

文章目录

  • RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)
    • 写法一、配置类配置方式
    • 写法二、注解方式(@RabbitListener)

RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

发布者/订阅者 模型如下:
在这里插入图片描述

他与前面两个小案例最大的区别就是,他的消息不是阅完即焚的。他允许将同一条消息发送给多个消费者。而实现此操作的原因是加入了我们的交换机(exchange)。

在发布者和订阅者的模型中,各个组件的功能如下

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Queue:消息队列也与以前一样,接收消息、缓存消息。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

注意:交换机他只负责消息的转发,并不存储消息,如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!!

OK,这么解释肯定是不够的,下面我们就来说一下第一种交换机类型——Fanout(广播)在Java中的具体使用方式

写法一、配置类配置方式

步骤一、在消费者服务中,利用代码声明队列、交换机,并将两者进行绑定。
SpringAMQP提供的**交换机(Exchange)、队列(Queue)、绑定(Binding)**的API如下:
在这里插入图片描述
要将我们的队列绑定到交换机,我们需要编写我们的配置类如下:

java">package com.demo.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     *  声明FanoutExchange(广播交换机)
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        //交换机的名称
        return new FanoutExchange("exchange.fanout");
    }

    /**
     *  声明第一个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     *  声明第二个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     *  绑定 队列1 到 交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     *  绑定 队列2 到 交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

写完配置类,我们重启一下我们的消费者服务类,然后我们到RabbitMQ上看一下我们的交换机和队列。

可以看到,确实多了一个交换机叫 exchange.fanout。
在这里插入图片描述

我们再看一下队列,可以看到,我们两个队列也都注册成功了。
在这里插入图片描述
点击我们刚才新增的交换机,打开它的Bindings,可以看到这个交换机他告诉我们,他的消息是会转发到 fanout.queue1 和 fanout.queue2中:
在这里插入图片描述

ok,我们接着往下写:

**步骤二、在消费者服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2。 **.

监听的方法,现在应该已经写得滚瓜烂熟了吧,这里就直接贴代码了。
1、编写的类记得加 @Component 将这个监听的类注册到 Spring容器中。
2、监听哪个queue,那么就写对应的方法,并在方法上方添加@RabbitListener注解,用queues属性标明要监听的queue即可。(如果有多个,那么用 @RabbitListener(queues = {“queueName1”, “queueName2”})表示即可。

java">@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
    }
}

步骤三、在发布者服务中,编写测试方法,向交换机 exchange.fanout 发送消息。

java">    @Test
    public void testFanoutExchange(){
        //交换机名称
        String exchangeName = "exchange.fanout";
        //消息
        String msg = "Hello,av8d!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", msg);
    }

这里的rabbitTemplate.convertAndSend接受三个参数,分别是

java">public void convertAndSend(String exchange,
                           String routingKey,
                           Object object)
  1. exchange:交换机的名称
  2. routeKey:routeKey值(还不需要用到,先不管他,给个"")
  3. object:发送的消息

写完测试方法,我们跑一下我们的测试方法,然后看一下我们消费者的控制台如下:
在这里插入图片描述
可以看到,只发布了一条消息,但是通过交换机发布给两个Queue后,我们消费者的两个方法都监听到了我们同一条消息。

写法二、注解方式(@RabbitListener)

如果以前尝试了上面的写法,记得把配置类的 @Configuration 注释掉

java">//@Configuration
public class FanoutConfig {
...
}

然后把刚才写的两个方法注释掉。

java">/**
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
    }
**/

接下来,我们就开始写我们使用注解声明队列的方法。

步骤一、配置我们的RabbitMQ。(只要使用RabbitMQ,都必须要配置)

spring:
  rabbitmq:
    host: 192.168.83.134
    port: 5672
    virtual-host: /
    username: admin
    password: root
    listener:
      simple:
        prefetch: 1

步骤二、直接写我们的监听方法。(使用@RabbitListener注解写我们的路由方式、路由名称以及我们的队列名即可)

java">@Component
public class SpringRabbitListener {
	@RabbitListener(bindings = @QueueBinding(
	        value = @Queue(name = "fanout.queue1"),
	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
	))
	public void listenFanoutQueue1(String msg){
	    System.out.println("监听到 fanout.queue1 的消息为:【" + msg+"】");
	}
	
	@RabbitListener(bindings = @QueueBinding(
	        value = @Queue(name = "fanout.queue2"),
	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
	))
	public void listenFanoutQueue2(String msg){
	    System.out.println("监听到 fanout.queue2 的消息为:【" + msg+"】");
	}
}

http://www.niftyadmin.cn/n/1733236.html

相关文章

linux中实现jboss自启动

在该目录/etc/rc.d/init.d/中 创建文件:jboss 内容如下 # cat /etc/rc.d/init.d/jboss #!/bin/bash # chkconfig: 2345 99 01 # description: JBoss 3.2.3 case "$1" in start) su - root -c /jboss/jboss/bin/run.sh & ;; stop) …

重启虚拟机启动Docker常见问题

文章目录重启虚拟机启动Docker常见问题一、Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?二、admin is not in the sudoers file. This incident will be reported.(没有这个问题请自觉跳过)三、…

ESP8266接温度传感器18B20时序校准

陈拓 chentuoms.xab.ac.cn 2020/06/22-2020/06/25 1. 概述 在《ESP8266_SDK发送温度数据到阿里云》 https://blog.csdn.net/chentuo2000/article/details/105592791 https://zhuanlan.zhihu.com/p/132582024 一文中我们将温度传感器DS18B20的温度数据传送到了阿里云物联网平…

RabbitMQ第四个实操小案例——DirectExchange

文章目录RabbitMQ第四个实操小案例——DirectExchangeRabbitMQ第四个实操小案例——DirectExchange DirectExchange:这种交换机的模式跟前面的Fouout(广播)不太一样,DirectExchange 会将接收到的消息根据规则路由到指定的Queue&a…

javascript面向对象技术基础(六)

http://sdcyst.javaeye.com/blog/296492作用域、闭包、模拟私有属性 先来简单说一下变量作用域,这些东西我们都很熟悉了,所以也不详细介绍。 Js代码 var sco "global"; //全局变量 function t() { var sco "local"; //函…

javascript面向对象技术基础(四)

http://sdcyst.javaeye.com/blog/288808类、构造函数、原型 先来说明一点:在上面的内容中提到,每一个函数都包含了一个prototype属性,这个属性指向了一个prototype对象(Every function has a prototype property that refers to a predefined prototype object --section8.6.…

树莓派安装Web服务器Boa和CGIC

树莓派安装Web服务器Boa和CGIC 陈拓 2020/08/01-2020/08/09 1. 树莓派换源 为了加快所需软件的下载,我们需要先换源。 首先查看系统版本:lsb_release -a修改软件更新源 /etc/apt/sources.list sudo nano /etc/apt/sources.list 在下面的语句前面加#注…

树莓派I2C通过Shell操作FDC2214

陈拓 chentuoms.xab.ac.cn 2020/07/21-2020/07/29 FDC2214是Ti公司的一款低功耗高精度的电容传感器芯片。本文讲述用树莓派Linux Shell配置和操作FDC2214,可以快速熟悉并进行原型开发。 1. 树莓派换源 为了加快所需软件的下载,我们需要先换源。 首先查…