
隨著 SWC、NAPI-RS、Rspack 等等 Rust 前端工具鏈的出現,Rust 正在逐步成為前端工程化的一種新的選擇,無論是在性能、安全性還是開發體驗上都有著很大的優勢。筆者在工作中也在使用 Rust 進行一些前端工具鏈的開發工作,對于 Rust 的一些特性也在不斷的學習和探索,最近也會不定期的分享一些 Rust 的相關內容,比如: 如何用 napi-rs 搭建一個 Node.js 可以調用的 Rust 庫、Rust 并發和異步模型、Rust 宏編程 等等話題。
這篇文章將會圍繞 Rust 的并發模型展開,首先會介紹并發的基本概念,然后會對 Rust 中一些重要的并發工具進行介紹,比如 Atomic、Mutex、Condvar 等等,最后會實現一個 channel 并發處理模型。
注: 關于基礎的環境搭建和語法內容不會進行講解,可以參考 《Rust 語言圣經》這本書,相信對于初學者是一個不錯的選擇,地址: https://course.rs/about-book.html。
什么是并發?
要理解并發,我們繞不開另外一個相似的概念——并行,這兩個概念也是計算機科學中經常被提到的兩個概念,它們之間到底有什么區別?
這里引入非常經典的解釋,來自 Golang 之父 Rob Pike 的一段話:
Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
翻譯過來就是: 并發是指同時處理很多事情,而并行是指同時做很多事情。
在并發的場景中,對于正在處理的一些任務,雖然看起來好像它們在同時執行,但實際上是通過在單個處理器上交替輪流運行,某個時刻只有一個任務在運行,而其他任務都處于等待狀態。
而在并行的場景中,對于正在處理的一些任務,它們是真正的同時執行。
而兩者也并不是相互排斥的,并發和并行可以同時存在,比如在多核的 CPU 中,我們可以同時運行多個并發的任務,這樣就可以充分利用多核 CPU 的優勢,提高程序的執行效率。
Rust 中的并發原語
我們通常可以通過把任務放到多線程,或者多個異步任務來實現并發,在這個過程中,其實真正的難點不在于如何創建多個線程或者異步任務,而在于如何處理這些并發任務的同步和競態問題。
在 Rust 中,提供了一些并發原語,來幫助我們處理并發任務的同步和競態問題,這些原語包括: Atomic、Mutex、Condvar、Arc 等等,下面我們來逐一介紹一下。
Atomic
Atomic 是原子操作,它提供了一些原子操作的方法,比如 fetch_add、fetch_sub 等等,這些方法都是原子化的,也就是說,這些方法在執行的過程中,不會被其他線程打斷,也不會被其他線程修改,這樣就可以保證這些方法的執行是安全的。比如:
use std::sync::atomic::{AtomicUsize, Ordering};
let a = AtomicUsize::new(0);
a.fetch_add(1, Ordering::SeqCst);
Ordering::SeqCst 代表嚴格控制操作順序的一致性,可以參考: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
上面的代碼中,我們創建了一個 AtomicUsize 類型的變量 a,然后調用了 fetch_add 方法,這個方法會將 a 的值加 1,這個過程是原子化的。
為什么這里要突出強調一下原子化呢?這里我們來舉個例子:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let counter = AtomicUsize::new(0);
let t1 = thread::spawn(|| {
for _ in 0..100 {
counter.fetch_add(1, Ordering::Relaxed);
}
});
let t2 = thread::spawn(|| {
for _ in 0..100 {
counter.fetch_add(1, Ordering::Relaxed);
}
});
t1.join().unwrap();
t2.join().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 200);
如果 fetch_add 方法執行不是原子化的,那么就可能出現競態問題。例如,當線程 t1 和 t2 同時運行時,它們可能讀取相同的計數器值,然后各自將其增加,并將結果存回計數器中,從而導致丟失一次增加的操作。這樣就會導致最終結果小于預期值 200。
所以所謂的原子化,實際上是將某些步驟合并成一個原子操作,不能中斷,拿這里的 fetch_add 來說:
- 讀取 counter 的值。
- 將 counter 的值加 1。
這兩個步驟不能中斷,如果中斷了,那么就會導致競態問題。
Mutex
Mutex 是常用的一種互斥鎖,它可以保證在同一時刻,只有一個線程可以訪問某個數據,其他線程必須等待,直到鎖被釋放。
Mutex 有兩種狀態: 鎖定和未鎖定,當 Mutex 處于鎖定狀態時,其他線程就無法再次獲取鎖,直到 Mutex 處于未鎖定狀態。
舉一個例子:
use std::sync::Mutex;
use std::thread;
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut value = counter.lock().unwrap();
*value += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
這段代碼會有編譯問題,后續會分析。
這里我們通過循環創建了 10 個線程來增加計數器的值。每個線程都獲取了 Mutex 鎖,并修改了計數器的值。當某個線程完成時,它會釋放互斥鎖,允許其他線程進行修改。
最后,我們使用 join() 方法等待所有線程完成,并打印出最終結果。
但這里的代碼涉及到所有權轉移的問題,我們知道,在 Rust 中,同一時間一個變量只能有一個所有者,當我們將 counter 傳遞給線程時,就會發生所有權轉移,這樣就會導致其它的線程無法獲取 counter 的所有權,導致編譯報錯。
我們需要使用 Arc 來解決這個問題。
Arc
Arc 是原子引用計數,它可以在多個線程之間共享數據,它的內部實現是通過原子操作來實現的,所以它是線程安全的。
我們可以通過 Arc::new 來創建一個 Arc 對象,然后通過 Arc::clone 來克隆一個 Arc 對象,這樣就可以在多個線程之間共享數據了。
use std::sync::{Arc, Mutex};
use std::thread;
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut value = counter.lock().unwrap();
*value += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Condvar
Condvar 是一個條件變量,它可以讓線程等待某個條件滿足,然后再執行。比如:
use std::sync::{Arc, Condvar, Mutex};
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
let thread1 = std::thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
thread1.join().unwrap();
上面的代碼中,我們創建了一個 pair,它是一個元組,第一個元素是一個 Mutex,第二個元素是一個 Condvar。然后我們創建了一個線程 thread1,它會將 Mutex 中的值設置為 true,然后調用 Condvar 的 notify_one 方法,通知 Condvar 等待的線程。
而在主線程中,我們會調用 Condvar 的 wait 方法,等待 Condvar 的通知,當主線程收到通知后,就會繼續執行。
使用 Channel 處理并發
讀到這里,你可能會說了,我們使用 Mutex、Arc、Condvar 等方式來處理并發,看起來很麻煩呀?其實,Rust 中還有一種更簡單的方式來處理并發,那就是通過 Channel。
Channel 的本質是一個消息隊列,它可以讓多個線程之間進行消息通信,把讀者和寫者分離。根據讀者和寫者的數量,Channel 可以分為下面的幾個類型:
- 單生產者單消費者(Single Producer, Single Consumer, SPSC)
- 單生產者多消費者(Single Producer, Multiple Consumer, SPMC)
- 多生產者單消費者(Multiple Producer, Single Consumer, MPSC)
- 多生產者多消費者(Multiple Producer, Multiple Consumer, MPMC)
其中 MPSC 是最常用的,在 Rust 中,它是通過 std::sync::mpsc 模塊來實現的。我們來看看它是如何使用的。
use std::sync::mpsc;
let (s, r) = mpsc::channel();
let s1 = mpsc::Sender::clone(&s);
std::thread::spawn(move || {
let val = String::from("hi");
s1.send(val).unwrap();
});
let received = r.recv().unwrap();
println!("Got: {}", received);
上面的代碼中,我們創建了一個 Channel,它是一個元組,第一個元素是一個 Sender,第二個元素是一個 Receiver。Sender 用來發送消息,Receiver 用來接收消息。
我們通過 mpsc::Sender::clone 方法來克隆一個 Sender,然后將克隆的 Sender 傳遞給線程,線程中通過 Sender 的 send 發送消息。而在主線程中,我們通過 Receiver 的 recv 方法來接收消息。
實現一個 Channel
接下來我們基于 Arc、Mutex、Condvar 來實現一個 Channel,它的功能和 std::sync::mpsc 中的 channel 類似,支持多生產者單消費者。
1、創建項目
首先我們通過 cargo new my-channel --lib 來創建一個庫項目,然后在 Cargo.toml 中添加依賴:
[dependencies]
anyhow="1.0.40"
anyhow 是一個錯誤處理庫,它可以讓我們更方便的處理錯誤。
2、整體設計
對外暴露一個 channel 函數,它返回一個 Sender 和 Receiver,Sender 用來發送消息,Receiver 用來接收消息。
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
todo!()
}
因此關鍵的數據結構就是 Sender 和 Receiver,它們都需要持有一個共享的內部數據結構,我們將其命名為 Inner,它的定義如下:
// src/lib.rs
use anyhow::{anyhow, Ok, Result};
use std::{
collections::VecDeque,
sync::{atomic::AtomicUsize, Arc, Condvar, Mutex},
};
struct Inner<T> {
// 共享的數據
data: Mutex<VecDeque<T>>,
// 條件變量
condvar: Condvar,
// 發送者數量,使用原子操作
senders: AtomicUsize,
// 接收者數量,使用原子操作
receivers: AtomicUsize,
}
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
OK,確定了數據結構之后,我們來實現 Sender 和 Receiver 的行為。
3、實現 Sender
首先我們來實現 Sender:
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<()> {
todo!()
}
pub fn get_receivers_count(&self) -> usize {
todo!()
}
}
我們需要實現下面的方法:
- send 方法,用來發送消息。
- get_receivers_count 方法,用來獲取接收者的數量。
具體實現如下:
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<()> {
// 如果沒有接收者了,就拋錯
if self.get_receivers_count() == 0 {
return Err(anyhow!("no more receivers"));
}
let mut data = self.inner.data.lock().unwrap();
data.push_back(value);
// 通知接收者
self.inner.condvar.notify_one();
Ok(())
}
pub fn get_receivers_count(&self) -> usize {
self.inner
.receivers
.load(std::sync::atomic::Ordering::SeqCst)
}
}
上面的代碼中,我們通過 get_receivers_count 方法來獲取接收者的數量,如果沒有接收者了,就拋錯。然后我們通過 Mutex 的 lock 方法來獲取鎖,然后將消息放入隊列中,最后通過 Condvar 的 notify_one 方法來通知接收者。
4、實現 Receiver
接下來我們來實現 Receiver:
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T> {
todo!()
}
pub fn get_senders_count(&self) -> usize {
todo!()
}
}
我們需要實現下面的方法:
- recv 方法,用來接收消息。
- get_senders_count 方法,用來獲取發送者的數量。
具體實現如下:
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T> {
let mut data = self.inner.data.lock().unwrap();
loop {
// 如果沒有發送者了,就拋錯
if self.get_senders_count() == 0 {
return Err(anyhow!("no more senders"));
}
// 如果隊列中有消息,就返回
if let Some(value) = data.pop_front() {
return Ok(value);
}
// 如果隊列中沒有消息,就等待
data = self.inner.condvar.wait(data).unwrap();
}
}
pub fn get_senders_count(&self) -> usize {
self.inner
.senders
.load(std::sync::atomic::Ordering::SeqCst)
}
}
上面的代碼中,我們通過 get_senders_count 方法來獲取發送者的數量,如果沒有發送者了,就拋錯。
然后我們通過 Mutex 的 lock 方法來獲取鎖,通過 Condvar 的 wait 方法來等待消息,如果隊列中有消息,就返回,如果隊列中沒有消息,就繼續等待,直到有消息為止。
當然,我們還需要實現 Drop trait,當 Sender 或者 Receiver 被釋放時,我們需要更新發送者數量或者接收者數量:
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner
.senders
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner
.receivers
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
5、實現 channel 函數
最后我們來實現 channel 函數:
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
data: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
senders: AtomicUsize::new(1),
receivers: AtomicUsize::new(1),
});
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}
我們通過 Arc 來包裝 Inner,然后創建一個 Sender 和一個 Receiver,最后返回。
6、測試
我們來測試一下目前的 channel 能否正常工作:
#[test]
fn test_channel() {
let (mut s, r) = channel();
let mut s1 = s.clone();
let mut s2 = s.clone();
let t = thread::spawn(move || {
s.send(1).unwrap();
});
let t1 = thread::spawn(move || {
s1.send(10).unwrap();
});
let t2 = thread::spawn(move || {
s2.send(100).unwrap();
});
for handle in [t, t1, t2] {
handle.join().unwrap();
}
let mut result = [r.recv().unwrap(), r.recv().unwrap(), r.recv().unwrap()];
// 保證順序的穩定
result.sort();
assert_eq!(result, [1, 10, 100]);
}
#[test]
fn with_no_senders() {
let (s, r) = channel::<i32>();
drop(s);
assert!(r.recv().is_err());
}
#[test]
fn with_no_receivers() {
let (mut s, _) = channel::<i32>();
assert!(s.send(1).is_err());
}
OK,目前的 channel 已經可以正常工作了。
總結
這篇文章中,我們介紹了 Rust 中并發的基礎概念,包括 Mutex、Condvar、Arc、Atomic 等,然后我們實現了一個簡單的 MPSC channel,即多生產者單消費者模型,理解了 channel 內部的實現原理,其內部也是基于 Mutex 和 Condvar 這些基礎的原語來實現的。