golang整合rabbitmq-实现创建生产者绑定交换机-创建消费者消费完整代码

news/2024/6/15 20:16:38 标签: golang, rabbitmq, 开发语言

1,在生产者端初始化mq连接
在这里插入图片描述

package rabbitmq

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

var (
	conn *amqp.Config
)

func InitRabbitMq() {
	// 连接RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@你的mq服务器地址:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"my_exchange", // 交换机名称
		"direct",      // 交换机类型
		true,          // 是否持久化
		false,         // 是否自动删除
		false,         // 是否内部使用
		false,         // 是否等待确认
		nil,           // 其他属性
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 声明一个队列
	q, err := ch.QueueDeclare(
		"my_queue", // 队列名称
		true,       // 是否持久化
		false,      // 是否自动删除
		false,      // 是否排他
		false,      // 是否等待确认
		nil,        // 其他属性
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,           // 队列名称
		"my_routing_key", // 路由键
		"my_exchange",    // 交换机名称
		false,            // 是否等待确认
		nil,              // 其他属性
	)
	if err != nil {
		log.Fatalf("Failed to bind a queue: %v", err)
	}

	fmt.Println("Exchange and queue created and bound successfully!")
}

2,创建生产者

package api

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/streadway/amqp"
	"log"
)

func RabbitMqPublish(c *gin.Context) {
	name := c.Param("name")
	fmt.Println(">>>>>>>", name)
	// 建立与RabbitMQ服务器的连接
	conn, err := amqp.Dial("amqp://guest:guest@你的mq服务器地址:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个队列
	queue, err := ch.QueueDeclare(
		"my_queue", // 队列名称
		true,       // 是否持久化
		false,      // 是否自动删除
		false,      // 是否具有排他性
		false,      // 是否阻塞
		nil,        // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
		fmt.Println(queue.Name)
	}

	//绑定队列
	//_ = ch.QueueBind("my_queue", "my_routing_key", "my_exchange", false, nil)

	// 发布消息到队列
	//message := "Hello, RabbitMQ!"
	if len(name) < 1 {
		name = "空消息"
	}
	err = ch.Publish(
		"my_exchange",    // 交换机名称
		"my_routing_key", // 路由key
		false,            // 是否强制
		false,            // 是否立即
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(name),
		},
	)
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}

	log.Println("消息发送成功!!!!!!!!!!")
}

3,另起一个go服务进行消费者消费
在这里插入图片描述

package mq

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func MqSubContext() {
	// 连接RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@你的mq服务器地址:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个队列
	queue, err := ch.QueueDeclare(
		"my_queue", // 队列名称
		true,       // 是否持久化
		false,      // 是否自动删除
		false,      // 是否具有排他性
		false,      // 是否阻塞
		nil,        // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		queue.Name,       // 队列名称
		"my_routing_key", // routing key
		"my_exchange",    // 交换机名称
		false,            // 是否阻塞
		nil,              // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to bind a queue: %v", err)
	}

	// 消费消息
	msgs, err := ch.Consume(
		queue.Name, // 队列名称
		"",         // 消费者标识符
		true,       // 是否自动应答
		false,      // 是否具有排他性
		false,      // 是否阻塞
		false,      // 是否等待
		nil,        // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to consume messages: %v", err)
	}

	// 处理接收到的消息
	for msg := range msgs {

		fmt.Printf("消费的消息是: %s\n", msg.Body)
	}
}

后面将会发布golang整合es操作的文章


http://www.niftyadmin.cn/n/5345670.html

相关文章

HCIP OSPF实验

任务&#xff1a; 1.使用三种解决ospf不规则区域的方法 2.路由器5、6、7、8、15使用mgre 3.使用各种优化 4.全网可达 5.保证更新安全 6.使用地址为172.16.0.0/16合理划分 7.每个路由器都有环回 拓扑图&IP划分如下&#xff1a; 第一步&#xff0c;配置IP&环回地址…

Unity——FSM有限状态机

有限状态机就是有限个切换状态的条件&#xff0c;要制作有限状态机&#xff0c;有几个必要点&#xff1a;状态抽象类、FSMSystem类、FSMSystem实现类、FSM状态实现类。 每一个控制者都有一个状态机&#xff0c;每一个状态机都有其包含的状态&#xff0c;每一个状态都有能转换的…

可解释性AI(XAI)

可解释性AI&#xff08;XAI&#xff09;旨在提高人工智能系统的透明度和可理解性&#xff0c;使人们更好地理解AI的决策过程和原理。随着AI技术的广泛应用&#xff0c;XAI成为了一个备受关注的重要领域。它不仅有助于建立人们对AI的信任&#xff0c;还可以帮助解决AI伦理和偏见…

华为机考入门python3--(0)测试题1-句子平均重量

分类&#xff1a;字符串 知识点&#xff1a; 获取输入 input().strip().split(" ") 拼接列表 " ".join(list) 输出指定位数的浮点数 print("%.2f" % value) len() 函数对于很多内置的数据类型都适用&#xff0c;它返回对象的元素个数或长度。…

【AI】深度学习在编码中的应用(6)

目录 一、熵模型和自适应熵编码 1.1 区别 1.2 联系 二、关于自适应熵模型 前面我们讨论了基础架构设计、分析合成变换&#xff0c;本文来梳理和学习编码的第三步&#xff0c;自适应熵模型。 一、熵模型和自适应熵编码 自适应熵模型和熵编码在概念和应用上有一些区别&…

26.删除排序数组中的重复项(力扣LeetCode)

26.删除排序数组中的重复项 题目描述 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nu…

中移(苏州)软件技术有限公司面试问题与解答(4)—— virtio所创建的设备1

接前一篇文章&#xff1a;中移&#xff08;苏州&#xff09;软件技术有限公司面试问题与解答&#xff08;0&#xff09;—— 面试感悟与问题记录 本文参考以下文章&#xff1a; VirtIO实现原理——PCI基础 VirtIO实现原理——virtblk设备初始化 特此致谢&#xff01; 本文对…

【Linux】进程间通信——信号量

让大家久等啦&#xff0c;本期我们来讲讲Linux系统中的信号量 目录 一、引入 二、认识信号量 2.1 信号量的概念 2.2 信号量的内核结构 三、关于信号量的接口 3.1 semget 3.2 ipcs -s 3.3 ipcrm -s 3.4 semctl 3.5 semop 四、理解IPC 一、引入 在开始之前我们先来认…