部分参考消息队列-RabbitMQ篇章- 专栏 -KuangStudy

安装

先下好erlang和rabbitmq安装包,对应目录下执行:

rpm -ivh erlang-21.3-1.el7.x86_64.rpm

yum install socat -y

rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

开机启动chkconfig rabbitmq-server on

启动服务/sbin/service rabbitmq-server start

查看状态/sbin/service rabbitmq-server status

开启网页插件rabbitmq-plugins enable rabbitmq_management

创建网络插件的用户:

rabbitmqctl add_user admin 123

rabbitmqctl set_user_tags admin administrator

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

运行流程

首先建立信道连接(channel)生产者将消息发送到交换机,并对信息的特征用routingkey描述,交换机根据routingkey将消息路由到绑定的队列,在特定模式下(direct和topic),消费者可以根据routingkey特征获取特定队列的信息。消费者通过监听队列获取消息(轮询,竞争)。

生产者仅仅将消息发送到交换机且仅与交换机联系,并不知道消息会被分配到哪个队列。

Springboot基础整合(广播fanout)

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

</dependencies>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.example.demo.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "direct_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.demo.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class WeixinService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("weixin-------------->" + message);
}
}

交换机绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.example.demo.configuration;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Binding {
//队列 起名:TestDirectQueue
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
return new Queue("weixin.fanout.queue", true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
public DirectExchange directOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("direct_order_exchange", true, false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public org.springframework.amqp.core.Binding bindingDirect1() {
return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("");
}
@Bean
public org.springframework.amqp.core.Binding bindingDirect2() {
return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("");
}
@Bean
public org.springframework.amqp.core.Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("");
}
}

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.demo;

import com.example.demo.producer.OrderProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Demo1ApplicationTests {
@Autowired
OrderProducer orderProducer;
@Test
public void contextLoads() throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
Long userId = 100L + i;
Long productId = 10001L + i;
int num = 10;
orderProducer.makeOrder(userId, productId, num);
}
}
}

死信队列

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

面试题

1、Rabbitmq 为什么需要信道,为什么不是TCP直接通信

1、TCP的创建和销毁,开销大,创建要三次握手,销毁要4次分手。

2、如果不用信道,那应用程序就会TCP连接到Rabbit服务器,高峰时每秒成千上万连接就会造成资源的巨大浪费,而且==底层操作系统每秒处理tcp连接数也是有限制的,==必定造成性能瓶颈。

3、信道的原理是一条线程一条信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈。

2:queue队列到底在消费者创建还是生产者创建?

1: 一般建议是在rabbitmq操作面板创建。这是一种稳妥的做法。
2:按照常理来说,确实应该消费者这边创建是最好,消息的消费是在这边。这样你承受一个后果,可能我生产在生产消息可能会丢失消息。
3:在生产者创建队列也是可以,这样稳妥的方法,消息是不会出现丢失。
4:如果你生产者和消费都创建的队列,谁先启动谁先创建,后面启动就覆盖前面的

3: RabbitMQ与其他消息队列相比,具有以下的主要优点和缺点:

优点

  1. 可靠性:RabbitMQ使用AMQP(高级消息队列协议)作为底层协议,提供可靠的消息传递机制。它支持持久化消息、交换机和队列,确保消息不会丢失,并能在出现故障后进行恢复。

  2. 灵活的消息路由:RabbitMQ提供了灵活的消息路由机制,支持多种交换机类型和路由策略。它允许根据消息的路由键、交换机的类型和绑定配置,将消息按需发布到不同的队列中。

  3. 可扩展性:RabbitMQ支持分布式部署,可以通过添加更多的节点实现高可用性和可伸缩性。它允许在集群中分布消息处理的负载,提高吞吐量和处理能力。

  4. 多语言支持:RabbitMQ提供了广泛的客户端库,支持多种编程语言,包括Java、Python、JavaScript等。这使得它可以与不同技术栈的应用程序进行集成。

缺点:

  1. 复杂性:相对于一些简单的消息队列,RabbitMQ的配置和管理相对复杂。它需要一定的学习和理解,以正确地配置和使用其各种功能和组件。

  2. 高延迟:由于RabbitMQ保证消息的可靠性和持久化,它的传输和处理过程可能引入一定的延迟。对于对延迟非常敏感的应用程序,可能需要考虑其他更低延迟的消息队列方案。

  3. 较高的资源消耗:相对于一些轻量级的消息队列,RabbitMQ的资源消耗较高。它需要较多的内存和计算资源来支持高吞吐量和可靠性。