HTML5 WebSocket

WebSocket

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

本项目中使用webSocket连接前后端,实现弹幕实时传输功能。

应用—webSocket实现弹幕传输

WebSocket搭建

需要注意的是,应用服务器要为每个客户端创建一个webSocket连接,属于多例模式,因此需要声明全局上下文环境

1
2
3
4
5
6
7
//全局上下文环境设置,通过设置这个参数可以在多例下注入bean(在bilibiliApp.java里赋值)
//DanmuService danmuService = (DanmuService)APPLICATION_CONTEXT.getBean("danmuService");
private static ApplicationContext APPLICATION_CONTEXT;

public static void setApplicationContext(ApplicationContext applicationContext){
WebSocketService.APPLICATION_CONTEXT = applicationContext;
}

1.创建连接,使用WEBSOCKET_MAP记录连接信息,其中每个Entry都是一个连接实例,参数分别是sessionId,WebSocketService实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final Logger logger =  LoggerFactory.getLogger(this.getClass());

//保证线程安全
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
//线程安全
//每个客户端连接都需要一个WebSocketService/多例模式
public static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();

private Session session;

private String sessionId;

private Long userId;
@OnOpen
public void openConnection(Session session, @PathParam("token") String token){
try{
this.userId = TokenUtil.verifyToken(token);
}catch (Exception ignored){}
this.sessionId = session.getId();
this.session = session;
if(WEBSOCKET_MAP.containsKey(sessionId)){
WEBSOCKET_MAP.remove(sessionId);
WEBSOCKET_MAP.put(sessionId, this);
}else{
WEBSOCKET_MAP.put(sessionId, this);
//获取值并加一
ONLINE_COUNT.getAndIncrement();
}
logger.info("用户连接成功:" + sessionId + ",当前在线人数为:" + ONLINE_COUNT.get());
try{
this.sendMessage("0");
}catch (Exception e){
logger.error("连接异常");
}
}

2.接收弹幕并添加到数据库和缓存,使用RocketMQ进行弹幕分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//通讯时,如前端传来消息
//这段代码是一个Java程序,它的功能是接收一个字符串类型的message,然后将这个message发送给所有的WebSocketService对象。在这个过程中,它还会向producer发送一个消息,这个消息包含了message和sessionId两个属性。这个sessionId是WebSocketService对象的一个属性。
//如果这个对象的userId不为空,那么它还会将这个message保存到数据库和redis中。
@OnMessage
public void onMessage(String message){
logger.info("用户信息:" + sessionId + ",报文:" + message);
if(!StringUtil.isNullOrEmpty(message)){
try{
//群发消息
//Entry表示map的一个元素
for(Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()){
//拿到一个WebSocketService对象
WebSocketService webSocketService = entry.getValue();
DefaultMQProducer danmusProducer = (DefaultMQProducer)APPLICATION_CONTEXT.getBean("danmusProducer");
//要向producer发送的消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("message", message);
jsonObject.put("sessionId", webSocketService.getSessionId());
Message msg = new Message(UserMomentsConstant.TOPIC_DANMUS, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
RocketMQUtil.asyncSendMsg(danmusProducer, msg);
}
if(this.userId != null){
//保存弹幕到数据库
//先把message转成danmu类
Danmu danmu = JSONObject.parseObject(message, Danmu.class);
danmu.setUserId(userId);
danmu.setCreateTime(new Date());
DanmuService danmuService = (DanmuService)APPLICATION_CONTEXT.getBean("danmuService");
danmuService.asyncAddDanmu(danmu);
//保存弹幕到redis
danmuService.addDanmusToRedis(danmu);
}
}catch (Exception e){
logger.error("弹幕接收出现问题");
e.printStackTrace();
}
}
}

3.关闭WebSocket

1
2
3
4
5
6
7
8
9
@OnClose
public void closeConnection(){
if(WEBSOCKET_MAP.containsKey(sessionId)){
WEBSOCKET_MAP.remove(sessionId);
//获取值并自减一
ONLINE_COUNT.getAndDecrement();
}
logger.info("用户退出:" + sessionId + "当前在线人数为:" + ONLINE_COUNT.get());
}

Rocket MQ

功能中同过RocketMQ实现弹幕转发和负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Bean("danmusProducer")
public DefaultMQProducer danmusProducer() throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_DANMUS);
// 设置NameServer的地址
producer.setNamesrvAddr(nameServerAddr);
// 启动Producer实例
producer.start();
return producer;
}

@Bean("danmusConsumer")
public DefaultMQPushConsumer danmusConsumer() throws Exception{
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_DANMUS);
// 设置NameServer的地址
consumer.setNamesrvAddr(nameServerAddr);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(UserMomentsConstant.TOPIC_DANMUS, "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
byte[] msgByte = msg.getBody();
String bodyStr = new String(msgByte);
//把msg转为json对象
JSONObject jsonObject = JSONObject.parseObject(bodyStr);
//从jsonObject获取信息
String sessionId = jsonObject.getString("sessionId");
String message = jsonObject.getString("message");
WebSocketService webSocketService = WebSocketService.WEBSOCKET_MAP.get(sessionId);
if(webSocketService.getSession().isOpen()){
try {
webSocketService.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
return consumer;
}

获取弹幕

查询策略是优先查redis中的弹幕数据,如果没有的话查询数据库,然后把查询的数据写入redis当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 查询策略是优先查redis中的弹幕数据,
* 如果没有的话查询数据库,然后把查询的数据写入redis当中
*/
public List<Danmu> getDanmus(Long videoId,
String startTime, String endTime) throws Exception {

String key = DANMU_KEY + videoId;
String value = redisTemplate.opsForValue().get(key);
List<Danmu> list;
//redis不为空,调redis;为空,调数据库
if(!StringUtil.isNullOrEmpty(value)){
list = JSONArray.parseArray(value, Danmu.class);
if(!StringUtil.isNullOrEmpty(startTime)
&& !StringUtil.isNullOrEmpty(endTime)){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date startDate = sdf.parse(startTime);
Date endDate = sdf.parse(endTime);
//时间筛选
List<Danmu> childList = new ArrayList<>();
for(Danmu danmu : list){
Date createTime = danmu.getCreateTime();
if(createTime.after(startDate) && createTime.before(endDate)){
childList.add(danmu);
}
}
list = childList;
}
}else{
Map<String, Object> params = new HashMap<>();
params.put("videoId", videoId);
params.put("startTime", startTime);
params.put("endTime", endTime);
list = danmuDao.getDanmus(params);
//保存弹幕到redis
redisTemplate.opsForValue().set(key, JSONObject.toJSONString(list));
}
return list;
}