上一篇文章 直播弹幕系统(三)- 直播在线人数统计 主要讲了利用Redis对一个直播间的在线用户做出统计。那么这篇文章,就要对发送弹幕的用户进行一个校验了。毕竟一个直播间,游客是可以正常访问的。但是发送弹幕的话,肯定就是需要登录了。这次使用JWT来完成校验。
JWT可以先看看这篇文章Java - JWT的简单介绍和使用
1.添加以下pom依赖:
org.springframework.boot spring-boot-starter-tomcat 2.3.2.RELEASE
io.jsonwebtoken jjwt 0.9.1
com.auth0 java-jwt 3.19.1
2.添加配置类AppConfig:
import kz.util.SpringBeanUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class AppConfig {@Value("${user.secretKey}")public String secretKey;public static String GetSecret() {AppConfig configBean = SpringBeanUtil.getBean(AppConfig.class);String secretKey = configBean.getSecretKey();return secretKey == null ? StringUtils.EMPTY : secretKey;}public String getSecretKey() {return secretKey;}public void setSecretKey(String secretKey) {this.secretKey = secretKey;}
}
3.添加JwtUtil:
import com.auth0.jwt.JWT;
import com.auth0.jwt.JWTVerifier;
import com.auth0.jwt.algorithms.Algorithm;
import io.jsonwebtoken.JwtBuilder;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import kz.config.AppConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.util.codec.binary.Base64;import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author Zong0915* @date 2022/11/11 下午7:16*/
public class JwtUtil {private static final SignatureAlgorithm ALGORITHM = SignatureAlgorithm.HS256;// 生成Jwt Tokenpublic static String buildToken(Long userId) {HashMap chaim = new HashMap();chaim.put("userId", userId);return JwtUtil.buildToken(userId, chaim);}// 生成Jwt Tokenpublic static String buildToken(Long userId, Map chaim) {Calendar expires = Calendar.getInstance();JwtBuilder jwtBuilder = Jwts.builder().setClaims(chaim)// JWT唯一标识.setId(UUID.randomUUID().toString())// 签发时间.setIssuedAt(expires.getTime())// Subject主体,存我们的userId.setSubject(userId + "")// 签名算法和对应的秘钥.signWith(ALGORITHM, "sdss@!##rerwe");// 设置过期时间expires.add(Calendar.HOUR, 24 * 30);jwtBuilder.setExpiration(expires.getTime());// 生成Tokenreturn jwtBuilder.compact();}// 根据Token 拿到我们的userIdpublic static Long getUerIdFromClaim(String token) {if (StringUtils.isBlank(token)) {return null;}Object o = Jwts.parser().setSigningKey(AppConfig.GetSecret()).parseClaimsJws(token).getBody().get("userId");if (o != null) {return Long.parseLong(o + "");}return null;}public static boolean isVerify(String jwtToken) {// 这里一定要经过Base64解码try {Algorithm algorithm = Algorithm.HMAC256(Base64.decodeBase64(AppConfig.GetSecret()));JWTVerifier verifier = JWT.require(algorithm).build();verifier.verify(jwtToken); // 校验不通过会抛出异常} catch (Exception e) {return false;}return true;}
}
第一步,我们先添加JWT相关的秘钥。在boostrap.yml文件中添加以下内容:(配合AppConfig)
user:secretKey: sdss@!##rerwe
我们以userId为10010为例:http://localhost:4396/zong/?userId=10010&roomId=1。写个UT,创建一条Token:

