从零手搓一个【消息队列】项目设计、需求分析、模块划分、目录结构

news/2024/6/16 12:58:40 标签: 消息队列, RabbitMQ, 中间件, 生产者消费者

文章目录

  • 一、需求分析
    • 1, 项目简介
    • 2, BrokerServer 核心概念
    • 3, BrokerServer 提供的核心 API
    • 4, 交换机类型
    • 5, 持久化存储
    • 6, 网络通信
    • 7, TCP 连接的复用
    • 8, 需求分析小结
  • 二、模块划分
  • 三、目录结构


提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

一、需求分析

1, 项目简介

之前我的 这篇文章 中介绍过 JUC 中的阻塞队列 BlockingQueue , 介绍过"生产者消费者模型", 在实际开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求

因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能, 这样的程序我们就称为 消息队列 (Message Queue, MQ)

目前市场上比较成熟的消息队列有: kafka, RabbitMQ, RocketMQ…, 本项目中部分参考 RabbitMQ 和 “AMQP 协议” 进行设计

项目描述 : 可跨主机的生产者消费者服务器程序,用于服务器之间的解耦,流量削峰;使用RPC模式,由客户端发送网络请求,远程调用服务器以操作交换机、队列、发布消息、订阅消息等;由服务器实现相关逻辑、持久化存储、异步转发消息等

所以这个项目中有一些比较核心的概念和模块 :

  • 生产者(Producer) 负责往消息队列中投入消息
  • 消费者(Consumer) 负责从消息队列中消费消息
  • 中间人(Broker) 负责消息的存储和转发
  • 发布(Publish) 投入消息的过程
  • 订阅(Subscribe) 消费者注册一个队列的过程(我想要这个队列里的消息)

消费者消费数据由两种模式 :
1, 消费者从队列里取, 这种模式称为"拉"
2, 队列里有消息时, Broker (中间人服务器) 给消费者推送消息, 这种模式称为"推"
我们的项目中实现 “推” 这种模式

Producer, Consumer, Broker 都是服务器程序, 需要编写代码支持这三者之间的业务逻辑

但 Producer, Consumer 又是广义上的消费者, Producer, Consumer 是发起请求的一方, 所以是客户端, Broker 是返回响应的一方, 所以是服务器
在这里插入图片描述
在这里插入图片描述


2, BrokerServer 核心概念

  • 虚拟机 (VirtualHost): 是⼀个逻辑上的集合, ⼀个 BrokerServer 上可以存在多个 VirtualHost , 如果一个 BrokerServer 有多个业务线, 就可以用不同的 VirtualHost 在分别组织不同类别的数据(我们的项目中暂时只设定一个VirtualHost)
  • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue
  • 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上取消息
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系.
  • 消息 (Message): 传递的内容.

用数据库做对比:
VirtualHost 相当于 database, Exchange 和 Queue 都是 table, Exchange 和 Queue 可理解成 “多对多” 关系. 所以 Binding 就是一个中间表, 可以把这两张表联系起来

在这里插入图片描述

AMQP 协议就是按照上述格式组织


3, BrokerServer 提供的核心 API

