SpringBoot與RSocket整合,實現在線聊天系統
作者:Java知識日歷
Socket 提供了多種通信模式(Request-Response、Fire-and-Forget、Request-Stream 和 Channel),非常適合實時通信場景。相比之下,HTTP/REST 通常用于請求-響應模式,不適合長時間的連接或頻繁的數據交換,可能導致較高的延遲和資源浪費。
RSocket 是一個高性能、雙向通信的二進制協議,適用于實時數據流和低延遲應用場景。
我們為什么選擇RSocket?
- RSocket 提供了多種通信模式(Request-Response、Fire-and-Forget、Request-Stream 和 Channel),非常適合實時通信場景。相比之下,HTTP/REST 通常用于請求-響應模式,不適合長時間的連接或頻繁的數據交換,可能導致較高的延遲和資源浪費。
- RSocket 內置對流的支持,可以高效地處理大量并發連接和數據流,適合高并發的聊天系統。而傳統的 HTTP/REST 需要為每個請求創建新的線程或連接,高并發情況下會導致資源耗盡和性能下降。
- RSocket 是一個二進制協議,提供了更高的效率和靈活性。而HTTP/REST 使用文本格式(通常是 JSON 或 XML),增加了額外的開銷,并且每個請求都需要單獨的連接。
- RSocket 的各個組件高度模塊化,可以根據需要進行替換和優化。
- RSocket 提供了內置的安全特性和可靠的消息傳遞機制。
- RSocket 使用長連接,減少了連接建立和銷毀的開銷。
- 相比于 HTTP/REST,RSocket 的協議更加輕量級,減少了不必要的頭部信息。
哪些公司在使用RSocket?
- Netflix 是 RSocket 的主要貢獻者之一。他們使用 RSocket 來實現微服務間的高效通信,特別是在需要實時數據流和低延遲的應用程序中。
- CERN (歐洲核子研究組織)使用 RSocket 來實現實時數據分析和監控系統。
- Capital One 在其金融應用程序中使用 RSocket 來實現實時交易處理和通知系統。
- PayPal 使用 RSocket 來實現實時支付處理和通知系統。
- Uber 使用 RSocket 來實現實時位置跟蹤和調度系統。
- Intel 使用 RSocket 來實現實時數據分析和機器學習模型部署。
- Samsung SDS 在其云服務和物聯網解決方案中使用 RSocket。
代碼實操
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
實現RSocket服務端用于處理傳入的消息
package com.example.chat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Controller
publicclass ChatController {
// 存儲所有連接客戶端的FluxSink實例,用于廣播消息
privatefinal ConcurrentHashMap<String, FluxSink<Message>> clients = new ConcurrentHashMap<>();
/**
* 處理發送消息請求
*
* @param message 要發送的消息對象
* @return 返回空Mono表示操作完成
*/
@MessageMapping("chat.sendMessage")
public Mono<Void> sendMessage(Message message) {
log.info("Received message: {}", message); // 記錄接收到的消息
// 將消息廣播給所有已連接的客戶端
clients.values().forEach(sink -> sink.next(message));
return Mono.empty(); // 操作完成
}
/**
* 處理客戶端連接請求
*
* @param username 客戶端用戶名
* @return 返回一個Flux流,包含來自服務器和其他客戶端的消息
*/
@MessageMapping("chat.connect")
public Flux<Message> connect(String username) {
log.info("User connected: {}", username); // 記錄用戶連接事件
// 創建一個新的Flux流,并將其存儲在clients集合中
return Flux.create(sink -> clients.put(username, sink))
.doOnCancel(() -> { // 當客戶端斷開連接時執行的操作
log.info("User disconnected: {}", username); // 記錄用戶斷開連接事件
clients.remove(username); // 從clients集合中移除該用戶的sink
})
.mergeWith(Flux.interval(Duration.ofSeconds(1)) // 合并一個定時消息流
.map(tick -> new Message("Server", "Ping"))); // 發送心跳消息
}
}
定義消息類用于傳輸數據
package com.example.chat;
import lombok.Data;
/**
* 消息類,用于在客戶端和服務端之間傳輸消息
*/
@Data
publicclass Message {
private String sender; // 發送者名稱
private String content; // 消息內容
public Message() {}
public Message(String sender, String content) {
this.sender = sender;
this.content = content;
}
}
配置RSocket服務器,接受來自客戶端的連接
package com.example.chat.config;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
@Configuration
publicclass RSocketConfig {
/**
* 配置RSocket消息處理器
*
* @param strategies RSocket策略
* @return RSocket消息處理器實例
*/
@Bean
public RSocketMessageHandler rsocketMessageHandler(RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(strategies); // 設置RSocket策略
handler.route("chat.*") // 設置路由模式
.acceptMimeType(org.springframework.util.MimeTypeUtils.APPLICATION_JSON); // 設置支持的消息類型
return handler;
}
/**
* 配置TCP服務器傳輸方式
*
* @return TCP服務器傳輸實例
*/
@Bean
public TcpServerTransport tcpServerTransport() {
return TcpServerTransport.create(7000); // 監聽7000端口
}
}
測試
需要先安裝rsc
命令行工具。如果你本地沒有這個命令工具,請到GitHub (https://github.com/making/rsc/releases) 自行安裝。
第一個終端窗口A中訂閱消息
rsc --request-stream tcp://localhost:7000 chat.connect -d "Alice"
第二個終端窗口B中發送消息
rsc --fire-and-forget tcp://localhost:7000 chat.sendMessage -d '{"sender":"Bob","content":"Hello Alice!"}'
查看第一個終端窗口A的結果
{"sender":"Server","content":"Ping"}
{"sender":"Bob","content":"Hello Alice!"}
{"sender":"Server","content":"Ping"}
責任編輯:武曉燕
來源:
Java知識日歷