重点:加 二当家小D 讲师微信: xdclass6
有专属技术交流群,有问题直接留言给我即可,扫描下面二维码也可以
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:讲解高性能消息队列RabbitMQ适合人员和学后水平
x从0到1讲解高性能消息队列RabbitMQ,急速入门Docker部署RabbitMQ,掌握RabbitMQ核心概念和多种工作模式,不止讲解MQ多个工作模式,如简单队列、工作队列(轮训+公平策略)、发布订阅、路由模式、主题模式等,还整合当下新版热门框架SpringBoot2.X;
高级内容讲解消息可靠性投递、ACK消费确认模式,玩转死信队列,实现延迟队列功能+案例实战
搭建RabbitMQ普通集群、Mirror镜像集群,整合SpringBoot模拟多种异常情况
高级工程师+架构师必备大厂面试题+原理。
一套真正让你掌握RabbitMQ核心应用+内功的视频,总共15章近60集视频,全新版本录制
为什么要学习RabbitMQ消息队列
学后水平
零基础掌握MQ中间件应用场景、掌握多个业界主流中间件优缺点
掌握JMS、AMQP核心概念,各个优缺点和适合场景
零基础急速掌握Docker容器知识+Linux搭建Docker和部署RabbitMQ
掌握RabbitMQ多个核心概念交换机、队列、虚拟主机和Web管控台使用
玩转RabbitMQ多个工作队列、发布订阅模型、主题模型通配符实战
掌握新版SpringBoot+AMQP整合RabbitMQ并开发多个模式实战
高级篇幅玩转可靠性消息投递ConfirmCallback和returnCallback编码实战
高级篇幅掌握消息ACK确认机制+多种Reject编码实战
高级篇幅掌握RabbitMQ TTL死信队列 + 延迟队列开发【综合案例实战】
高级加餐内容
适合人群
高级后端工程师、高级前端/全栈工程师、运维工程师、CTO 更新必备技术栈
从传统软件公司过渡到互联网公司的人员
课程技术技术栈和环境说明
SpringBoot.2.4 + Maven + IDEA旗舰版 + JDK8 或 JDK11
学习形式
简介:讲解高性能消息队列RabbitMQ课程大纲和效果演示
课程效果演示
课程学前基础
目录大纲浏览
学习寄语
简介:课程开发环境准备和说明
必备基础环境:JDK8或者JDK11版本 + Maven3.5(采用默认) + IDEA旗舰版 + Mysql5.7以上版本
操作系统:Win10或者Mac苹果
创建新版SpringBoot2.X项目
注意: 有些包maven下载慢,等待下载如果失败
当前项目仓库地址修改
xxxxxxxxxx
<!-- 代码库 -->
<repositories>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:介绍什么是MQ消息中间件和应用场景
什么是MQ消息中间件
使用场景:
核心应用
跨平台 、多语言
分布式事务、最终一致性
RPC调用上下游对接,数据源变动->通知下属
简介:讲解什么是AMQP和JMS消息服务
什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
特性
常见概念
基础编程模型
简介:介绍什么是AMQP高级消息队列协议和MQTT科普
背景
什么是AMQP
特性
独立于平台的底层消息传递协议
消费者驱动消息传递
跨语言和平台的互用性、属于底层协议
有5种交换类型direct,fanout,topic,headers,system
面向缓存的、可实现高性能、支持经典的消息队列,循环,存储和转发
支持长周期消息传递、支持事务(跨消息队列)
AMQP和JMS的主要区别
科普:大家可能也听过MQTT
MQTT: 消息队列遥测传输(Message Queueing Telemetry Transport )
背景:
原因
特性
简介:对比当下主流的消息队列和选择问题
业界主流的消息队列:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ
ActiveMQ:http://activemq.apache.org/
Kafka:http://kafka.apache.org/
RocketMQ:http://rocketmq.apache.org/
RabbitMQ:http://www.rabbitmq.com/
什么我们讲RabbitMQ呢,只要你目标是高级工程师或者架构师,就要多学,才知道解决方案+适合场景
因为这个是rabbitmq专题课程
下集专门介绍rabbitmq
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:介绍RabbitMQ消息队列
RabbitMQ:http://www.rabbitmq.com/
核心概念, 了解了这些概念,是使用好RabbitMQ的基础
Broker
Producer生产者
Consumer消费者:
Message 消息
Queue 队列
Channel 信道
Connection连接
Exchange 交换器
RoutingKey 路由键
Binding 绑定
xxxxxxxxxx
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对应的队列中
Virtual host 虚拟主机
用于不同业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost 里面不能有相同名称的Exchange或Queue
默认是 /
简介:安装RabbitMQ相关Linux服务器
RabbitMQ安装方式
源码安装
docker镜像安装【推荐】
Linux服务器准备:CentOS7.x以上即可, 根据个人能力选择哪种
本地虚拟机
阿里云ECS服务器 【推荐,2核4g】
本地开发
总结
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:什么是云服务器及目前主要的几个厂商介绍
云厂商
阿里云新用户地址(如果地址失效,联系我或者客服即可,1折)
环境问题说明
简介:讲解阿里云服务器登录使用和常见终端工具
备注:(服务器、域名等使用你们自己购买的哈,上面有提供低价购买链接,失效找我)
阿里云新用户地址 https://www.aliyun.com/minisite/goods?userCode=r5saexap&share_source=copy_link
控制台修改阿里云远程连接密码
windows工具 putty,xshell, security CRT
苹果系统MAC : 通过终端登录
linux图形操作工具(用于远程连接上传文件)
mac: filezilla
windows: winscp
参考资料:https://jingyan.baidu.com/article/ed2a5d1f346fd409f6be179a.html
更多阿里云操作,可以尝试自己通过百度进行找文档, 安装mysql jdk nginx maven git redis等,也可以看我们的课程
简介:Docker介绍和使用场景
什么是Dokcer
百科:一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。
容器是完全使用沙箱机制,相互之间不会有任何接口,使用go语言编写,在LCX(linux容器)基础上进行的封装
简单来说:
为什么要用
简介:讲解阿里云ECS服务安装Docker实战
远程连接ECS实例 8.129.113.233
依次运行以下命令添加yum源。
xxxxxxxxxx
yum update
xxxxxxxxxx
yum install epel-release -y
xxxxxxxxxx
yum clean all
xxxxxxxxxx
yum list
安装并运行Docker。
xxxxxxxxxx
yum install docker-io -y
xxxxxxxxxx
systemctl start docker
检查安装结果。
xxxxxxxxxx
docker info
启动使用Docker
xxxxxxxxxx
systemctl start docker #运行Docker守护进程
xxxxxxxxxx
systemctl stop docker #停止Docker守护进程
xxxxxxxxxx
systemctl restart docker #重启Docker守护进程
更多文档
xxxxxxxxxx
https://help.aliyun.com/document_detail/51853.html?spm=a2c4g.11186623.6.820.RaToNY
简介:快速掌握Dokcer基础知识
概念:
xxxxxxxxxx
class User{
private String userName;
private int age;
}
xxxxxxxxxx
User user = new User()
xxxxxxxxxx
Dokcer 里面的镜像 : Java里面的类 Class
Docker 里面的容器 : Java里面的对象 Object
通过类创建对象,通过镜像创建容器
简介:掌握Docker容器常见命令
常用命令(安装部署好Docker后,执行的命令是docker开头),xxx是镜像名称
搜索镜像:docker search xxx
列出当前系统存在的镜像:docker images
拉取镜像:docker pull xxx
运行一个容器:
xxxxxxxxxx
docker run --name nginx-xd -p 8080:80 -d nginx
docker run - 运行一个容器
-d 后台运行
-p 端口映射
--name "xxx" 容器名称
列举当前运行的容器:docker ps
检查容器内部信息:docker inspect 容器名称
删除镜像:docker rmi IMAGE_NAME
停止某个容器:docker stop 容器名称
启动某个容器:docker start 容器名称
移除某个容器: docker rm 容器名称 (容器必须是停止状态)
列举全部 容器 : docker ps -a
查看容器启动日志
简介:常见系统安装Docker和一些坑
如果没使用阿里云,本地需要安装Docker,才可以进行打包,但是容易出现兼容性问题,大家自行解决
Win7~Win10
Mac
Linux(系统镜像不可能每个人都统一的,所以大家结合百度博文看看)
官方地址
问题
直接安装Docker不成功,或者下载Yum失败,这个只能根据报错百度检索信息
镜像下载慢
本地网络差,下载包容易超时或者慢(只能等)
常规的部署只是Docker的冰山一角,课程快速入门,如果想深入可以看我们的Docker专题
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:Docker安装RabbitMQ消息队列
登录个人的Linux服务器
Docker安装RabbitMQ
xxxxxxxxxx
#拉取镜像
docker pull rabbitmq:management
docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中
-e 参数
RABBITMQ_DEFAULT_USER 用户名
RABBITMQ_DEFAULT_PASS 密码
主要端口介绍
xxxxxxxxxx
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
访问管理界面
注意事项!!!!
xxxxxxxxxx
CentOS 7 以上默认使用的是firewall作为防火墙
查看防火墙状态
firewall-cmd --state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service
简介:RabbitMQ控制台介绍
管控台介绍
每个虚拟主机默认就有7个交换机
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:Java项目创建并整合RabbitMQ
项目创建需要点时间-大家静待就行
添加依赖
xxxxxxxxxx
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
简介:玩转RabbitMQ简单队列实战
xxxxxxxxxx
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数
*
* 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
/**
* 参数说明:
* 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
* 路由健名称
* 配置信息
* 发送的消息数据:字节数组
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
xxxxxxxxxx
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
// DeliverCallback deliverCallback = (consumerTag, envelop, delivery,properties, msg) -> {
// String message = new String(msg, "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
//channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
简介:RabbitMQ工作队列 轮训策略讲解实战
工作队列
消息生产能力大于消费能力,增加多几个消费节点
和简单队列类似,增加多个几个消费节点,处于竞争关系
默认策略:round robin 轮训
xxxxxxxxxx
public class Send {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try (
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置
* 排他配置
* 自动删除
* 其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//轮训发送 10个
for(int i=0;i<10;i++){
String message = "Hello World!"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
xxxxxxxxxx
public class Recv1 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*]Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
//手工确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
xxxxxxxxxx
public class Recv2 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//手工确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
轮训策略验证
简介:RabbitMQ工作队列 公平策略讲解实战
公平策略验证
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:RabbitMQ的Exchange交换机介绍
RabbitMQ的Exchange 交换机
生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系。
交换机只负责转发消息,不具备存储消息的能力,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不用
交换机类型
Direct Exchange 定向
Fanout Exchange 广播
Topic Exchange 通配符
Headers Exchanges(少用)
简介:RabbitMQ的发布订阅消息模型介绍
什么是rabbitmq的发布订阅模式
发布订阅模型应用场景
rabbitmq发布订阅模型
简介:RabbitMQ的发布订阅消息模型代码实战
xxxxxxxxxx
public class Send {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
String message = "Hello World pub !";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
xxxxxxxxxx
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:RabbitMQ的路由模式和应用场景
什么是rabbitmq的路由模式
例子:日志采集系统 ELK
简介:RabbitMQ路由模式代码实战
xxxxxxxxxx
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
String error = "我是错误日志";
String info = "我是info日志";
String debug = "我是debug日志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
}
xxxxxxxxxx
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
简介:RabbitMQ的主题模式和应用场景
背景:
什么是rabbitmq的主题模式
文档 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order, 而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的; 如果是 ., 则info.order、error.order都会匹配
注意
测试,下面的匹配规则是怎样的
xxxxxxxxxx
quick.orange.rabbit 只会匹配 *.orange.* 和 *.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)
quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理
lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2
例子:日志采集系统
简介:RabbitMQ的topic主题模式代码实战
例子:日志采集系统
生产者
xxxxxxxxxx
public class Send {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String error = "我是订单错误日志";
String info = "我是订单info日志";
String debug = "我是商品debug日志";
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
}
xxxxxxxxxx
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,第一个节点
//channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
//绑定队列和交换机,第二个节点
//channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
简介:RabbitMQ的多个工作模式总结
对照官网回顾总结
简单模式
工作队列模式
发布订阅模式
路由模式
通配符模式
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:介绍SpringAMQP+创建SpringBoot2.X
什么是Spring-AMQP
创建新版SpringBoot2.X项目
注意: 有些包maven下载慢,等待下载如果失败
xxxxxxxxxx
<!-- 代码库 -->
<repositories>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
xxxxxxxxxx
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
简介:介绍SpringBoot2.X整合Rabbitmq实战
xxxxxxxxxx
#消息队列
spring:
rabbitmq:
host: 10.211.55.13
port: 5672
virtual-host: /dev
password: password
username: admin
xxxxxxxxxx
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "order_exchange";
public static final String QUEUE_NAME = "order_queue";
/**
* 交换机
* @return
*/
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
//return new TopicExchange(EXCHANGE_NAME, true, false);
}
/**
* 队列
* @return
*/
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
//return new Queue(QUEUE_NAME, true, false, false, null);
}
/**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
//return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
}
}
xxxxxxxxxx
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RabbitTemplate template;
@Test
void send() {
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
}
}
xxxxxxxxxx
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
/**
* RabbitHandler 会自动匹配 消息类型(消息自动确认)
* @param msg
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("监听到消息:消息内容:"+message.getBody());
}
}
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:Rabbitmq的消息可靠性投递讲解
什么是消息的可靠性投递
保证消息百分百发送到消息队列中去
详细
RabbitMQ消息投递路径
生产者-->交换机->队列->消费者
通过两个的点控制消息的可靠性投递
生产者到交换机
交换机到队列
建议
简介:Rabbitmq的消息可靠性投递confirmCallback实战
生产者到交换机
开启confirmCallback
xxxxxxxxxx
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
开发实战
xxxxxxxxxx
@Autowired
private RabbitTemplate template;
@Test
void testConfirmCallback() {
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack="+ack);
System.out.println("confirm==== cause="+cause);
//根据ACK状态做对应的消息更新操作 TODO
}
});
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+,"order.new","新订单来啦1");
}
模拟异常:修改投递的交换机名称
简介:Rabbitmq的消息可靠性投递returnCallback实战
交换机到队列
通过returnCallback
消息从交换器发送到对应队列失败时触发
两种模式
xxxxxxxxxx
//为true,则交换机处理消息到路由失败,则会返回给生产者
//或者配置文件 spring.rabbitmq.template.mandatory=true
template.setMandatory(true);
第一步 开启returnCallback配置
xxxxxxxxxx
#新版
spring.rabbitmq.publisher-returns=true
xxxxxxxxxx
#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true
xxxxxxxxxx
@Test
void testReturnCallback() {
//为true,则交换机处理消息到路由失败,则会返回给生产者
//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
int code = returned.getReplyCode();
System.out.println("code="+code);
System.out.println("returned="+returned.toString());
}
});
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
}
简介:Rabbitmq的消息确机制ACK讲解
RabbitMQ的ACK介绍
确认方式
xxxxxxxxxx
spring:
rabbitmq:
#开启手动确认消息,如果消息重新入队,进行重试
listener:
simple:
acknowledge-mode: manual
简介:Rabbitmq的消息确机制ACK实战+DeliveryTag介绍
代码实战
xxxxxxxxxx
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("body="+body);
//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
//channel.basicAck(msgTag,false);
//channel.basicNack(msgTag,false,true);
}
deliveryTag介绍
表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加
basicNack和basicReject介绍
人工审核异常消息
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:讲解RabbitMQ的的死信队列+ TTL《上》
什么是TTL
time to live 消息存活时间
如果消息在存活时间内未被消费,则会别清除
RabbitMQ支持两种ttl设置
什么是rabbitmq的死信队列
什么是rabbitmq的死信交换机
消息有哪几种情况成为死信
简介:讲解RabbitMQ的的死信队列+ TTL《下》
RabbitMQ管控台消息TTL测试
队列过期时间使用参数,对整个队列消息统一过期
消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,已经还在队列里面)
两者都配置的话,时间短的先触发
RabbitMQ Web控制台测试
新建死信队列 (和普通没区别)
死信交换机和队列绑定
新建普通队列,设置过期时间、指定死信交换机
测试:直接web控制台往product_qeueu发送消息即可
简介:讲解RabbitMQ的延迟队列和应用场景
什么是延迟队列
使用场景
Cloud微服务大课训练营里面的应用
业界的一些实现方式
定时任务高精度轮训
采用RocketMQ自带延迟消息功能
RabbitMQ本身是不支持延迟队列的,怎么办?
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:讲解RabbitMQ的案例实战业务逻辑介绍
xxxxxxxxxx
JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。
简介:讲解RabbitMQ的案例实战配置开发
xxxxxxxxxx
/**
* 死信队列
*/
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
/**
* 死信交换机
*/
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
/**
* 进入死信队列的路由key
*/
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
/**
* 创建死信交换机
* @return
*/
@Bean
public Exchange lockMerchantDeadExchange(){
return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
}
/**
* 创建死信队列
* @return
*/
@Bean
public Queue lockMerchantDeadQueue(){
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列
* @return
*/
@Bean
public Binding lockMerchantBinding(){
return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
}
xxxxxxxxxx
/**
* 普通队列,绑定的个死信交换机
*/
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
/**
* 普通的topic交换机
*/
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
/**
* 路由key
*/
public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
/**
* 创建普通交换机
* @return
*/
@Bean
public Exchange newMerchantExchange(){
return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue newMerchantQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl",10000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
/**
* 绑定交换机和队列
* @return
*/
@Bean
public Binding newMerchantBinding(){
return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
}
简介:讲解RabbitMQ的案例实战-消息生产和消费
消息生产
消息消费
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:玩转高性能消息队列RabbitMQ课程总结
加餐内容
简介:小滴课堂架构师学习路线和大课训练营介绍
小滴课堂永久会员 & 小滴课堂年度会员
适合IT后端人员,零基础就业、在职充电、架构师学习路线-专题技术方向
大课综合训练营
适合用于专项拔高,适合有一定工作经验的同学,架构师方向发展的训练营,包括多个方向
直播小班课(留意官网即可或者联系我即可)
技术人的导航站(涵盖我们主流的技术网站、社区、工具等等,即将上线)
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:讲解RabbitMQ高可用普通集群模式介绍
背景
RabbitMQ集群模式一介绍
xxxxxxxxxx
默认的集群模式, 比如有节点 node1和node2、node3,三个节点是普通集群,但是他们仅有相同的元数据,即交换机、队列的结构;
案例:
消息只存在其中的一个节点里面,假如消息A,存储在node1节点,
消费者连接node1个节点消费消息时,可以直接取出来;
但如果 消费者是连接的是其他节点
那rabbitmq会把 queue 中的消息从存储它的节点中取出,并经过连接节点转发后再发送给消费者
问题:
假如node1故障,那node2无法获取node1存储未被消费的消息;
如果node1持久化后故障,那需要等node1恢复后才可以正常消费
如果ndoe1没做持久化后故障,那消息将会丢失
这个情况无法实现高可用性,且节点间会增加通讯获取消息,性能存在瓶颈
项目中springboot+amqp里面需要写多个节点的配置,比如下面
spring.rabbitmq.addresses = 192.168.1.1:5672,192.168.1.2:5672,192.168.1.3:5672
该模式更适合于消息无需持久化的场景,如日志传输的队列
xxxxxxxxxx
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
简介:讲解RabbitMQ高可用mirror镜像集群模式介绍
xxxxxxxxxx
队列做成镜像队列,让各队列存在于多个节点中
和普通集群比较大的区别就是【队列queue的消息message 】会在集群各节点之间同步,且并不是在 consumer 获取数据时临时拉取,而普通集群则是临时从存储的节点里面拉取对应的数据
结论:
实现了高可用性,部分节点挂掉后,不影响正常的消费
可以保证100%消息不丢失,推荐3个奇数节点,结合LVS+Keepalive进行IP漂移,防止单点故障
缺点:由于镜像队列模式下,消息数量过多,大量的消息同步也会加大网络带宽开销,适合高可用要求比较高的项目
过多节点的话,性能则更加受影响
xxxxxxxxxx
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
简介:讲解RabbitMQ高可用普通集群搭建基础准备
清理单机和网络开发
准备3个节点安装好rabbitmq,形成集群 (记得每个节点间隔几十秒再启动,如果失败删除宿主机文件重新搭建)
xxxxxxxxxx
#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
xxxxxxxxxx
--hostname 自定义Docker容器的 hostname
--link 容器之间连接,link不可或缺,使得三个容器能互相通信
--privileged=true 使用该参数,container内的root拥有真正的root权限,否则容器出现permission denied
-v 宿主机和容器路径映射
参数 RABBITMQ_NODENAME,缺省 Unix*: rabbit@$HOSTNAME
参数 RABBITMQ_DEFAULT_USER=admin
参数 RABBITMQ_DEFAULT_PASS=xdclass.net168
Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
简介:讲解RabbitMQ高可用普通集群搭建
xxxxxxxxxx
节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
节点二加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit
节点三加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit
#查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点
rabbitmqctl cluster_status
到此为止,我们已经完成了RabbitMQ普通模式集群的建立,启动了3个节点,1个磁盘节点和2个内存节点
测试
简介:讲解RabbitMQ高可用普通集群项目配置
xxxxxxxxxx
#配置文件修改
#消息队列
spring:
rabbitmq:
addresses: 10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674
virtual-host: /dev
password: xdclass.net168
username: admin
#开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
#开启消息二次确认,交换机到队列的可靠性投递
publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
template:
mandatory: true
#消息手工确认ACK
listener:
simple:
acknowledge-mode: manual
其他不用变动,正常的发消息
简介:讲解RabbitMQ高可用mirror镜像集群配置策略配置
背景
策略policy介绍
xrabbitmq的策略policy是用来控制和修改集群的vhost队列和Exchange复制行为
就是要设置哪些Exchange或者queue的数据需要复制、同步,以及如何复制同步
创建一个策略来匹配队列
路径:rabbitmq管理页面 —> Admin —> Policies —> Add / update a policy
参数: 策略会同步同一个VirtualHost中的交换器和队列数据
xxxxxxxxxx
ha-mode: 指明镜像队列的模式,可选下面的其中一个
all:表示在集群中所有的节点上进行镜像同步(一般都用这个参数)
exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像同步,节点名称通过ha-params指定
ha-sync-mode:镜像消息同步方式 automatic(自动),manually(手动)
集群重启顺序
简介:使用SpringBoot项目整合RabbitMQ高可用集群
xxxxxxxxxx
spring:
rabbitmq:
host: 10.211.55.13
port: 5672
virtual-host: /
password: guest
username: guest
# 投递到交换机
publisher-confirm-type: correlated
# 交换机到队列
publisher-returns: true
##指定消息在没有被队列接收时是否强行退回还是直接丢弃,true是退回
template:
mandatory: true
#开启手动确认消息,如果消息重新入对则会一直重试,可以配置重试次数
listener:
simple:
acknowledge-mode: manual
xxxxxxxxxx
#消息队列
spring:
rabbitmq:
addresses: 10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674
virtual-host: /dev
password: xdclass.net168
username: admin
#开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
#开启消息二次确认,交换机到队列的可靠性投递
publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
template:
mandatory: true
#消息手工确认ACK
listener:
simple:
acknowledge-mode: manual
高可用集群测试
愿景:"让编程不再难学,让技术与生活更加有趣"
简介:RabbitMQ里面vhost虚拟主机的作用知道不
xvhost可以认为是一个虚拟的小型rabbitmq队列
内部均含有独立的,queue、exchange 和 binding 等
其拥有独立的权限系统,可以做到vhost范围的用户控制,更多用于做不同权限的隔离
用于不同业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost 里面不能有相同名称的Exchange或Queue
默认是 /
简介:项目中为什么使用消息队列
使用场景:
核心应用
跨平台 、多语言
分布式事务、最终一致性
RPC调用上下游对接,数据源变动->通知下属
缺点:
简介:项目中为什么使用消息队列,怎么选择哪个队列产品
业界主流的消息队列:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ
ActiveMQ:http://activemq.apache.org/
Kafka:http://kafka.apache.org/
RocketMQ:http://rocketmq.apache.org/
RabbitMQ:http://www.rabbitmq.com/
简介:讲解怎么样可以避免重复消费
任何消息队列产品不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重
kafka、rocketmq、rabbitmq等都是一样的
接口幂等性保障 ,消费端处理业务消息要保持幂等性
幂等性,通俗点说,就一个数据或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的
Redis
setNX() , 做消息id去重 java版本目前不支持设置过期时间
xxxxxxxxxx
//Redis中操作,判断是否已经操作过 TODO
boolean flag = jedis.setNX(key);
if(flag){
//消费
}else{
//忽略,重复消费
}
Incr 原子操作:key自增,大于0 返回值大于0则说明消费过
xxxxxxxxxx
int num = jedis.incr(key);
if(num == 1){
//消费
}else{
//忽略,重复消费
}
上述两个方式都可以,但是排重可以不考虑原子问题,数据量多需要设置过期时间,考虑原子问题,
数据库去重表
核心还是业务场景,不一定每个消息消费都需要加上述的操作,比如下面的场景
xxxxxxxxxx
update coupon_record set state='NEW' where id =#{id} and state='LOCK'
简介:Rabbitmq的消息可靠性投递讲解
什么是消息的可靠性投递
保证消息百分百发送到消息队列中去
详细
RabbitMQ消息投递路径
生产者-->交换机->队列->消费者
通过两个的点控制消息的可靠性投递
生产者到交换机
交换机到队列
建议
简介:Rabbitmq的死信队列和延迟队列介绍
什么是rabbitmq的死信队列
什么是rabbitmq的死信交换机
消息有哪几种情况成为死信
使用场景
Cloud微服务大课训练营里面的应用
简介:Rabbitmq多个面试题总结和延伸
RabbitMQ专题视频面试题贯穿整个专题视频
小滴课堂,愿景:让编程不在难学,让技术与生活更加有趣
相信我们,这个是可以让你学习更加轻松的平台,里面的课程绝对会让你技术不断提升
欢迎加小D讲师的微信: xdclass-lw
我们官方网站:https://xdclass.net
千人IT技术交流QQ群: 718617859
重点来啦:加讲师微信 免费赠送你干货文档大集合,包含前端,后端,测试,大数据,运维主流技术文档(持续更新)
https://mp.weixin.qq.com/s/qYnjcDYGFDQorWmSfE7lpQ