高性能PHP框架Webman服務端實現流式輸出有哪些解決方案?
概念
Stream流式輸出是一種數據處理方式,它將數據以流的形式進行傳輸和處理。在這種處理方式中,數據不再是集中存儲在某個地方,而是以分散的方式存儲在各個節點上,并不斷流動。數據流的處理是在流動的過程中完成的,因此能夠實時地處理數據,提高了數據處理效率。
流式輸出優點
- 實時性:Stream流式輸出能夠實時地處理數據,減少了數據處理的延遲,使得數據處理的結果更加及時。
- 高效性:由于數據是分散存儲的,因此可以并行處理數據,提高了數據處理效率。同時,由于數據處理是在流動的過程中完成的,可以避免數據的重復傳輸和處理。
- 可擴展性:Stream流式輸出具有良好的可擴展性,當數據量增加時,可以通過增加節點來擴展系統的處理能力。
- 靈活性:Stream流式輸出可以靈活地處理各種類型的數據,包括結構化數據、半結構化數據和非結構化數據。
流式輸出應用場景
- 實時數據分析:通過Stream流式輸出,可以對海量數據進行實時分析,從而得到實時的分析結果。例如,在金融領域中,可以對股票交易數據進行實時分析,得到實時的股票走勢預測。
- 實時推薦系統:通過Stream流式輸出,可以根據用戶的實時行為數據,推薦個性化的內容。例如,在電商平臺上,可以根據用戶的瀏覽和購買行為,推薦相關的商品和活動。
- 實時監控系統:通過Stream流式輸出,可以對各種類型的實時數據進行監控,如網絡流量、設備運行狀態等。這種系統可以及時發現異常情況,并采取相應的措施進行處理。
- 語音識別和自然語言處理:通過Stream流式輸出,可以對語音數據進行實時識別和處理,實現語音轉文字、機器翻譯等功能。這種技術可以大大提高語音識別的準確性和實時性。
- 物聯網數據處理:物聯網設備會產生大量的實時數據,通過Stream流式輸出可以對這些數據進行實時處理和分析,從而更好地了解設備的運行狀況和環境情況。
流式輸出實現解決方案
webman SSE
SSE也就是Server-sent Events,是一種服務端推送技術。它的本質是客戶端發送一個攜帶Accept: text/event-stream 頭的http請求后,連接不關閉,服務端可以在這個連接上不斷的給客戶端推送數據。
相關文檔:https://www.workerman.net/doc/workerman/http/SSE.html
process/EventStreamProcess.php自定義進程
<?php
/**
* @desc EventStreamProcess.php
* @author Tinywan(ShaoBo Wan)
* @date 2024/10/31 下午9:12
*/
declare(strict_types=1);
namespace process;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\ServerSentEvents;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
class EventStreamProcess
{
/**
* @desc onMessage
* @param TcpConnection $connection
* @param Request $request
* @return void
* @author Tinywan(ShaoBo Wan)
*/
public function onMessage(TcpConnection $connection, Request $request)
{
// 如果Accept頭是text/event-stream則說明是SSE請求
if ($request->header('accept') === 'text/event-stream') {
// 首先發送一個 Content-Type: text/event-stream 頭的響應
$connection->send(new Response(200, [
'Content-Type' => 'text/event-stream',
'Access-Control-Allow-Origin' => '*'
]));
// 定時向客戶端推送數據
$timerId = Timer::add(2, function () use ($connection, &$timerId){
// 連接關閉的時候要將定時器刪除,避免定時器不斷累積導致內存泄漏
if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
Timer::del($timerId);
return;
}
// 發送message事件,事件攜帶的數據為hello,消息id可以不傳
$connection->send(new ServerSentEvents(['event' => 'message', 'data' => '開源技術小棧 '.date('Y-m-d H:i:s'), 'id'=>time()]));
});
return;
}
$connection->send('ok');
}
}
config/process.php 加入配置。
return [
// ... 其它配置 ...
'event-stream' => [
'listen' => 'http://0.0.0.0:8288',
'handler' => \process\EventStreamProcess::class
]
];
啟動webman。
php start.php start
Workerman[start.php] start in DEBUG mode
-------------------------------------------------- WORKERMAN --------------------------------------------------
Workerman version:4.1.15 PHP version:8.2.10 Event-Loop:\Workerman\Events\Event
--------------------------------------------------- WORKERS ---------------------------------------------------
proto user worker listen processes status
tcp root webman http://0.0.0.0:8217 24 [OK]
tcp root monitor none 1 [OK]
tcp root event-stream http://0.0.0.0:8288 1 [OK]
---------------------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
前端測試代碼;
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
</head>
<body>
<h2>使用webman創建text/eventstream響應</h2>
</body>
<script>
var source = new EventSource('http://127.0.0.1:8288');
source.addEventListener('message', function (event) {
var data = event.data;
console.log(data); // 輸出:開源技術小棧 2024-10-31 21:40:57
}, false);
</script>
</html>
效果圖;
圖片
圖片
webman Http Chunk
process/HttpChunkStreamProcess.php自定義進程;
<?php
/**
* @desc webman Http Chunk 方案
* @author Tinywan(ShaoBo Wan)
*/
declare(strict_types=1);
namespace process;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Chunk;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
class HttpChunkStreamProcess
{
/**
* @param TcpConnection $connection
* @param Request $request
* @return void
* @author Tinywan(ShaoBo Wan)
*/
public function onMessage(TcpConnection $connection, Request $request)
{
// 首先發送一個帶Transfer-Encoding: chunked頭的Response響應
$total_count = 10;
$connection->send(new Response(200, [
'Transfer-Encoding' => 'chunked',
'Access-Control-Allow-Origin' => '*'
], "開源技術小棧。共【{$total_count}】段數據<br>"));
$timer_id = Timer::add(2, function () use ($connection, &$timer_id, $total_count){
static $count = 0;
// 連接關閉的時候要將定時器刪除,避免定時器不斷累積導致內存泄漏
if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
Timer::del($timer_id);
return;
}
if ($count++ >= $total_count) {
// 發送一個空的''代表結束響應
$connection->send(new Chunk(''));
return;
}
// 發送chunk數據
$connection->send(new Chunk("開源技術小棧。第【{$count}】段數據<br>"));
});
}
}
config/process.php 加入配置;
return [
// ... 其它配置 ...
'event-stream' => [
'listen' => 'http://0.0.0.0:8289',
'handler' => \process\HttpChunkStreamProcess::class
]
];
瀏覽器訪問 http://127.0.0.1:8289 頁面會定時輸出以下數據;
圖片
相關文檔:https://www.workerman.net/doc/workerman/http/response.html#%E5%8F%91%E9%80%81http%20chunk%E6%95%B0%E6%8D%AE
webman push插件
官方文檔:https://www.workerman.net/plugin/2