HTML5 WebSocket
WebSocket
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。
本项目中使用webSocket连接前后端,实现弹幕实时传输功能。
应用—webSocket实现弹幕传输
WebSocket搭建
需要注意的是,应用服务器要为每个客户端创建一个webSocket连接,属于多例模式,因此需要声明全局上下文环境
1 2 3 4 5 6 7
|
private static ApplicationContext APPLICATION_CONTEXT;
public static void setApplicationContext(ApplicationContext applicationContext){ WebSocketService.APPLICATION_CONTEXT = applicationContext; }
|
1.创建连接,使用WEBSOCKET_MAP记录连接信息,其中每个Entry都是一个连接实例,参数分别是sessionId,WebSocketService实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0); public static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();
private Session session;
private String sessionId;
private Long userId; @OnOpen public void openConnection(Session session, @PathParam("token") String token){ try{ this.userId = TokenUtil.verifyToken(token); }catch (Exception ignored){} this.sessionId = session.getId(); this.session = session; if(WEBSOCKET_MAP.containsKey(sessionId)){ WEBSOCKET_MAP.remove(sessionId); WEBSOCKET_MAP.put(sessionId, this); }else{ WEBSOCKET_MAP.put(sessionId, this); ONLINE_COUNT.getAndIncrement(); } logger.info("用户连接成功:" + sessionId + ",当前在线人数为:" + ONLINE_COUNT.get()); try{ this.sendMessage("0"); }catch (Exception e){ logger.error("连接异常"); } }
|
2.接收弹幕并添加到数据库和缓存,使用RocketMQ进行弹幕分发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
@OnMessage public void onMessage(String message){ logger.info("用户信息:" + sessionId + ",报文:" + message); if(!StringUtil.isNullOrEmpty(message)){ try{ for(Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()){ WebSocketService webSocketService = entry.getValue(); DefaultMQProducer danmusProducer = (DefaultMQProducer)APPLICATION_CONTEXT.getBean("danmusProducer"); JSONObject jsonObject = new JSONObject(); jsonObject.put("message", message); jsonObject.put("sessionId", webSocketService.getSessionId()); Message msg = new Message(UserMomentsConstant.TOPIC_DANMUS, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)); RocketMQUtil.asyncSendMsg(danmusProducer, msg); } if(this.userId != null){ Danmu danmu = JSONObject.parseObject(message, Danmu.class); danmu.setUserId(userId); danmu.setCreateTime(new Date()); DanmuService danmuService = (DanmuService)APPLICATION_CONTEXT.getBean("danmuService"); danmuService.asyncAddDanmu(danmu); danmuService.addDanmusToRedis(danmu); } }catch (Exception e){ logger.error("弹幕接收出现问题"); e.printStackTrace(); } } }
|
3.关闭WebSocket
1 2 3 4 5 6 7 8 9
| @OnClose public void closeConnection(){ if(WEBSOCKET_MAP.containsKey(sessionId)){ WEBSOCKET_MAP.remove(sessionId); ONLINE_COUNT.getAndDecrement(); } logger.info("用户退出:" + sessionId + "当前在线人数为:" + ONLINE_COUNT.get()); }
|
Rocket MQ
功能中同过RocketMQ实现弹幕转发和负载均衡
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Bean("danmusProducer") public DefaultMQProducer danmusProducer() throws Exception{ DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_DANMUS); producer.setNamesrvAddr(nameServerAddr); producer.start(); return producer; }
@Bean("danmusConsumer") public DefaultMQPushConsumer danmusConsumer() throws Exception{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_DANMUS); consumer.setNamesrvAddr(nameServerAddr); consumer.subscribe(UserMomentsConstant.TOPIC_DANMUS, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); byte[] msgByte = msg.getBody(); String bodyStr = new String(msgByte); JSONObject jsonObject = JSONObject.parseObject(bodyStr); String sessionId = jsonObject.getString("sessionId"); String message = jsonObject.getString("message"); WebSocketService webSocketService = WebSocketService.WEBSOCKET_MAP.get(sessionId); if(webSocketService.getSession().isOpen()){ try { webSocketService.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); return consumer; }
|
获取弹幕
查询策略是优先查redis中的弹幕数据,如果没有的话查询数据库,然后把查询的数据写入redis当中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public List<Danmu> getDanmus(Long videoId, String startTime, String endTime) throws Exception {
String key = DANMU_KEY + videoId; String value = redisTemplate.opsForValue().get(key); List<Danmu> list; if(!StringUtil.isNullOrEmpty(value)){ list = JSONArray.parseArray(value, Danmu.class); if(!StringUtil.isNullOrEmpty(startTime) && !StringUtil.isNullOrEmpty(endTime)){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date startDate = sdf.parse(startTime); Date endDate = sdf.parse(endTime); List<Danmu> childList = new ArrayList<>(); for(Danmu danmu : list){ Date createTime = danmu.getCreateTime(); if(createTime.after(startDate) && createTime.before(endDate)){ childList.add(danmu); } } list = childList; } }else{ Map<String, Object> params = new HashMap<>(); params.put("videoId", videoId); params.put("startTime", startTime); params.put("endTime", endTime); list = danmuDao.getDanmus(params); redisTemplate.opsForValue().set(key, JSONObject.toJSONString(list)); } return list; }
|