RocketMQ
RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

在仿B站后端系统中,使用RocketMQ作为消息中间件,实现用户动态发布,订阅消息接收,弹幕接收并批量分发的核心功能。
应用—动态订阅
简单来说,用户动态的实现流程是创建动态->创建生产者->消费者接收->消息分发 。其中MQ作为中间件,可以对消息进行监听,处理,缓存(redis),分发的操作。
controller层主要实现接口的调用,没啥好说的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @PostMapping("/user-moments") public JsonResponse<String> addUserMoments(@RequestBody UserMoment userMoment) throws Exception{ Long userId = userSupport.getCurrentUserId(); userMoment.setUserId(userId); userMomentsService.addUserMoments(userMoment); return JsonResponse.success(); }
@GetMapping("/user-subscribed-moments") public JsonResponse<List<UserMoment>> getUserSubscribedMoments(){ Long userId = userSupport.getCurrentUserId(); List<UserMoment> list = userMomentsService.getUserSubscribedMoments(userId); return new JsonResponse<>(list); }
|
service层实现了添加和获取两个方法,add方法先把数据存入数据库中,再创建生产者。生产者会将消息发到中间件当中,消费者接收并处理,为每个用户创建一个subscribed。当fans关注的用户更新动态时,会把更新的内容放到fans的subscribed列表中。当订阅的用户需要查询动态时,会从中间件请求信息。get方法中使用redis调用缓存返回信息,redis在MQ中录入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void addUserMoments(UserMoment userMoment) throws Exception {
userMoment.setCreateTime(new Date());
userMomentsDao.addUserMoments(userMoment);
DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");
Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8)); RocketMQUtil.syncSendMsg(producer, msg); }
public List<UserMoment> getUserSubscribedMoments(Long userId) {
String key = "subscribed-" + userId;
String listStr = redisTemplate.opsForValue().get(key);
return JSONArray.parseArray(listStr, UserMoment.class); }
|
在RocketMQconfig中,配置生产者和消费者的封装类,他们之间的关联便是GROUP_MOMENTS参数。下文中for循环的逻辑是,遍历每一位fans,将动态内容添加到他们的subscribed列表中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @Bean("momentsProducer") public DefaultMQProducer momentsProducer() throws Exception{ DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS); producer.setNamesrvAddr(nameServerAddr); producer.start(); return producer; }
消费者 @Bean("momentsConsumer") public DefaultMQPushConsumer momentsConsumer() throws Exception{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS); consumer.setNamesrvAddr(nameServerAddr); consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg == null){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } String bodyStr = new String(msg.getBody()); UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr),UserMoment.class); Long userId = userMoment.getUserId(); List<UserFollowing> fanList = userFollowingService.getUserFans(userId); for(UserFollowing fan : fanList){ String key = "subscribed-" + fan.getUserId(); String subscribedListStr = redisTemplate.opsForValue().get(key); List<UserMoment> subscribedList; if(StringUtil.isNullOrEmpty(subscribedListStr)){ subscribedList = new ArrayList<>(); }else{ subscribedList = JSONArray.parseArray(subscribedListStr,UserMoment.class); } subscribedList.add(userMoment); redisTemplate.opsForValue().set(key,JSONObject.toJSONString(subscribedList)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); return consumer; }
|