部分参考消息队列-RabbitMQ篇章- 专栏 -KuangStudy
安装
先下好erlang和rabbitmq安装包,对应目录下执行:
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
开机启动chkconfig rabbitmq-server on
启动服务/sbin/service rabbitmq-server start
查看状态/sbin/service rabbitmq-server status
开启网页插件rabbitmq-plugins enable rabbitmq_management
创建网络插件的用户:
rabbitmqctl add_user admin 123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
运行流程
首先建立信道连接(channel)生产者将消息发送到交换机,并对信息的特征用routingkey描述,交换机根据routingkey将消息路由到绑定的队列,在特定模式下(direct和topic),消费者可以根据routingkey特征获取特定队列的信息。消费者通过监听队列获取消息(轮询,竞争)。
生产者仅仅将消息发送到交换机且仅与交换机联系,并不知道消息会被分配到哪个队列。
Springboot基础整合(广播fanout)
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
</dependencies>
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.example.demo.producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class OrderProducer { @Autowired private RabbitTemplate rabbitTemplate; private String exchangeName = "direct_order_exchange"; private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) { System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.example.demo.consumer;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;
@RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class WeixinService { @RabbitHandler public void messagerevice(String message){ System.out.println("weixin-------------->" + message); } }
|
交换机绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.example.demo.configuration; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class Binding { @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.fanout.queue", true); } @Bean public DirectExchange directOrderExchange() { return new DirectExchange("direct_order_exchange", true, false); } @Bean public org.springframework.amqp.core.Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with(""); } @Bean public org.springframework.amqp.core.Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with(""); } @Bean public org.springframework.amqp.core.Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with(""); } }
|
启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.example.demo;
import com.example.demo.producer.OrderProducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class Demo1ApplicationTests { @Autowired OrderProducer orderProducer; @Test public void contextLoads() throws Exception { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderProducer.makeOrder(userId, productId, num); } } }
|
死信队列
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
面试题
1、Rabbitmq 为什么需要信道,为什么不是TCP直接通信
1、TCP的创建和销毁,开销大,创建要三次握手,销毁要4次分手。
2、如果不用信道,那应用程序就会TCP连接到Rabbit服务器,高峰时每秒成千上万连接就会造成资源的巨大浪费,而且==底层操作系统每秒处理tcp连接数也是有限制的,==必定造成性能瓶颈。
3、信道的原理是一条线程一条信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈。
2:queue队列到底在消费者创建还是生产者创建?
1: 一般建议是在rabbitmq操作面板创建。这是一种稳妥的做法。
2:按照常理来说,确实应该消费者这边创建是最好,消息的消费是在这边。这样你承受一个后果,可能我生产在生产消息可能会丢失消息。
3:在生产者创建队列也是可以,这样稳妥的方法,消息是不会出现丢失。
4:如果你生产者和消费都创建的队列,谁先启动谁先创建,后面启动就覆盖前面的
3: RabbitMQ与其他消息队列相比,具有以下的主要优点和缺点:
优点
可靠性:RabbitMQ使用AMQP(高级消息队列协议)作为底层协议,提供可靠的消息传递机制。它支持持久化消息、交换机和队列,确保消息不会丢失,并能在出现故障后进行恢复。
灵活的消息路由:RabbitMQ提供了灵活的消息路由机制,支持多种交换机类型和路由策略。它允许根据消息的路由键、交换机的类型和绑定配置,将消息按需发布到不同的队列中。
可扩展性:RabbitMQ支持分布式部署,可以通过添加更多的节点实现高可用性和可伸缩性。它允许在集群中分布消息处理的负载,提高吞吐量和处理能力。
多语言支持:RabbitMQ提供了广泛的客户端库,支持多种编程语言,包括Java、Python、JavaScript等。这使得它可以与不同技术栈的应用程序进行集成。
缺点:
复杂性:相对于一些简单的消息队列,RabbitMQ的配置和管理相对复杂。它需要一定的学习和理解,以正确地配置和使用其各种功能和组件。
高延迟:由于RabbitMQ保证消息的可靠性和持久化,它的传输和处理过程可能引入一定的延迟。对于对延迟非常敏感的应用程序,可能需要考虑其他更低延迟的消息队列方案。
较高的资源消耗:相对于一些轻量级的消息队列,RabbitMQ的资源消耗较高。它需要较多的内存和计算资源来支持高吞吐量和可靠性。