RocketMQ

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

RocketMQ保姆级教程

在仿B站后端系统中,使用RocketMQ作为消息中间件,实现用户动态发布,订阅消息接收,弹幕接收并批量分发的核心功能。

应用—动态订阅

简单来说,用户动态的实现流程是创建动态->创建生产者->消费者接收->消息分发 。其中MQ作为中间件,可以对消息进行监听,处理,缓存(redis),分发的操作。

controller层主要实现接口的调用,没啥好说的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@PostMapping("/user-moments")
public JsonResponse<String> addUserMoments(@RequestBody UserMoment userMoment) throws Exception{
Long userId = userSupport.getCurrentUserId();
userMoment.setUserId(userId);
userMomentsService.addUserMoments(userMoment);
return JsonResponse.success();
}

//查询订阅的动态
@GetMapping("/user-subscribed-moments")
public JsonResponse<List<UserMoment>> getUserSubscribedMoments(){
Long userId = userSupport.getCurrentUserId();
List<UserMoment> list = userMomentsService.getUserSubscribedMoments(userId);
return new JsonResponse<>(list);
}

service层实现了添加和获取两个方法,add方法先把数据存入数据库中,再创建生产者。生产者会将消息发到中间件当中,消费者接收并处理,为每个用户创建一个subscribed。当fans关注的用户更新动态时,会把更新的内容放到fans的subscribed列表中。当订阅的用户需要查询动态时,会从中间件请求信息。get方法中使用redis调用缓存返回信息,redis在MQ中录入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void addUserMoments(UserMoment userMoment) throws Exception {
// 设置动态创建时间
userMoment.setCreateTime(new Date());
// 添加用户动态
userMomentsDao.addUserMoments(userMoment);
// 全局上下文实现bean调用
DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");
// 将动态转换为JSON格式并发送到RocketMQ
Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));
RocketMQUtil.syncSendMsg(producer, msg);
}

public List&lt;UserMoment&gt; getUserSubscribedMoments(Long userId) {
// 获取用户订阅动态的Redis缓存键
String key = "subscribed-" + userId;
// 从Redis中获取用户订阅动态
String listStr = redisTemplate.opsForValue().get(key);
// 将JSON格式的动态列表转换为Java对象列表
return JSONArray.parseArray(listStr, UserMoment.class);
}

在RocketMQconfig中,配置生产者和消费者的封装类,他们之间的关联便是GROUP_MOMENTS参数。下文中for循环的逻辑是,遍历每一位fans,将动态内容添加到他们的subscribed列表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Bean("momentsProducer")
public DefaultMQProducer momentsProducer() throws Exception{
//新建生产者
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);
producer.setNamesrvAddr(nameServerAddr);
producer.start();
return producer;
}

消费者
@Bean("momentsConsumer")
public DefaultMQPushConsumer momentsConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);
consumer.setNamesrvAddr(nameServerAddr);
//订阅 参数:主题,副主题
consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS,"*");
//新建监听,接收msg
//生产者发布动态了,监听接口接收到消息,要查询当前用户的粉丝,从redis中找出粉丝的订阅动态列表,把这次新增的动态填进去
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg == null){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//接收到的是String类型的moment类(service里转的),这里再转回去
String bodyStr = new String(msg.getBody());
UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr),UserMoment.class);
//查找订阅了该id的订阅者id(粉丝id)
Long userId = userMoment.getUserId();
List<UserFollowing> fanList = userFollowingService.getUserFans(userId);
//*从redis中查询当前粉丝的订阅列表,转为str
//如果为空则新建,不为空则把信息转为List<UserMoment>以方便把新接收到的动态传进去
//*把新的信息添加进列表
//*调用redis,把列表重新转为字符串存进去
for(UserFollowing fan : fanList){
String key = "subscribed-" + fan.getUserId();
String subscribedListStr = redisTemplate.opsForValue().get(key);
List<UserMoment> subscribedList;
if(StringUtil.isNullOrEmpty(subscribedListStr)){
subscribedList = new ArrayList<>();
}else{
subscribedList = JSONArray.parseArray(subscribedListStr,UserMoment.class);
}
subscribedList.add(userMoment);
redisTemplate.opsForValue().set(key,JSONObject.toJSONString(subscribedList));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}