從架構(gòu)到底層:構(gòu)建高效的實(shí)時(shí)彈幕系統(tǒng)
彈幕系統(tǒng)是一種即時(shí)互動(dòng)機(jī)制,廣泛用于直播、短視頻等內(nèi)容平臺(tái)。在該系統(tǒng)中,用戶發(fā)送的消息需在極短時(shí)間內(nèi)被收集、處理,并同步分發(fā)給所有觀看者,要求高并發(fā)、高吞吐、低延遲。本文將從數(shù)據(jù)結(jié)構(gòu)、消息通道、風(fēng)控機(jī)制以及前端渲染四個(gè)層面,重構(gòu)該系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)方案。
彈幕數(shù)據(jù)結(jié)構(gòu)與消息緩沖機(jī)制
彈幕消息模型設(shè)計(jì)
文件路徑:
/src/main/java/com/icoderoad/danmaku/model/DanmakuMessage.java
package com.icoderoad.danmaku.model;
import lombok.Data;
/**
* 表示一條彈幕消息的實(shí)體類
*/
@Data
public class DanmakuMessage {
private String userId; // 用戶唯一標(biāo)識(shí)
private String content; // 彈幕內(nèi)容
private long timestamp; // 消息發(fā)送的時(shí)間戳(ms)
private String color; // 彈幕顏色(可選)
private String type; // 彈幕類型,如 scroll、top、bottom
}
彈幕緩沖隊(duì)列設(shè)計(jì)
文件路徑:
/src/main/java/com/icoderoad/danmaku/service/DanmakuBufferService.java
package com.icoderoad.danmaku.service;
import com.icoderoad.danmaku.model.DanmakuMessage;
import org.springframework.stereotype.Service;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 管理實(shí)時(shí)彈幕緩沖隊(duì)列的服務(wù)
*/
@Service
public class DanmakuBufferService {
// 彈幕接收緩沖區(qū),線程安全
private final Queue<DanmakuMessage> writeQueue = new ConcurrentLinkedQueue<>();
/**
* 添加彈幕消息
*/
public void addMessage(DanmakuMessage message) {
writeQueue.offer(message);
}
/**
* 批量獲取彈幕(用于調(diào)度器分發(fā))
*/
public Queue<DanmakuMessage> fetchAll() {
Queue<DanmakuMessage> result = new ConcurrentLinkedQueue<>();
DanmakuMessage msg;
while ((msg = writeQueue.poll()) != null) {
result.offer(msg);
}
return result;
}
}
WebSocket 實(shí)時(shí)通信實(shí)現(xiàn)
WebSocket 配置與通道建立
件路徑:
/src/main/java/com/icoderoad/danmaku/websocket/DanmakuWebSocketServer.java
package com.icoderoad.danmaku.websocket;
import com.icoderoad.danmaku.model.DanmakuMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* WebSocket 處理類,實(shí)現(xiàn)實(shí)時(shí)通信
*/
@Slf4j
@Component
@ServerEndpoint(value = "/ws/danmaku")
public class DanmakuWebSocketServer {
private static final Set<Session> clients = new CopyOnWriteArraySet<>();
private static final ObjectMapper mapper = new ObjectMapper();
@OnOpen
public void onOpen(Session session) {
clients.add(session);
log.info("新連接加入: {}", session.getId());
}
@OnMessage
public void onMessage(String message, Session session) {
try {
DanmakuMessage danmaku = mapper.readValue(message, DanmakuMessage.class);
for (Session client : clients) {
client.getAsyncRemote().sendText(mapper.writeValueAsString(danmaku));
}
} catch (Exception e) {
log.error("消息處理失敗", e);
}
}
@OnClose
public void onClose(Session session) {
clients.remove(session);
log.info("連接關(guān)閉: {}", session.getId());
}
@OnError
public void onError(Session session, Throwable throwable) {
log.error("連接異常: {}", session.getId(), throwable);
}
/**
* 主動(dòng)推送(用于后臺(tái)調(diào)度)
*/
public static void broadcast(DanmakuMessage message) throws Exception {
String msg = mapper.writeValueAsString(message);
for (Session client : clients) {
client.getAsyncRemote().sendText(msg);
}
}
}
風(fēng)控與限流邏輯(防刷屏、防攻擊)
Redis 限流邏輯實(shí)現(xiàn)
件路徑:
/src/main/java/com/icoderoad/danmaku/security/RateLimiterService.java
package com.icoderoad.danmaku.security;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* 限流服務(wù)(基于 Redis)
*/
@Service
public class RateLimiterService {
@Resource
private StringRedisTemplate redisTemplate;
/**
* 判斷是否允許發(fā)送彈幕
* 限制頻率為:每用戶 1 秒 1 條
*/
public boolean canSend(String userId) {
String key = "danmaku:limit:" + userId;
Boolean exist = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exist)) {
return false; // 剛剛發(fā)過,限流
}
redisTemplate.opsForValue().set(key, "1", java.time.Duration.ofSeconds(1));
return true;
}
}
敏感詞過濾(簡(jiǎn)單正則)
可擴(kuò)展為 DFA 敏感詞算法。
public boolean containsIllegalContent(String content) {
String[] illegalWords = {"垃圾", "罵人詞"};
for (String word : illegalWords) {
if (content.contains(word)) return true;
}
return false;
}
前端彈幕展示邏輯實(shí)現(xiàn)
Canvas 動(dòng)畫渲染(多軌道)
文件路徑:
/web/static/js/danmaku.js
const canvas = document.getElementById('danmakuCanvas');
const ctx = canvas.getContext('2d');
let messages = [];
function Danmaku(text, color, speed, y) {
this.text = text;
this.color = color;
this.speed = speed;
this.x = canvas.width;
this.y = y;
}
Danmaku.prototype.draw = function () {
ctx.font = "20px Arial";
ctx.fillStyle = this.color || "#ffffff";
ctx.fillText(this.text, this.x, this.y);
this.x -= this.speed;
}
function render() {
ctx.clearRect(0, 0, canvas.width, canvas.height);
messages.forEach(msg => msg.draw());
messages = messages.filter(m => m.x + ctx.measureText(m.text).width > 0);
requestAnimationFrame(render);
}
render();
最終總結(jié)與優(yōu)化建議
模塊 | 技術(shù)選型 | 說(shuō)明 |
消息通道 | WebSocket (JDK + Spring Boot) | 支持毫秒級(jí)推送延遲 |
隊(duì)列結(jié)構(gòu) | ConcurrentLinkedQueue | 支持無(wú)鎖并發(fā)寫入 |
限流機(jī)制 | Redis TTL + Key 檢查 | 用戶級(jí)限速,低成本 |
前端渲染 | Canvas + requestAnimationFrame | 高性能動(dòng)畫,適配移動(dòng)端 |
風(fēng)控邏輯 | Redis 黑名單 + 敏感詞攔截 | 攔截非法信息與頻繁操作 |