Node.js子線程調(diào)試和診斷指南
調(diào)試、診斷子線程最直接的方式就是像調(diào)試、診斷主線程一樣,但是無論是動(dòng)態(tài)開啟還是靜態(tài)開啟,子線程都不可避免地需要內(nèi)置一些相關(guān)的非業(yè)務(wù)代碼,本文介紹另外一種對子線程代碼無侵入的調(diào)試方式,另外也介紹一下通過子線程調(diào)試主線程的方式。
1.初始化子線程的Inspector
在Node.js啟動(dòng)子線程的時(shí)候,會(huì)初始化Inspector。
- env_->InitializeInspector(std::move(inspector_parent_handle_));
在分析InitializeInspector之前,我們先看一下inspector_parent_handle_。
- std::unique_ptr<inspector::ParentInspectorHandle> inspector_parent_handle_;
inspector_parent_handle_是一個(gè)ParentInspectorHandle對象,這個(gè)對象是子線程和主線程通信的橋梁。我們看一下他的初始化邏輯(在主線程里執(zhí)行)。
- inspector_parent_handle_ = env->inspector_agent()->GetParentHandle(thread_id_, url);
調(diào)用agent的GetParentHandle獲取一個(gè)ParentInspectorHandle對象。
- std::unique_ptr<ParentInspectorHandle> Agent::GetParentHandle(int thread_id, const std::string& url) {
- return client_->getWorkerManager()->NewParentHandle(thread_id, url);
- }
內(nèi)部其實(shí)是通過client_->getWorkerManager()對象的NewParentHandle方法獲取ParentInspectorHandle對象,接下來我們看一下WorkerManager的NewParentHandle。
- std::unique_ptr<ParentInspectorHandle> WorkerManager::NewParentHandle(int thread_id, const std::string& url) {
- bool wait = !delegates_waiting_on_start_.empty();
- return std::make_unique<ParentInspectorHandle>(thread_id, url, thread_, wait);
- }
- ParentInspectorHandle::ParentInspectorHandle(
- int id, const std::string& url,
- std::shared_ptr<MainThreadHandle> parent_thread,
- bool wait_for_connect
- )
- : id_(id),
- url_(url),
- parent_thread_(parent_thread),
- wait_(wait_for_connect) {}
最終的架構(gòu)圖如下入所示。
分析完P(guān)arentInspectorHandle后繼續(xù)看一下env_->InitializeInspector(std::move(inspector_parent_handle_))的邏輯(在子線程里執(zhí)行)。
- int Environment::InitializeInspector(
- std::unique_ptr<inspector::ParentInspectorHandle> parent_handle) {
- std::string inspector_path;
- inspector_path = parent_handle->url();
- inspector_agent_->SetParentHandle(std::move(parent_handle));
- inspector_agent_->Start(inspector_path,
- options_->debug_options(),
- inspector_host_port(),
- is_main_thread());
- }
首先把ParentInspectorHandle對象保存到agent中,然后調(diào)用agent的Start方法。
- bool Agent::Start(...) {
- // 新建client對象
- client_ = std::make_shared<NodeInspectorClient>(parent_env_, is_main);
- // 調(diào)用agent中保存的ParentInspectorHandle對象的WorkerStarted
- parent_handle_->WorkerStarted(client_->getThreadHandle(), ...);
- }
Agent::Start創(chuàng)建了一個(gè)client對象,然后調(diào)用ParentInspectorHandle對象的WorkerStarted方法(剛才SetParentHandle的時(shí)候保存的),我們看一下這時(shí)候的架構(gòu)圖。
接著看parent_handle_->WorkerStarted。
- void ParentInspectorHandle::WorkerStarted(
- std::shared_ptr<MainThreadHandle> worker_thread, bool waiting) {
- std::unique_ptr<Request> request(
- new WorkerStartedRequest(id_, url_, worker_thread, waiting));
- parent_thread_->Post(std::move(request));
- }
WorkerStarted創(chuàng)建了一個(gè)WorkerStartedRequest請求,然后通過parent_thread_->Post提交,parent_thread_是MainThreadInterface對象。
- void MainThreadInterface::Post(std::unique_ptr<Request> request) {
- Mutex::ScopedLock scoped_lock(requests_lock_);
- // 之前是空則需要喚醒消費(fèi)者
- bool needs_notify = requests_.empty();
- // 消息入隊(duì)
- requests_.push_back(std::move(request));
- if (needs_notify) {
- // 獲取當(dāng)前對象的一個(gè)弱引用
- std::weak_ptr<MainThreadInterface>* interface_ptr = new std::weak_ptr<MainThreadInterface>(shared_from_this());
- // 請求V8執(zhí)行RequestInterrupt入?yún)?yīng)的回調(diào)
- isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) {
- // 把執(zhí)行時(shí)傳入的參數(shù)轉(zhuǎn)成MainThreadInterface
- std::unique_ptr<std::weak_ptr<MainThreadInterface>> interface_ptr {
- static_cast<std::weak_ptr<MainThreadInterface>*>(opaque)
- };
- // 判斷對象是否還有效,是則調(diào)用DispatchMessages
- if (auto iface = interface_ptr->lock()) iface->DispatchMessages();
- }, static_cast<void*>(interface_ptr));
- }
- // 喚醒消費(fèi)者
- incoming_message_cond_.Broadcast(scoped_lock);
- }
我們看看這時(shí)候的架構(gòu)圖。
接著看回調(diào)里執(zhí)行MainThreadInterface對象DispatchMessages方法的邏輯。
- void MainThreadInterface::DispatchMessages() {
- // 遍歷請求隊(duì)列
- requests_.swap(dispatching_message_queue_);
- while (!dispatching_message_queue_.empty()) {
- MessageQueue::value_type task;
- std::swap(dispatching_message_queue_.front(), task);
- dispatching_message_queue_.pop_front();
- // 執(zhí)行任務(wù)函數(shù)
- task->Call(this);
- }
- }
task是WorkerStartedRequest對象,看一下Call方法的代碼。
- void Call(MainThreadInterface* thread) override {
- auto manager = thread->inspector_agent()->GetWorkerManager();
- manager->WorkerStarted(id_, info_, waiting_);
- }
接著調(diào)用agent的WorkerManager的WorkerStarted。
- void WorkerManager::WorkerStarted(int session_id,
- const WorkerInfo& info,
- bool waiting) {
- children_.emplace(session_id, info);
- for (const auto& delegate : delegates_) {
- Report(delegate.second, info, waiting);
- }
- }
WorkerStarted記錄了一個(gè)id和上下文,因?yàn)閐elegates_初始化的時(shí)候是空的,所以不會(huì)執(zhí)行。至此,子線程Inspector初始化的邏輯就分析完了,結(jié)構(gòu)圖如下。
我們發(fā)現(xiàn),和主線程不一樣,主線程會(huì)啟動(dòng)一個(gè)WebSocket服務(wù)器接收客戶端的連接請求,而子線程只是初始化了一些數(shù)據(jù)結(jié)構(gòu)。下面我們看一下基于這些數(shù)據(jù)結(jié)構(gòu),主線程是如何動(dòng)態(tài)開啟調(diào)試子線程的。
2.主線程開啟調(diào)試子線程的能力
我們可以以以下方式開啟對子線程的調(diào)試。
- const { Worker, workerData } = require('worker_threads');
- const { Session } = require('inspector');
- // 新建一個(gè)新的通信通道
- const session = new Session();
- session.connect();
- // 創(chuàng)建子線程
- const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
- // 子線程啟動(dòng)成功后開啟調(diào)試子線程的能力
- worker.on('online', () => {
- session.post("NodeWorker.enable",
- {waitForDebuggerOnStart: false},
- (err) => {
- err && console.log("NodeWorker.enable", err);
- });
- });
- // 防止主線程退出
- setInterval(() => {}, 100000);
我們先來分析一下connect函數(shù)的邏輯。
- connect() {
- this[connectionSymbol] = new Connection((message) => this[onMessageSymbol](message));
- }
新建了一個(gè)Connection對象并傳入一個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)在收到消息時(shí)被回調(diào)。Connection是C++層導(dǎo)出的對象,由模版類JSBindingsConnection實(shí)現(xiàn)。
- template <typename ConnectionType>
- class JSBindingsConnection {}
我們看看導(dǎo)出的路邏輯。
- JSBindingsConnection<Connection>::Bind(env, target);
接著看Bind。
- static void Bind(Environment* env, Local<Object> target) {
- // class_name是Connection
- Local<String> class_name = ConnectionType::GetClassName(env);
- Local<FunctionTemplate> tmpl = env->NewFunctionTemplate(JSBindingsConnection::New);
- tmpl->InstanceTemplate()->SetInternalFieldCount(1);
- tmpl->SetClassName(class_name);
- tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
- env->SetProtoMethod(tmpl, "dispatch", JSBindingsConnection::Dispatch);
- env->SetProtoMethod(tmpl, "disconnect", JSBindingsConnection::Disconnect);
- target->Set(env->context(),
- class_name,
- tmpl->GetFunction(env->context()).ToLocalChecked())
- .ToChecked();
- }
當(dāng)我們在JS層執(zhí)行new Connection的時(shí)候,就會(huì)執(zhí)行JSBindingsConnection::New。
- static void New(const FunctionCallbackInfo<Value>& info) {
- Environment* env = Environment::GetCurrent(info);
- Local<Function> callback = info[0].As<Function>();
- new JSBindingsConnection(env, info.This(), callback);
- }
我們看看新建一個(gè)JSBindingsConnection對象時(shí)的邏輯。
- JSBindingsConnection(Environment* env,
- Local<Object> wrap,
- Local<Function> callback)
- : AsyncWrap(env, wrap, PROVIDER_INSPECTORJSBINDING),
- callback_(env->isolate(), callback) {
- Agent* inspector = env->inspector_agent();
- session_ = LocalConnection::Connect(
- inspector, std::make_unique<JSBindingsSessionDelegate>(env, this)
- );}static std::unique_ptr<InspectorSession> Connect(
- Agent* inspector,
- std::unique_ptr<InspectorSessionDelegate> delegate
- ) {
- return inspector->Connect(std::move(delegate), false);
- }
最終是傳入了一個(gè)JSBindingsSessionDelegate對象調(diào)用Agent的Connect方法。
- std::unique_ptr<InspectorSession> Agent::Connect(
- std::unique_ptr<InspectorSessionDelegate> delegate,
- bool prevent_shutdown) {
- int session_id = client_->connectFrontend(std::move(delegate),
- prevent_shutdown);
- // JSBindingsConnection對象的session_字段指向的對象
- return std::unique_ptr<InspectorSession>(
- new SameThreadInspectorSession(session_id, client_)
- );
- }
Agent的Connect方法繼續(xù)調(diào)用client_->connectFrontend。
- int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate,
- bool prevent_shutdown) {
- int session_id = next_session_id_++;
- channels_[session_id] = std::make_unique<ChannelImpl>(env_,
- client_,
- getWorkerManager(),
- std::move(delegate),
- getThreadHandle(),
- prevent_shutdown);
- return session_id;
- }
connectFrontend新建了一個(gè)ChannelImpl對象,在新建ChannelImpl時(shí),會(huì)初始化子線程處理的邏輯。
- explicit ChannelImpl(Environment* env,
- const std::unique_ptr<V8Inspector>& inspector,
- std::shared_ptr<WorkerManager> worker_manager,
- std::unique_ptr<InspectorSessionDelegate> delegate,
- std::shared_ptr<MainThreadHandle> main_thread_,
- bool prevent_shutdown)
- : delegate_(std::move(delegate)), prevent_shutdown_(prevent_shutdown),
- retaining_context_(false) {
- session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView());
- // Node.js拓展命令的處理分發(fā)器
- node_dispatcher_ = std::make_unique<protocol::UberDispatcher>(this);
- // trace相關(guān)
- tracing_agent_ = std::make_unique<protocol::TracingAgent>(env, main_thread_);
- tracing_agent_->Wire(node_dispatcher_.get());
- // 處理子線程相關(guān)
- if (worker_manager) {
- worker_agent_ = std::make_unique<protocol::WorkerAgent>(worker_manager);
- worker_agent_->Wire(node_dispatcher_.get());
- }
- // 處理runtime
- runtime_agent_ = std::make_unique<protocol::RuntimeAgent>();
- runtime_agent_->Wire(node_dispatcher_.get());
- }
我們這里只關(guān)注處理子線程相關(guān)的邏輯。看一下 worker_agent_->Wire。
- void WorkerAgent::Wire(UberDispatcher* dispatcher) {
- frontend_.reset(new NodeWorker::Frontend(dispatcher->channel()));
- NodeWorker::Dispatcher::wire(dispatcher, this);
- auto manager = manager_.lock();
- workers_ = std::make_shared<NodeWorkers>(frontend_, manager->MainThread());
- }
這時(shí)候的架構(gòu)圖如下
接著看一下NodeWorker::Dispatcher::wire(dispatcher, this)的邏輯。
- void Dispatcher::wire(UberDispatcher* uber, Backend* backend){
- std::unique_ptr<DispatcherImpl> dispatcher(new DispatcherImpl(uber->channel(), backend));
- uber->setupRedirects(dispatcher->redirects());
- uber->registerBackend("NodeWorker", std::move(dispatcher));
- }
首先新建了一個(gè)DispatcherImpl對象。
- DispatcherImpl(FrontendChannel* frontendChannel, Backend* backend)
- : DispatcherBase(frontendChannel)
- , m_backend(backend) {
- m_dispatchMap["NodeWorker.sendMessageToWorker"] = &DispatcherImpl::sendMessageToWorker;
- m_dispatchMap["NodeWorker.enable"] = &DispatcherImpl::enable;
- m_dispatchMap["NodeWorker.disable"] = &DispatcherImpl::disable;
- m_dispatchMap["NodeWorker.detach"] = &DispatcherImpl::detach;
- }
除了初始化一些字段,另外了一個(gè)kv數(shù)據(jù)結(jié)構(gòu),這個(gè)是一個(gè)路由配置,后面我們會(huì)看到它的作用。新建完DispatcherImpl后又調(diào)用了uber->registerBackend("NodeWorker", std::move(dispatcher))注冊該對象。
- void UberDispatcher::registerBackend(const String& name, std::unique_ptr<protocol::DispatcherBase> dispatcher){
- m_dispatchers[name] = std::move(dispatcher);
- }
這時(shí)候的架構(gòu)圖如下。
我們看到這里其實(shí)是建立了一個(gè)路由體系,后面收到命令時(shí)就會(huì)根據(jù)這些路由配置進(jìn)行轉(zhuǎn)發(fā),類似Node.js Express框架路由機(jī)制。這時(shí)候可以通過session的post給主線程發(fā)送NodeWorker.enable命令來開啟子線程的調(diào)試。我們分析這個(gè)過程。
- post(method, params, callback) {
- // 忽略參數(shù)處理
- // 保存請求對應(yīng)的回調(diào)
- if (callback) {
- this[messageCallbacksSymbol].set(id, callback);
- }
- // 調(diào)用C++的dispatch
- this[connectionSymbol].dispatch(JSONStringify(message));
- }
this[connectionSymbol]對應(yīng)的是JSBindingsConnection對象。
- static void Dispatch(const FunctionCallbackInfo<Value>& info) {
- Environment* env = Environment::GetCurrent(info);
- JSBindingsConnection* session;
- ASSIGN_OR_RETURN_UNWRAP(&session, info.Holder());
- if (session->session_) {
- session->session_->Dispatch(
- ToProtocolString(env->isolate(), info[0])->string());
- }
- }
session_是一個(gè)SameThreadInspectorSession對象。
- void SameThreadInspectorSession::Dispatch(
- const v8_inspector::StringView& message) {
- auto client = client_.lock();
- client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) {
- channels_[session_id]->dispatchProtocolMessage(message);
- }
最終調(diào)用了ChannelImpl的dispatchProtocolMessage。
- void dispatchProtocolMessage(const StringView& message) {
- std::string raw_message = protocol::StringUtil::StringViewToUtf8(message);
- std::unique_ptr<protocol::DictionaryValue> value =
- protocol::DictionaryValue::cast(protocol::StringUtil::parseMessage(
- raw_message, false));
- int call_id;
- std::string method;
- // 解析命令
- node_dispatcher_->parseCommand(value.get(), &call_id, &method);
- // 判斷命令是V8內(nèi)置命令還是Node.js拓展的命令
- if (v8_inspector::V8InspectorSession::canDispatchMethod(
- Utf8ToStringView(method)->string())) {
- session_->dispatchProtocolMessage(message);
- } else {
- node_dispatcher_->dispatch(call_id, method, std::move(value),
- raw_message);
- }
- }
因?yàn)镹odeWorker.enable是Node.js拓展的命令,所以會(huì)走到else里面的邏輯。根據(jù)路由配置找到該命令對應(yīng)的處理邏輯(NodeWorker.enable以.切分,對應(yīng)兩級路由)。
- void UberDispatcher::dispatch(int callId, const String& in_method, std::unique_ptr<Value> parsedMessage, const ProtocolMessage& rawMessage){
- // 找到一級路由配置
- protocol::DispatcherBase* dispatcher = findDispatcher(method);
- std::unique_ptr<protocol::DictionaryValue> messageObject = DictionaryValue::cast(std::move(parsedMessage));
- // 交給一級路由處理器處理
- dispatcher->dispatch(callId, method, rawMessage, std::move(messageObject));
- }
NodeWorker.enable對應(yīng)的路由處理器代碼如下
- void DispatcherImpl::dispatch(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr<protocol::DictionaryValue> messageObject){
- // 查找二級路由
- std::unordered_map<String, CallHandler>::iterator it = m_dispatchMap.find(method);
- protocol::ErrorSupport errors;
- // 找到處理函數(shù)
- (this->*(it->second))(callId, method, message, std::move(messageObject), &errors);
- }
dispatch繼續(xù)尋找命令對應(yīng)的處理函數(shù),最終找到NodeWorker.enable命令的處理函數(shù)為DispatcherImpl::enable。
- void DispatcherImpl::enable(...){
- std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr();
- DispatchResponse response = m_backend->enable(...);
- // 返回響應(yīng)給命令(類似請求/響應(yīng)模式)
- weak->get()->sendResponse(callId, response);
- }
根據(jù)架構(gòu)圖可以知道m(xù)_backend是WorkerAgent對象。
- DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) {
- auto manager = manager_.lock();
- std::unique_ptr<AgentWorkerInspectorDelegate> delegate(new AgentWorkerInspectorDelegate(workers_));
- event_handle_ = manager->SetAutoAttach(std::move(delegate));
- return DispatchResponse::OK();
- }
繼續(xù)調(diào)用WorkerManager的SetAutoAttach方法。
- std::unique_ptr<WorkerManagerEventHandle> WorkerManager::SetAutoAttach(
- std::unique_ptr<WorkerDelegate> attach_delegate) {
- int id = ++next_delegate_id_;
- // 保存delegate
- delegates_[id] = std::move(attach_delegate);
- const auto& delegate = delegates_[id];
- // 通知子線程
- for (const auto& worker : children_) {
- Report(delegate, worker.second, false);
- }
- ...
- }
SetAutoAttach遍歷子線程。
- void Report(const std::unique_ptr<WorkerDelegate>& delegate,
- const WorkerInfo& info, bool waiting) {
- if (info.worker_thread)
- delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread);
- }
info是一個(gè)WorkerInfo對象,該對象是子線程初始化和主線程建立關(guān)系的數(shù)據(jù)結(jié)構(gòu)。delegate是AgentWorkerInspectorDelegate對象。
- void WorkerCreated(const std::string& title,
- const std::string& url,
- bool waiting,
- std::shared_ptr<MainThreadHandle> target) override {
- workers_->WorkerCreated(title, url, waiting, target);
- }
workers_是一個(gè)NodeWorkers對象。
- void NodeWorkers::WorkerCreated(const std::string& title,
- const std::string& url,
- bool waiting,
- std::shared_ptr<MainThreadHandle> target) {
- auto frontend = frontend_.lock();
- std::string id = std::to_string(++next_target_id_);
- // 處理數(shù)據(jù)通信的delegate
- auto delegate = thread_->MakeDelegateThreadSafe(
- std::unique_ptr<InspectorSessionDelegate>(
- new ParentInspectorSessionDelegate(id, shared_from_this())
- )
- );
- // 建立和子線程V8 Inspector的通信通道
- sessions_[id] = target->Connect(std::move(delegate), true);
- frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting);
- }
WorkerCreated建立了一條和子線程通信的通道,然后通知命令的發(fā)送方通道建立成功。這時(shí)候架構(gòu)圖如下。
接著看attachedToWorker。
- void Frontend::attachedToWorker(const String& sessionId, std::unique_ptr<protocol::NodeWorker::WorkerInfo> workerInfo, bool waitingForDebugger){
- std::unique_ptr<AttachedToWorkerNotification> messageData = AttachedToWorkerNotification::create()
- .setSessionId(sessionId)
- .setWorkerInfo(std::move(workerInfo))
- .setWaitingForDebugger(waitingForDebugger)
- .build();
- // 觸發(fā)NodeWorker.attachedToWorker
- m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.attachedToWorker", std::move(messageData)));
- }
繼續(xù)看sendProtocolNotification
- void sendProtocolNotification(
- std::unique_ptr<Serializable> message) override {
- sendMessageToFrontend(message->serializeToJSON());
- }
- void sendMessageToFrontend(const StringView& message) {
- delegate_->SendMessageToFrontend(message);
- }
這里的delegate_是一個(gè)JSBindingsSessionDelegate對象。
- void SendMessageToFrontend(const v8_inspector::StringView& message)
- override {
- Isolate* isolate = env_->isolate();
- HandleScope handle_scope(isolate);
- Context::Scope context_scope(env_->context());
- MaybeLocal<String> v8string = String::NewFromTwoByte(isolate,
- message.characters16(),
- NewStringType::kNormal, message.length()
- );
- Local<Value> argument = v8string.ToLocalChecked().As<Value>();
- // 收到消息執(zhí)行回調(diào)
- connection_->OnMessage(argument);
- }
- // 執(zhí)行JS層回調(diào)
- void OnMessage(Local<Value> value) {
- MakeCallback(callback_.Get(env()->isolate()), 1, &value);
- }
JS層回調(diào)邏輯如下。
- [onMessageSymbol](message) {
- const parsed = JSONParse(message);
- // 收到的消息如果是某個(gè)請求的響應(yīng),則有個(gè)id字段記錄了請求對應(yīng)的id,否則則觸發(fā)事件
- if (parsed.id) {
- const callback = this[messageCallbacksSymbol].get(parsed.id);
- this[messageCallbacksSymbol].delete(parsed.id);
- if (callback) {
- callback(null, parsed.result);
- }
- } else {
- this.emit(parsed.method, parsed);
- this.emit('inspectorNotification', parsed);
- }
- }
主線程拿到Worker Session對一個(gè)的id,后續(xù)就可以通過命令NodeWorker.sendMessageToWorker加上該id和子線程通信。大致原理如下,主線程通過自己的channel和子線程的channel進(jìn)行通信,從而達(dá)到控制子線程的目的。
我們分析一下NodeWorker.sendMessageToWorker命令的邏輯,對應(yīng)處理函數(shù)為DispatcherImpl::sendMessageToWorker。
- void DispatcherImpl::sendMessageToWorker(...){
- std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr();
- DispatchResponse response = m_backend->sendMessageToWorker(in_message, in_sessionId);
- // 響應(yīng)
- weak->get()->sendResponse(callId, response);
- return;
- }
繼續(xù)分析m_backend->sendMessageToWorker。
- DispatchResponse WorkerAgent::sendMessageToWorker(const String& message,
- const String& sessionId) {
- workers_->Receive(sessionId, message);
- return DispatchResponse::OK();
- }
- void NodeWorkers::Receive(const std::string& id, const std::string& message) {
- auto it = sessions_.find(id);
- it->second->Dispatch(Utf8ToStringView(message)->string());
- }
sessions_對應(yīng)的是和子線程的通信的數(shù)據(jù)結(jié)構(gòu)CrossThreadInspectorSession。看一下該對象的Dispatch方法。
- void Dispatch(const StringView& message) override {
- state_.Call(&MainThreadSessionState::Dispatch,
- StringBuffer::create(message));
- }
再次調(diào)了MainThreadSessionState::Dispatch
- void Dispatch(std::unique_ptr<StringBuffer> message) {
- session_->Dispatch(message->string());
- }
session_是SameThreadInspectorSession對象。繼續(xù)看它的Dispatch方法。
- void SameThreadInspectorSession::Dispatch(
- const v8_inspector::StringView& message) {
- auto client = client_.lock();
- client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) {
- channels_[session_id]->dispatchProtocolMessage(message);
- }
通過層層調(diào)用,最終拿到了一個(gè)合子線程通信的channel,dispatchProtocolMessage方法剛才已經(jīng)分析過,該方法會(huì)根據(jù)命令做不同的處理,因?yàn)槲覀冞@里發(fā)送的是V8內(nèi)置的命令,所以會(huì)交給V8 Inspector處理。當(dāng)V8 Inspector處理完后,會(huì)通過ChannelImpl的sendResponse返回結(jié)果。
- void sendResponse(
- int callId,
- std::unique_ptr<v8_inspector::StringBuffer> message) override {
- sendMessageToFrontend(message->string());
- }
- void sendMessageToFrontend(const StringView& message) {
- delegate_->SendMessageToFrontend(message);
- }
這里的delegate_是ParentInspectorSessionDelegate對象。
- void SendMessageToFrontend(const v8_inspector::StringView& msg) override {
- std::string message = protocol::StringUtil::StringViewToUtf8(msg);
- workers_->Send(id_, message);
- }
- void NodeWorkers::Send(const std::string& id, const std::string& message) {
- auto frontend = frontend_.lock();
- if (frontend)
- frontend->receivedMessageFromWorker(id, message);
- }
- void Frontend::receivedMessageFromWorker(const String& sessionId, const String& message){
- std::unique_ptr<ReceivedMessageFromWorkerNotification> messageData = ReceivedMessageFromWorkerNotification::create()
- .setSessionId(sessionId)
- .setMessage(message)
- .build();
- // 觸發(fā)NodeWorker.receivedMessageFromWorker
- m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.receivedMessageFromWorker", std::move(messageData)));
- }
m_frontendChannel是主線程的ChannelImpl對象。
- void sendProtocolNotification(
- std::unique_ptr<Serializable> message) override {
- sendMessageToFrontend(message->serializeToJSON());
- }
- void sendMessageToFrontend(const StringView& message) {
- delegate_->SendMessageToFrontend(message);
- }
delegate_是C++層傳入的JSBindingsSessionDelegate對象。最終通過JSBindingsSessionDelegate對象回調(diào)JS層,之前已經(jīng)分析過就不再贅述。至此,主線程就具備了控制子線程的能力,但是控制方式有很多種。
2.1 使用通用的V8命令
通過下面代碼收集子線程的CPU Profile信息。
- const { Worker, workerData } = require('worker_threads');
- const { Session } = require('inspector');
- const session = new Session();
- session.connect();
- let id = 1;
- function post(sessionId, method, params, callback) {
- session.post('NodeWorker.sendMessageToWorker', {
- sessionId,
- message: JSON.stringify({ id: id++, method, params })
- }, callback);
- }
- session.on('NodeWorker.attachedToWorker', (data) => {
- post(data.params.sessionId, 'Profiler.enable');
- post(data.params.sessionId, 'Profiler.start');
- // 收集一段時(shí)間后提交停止收集命令
- setTimeout(() => {
- post(data.params.sessionId, 'Profiler.stop');
- }, 10000)
- });
- session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {
- const data = JSON.parse(message);
- console.log(data);
- });
- const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
- worker.on('online', () => {
- session.post("NodeWorker.enable",{waitForDebuggerOnStart: false}, (err) => { console.log(err, "NodeWorker.enable");});
- });
- setInterval(() => {}, 100000);
通過這種方式可以通過命令控制子線程的調(diào)試和數(shù)據(jù)收集。
2.2 在子線程中動(dòng)態(tài)執(zhí)行腳本
可以通過執(zhí)行腳本開啟子線程的WebSocket服務(wù),像調(diào)試主線程一樣。
- const { Worker, workerData } = require('worker_threads');
- const { Session } = require('inspector');
- const session = new Session();
- session.connect();
- let workerSessionId;
- let id = 1;
- function post(method, params) {
- session.post('NodeWorker.sendMessageToWorker', {
- sessionId: workerSessionId,
- message: JSON.stringify({ id: id++, method, params })
- });
- }
- session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {
- const data = JSON.parse(message);
- console.log(data);
- });
- session.on('NodeWorker.attachedToWorker', (data) => {
- workerSessionId = data.params.sessionId;
- post("Runtime.evaluate", {
- includeCommandLineAPI: true,
- expression: `const inspector = process.binding('inspector');
- inspector.open();
- inspector.url();
- `
- }
- );
- });
- const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
- worker.on('online', () => {
- session.post("NodeWorker.enable",{waitForDebuggerOnStart: false}, (err) => { err && console.log("NodeWorker.enable", err);});
- });
- setInterval(() => {}, 100000);
執(zhí)行上面的代碼就拿到以下輸出
- {
- id: 1,
- result: {
- result: {
- type: 'string',
- value: 'ws://127.0.0.1:9229/c0ca16c8-55aa-4651-9776-fca1b27fc718'
- }
- }
- }
通過該地址,客戶端就可以對子線程進(jìn)行調(diào)試了。上面代碼里使用process.binding而不是require加載inspector,因?yàn)閯偛磐ㄟ^NodeWorker.enable命令為子線程創(chuàng)建了一個(gè)到子線程Inspector的channel,而JS模塊里判斷如果channel非空則報(bào)錯(cuò)Inspector已經(jīng)打開。所以這里需要繞過這個(gè)限制,直接加載C++模塊開啟WebSocket服務(wù)器。
3.子線程調(diào)試主線程
不僅可以通過主線程調(diào)試子線程,還可以通過子線程調(diào)試主線程。Node.js在子線程暴露了connectToMainThread方法連接到主線程的Inspector(只能在work_threads中使用),實(shí)現(xiàn)的原理和之前分析的類似,主要是子線程連接到主線程的V8 Inspector,通過和該Inspector完成對主線程的控制。看下面一個(gè)例子。主線程代碼
- const { Worker, workerData } = require('worker_threads');const http = require('http');const worker = new Worker('./worker.js', {workerData: {port: 80}});
- http.createServer((_, res) => {
- res.end('main');
- }).listen(8000);
worker.js代碼如下:
- const fs = require('fs');
- const { workerData: { port } } = require('worker_threads');
- const { Session } = require('inspector');
- const session = new Session();
- session.connectToMainThread();
- session.post('Profiler.enable');
- session.post('Profiler.start');
- setTimeout(() => {
- session.post('Profiler.stop', (err, data) => {
- if (data.profile) {
- fs.writeFileSync('./profile.cpuprofile', JSON.stringify(data.profile));
- }
- });
- }, 5000)