使用Rust構建可以并發執行多個任務的線程池
在這篇文章中讓我們探討一下如何使用Rust構建線程池來并發地管理多個任務。
在開始實際的編碼之前,讓我們首先了解線程池是什么以及它是如何工作的。
線程池
線程池是工作線程的集合,創建這些線程是為了同時執行多個任務并等待新任務的到來。這意味著一開始創建了多個線程,并且所有線程都處于空閑狀態。
每當你的系統獲得任務時,它可以快速地將任務分配給這些線程,從而節省大量時間,而無需多次創建和刪除線程。
圖片
正如圖所看到的,線程池是等待從主線程接收任務以執行的多個線程的集合。
在該圖中,主線程中總共有15個任務,所有這些任務都被轉發給不同的工作線程并發執行。了解了線程池的概念后,讓我們來理解線程池的內部工作原理。
線程池是如何工作的?
在線程池體系結構中,主線程只有兩個任務:
1,接收所有的任務并將它們存儲在一個地方。
2,創建多個線程,并定期為它們分配不同的任務。
在接收任務之前創建線程集,并使用ID存儲在某個地方,以便我們可以通過ID識別它們。
然后每個線程都在等待接收任務,如果它們得到任務,就開始處理任務。完成任務后,他們再次等待下一個任務。
當該線程忙于執行任務時,主線程將更多的任務分配給其他線程,這樣在主線程結束任務之前沒有線程空閑。在完成所有任務后,主線程終止所有線程并關閉線程池。
現在我們了解了線程池是如何工作的。接下來,讓我們使用Rust實現一個線程池。
使用Rust實現線程池
1. 創建線程
我們需要一個函數來生成一個線程并返回它的JoinHandle。
此外,我們需要知道線程的ID,如果我們搞砸了,就可以用線程ID記錄錯誤,這樣我們就可以知道哪個線程出錯了。
可以看出,如果兩個相互關聯的數據需要組合,需要一個結構體。我們來創建一個:
struct Worker {
id: usize,
thread: JoinHandle<()>
}
現在我們實現一個可以返回新Worker的構造函數:
impl Worker {
fn new(id: usize) -> Self {
let thread = thread::spawn(|| {});
Self {id, thread}
}
}
現在,我們的函數已經準備好創建線程并將它們返回給調用者。
2. 存放線程
我們需要一個結構來保存所有線程的所有JoinHandles,我們還想控制線程池可以擁有多少線程。
這意味著,我們需要一個帶有構造函數的結構體,該函數指定一個數字來指示線程的數量,并且必須調用Worker來創建線程。
struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i));
}
Self { workers }
}
}
我們有了創建線程和管理線程的函數,現在是時候創建一個可以將任務分配給不同線程的函數了。
3. 給線程分配任務
我們的線程池結構體必須有一個函數,該函數可以在線程內部分配和執行任務。但是有一個問題,我們如何將任務發送給線程,以便線程能夠執行任務?
為此,我們需要一個task類型來表示我們需要完成的任務:
type task = Box<dyn FnOnce() + Send + 'static>;
在這里,意味著我們的任務必須實現Box<dyn>里的這些Trait:
1,實現FnOnce()意味著我們的任務是一個只能運行一次的函數。
2,實現Send,因為我們將任務從主線程發送到工作線程,所以將任務設置為Send類型,以便它可以在線程之間安全地傳輸。
3,實現'static,意味著我們的任務必須和程序運行的時間一樣長。
現在是時候將任務發送給工作線程了,但要做到這一點,我們必須在主線程和所有工作線程之間建立一個通道,因此我們需要使用Arc<Mutex<()>>。
讓我們來更新這兩個構造函數:
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let (sender, reciever) = mpsc::channel();
let reciever = Arc::new(Mutex::new(reciever));
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i, Arc::clone(&reciever)));
}
Self {
workers,
sender: Some(sender)
}
}
}
impl Worker {
fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
let thread = thread::spawn(move || {});
Self {
id,
thread
}
}
}
在ThreadPool構造函數中,我們創建了一個新的通道,并在Arc<Mutex<()>>中封裝了接收器,我們把接收器發送給工作線程,以便主線程可以發送任務,工作線程可以接收任務。
此外,我們必須更新ThreadPool結構體,以包含一個發送者,它將被主線程用來向不同的線程發送任務。
現在,讓我們實現在工作線程中執行任務的邏輯:
fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
let thread = thread::spawn(move || {
loop {
let receiver = reciever.lock()
.expect("Failed to grab the lock!")
.recv();
match receiver {
Ok(task) => {
println!("Thread {} got the task& executing.", id);
task();
thread::sleep(Duration::from_millis(10));
},
Err(_) => {
println!("No got the task");
break;
}
}
}
});
Self {
id,
thread
}
}
這里,在每個循環中,我們都試圖獲得鎖并調用鎖上的recv(),以便我們可以獲得主線程發送的任務。
接下來,我們在ThreadPool中實現一個函數,將任務發送到不同的線程。
impl ThreadPool {
fn new(size: usize) -> Self {
// snip
}
fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(job);
self.sender.send(job)
.expect("Failed to send the job to workers!");
}
}
我們還需要創建一個函數,在ThreadPool結束時動態終止所有線程。簡單地說,我們必須手動實現ThreadPool的Drop特性,在那里我們將終止所有線程。
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Thread {} is shutting down.", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join()..unwrap_or_else(|_| panic!("Failed to join the thread {}", worker.id));}
}
}
}
這里我們還必須刪除發送方,因為如果我們不這樣做,那么接收方將永遠循環。如果刪除發送者,那么接收者也會自動刪除,我們就可以成功地退出這個程序。
測試
main函數代碼如下:
fn main() {
let pool = ThreadPool::new(5);
for _ in 0..10 {
pool.execute(|| println!("Doing something"));
}
}
運行結果:
Thread 0 is shutting down.
Thread 0 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 1 got the job & executing.
Thread 2 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Doing something
Thread 0 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 2 got the job & executing.
Doing something
Thread 1 got the job & executing.
Doing something
No got the job
Thread 1 is shutting down.
No got the job
No got the job
No got the job
No got the job
Thread 2 is shutting down.
Thread 3 is shutting down.
Thread 4 is shutting down.