如何構建一個永不掉鏈的JavaScript實時數據同步引擎
從微服務到本地腳本,本文將揭秘我如何在Node.js中設計一套健壯的發布-訂閱架構,實現多源實時數據的同步、記錄與轉換。
問題根源:數據孤島危機
- 應用A將數據存入Firebase
- 應用B使用MongoDB
- 命令行工具將數據轉儲到本地JSON文件
- 而我的儀表盤?對這三個數據源一無所知
我陷入了數據混亂的泥潭:
- 系統間缺乏實時通信
- 視圖數據陳舊
- 狀態不一致
- 承諾失效
- 手動刷新成了家常便飯
于是,我構建了自己的JavaScript實時發布-訂閱引擎,它能:
- 監控多個數據源
- 跨通道觸發事件
- 實時同步數據
- 記錄每次事務
- 確保全局一致性
下面讓我展示如何將這些碎片化工具整合成協調運作的系統。
基礎架構:發布-訂閱核心
首先創建所有工具都能通信的事件管理器:
// core/EventBus.js
const EventEmitter = require('events');
class EventBus extends EventEmitter {}
const eventBus = new EventBus();
module.exports = eventBus;
簡潔而優雅。這個eventBus成為整個引擎的中樞神經,任何數據源現在都能發布和訂閱特定事件。
實時監控文件系統變化
對于命令行工具更新的JSON文件,系統需要實時檢測文件變更并作出響應:
// watchers/FileWatcher.js
const fs = require('fs');
const path = require('path');
const eventBus = require('../core/EventBus');
functionwatchFile(filePath) {
fs.watch(filePath, (eventType, filename) => {
if (filename) {
const fullPath = path.resolve(filePath);
const data = JSON.parse(fs.readFileSync(fullPath));
eventBus.emit('file:update', { source: filename, data });
console.log(`[FileWatcher] 檢測到 ${filename} 變更`);
}
});
}
module.exports = watchFile;
現在每次文件更新都會向系統發送結構化事件。
實時監聽Firebase變更
Firebase更新迅速,但對其他系統卻不可見。通過Firebase SDK監聽器解決這個問題:
// watchers/FirebaseWatcher.js
const { initializeApp } = require('firebase/app');
const { getDatabase, ref, onValue } = require('firebase/database');
const eventBus = require('../core/EventBus');
const firebaseConfig = {
apiKey: "xxx",
authDomain: "xxx",
databaseURL: "https://your.firebaseio.com",
projectId: "xxx"
};
const app = initializeApp(firebaseConfig);
const db = getDatabase(app);
functionwatchFirebase(path) {
const dbRef = ref(db, path);
onValue(dbRef, (snapshot) => {
const data = snapshot.val();
eventBus.emit('firebase:update', { path, data });
console.log(`[FirebaseWatcher] ${path} 數據更新`);
});
}
module.exports = watchFirebase;
現在Firebase更新直接進入JavaScript引擎,無需輪詢。
通過變更流實現MongoDB實時同步
MongoDB作為另一個微服務的后端,通過變更流功能保持同步:
// watchers/MongoWatcher.js
const { MongoClient } = require('mongodb');
const eventBus = require('../core/EventBus');
asyncfunctionwatchMongo(uri, dbName, collectionName) {
const client = newMongoClient(uri);
await client.connect();
const collection = client.db(dbName).collection(collectionName);
const changeStream = collection.watch();
changeStream.on('change', (next) => {
eventBus.emit('mongo:update', {
operation: next.operationType,
document: next.fullDocument
});
console.log(`[MongoWatcher] 檢測到變更: ${next.operationType}`);
});
}
module.exports = watchMongo;
任何插入、更新或刪除操作都會觸發自定義事件,MongoDB就此加入同步網絡。
數據標準化處理
不僅需要同步原始數據,還要標準化和記錄所有通道的數據:
// core/Transformer.js
const eventBus = require('./EventBus');
const logger = require('../utils/logger');
functiontransformPayload(payload, source) {
return {
id: Date.now(),
source,
data: payload,
timestamp: newDate().toISOString()
};
}
functioninitTransformer() {
eventBus.on('file:update', (payload) => {
const result = transformPayload(payload.data, 'file');
logger.log(result);
eventBus.emit('data:ready', result);
});
eventBus.on('firebase:update', (payload) => {
const result = transformPayload(payload.data, 'firebase');
logger.log(result);
eventBus.emit('data:ready', result);
});
eventBus.on('mongo:update', (payload) => {
const result = transformPayload(payload.document, 'mongo');
logger.log(result);
eventBus.emit('data:ready', result);
});
}
module.exports = initTransformer;
所有數據源現在都輸出統一格式,消除了混亂,確保一致性。
通過Socket.io實現前端實時同步
為了讓網頁儀表盤實時顯示數據流,添加WebSocket支持:
npm install socket.io
// server.js
const http = require('http');
const socketIo = require('socket.io');
const express = require('express');
const eventBus = require('./core/EventBus');
const initTransformer = require('./core/Transformer');
const app = express();
const server = http.createServer(app);
const io = socketIo(server);
initTransformer();
eventBus.on('data:ready', (payload) => {
io.emit('sync', payload);
});
app.get('/', (_, res) => res.send('<h1>同步引擎運行中</h1>'));
server.listen(3000, () =>console.log('服務運行于 http://localhost:3000'));
前端通過以下方式連接:
const socket = io("http://localhost:3000");
socket.on("sync", (data) => {
console.log("實時更新:", data);
});
現在,本地文件、Firebase和MongoDB的實時同步數據都匯聚到一個數據流中。
完整記錄每次同步
添加文件記錄器持久化每次同步更新:
// utils/logger.js
const fs = require('fs');
function log(data) {
const logEntry = `[${data.timestamp}] (${data.source}) ${JSON.stringify(data.data)}\n`;
fs.appendFileSync('sync.log', logEntry);
}
module.exports = { log };
這提供了回放能力和完整的同步歷史記錄,便于調試。
引擎啟動腳本
最終將所有組件整合:
// index.js
const watchFile = require('./watchers/FileWatcher');
const watchFirebase = require('./watchers/FirebaseWatcher');
const watchMongo = require('./watchers/MongoWatcher');
const initTransformer = require('./core/Transformer');
watchFile('./data/myfile.json');
watchFirebase('/live/updates');
watchMongo('mongodb://localhost:27017', 'mydb', 'records');
initTransformer();
實現效果:
- 所有數據源互聯
- 所有事件可追溯
- 所有同步標準化
- 所有更新實時推送至網頁
為何不再需要手動同步
這個引擎為我節省了大量時間,替代了:
- 手動導出Firebase數據
- 運行bash腳本同步MongoDB
- 刷新JSON文件
- 合并日志
現在擁有:
- 實時同步系統
- 單一事件流
- 實時儀表盤
- 完整日志追蹤
- 可插拔架構
全部由Node.js、EventEmitters和清晰的模塊化代碼驅動。
無需復雜框架,不用Kubernetes,純粹發揮JavaScript跨平臺運行的優勢。
如果你正面臨數據系統碎片化的問題,這就是將它們統一起來的一勞永逸之法。
原文鏈接:https://medium.com/javascript-in-plain-english/how-i-built-a-realtime-data-sync-engine-in-javascript-that-never-misses-a-beat-1359e6f2dc72作者:Ai Panda