Producer, Consumer 是发起请求的一方, 所以是客户端, Broker 是返回响应的一方, 所以是服务器
因此 BrokerServer 需要给 Producer, Consumer 提供一些核心 API (对外暴露的), 这些 API 用来实现整个消息队列支持的基本功能

  • 1, 创建队列 queueDeclare (Declare 表示不存在则创建, 存在则不创建)
  • 2, 销毁队列 queueDelete
  • 3, 创建交换机 exchangeDeclare
  • 4, 销毁交换机 exchageDelete
  • 5, 创建绑定 queueBind
  • 6, 解除绑定 queueUnbind
  • 7, 发布消息 basicPublish
  • 8, 订阅消息 basicSubscribe (只是告知服务器有个消费者需要这个队列的数据)
  • 9, 确认消息 basicAck (让消费者显式的告诉服务器, 收到并处理了消息

我们的项目给消费者在订阅消息时, 提供 “自动应答” 和 “确认应答” 两种方式

还有 “拒绝应答” 这种方式, 本项目暂未实现

没有 “消费消息” 这个 API , 上文已经说明, 我们的项目中不支持消费者主动从队列中取消息, 而是由服务器推送给消费者**(这个过程有些复杂, 后续会详细介绍)**

在这里插入图片描述


4, 交换机类型

  • 直接交换机 Direct
  • 扇出交换机 Fanout
  • 主题交换机 Topic
  • 消息头交换机 Header

咱们的项目不实现第四种, 实现方式复杂且比较少见

生产者在发布消息时, 有三个参数: 1, 消息 2, 交换机 3, routingKey (下面介绍), 不同的交换机有不同的转发规则

主题交换机: 不需要和队列建立绑定, 生产者发布消息时, routingKey 作为队列名(唯一标识), 如果队列存在, 交换机直接把消息投入到该队列中

扇出交换机: 需要和队列建立绑定, 生产者发布消息时什么都不用做 routingKey 为 null, 交换机会把消息给已绑定的队列都投送一份

主题交换机: 需要和队列建立绑定, 并且绑定的时候指定一个交换机和队列之间的 “暗号” bindingKey, 生产者发布消息的时候指定一个 “暗号” routingKey, 交换机会把消息投送给已绑定的队列中, 暗号能够对上(规则匹配)的那个队列

Binding 表中就记录了 交换机和队列的绑定关系, 以及 bindingKey
目前不需要全部看懂 Binding 表, 大概熟悉这三种绑定规则即可

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


5, 持久化存储

上述提到的交换机, 队列, 绑定, 消息 需要在硬盘和内存上各存储一份, 内存为主, 硬盘为辅

因为内存的访问速度快, 而这对消息队列这个项目, 效率更重要, 而硬盘存储是为了服务器重启之后数据不丢失

硬盘存储使用 数据库 + 文件的方式, 数据库中存储 交换机, 队列, 绑定, 文件中存储消息

因为数据库提供的 增, 删, 查 比较方便, 交换机, 队列, 绑定正需要这些操作, 而消息不需要大量, 复杂的增删查, 基本只涉及到存储, 所以使用数据库略显"多余", 而且一旦消息很多之后, 数据库的性能也有限


6, 网络通信

生产者消费者作为客户端程序, 和 BrokerServer 通过网络进行通信和交互

后续会自定义应用层协议, 传输层使用 TCP 的 Socket API

上述介绍过 BrokerServer 提供的一些核心 API , 在生产者消费者程序上也需要提供的相应的 API

以创建队列这个操作为例: 客户端调用 queueDeclare(), 客户端的这个方法只是实现了向服务器发送请求, 让服务器调用服务器的 queueDeclare() , 服务器的这个方法实现了真正创建交换机, 并持久化存储, 然后返回给客户端一个响应, 告知客户端执行成功了没有

在这里插入图片描述

由于客户端也有 queueDeclare() , 客户端看起来是在调用自己的 queueDeclare() 从而创建出来了队列, 实际上是通过网络请求, 调用了服务器的 queueDeclare() , 而服务器是怎么做的, 客户端并不知道细节

这种方式称作: 远程过程调用(RPC)


7, TCP 连接的复用

上面说到要在生产者消费者客户端提供 BrokerServer 的核心 API :

  • 1, 创建队列 queueDeclare (Declare 表示不存在则创建, 存在则不创建)
  • 2, 销毁队列 queueDelete
  • 3, 创建交换机 exchangeDeclare
  • 4, 销毁交换机 exchageDelete
  • 5, 创建绑定 queueBind
  • 6, 解除绑定 queueUnbind
  • 7, 发布消息 basicPublish
  • 8, 订阅消息 basicSubscribe (只是告知服务器有个消费者需要这个队列的数据)
  • 9, 确认消息 basicAck (让消费者显式的告诉服务器, 收到并处理了消息

由于一次 TCP 连接与断开 需要经过三次握手四次挥手, 一个客户端有可能短时间内持续使用 RPC 模式远程调用 BrokerServer 的 API, 这就需要多次 TCP 的连接和断开

所以引入 Channel (信道) 的概念, 建立 TCP 连接之后, 可以使用一个 Channel表示一次逻辑上的连接(一次请求和响应), 一个 Connection 中包含多个 Channel , 各个 Channel 中的数据互不相干, 实现一次 TCP 连接的复用

所以生产者客消费者(客户端)还需要提供四个 API

  • 10, 创建 Connection (建立 TCP 连接)
  • 11, 关闭 Connection (关闭 TCP 连接)
  • 12, 创建 Channel (建立逻辑上的连接)
  • 13, 关闭 Channel (关闭逻辑上的连接)

8, 需求分析小结

  • 1, 生产者客户端, 消费者客户端, BrokerServer 服务器这三个板块, 这三者都是服务器程序
  • 2, 生产者消费者 的代码主要围绕 “网络通信” 展开
  • 3, BrokerServer 的代码主要围绕 “核心概念” , "核心 API " , “数据管理” 展开

在这里插入图片描述


二、模块划分

在这里插入图片描述

三、目录结构

这里各位大概率是一眼看不懂的, 只需要结合注释大致了解各个包和类是用来做什么的即可

由于项目略微复杂, 先对整体结构有个大致认知, 从下篇开始会详细介绍代码编写
在这里插入图片描述


http://www.niftyadmin.cn/n/5059961.html

相关文章

[HD2006.X1] 打印图形(菱形换壳)——海淀区赛

题目描述 由键盘输入 N N N ,按一定的规律打印图形(见输出样例)。 输入格式 一个整数 N N N(其中 3 ≤ N ≤ 21 3≤N≤21 3≤N≤21 ), N N N 为奇数。 输出格式 如题中所描述的图形 样例 #1 样例…

1 论文笔记:Efficient Trajectory Similarity Computation with ContrastiveLearning

2022CIKM 1 intro 1.1 背景 轨迹相似度计算是轨迹分析任务(相似子轨迹搜索、轨迹预测和轨迹聚类)最基础的组件之一现有的关于轨迹相似度计算的研究主要可以分为两大类: 传统方法 DTW、EDR、EDwP等二次计算复杂度O(n^2)缺乏稳健性 会受到非…

【Spring Cloud】深入理解 Nacos 的统一配置管理,配置热更新,多环境配置共享,集群搭建

文章目录 前言:为什么要统一配置管理一、Nacos 的配置管理1.1 在 Nacos 中添加配置文件1.2 微服务获取配置1.2.1 没有 Nacos 配置的情况下1.2.2 有 Nacos 配置的情况下 1.3 本地配置文件的修改1.4 代码获取配置信息 二、配置文件的热更新2.1 修改配置文件2.2 设置配…

Python+Yolov8路面桥梁墙体裂缝识别

程序示例精选 PythonYolov8路面桥梁墙体裂缝识别 如需安装运行环境或远程调试,见文章底部个人QQ名片,由专业技术人员远程协助! 前言 这篇博客针对《PythonYolov8路面桥梁墙体裂缝识别》编写代码,代码整洁,规则&#…

2023-09-27 Cmake 编译 OpenCV+Contrib 源码通用设置

Cmake 编译 OpenCV 通用设置 特点: 包括 Contrib 模块关闭了 Example、Test、OpenCV_AppLinux、Windows 均只生成 OpenCV_World 需要注意: 每次把 Cmake 缓存清空,否则,Install 路径可能被设置为默认路径Windows 需要注意编译…

LeNet网络复现

文章目录 1. LeNet历史背景1.1 早期神经网络的挑战1.2 LeNet的诞生背景 2. LeNet详细结构2.1 总览2.2 卷积层与其特点2.3 子采样层(池化层)2.4 全连接层2.5 输出层及激活函数 3. LeNet实战复现3.1 模型搭建model.py3.2 训练模型train.py3.3 测试模型test…

【JavaEE】CAS(Compare And Swap)操作

文章目录 什么是 CASCAS 的应用如何使用 CAS 操作实现自旋锁CAS 的 ABA 问题CAS 相关面试题 什么是 CAS CAS(Compare and Swap)是一种原子操作,用于在无锁情况下保证数据一致性的问题。它包含三个操作数——内存位置、预期原值及更新值。在执…

基于SSM的实习管理系统

基于SSM的实习管理系统、前后端分离 开发语言:Java数据库:MySQL技术:SpringSpringMVCMyBatisVue工具:IDEA/Ecilpse、Navicat、Maven 系统展示 管理员界面 教师 学生 研究背景 基于SSM的实习管理系统是一个基于Spring、Spring…