修改BulletScreenServer类:主要修改如下:
token属性。WebSocket地址变更:/websocket/live/{roomId}/{token}Token进行合法校验。import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import kz.util.JwtUtil;
import kz.util.RedisUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{token}")
@Slf4j
@Getter
public class BulletScreenServer {/*** 多例模式下的赋值方式*/private static OriginMessageSender originMessageSender;/*** 多例模式下的赋值方式*/@Autowiredprivate void setOriginMessageSender(OriginMessageSender originMessageSender) {BulletScreenServer.originMessageSender = originMessageSender;}private static final AtomicLong count = new AtomicLong(0);private Session session;private String sessionId;private String token;private String roomId;private String userId;/*** 打开连接** @param session* @OnOpen 连接成功后会自动调用该方法*/@OnOpenpublic void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("token") String token) {// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用trycount.incrementAndGet();log.info("*************WebSocket连接次数: {} *************", count.longValue());this.token = token;this.roomId = roomId;// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;RedisUtil.increment(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);SocketCache.put(sessionId, this);// 初始化操作,通知客户端已经建立链接成功,并且初始化直播间在线人数个数originMessageSender.send(buildMessage("", 1));}/*** 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接*/@OnClosepublic void closeConnection() {SocketCache.remove(sessionId);RedisUtil.decrement(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);// 客户端退出,通知其他页面,在线人数-1originMessageSender.send(buildMessage("", 1));}/*** 客户端发送消息给服务端** @param message*/@OnMessagepublic void onMessage(String message) {if (StringUtils.isBlank(message)) {return;}// 校验当前Token,不合法或者失效,重新登录if (!JwtUtil.isVerify(token)) {// 定点通知客户端sendMessageSingle("TOKEN_ERROR", 4);return;}Long userIdFromToken = JwtUtil.getUerIdFromClaim(token);if (userIdFromToken == null || userIdFromToken == 0L) {// 定点通知客户端,发送弹幕需要进行登录sendMessageSingle("NO_LOGIN", 3);return;}this.userId = String.valueOf(userIdFromToken);// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的originMessageSender.send(buildMessage(message, 2));}private void sendMessageSingle(String message, Integer operateType) {OriginMessage buildMessage = buildMessage(message, operateType);try {this.session.getBasicRemote().sendText(JSONObject.toJSONString(buildMessage));} catch (IOException e) {e.printStackTrace();}}private OriginMessage buildMessage(String message, Integer operateType) {OriginMessage originMessage = new OriginMessage();originMessage.setMessage(message);originMessage.setRoomId(roomId);originMessage.setSessionId(sessionId);originMessage.setUserId(userId);originMessage.setOperateType(operateType);originMessage.setCount(RedisUtil.get(SocketConstants.LIVE_COUNT_HASH_KEY, roomId));return originMessage;}
}
前端代码做出修改index.tsx:
import React, { useEffect, useState } from 'react';
import { Button, Row, Col, Input, Modal } from 'antd';
import { getValueByParam } from '../utils/pageHelper';
const { warning } = Modal;
// 生成的JWT的Token,我这里是写死的
const token = 'eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMDAxMCIsImV4cCI6MTY3Mzc3NTYyMSwidXNlcklkIjoxMDAxMCwiaWF0IjoxNjcxMTgzNjIxLCJqdGkiOiJmOTY2OTlhYy00ODRhLTRiNDUtYjBlMi04YjVkY2U4ZjVmZGYifQ.Ie0obuzOksJL0a63bpoDfvILFrujnj-kAL0z_3tdwMI';
const errorToken = '123';
// userId为0的,当没登录
const emptyToken = 'eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIwIiwiZXhwIjoxNjczNzc3MzMwLCJ1c2VySWQiOjAsImlhdCI6MTY3MTE4NTMzMCwianRpIjoiYjZkOWUyYjUtZDVkYy00MjVhLWE3NjMtOTNjNzFhNDQ2YWIwIn0.pw1hIt-bK-1ctjIGBrINuAIzADYLy1058qMHv3nxr4w';const ws = new WebSocket(`ws://localhost:80/websocket/live/${getValueByParam('roomId')}/${token}`);const UserPage = () => {const [ message, setMessage ] = useState('');const [ bulletList, setBulletList ] = useState([]);const [ onlineCount, setOnlineCount ] = useState(0);useEffect(() => {ws.onopen = () => {ws.onmessage = (msg: any) => {const entity: any = JSON.parse(msg.data);if (entity?.operateType === 2) {const arr :any = [ `用户[${entity.userId}]: ${entity.message}` ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));} else if (entity?.operateType === 3) {warning({ content: '请先登录在发送弹幕!' });} else if (entity?.operateType === 4) {warning({ content: '登录已失效,请重新登录!' });}setOnlineCount(entity?.count ?? 0);};};ws.onclose = () => {console.log('断开连接');};}, []);const sendMsg = () => {ws?.send(message);};return <>{ width: 2000, marginTop: 200 }}> 6}>event => setMessage(event.target.value)} />sendMsg}type='primary'>发送弹幕 { marginLeft: 100 }}>{'在线人数: ' + onlineCount} { marginLeft: 10 }}>{ border: '1px solid', width: 500, height: 500 }}>{bulletList.map((item: string, index: number) => {return index}>{item}
;})}
>;
};export default UserPage;
生成Token的UT代码:
public class Test {public static void main(String[] args) {String token = JwtUtil.buildToken(0L);// String token = JwtUtil.buildToken(10010L);System.out.println(token);}
}
使用正常的Token:token(都写在代码里面了,读者需要生成一下然后替换)

使用错误Token(比如过期):errorToken(都写在代码里面了,读者需要生成一下然后替换)

使用未登录的Token:emptyToken(都写在代码里面了,读者需要生成一下然后替换)

其实,如果没登录的话,或者Token不合法,可以给他一个统一的状态给前端。都当做Token不合法即可。也可以对于游客这种身份,给个默认的userId,例如0。或者是随便传一个错误的Token。
而使用JWT这种技术,一旦实现登录了,Token就可以丢到浏览器里面了,那么用户在打开这个直播间的那一刻就已经知晓当前用户是否登录了。后端只需要拿到这个Token进行校验即可。
但是这种写法其实还是有一些问题:
URL上传Token来完成参数传递的。而非请求头。原生的WebSocket对这方面的支持比较差。@OnMessage修饰的函数中那就是最好的。但很可惜,一般拦截器只针对于HTTP请求,对WebSocket则不支持。因此后续还是学习下Netty这方面的框架,探究Netty对于WebSocket写法的替代性。