微服务技术 RabbitMQ SpringAMQP P61-P76

news/2024/6/15 20:16:51 标签: 微服务, rabbitmq, 架构

B站学习视频icon-default.png?t=N7T8https://www.bilibili.com/video/BV1LQ4y127n4?p=61&vd_source=8665d6da33d4e2277ca40f03210fe53a

文档资料:

链接:https://pan.baidu.com/s/1P_Ag1BYiPaF52EI19A0YRw?pwd=d03r 
提取码:d03r

一 初始MQ

1. 同步通讯

2. 异步通讯

3. MQ常见架构

二 RabbitMQ 快速入门

1. RabbitMQ概述和安装 --单机部署

我们在Centos7虚拟机中使用Docker来安装

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

1.2. RabbitMQ概述和安装 --集群部署

2.常见消息模型

3.快速入门 --Basic Queue 简单队列模型

三 SpringAMQP

1. Basic Queue 简单队列模型

  • 结构

1.1 消息发送 

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

1.2 消息接收

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2. SpringAMQP入门案例

2.1 介绍

2.2 案例 -- 发送消息

spring:
  rabbitmq:
    host: 127.0.0.1 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 123
    virtual-host: /
@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

2.3 案例 --接收消息

spring:
  rabbitmq:
    host: 127.0.0.1 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 123
    virtual-host: /
    listener:
      simple:
        prefetch: 1
@Component  //声明成 bean
public class SpringRabbitListener {

    // @RabbitListener(queues = "simple.queue")
    // public void listenSimpleQueue(String msg) {
    //     System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
    // }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
}

3. Work Queue 工作队列模型

3.1 介绍

  • 接收信息
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;
import java.util.Map;

@Component  //声明成 bean
public class SpringRabbitListener {

   /*  @RabbitListener(queues = "simple.queue")
     public void listenSimpleQueue(String msg) {
         System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
     }
*/
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
}
  • 发送信息

@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, message__";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
}
  • application.yml

    logging:
      pattern:
        dateformat: MM-dd HH:mm:ss:SSS
    spring:
      rabbitmq:
        host: 127.0.0.1 # rabbitMQ的ip地址
        port: 5672 # 端口
        username: admin
        password: 123
        virtual-host: / #虚拟主机
        listener:
          simple:
            prefetch: 1 # 预取 每次只取一条消息,处理完才能获取下一条消息

4. 发布模型介绍

5. 发布、订阅模型-Fanout

5.1 利用SpringAMQP演示FanoutExchange的使用 ( 发布、订阅模型-Direct)

5.2 步骤一 在consumer服务生命Exchange、QUEUE、Binding

6. 发布、订阅模型-Direct

6.1 案例

7. 发布、订阅模型-Topic

8. 消息转换器

推荐使用json


http://www.niftyadmin.cn/n/5263128.html

相关文章

YashanDB 携智慧政务方案亮相数字政府建设与数字湾区发展成果博览会

由广东省人民政府主办的第二届数字政府建设峰会暨数字湾区发展成果博览会于 12月8日-10日在广州举办。作为数字政府、智慧城市建设的核心支撑力量&#xff0c;深算院携单机/主备、共享集群、空间数据库等 YashanDB系列产品亮相本次博览会&#xff0c;展示最新的研发成果、场景应…

JupyterHub 如何切换 conda 小环境

JupyterHub 如何切换 conda 小环境 服务器已经部署好 JupyterHub &#xff0c;相关端口请看对应答疑群群公告。在Jupyterhub 中使用 conda 创建的小环境&#xff0c;首先 ssh 登录上服务器或者在 JupyterHub 网页端打开终端 terminal。然后安装 conda &#xff0c;方法请见 Q4&…

用队列实现栈,力扣

题目地址&#xff1a; 225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 难度&#xff1a;简单 今天刷用队列实现栈&#xff0c;大家有兴趣可以点上看看题目要求&#xff0c;试着做一下。 题目&#xff1a; 我们直接看题解吧&#xff1a; 解题方法&#xff1a; 方法…

ArcGIS导入excel中的经纬度信息,绘制矢量

1.首先整理坐标信息 2.其次转成2003格式的excel文件 3.导入arcgis&#xff0c;点击右键添加excel数据 4.显示xy数据 5.显示经度和纬度信息 6&#xff1a;点击【地理坐标系】->【World】->【WGS 1984】->【确定】 7.投影带的确定方式&#xff1a; 因为自己一直…

Java中使用EasyExcel写excel文件

1、公式 package com.web.report.handler;import com.alibaba.excel.context.WriteContext; import com.alibaba.excel.metadata.csv.CsvCellStyle; import com.alibaba.excel.metadata.data.WriteCellData; import com.alibaba.excel.write.handler.CellWriteHandler; import…

微信小程序uniapp记住密码

记住密码功能 在请求登录接口成功后&#xff0c;我们需要判断用户是否勾选记住密码&#xff0c;如果是&#xff0c;则将记住密码状态、账号信息存入本地。 下次登录时&#xff0c;获取本地的记住密码状态&#xff0c;如果为true则获取本地存储的账号信息&#xff0c;将信息回填…

【Qt】点击QTreeWidget空白处,使当前选择的Item失效

原因 有时在开发中&#xff0c;可能会对QTreeWidget进行操作&#xff0c;当点击feiQTreeWidgetItem时&#xff0c;需要焦点取消&#xff0c;无Item选中。 解决方案 可以通过设置事件过滤器进行实现。 1.QtreeWidget安装事件过滤器 ui->treeWidget->viewport()->i…

2023年OceanBase开发者大会-核心PPT资料下载

一、峰会简介 2023年OceanBase开发者大会主要涵盖了OceanBase的最新技术进展、产品更新以及开发者工具的发布。大会发布了OceanBase 4.1版本&#xff0c;公布了两大友好工具&#xff0c;升级了文档的易用性&#xff0c;并统一了企业版和社区版的代码分支。这些举措全面呈现了O…